summaryrefslogtreecommitdiff
path: root/pkg/event/aggregator.go
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2025-07-24 13:09:22 -0600
committermo khan <mo@mokhan.ca>2025-07-24 13:09:22 -0600
commit63793f9eb4bb90c7919f26d9f020c96072373f03 (patch)
treefc7a34582e61e5eae801c8a79bd6ad075595cd55 /pkg/event/aggregator.go
parentc383121498ffb9778eb32e6ebcf89ea2925cd9c6 (diff)
feat: use mutually exclusive lock for pub/sub
Diffstat (limited to 'pkg/event/aggregator.go')
-rw-r--r--pkg/event/aggregator.go20
1 files changed, 19 insertions, 1 deletions
diff --git a/pkg/event/aggregator.go b/pkg/event/aggregator.go
index 996a550..7ff5061 100644
--- a/pkg/event/aggregator.go
+++ b/pkg/event/aggregator.go
@@ -1,11 +1,23 @@
package event
-import "github.com/xlgmokha/x/pkg/x"
+import (
+ "sync"
+
+ "github.com/xlgmokha/x/pkg/x"
+)
type Aggregator struct {
+ mu sync.RWMutex
subscriptions map[Event][]Subscription
}
+func WithDefaults() x.Option[*Aggregator] {
+ return x.With(func(item *Aggregator) {
+ item.mu = sync.RWMutex{}
+ item.subscriptions = map[Event][]Subscription{}
+ })
+}
+
func WithoutSubscriptions() x.Option[*Aggregator] {
return WithSubscriptions(map[Event][]Subscription{})
}
@@ -17,10 +29,16 @@ func WithSubscriptions(subscriptions map[Event][]Subscription) x.Option[*Aggrega
}
func (a *Aggregator) Subscribe(event Event, f Subscription) {
+ a.mu.Lock()
+ defer a.mu.Unlock()
+
a.subscriptions[event] = append(a.subscriptions[event], f)
}
func (a *Aggregator) Publish(event Event, message any) {
+ a.mu.RLock()
+ defer a.mu.RUnlock()
+
for _, subscription := range a.subscriptions[event] {
subscription(message)
}