diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-24 13:09:22 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-24 13:09:22 -0600 |
| commit | 63793f9eb4bb90c7919f26d9f020c96072373f03 (patch) | |
| tree | fc7a34582e61e5eae801c8a79bd6ad075595cd55 /pkg/event/aggregator.go | |
| parent | c383121498ffb9778eb32e6ebcf89ea2925cd9c6 (diff) | |
feat: use mutually exclusive lock for pub/sub
Diffstat (limited to 'pkg/event/aggregator.go')
| -rw-r--r-- | pkg/event/aggregator.go | 20 |
1 files changed, 19 insertions, 1 deletions
diff --git a/pkg/event/aggregator.go b/pkg/event/aggregator.go index 996a550..7ff5061 100644 --- a/pkg/event/aggregator.go +++ b/pkg/event/aggregator.go @@ -1,11 +1,23 @@ package event -import "github.com/xlgmokha/x/pkg/x" +import ( + "sync" + + "github.com/xlgmokha/x/pkg/x" +) type Aggregator struct { + mu sync.RWMutex subscriptions map[Event][]Subscription } +func WithDefaults() x.Option[*Aggregator] { + return x.With(func(item *Aggregator) { + item.mu = sync.RWMutex{} + item.subscriptions = map[Event][]Subscription{} + }) +} + func WithoutSubscriptions() x.Option[*Aggregator] { return WithSubscriptions(map[Event][]Subscription{}) } @@ -17,10 +29,16 @@ func WithSubscriptions(subscriptions map[Event][]Subscription) x.Option[*Aggrega } func (a *Aggregator) Subscribe(event Event, f Subscription) { + a.mu.Lock() + defer a.mu.Unlock() + a.subscriptions[event] = append(a.subscriptions[event], f) } func (a *Aggregator) Publish(event Event, message any) { + a.mu.RLock() + defer a.mu.RUnlock() + for _, subscription := range a.subscriptions[event] { subscription(message) } |
