diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-24 13:38:27 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-24 13:38:27 -0600 |
| commit | 863ab07212e4044dab656609f1c8fdf578134e84 (patch) | |
| tree | 2add50a0043bccd9e5246da786bcb87681d1c15d | |
| parent | db82a14aa4942293b2b9726d297bfeb71f2bf79b (diff) | |
feat: publish an event after a new sparkle is saved to the database
| -rw-r--r-- | app/controllers/sparkles/controller_test.go | 5 | ||||
| -rw-r--r-- | app/db/in_memory_repository.go | 14 | ||||
| -rw-r--r-- | app/db/in_memory_repository_test.go | 25 | ||||
| -rw-r--r-- | app/init.go | 3 | ||||
| -rw-r--r-- | vendor/github.com/xlgmokha/x/pkg/event/aggregator.go | 35 | ||||
| -rw-r--r-- | vendor/github.com/xlgmokha/x/pkg/event/event.go | 3 | ||||
| -rw-r--r-- | vendor/github.com/xlgmokha/x/pkg/event/subscription.go | 3 | ||||
| -rw-r--r-- | vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go | 39 | ||||
| -rw-r--r-- | vendor/modules.txt | 1 |
9 files changed, 117 insertions, 11 deletions
diff --git a/app/controllers/sparkles/controller_test.go b/app/controllers/sparkles/controller_test.go index d2469a7..e825343 100644 --- a/app/controllers/sparkles/controller_test.go +++ b/app/controllers/sparkles/controller_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/xlgmokha/x/pkg/event" "github.com/xlgmokha/x/pkg/serde" "github.com/xlgmokha/x/pkg/test" "gitlab.com/gitlab-org/software-supply-chain-security/authorization/sparkled/app/cfg" @@ -35,7 +36,7 @@ func (f *FailingResponseWriter) Write([]byte) (int, error) { func TestSparkles(t *testing.T) { t.Run("GET /sparkles", func(t *testing.T) { sparkle, _ := domain.NewSparkle("@tanuki for helping me") - store := db.NewRepository[*domain.Sparkle]() + store := db.NewRepository[*domain.Sparkle](event.New[*domain.Sparkle]()) store.Save(t.Context(), sparkle) mux := http.NewServeMux() @@ -64,7 +65,7 @@ func TestSparkles(t *testing.T) { t.Run("POST /sparkles", func(t *testing.T) { t.Run("when a user is authenticated", func(t *testing.T) { currentUser := domain.NewUser(domain.WithID[*domain.User](domain.ID("1"))) - repository := db.NewRepository[*domain.Sparkle]() + repository := db.NewRepository[*domain.Sparkle](event.New[*domain.Sparkle]()) t.Run("when the user is authorized", func(t *testing.T) { mux := http.NewServeMux() diff --git a/app/db/in_memory_repository.go b/app/db/in_memory_repository.go index 2aa1fed..1177662 100644 --- a/app/db/in_memory_repository.go +++ b/app/db/in_memory_repository.go @@ -5,20 +5,23 @@ import ( "sort" "sync" + "github.com/xlgmokha/x/pkg/event" "github.com/xlgmokha/x/pkg/x" "gitlab.com/gitlab-org/software-supply-chain-security/authorization/sparkled/app/domain" "gitlab.com/gitlab-org/software-supply-chain-security/authorization/sparkled/pkg/pls" ) type inMemoryRepository[T domain.Entity] struct { - items []T - mu sync.RWMutex + aggregator *event.TypedAggregator[T] + items []T + mu sync.RWMutex } -func NewRepository[T domain.Entity]() domain.Repository[T] { +func NewRepository[T domain.Entity](aggregator *event.TypedAggregator[T]) domain.Repository[T] { return &inMemoryRepository[T]{ - items: []T{}, - mu: sync.RWMutex{}, + aggregator: aggregator, + items: []T{}, + mu: sync.RWMutex{}, } } @@ -50,5 +53,6 @@ func (r *inMemoryRepository[T]) Save(ctx context.Context, item T) error { sort.Slice(r.items, func(i, j int) bool { return r.items[i].GetID() > r.items[j].GetID() }) + r.aggregator.Publish("after.create", item) return nil } diff --git a/app/db/in_memory_repository_test.go b/app/db/in_memory_repository_test.go index cfbab41..8e1e017 100644 --- a/app/db/in_memory_repository_test.go +++ b/app/db/in_memory_repository_test.go @@ -7,11 +7,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/xlgmokha/x/pkg/event" "gitlab.com/gitlab-org/software-supply-chain-security/authorization/sparkled/app/domain" ) func TestInMemoryRepository(t *testing.T) { - storage := NewRepository[*domain.Sparkle]() + aggregator := event.New[*domain.Sparkle]() + storage := NewRepository[*domain.Sparkle](aggregator) t.Run("Save", func(t *testing.T) { t.Run("an invalid Sparkle", func(t *testing.T) { @@ -30,8 +32,25 @@ func TestInMemoryRepository(t *testing.T) { assert.Equal(t, "because", sparkles[0].Reason) }) + t.Run("publishes an event", func(t *testing.T) { + called := false + var payload *domain.Sparkle + + aggregator.SubscribeTo("after.create", func(item *domain.Sparkle) { + called = true + payload = item + }) + + sparkle := &domain.Sparkle{Sparklee: "@tanuki", Reason: "because"} + require.NoError(t, storage.Save(t.Context(), sparkle)) + + require.True(t, called) + require.NotNil(t, payload) + assert.Equal(t, sparkle, payload) + }) + t.Run("prevents race conditions", func(t *testing.T) { - repository := NewRepository[*domain.Sparkle]() + repository := NewRepository[*domain.Sparkle](aggregator) ctx := context.Background() numGoroutines := 100 @@ -92,7 +111,7 @@ func TestInMemoryRepository(t *testing.T) { }) t.Run("All", func(t *testing.T) { - repository := NewRepository[*domain.Sparkle]() + repository := NewRepository[*domain.Sparkle](aggregator) require.NoError(t, repository.Save(t.Context(), &domain.Sparkle{ Sparklee: "@tanuki", Reason: "because", diff --git a/app/init.go b/app/init.go index 960102a..444d991 100644 --- a/app/init.go +++ b/app/init.go @@ -8,6 +8,7 @@ import ( "github.com/authzed/authzed-go/v1" "github.com/rs/zerolog" "github.com/xlgmokha/x/pkg/env" + "github.com/xlgmokha/x/pkg/event" "github.com/xlgmokha/x/pkg/ioc" "github.com/xlgmokha/x/pkg/log" "github.com/xlgmokha/x/pkg/mapper" @@ -34,7 +35,7 @@ func init() { return ioc.MustResolve[*authzed.Client](ioc.Default) }) ioc.RegisterSingleton[domain.Repository[*domain.Sparkle]](ioc.Default, func() domain.Repository[*domain.Sparkle] { - return db.NewRepository[*domain.Sparkle]() + return db.NewRepository[*domain.Sparkle](event.New[*domain.Sparkle]()) }) ioc.RegisterSingleton[*http.ServeMux](ioc.Default, func() *http.ServeMux { return http.NewServeMux() diff --git a/vendor/github.com/xlgmokha/x/pkg/event/aggregator.go b/vendor/github.com/xlgmokha/x/pkg/event/aggregator.go new file mode 100644 index 0000000..302ad51 --- /dev/null +++ b/vendor/github.com/xlgmokha/x/pkg/event/aggregator.go @@ -0,0 +1,35 @@ +package event + +import ( + "sync" + + "github.com/xlgmokha/x/pkg/x" +) + +type Aggregator struct { + mu sync.RWMutex + subscriptions map[Event][]Subscription +} + +func WithDefaults() x.Option[*Aggregator] { + return x.With(func(item *Aggregator) { + item.mu = sync.RWMutex{} + item.subscriptions = map[Event][]Subscription{} + }) +} + +func (a *Aggregator) Subscribe(event Event, f Subscription) { + a.mu.Lock() + defer a.mu.Unlock() + + a.subscriptions[event] = append(a.subscriptions[event], f) +} + +func (a *Aggregator) Publish(event Event, message any) { + a.mu.RLock() + defer a.mu.RUnlock() + + for _, subscription := range a.subscriptions[event] { + subscription(message) + } +} diff --git a/vendor/github.com/xlgmokha/x/pkg/event/event.go b/vendor/github.com/xlgmokha/x/pkg/event/event.go new file mode 100644 index 0000000..a1ff812 --- /dev/null +++ b/vendor/github.com/xlgmokha/x/pkg/event/event.go @@ -0,0 +1,3 @@ +package event + +type Event any diff --git a/vendor/github.com/xlgmokha/x/pkg/event/subscription.go b/vendor/github.com/xlgmokha/x/pkg/event/subscription.go new file mode 100644 index 0000000..0a76efc --- /dev/null +++ b/vendor/github.com/xlgmokha/x/pkg/event/subscription.go @@ -0,0 +1,3 @@ +package event + +type Subscription func(any) diff --git a/vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go b/vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go new file mode 100644 index 0000000..9295e7d --- /dev/null +++ b/vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go @@ -0,0 +1,39 @@ +package event + +import "github.com/xlgmokha/x/pkg/x" + +type TypedAggregator[T any] struct { + aggregator *Aggregator +} + +func New[T any]() *TypedAggregator[T] { + return x.New[*TypedAggregator[T]]( + WithAggregator[T]( + x.New( + WithDefaults(), + ), + ), + ) +} + +func WithAggregator[T any](aggregator *Aggregator) x.Option[*TypedAggregator[T]] { + return x.With(func(item *TypedAggregator[T]) { + item.aggregator = aggregator + }) +} + +func (a *TypedAggregator[T]) SubscribeTo(event Event, f func(T)) { + a.aggregator.Subscribe(event, a.mapFrom(f)) +} + +func (a *TypedAggregator[T]) Publish(event Event, message T) { + a.aggregator.Publish(event, message) +} + +func (a *TypedAggregator[T]) mapFrom(f func(T)) Subscription { + return func(message any) { + if item, ok := message.(T); ok { + f(item) + } + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index f83eedd..03fec83 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -939,6 +939,7 @@ github.com/xlgmokha/minit github.com/xlgmokha/x/pkg/context github.com/xlgmokha/x/pkg/convert github.com/xlgmokha/x/pkg/env +github.com/xlgmokha/x/pkg/event github.com/xlgmokha/x/pkg/ioc github.com/xlgmokha/x/pkg/log github.com/xlgmokha/x/pkg/mapper |
