summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2025-07-24 13:38:27 -0600
committermo khan <mo@mokhan.ca>2025-07-24 13:38:27 -0600
commit863ab07212e4044dab656609f1c8fdf578134e84 (patch)
tree2add50a0043bccd9e5246da786bcb87681d1c15d
parentdb82a14aa4942293b2b9726d297bfeb71f2bf79b (diff)
feat: publish an event after a new sparkle is saved to the database
-rw-r--r--app/controllers/sparkles/controller_test.go5
-rw-r--r--app/db/in_memory_repository.go14
-rw-r--r--app/db/in_memory_repository_test.go25
-rw-r--r--app/init.go3
-rw-r--r--vendor/github.com/xlgmokha/x/pkg/event/aggregator.go35
-rw-r--r--vendor/github.com/xlgmokha/x/pkg/event/event.go3
-rw-r--r--vendor/github.com/xlgmokha/x/pkg/event/subscription.go3
-rw-r--r--vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go39
-rw-r--r--vendor/modules.txt1
9 files changed, 117 insertions, 11 deletions
diff --git a/app/controllers/sparkles/controller_test.go b/app/controllers/sparkles/controller_test.go
index d2469a7..e825343 100644
--- a/app/controllers/sparkles/controller_test.go
+++ b/app/controllers/sparkles/controller_test.go
@@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "github.com/xlgmokha/x/pkg/event"
"github.com/xlgmokha/x/pkg/serde"
"github.com/xlgmokha/x/pkg/test"
"gitlab.com/gitlab-org/software-supply-chain-security/authorization/sparkled/app/cfg"
@@ -35,7 +36,7 @@ func (f *FailingResponseWriter) Write([]byte) (int, error) {
func TestSparkles(t *testing.T) {
t.Run("GET /sparkles", func(t *testing.T) {
sparkle, _ := domain.NewSparkle("@tanuki for helping me")
- store := db.NewRepository[*domain.Sparkle]()
+ store := db.NewRepository[*domain.Sparkle](event.New[*domain.Sparkle]())
store.Save(t.Context(), sparkle)
mux := http.NewServeMux()
@@ -64,7 +65,7 @@ func TestSparkles(t *testing.T) {
t.Run("POST /sparkles", func(t *testing.T) {
t.Run("when a user is authenticated", func(t *testing.T) {
currentUser := domain.NewUser(domain.WithID[*domain.User](domain.ID("1")))
- repository := db.NewRepository[*domain.Sparkle]()
+ repository := db.NewRepository[*domain.Sparkle](event.New[*domain.Sparkle]())
t.Run("when the user is authorized", func(t *testing.T) {
mux := http.NewServeMux()
diff --git a/app/db/in_memory_repository.go b/app/db/in_memory_repository.go
index 2aa1fed..1177662 100644
--- a/app/db/in_memory_repository.go
+++ b/app/db/in_memory_repository.go
@@ -5,20 +5,23 @@ import (
"sort"
"sync"
+ "github.com/xlgmokha/x/pkg/event"
"github.com/xlgmokha/x/pkg/x"
"gitlab.com/gitlab-org/software-supply-chain-security/authorization/sparkled/app/domain"
"gitlab.com/gitlab-org/software-supply-chain-security/authorization/sparkled/pkg/pls"
)
type inMemoryRepository[T domain.Entity] struct {
- items []T
- mu sync.RWMutex
+ aggregator *event.TypedAggregator[T]
+ items []T
+ mu sync.RWMutex
}
-func NewRepository[T domain.Entity]() domain.Repository[T] {
+func NewRepository[T domain.Entity](aggregator *event.TypedAggregator[T]) domain.Repository[T] {
return &inMemoryRepository[T]{
- items: []T{},
- mu: sync.RWMutex{},
+ aggregator: aggregator,
+ items: []T{},
+ mu: sync.RWMutex{},
}
}
@@ -50,5 +53,6 @@ func (r *inMemoryRepository[T]) Save(ctx context.Context, item T) error {
sort.Slice(r.items, func(i, j int) bool {
return r.items[i].GetID() > r.items[j].GetID()
})
+ r.aggregator.Publish("after.create", item)
return nil
}
diff --git a/app/db/in_memory_repository_test.go b/app/db/in_memory_repository_test.go
index cfbab41..8e1e017 100644
--- a/app/db/in_memory_repository_test.go
+++ b/app/db/in_memory_repository_test.go
@@ -7,11 +7,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "github.com/xlgmokha/x/pkg/event"
"gitlab.com/gitlab-org/software-supply-chain-security/authorization/sparkled/app/domain"
)
func TestInMemoryRepository(t *testing.T) {
- storage := NewRepository[*domain.Sparkle]()
+ aggregator := event.New[*domain.Sparkle]()
+ storage := NewRepository[*domain.Sparkle](aggregator)
t.Run("Save", func(t *testing.T) {
t.Run("an invalid Sparkle", func(t *testing.T) {
@@ -30,8 +32,25 @@ func TestInMemoryRepository(t *testing.T) {
assert.Equal(t, "because", sparkles[0].Reason)
})
+ t.Run("publishes an event", func(t *testing.T) {
+ called := false
+ var payload *domain.Sparkle
+
+ aggregator.SubscribeTo("after.create", func(item *domain.Sparkle) {
+ called = true
+ payload = item
+ })
+
+ sparkle := &domain.Sparkle{Sparklee: "@tanuki", Reason: "because"}
+ require.NoError(t, storage.Save(t.Context(), sparkle))
+
+ require.True(t, called)
+ require.NotNil(t, payload)
+ assert.Equal(t, sparkle, payload)
+ })
+
t.Run("prevents race conditions", func(t *testing.T) {
- repository := NewRepository[*domain.Sparkle]()
+ repository := NewRepository[*domain.Sparkle](aggregator)
ctx := context.Background()
numGoroutines := 100
@@ -92,7 +111,7 @@ func TestInMemoryRepository(t *testing.T) {
})
t.Run("All", func(t *testing.T) {
- repository := NewRepository[*domain.Sparkle]()
+ repository := NewRepository[*domain.Sparkle](aggregator)
require.NoError(t, repository.Save(t.Context(), &domain.Sparkle{
Sparklee: "@tanuki",
Reason: "because",
diff --git a/app/init.go b/app/init.go
index 960102a..444d991 100644
--- a/app/init.go
+++ b/app/init.go
@@ -8,6 +8,7 @@ import (
"github.com/authzed/authzed-go/v1"
"github.com/rs/zerolog"
"github.com/xlgmokha/x/pkg/env"
+ "github.com/xlgmokha/x/pkg/event"
"github.com/xlgmokha/x/pkg/ioc"
"github.com/xlgmokha/x/pkg/log"
"github.com/xlgmokha/x/pkg/mapper"
@@ -34,7 +35,7 @@ func init() {
return ioc.MustResolve[*authzed.Client](ioc.Default)
})
ioc.RegisterSingleton[domain.Repository[*domain.Sparkle]](ioc.Default, func() domain.Repository[*domain.Sparkle] {
- return db.NewRepository[*domain.Sparkle]()
+ return db.NewRepository[*domain.Sparkle](event.New[*domain.Sparkle]())
})
ioc.RegisterSingleton[*http.ServeMux](ioc.Default, func() *http.ServeMux {
return http.NewServeMux()
diff --git a/vendor/github.com/xlgmokha/x/pkg/event/aggregator.go b/vendor/github.com/xlgmokha/x/pkg/event/aggregator.go
new file mode 100644
index 0000000..302ad51
--- /dev/null
+++ b/vendor/github.com/xlgmokha/x/pkg/event/aggregator.go
@@ -0,0 +1,35 @@
+package event
+
+import (
+ "sync"
+
+ "github.com/xlgmokha/x/pkg/x"
+)
+
+type Aggregator struct {
+ mu sync.RWMutex
+ subscriptions map[Event][]Subscription
+}
+
+func WithDefaults() x.Option[*Aggregator] {
+ return x.With(func(item *Aggregator) {
+ item.mu = sync.RWMutex{}
+ item.subscriptions = map[Event][]Subscription{}
+ })
+}
+
+func (a *Aggregator) Subscribe(event Event, f Subscription) {
+ a.mu.Lock()
+ defer a.mu.Unlock()
+
+ a.subscriptions[event] = append(a.subscriptions[event], f)
+}
+
+func (a *Aggregator) Publish(event Event, message any) {
+ a.mu.RLock()
+ defer a.mu.RUnlock()
+
+ for _, subscription := range a.subscriptions[event] {
+ subscription(message)
+ }
+}
diff --git a/vendor/github.com/xlgmokha/x/pkg/event/event.go b/vendor/github.com/xlgmokha/x/pkg/event/event.go
new file mode 100644
index 0000000..a1ff812
--- /dev/null
+++ b/vendor/github.com/xlgmokha/x/pkg/event/event.go
@@ -0,0 +1,3 @@
+package event
+
+type Event any
diff --git a/vendor/github.com/xlgmokha/x/pkg/event/subscription.go b/vendor/github.com/xlgmokha/x/pkg/event/subscription.go
new file mode 100644
index 0000000..0a76efc
--- /dev/null
+++ b/vendor/github.com/xlgmokha/x/pkg/event/subscription.go
@@ -0,0 +1,3 @@
+package event
+
+type Subscription func(any)
diff --git a/vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go b/vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go
new file mode 100644
index 0000000..9295e7d
--- /dev/null
+++ b/vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go
@@ -0,0 +1,39 @@
+package event
+
+import "github.com/xlgmokha/x/pkg/x"
+
+type TypedAggregator[T any] struct {
+ aggregator *Aggregator
+}
+
+func New[T any]() *TypedAggregator[T] {
+ return x.New[*TypedAggregator[T]](
+ WithAggregator[T](
+ x.New(
+ WithDefaults(),
+ ),
+ ),
+ )
+}
+
+func WithAggregator[T any](aggregator *Aggregator) x.Option[*TypedAggregator[T]] {
+ return x.With(func(item *TypedAggregator[T]) {
+ item.aggregator = aggregator
+ })
+}
+
+func (a *TypedAggregator[T]) SubscribeTo(event Event, f func(T)) {
+ a.aggregator.Subscribe(event, a.mapFrom(f))
+}
+
+func (a *TypedAggregator[T]) Publish(event Event, message T) {
+ a.aggregator.Publish(event, message)
+}
+
+func (a *TypedAggregator[T]) mapFrom(f func(T)) Subscription {
+ return func(message any) {
+ if item, ok := message.(T); ok {
+ f(item)
+ }
+ }
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index f83eedd..03fec83 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -939,6 +939,7 @@ github.com/xlgmokha/minit
github.com/xlgmokha/x/pkg/context
github.com/xlgmokha/x/pkg/convert
github.com/xlgmokha/x/pkg/env
+github.com/xlgmokha/x/pkg/event
github.com/xlgmokha/x/pkg/ioc
github.com/xlgmokha/x/pkg/log
github.com/xlgmokha/x/pkg/mapper