summaryrefslogtreecommitdiff
path: root/pkg/event/aggregator.go
blob: 738211e1c3b37439eb62ad9aa0eea6e50a8e05ed (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package event

import (
	"sync"

	"github.com/xlgmokha/x/pkg/x"
)

type Aggregator struct {
	mu            sync.RWMutex
	subscriptions map[Event][]Subscription
}

func WithDefaults() x.Option[*Aggregator] {
	return x.With(func(item *Aggregator) {
		item.mu = sync.RWMutex{}
		item.subscriptions = map[Event][]Subscription{}
	})
}

func WithSubscriptions(subscriptions map[Event][]Subscription) x.Option[*Aggregator] {
	return x.With(func(item *Aggregator) {
		item.subscriptions = subscriptions
	})
}

func (a *Aggregator) Subscribe(event Event, f Subscription) {
	a.mu.Lock()
	defer a.mu.Unlock()

	a.subscriptions[event] = append(a.subscriptions[event], f)
}

func (a *Aggregator) Publish(event Event, message any) {
	a.mu.RLock()
	defer a.mu.RUnlock()

	for _, subscription := range a.subscriptions[event] {
		subscription(message)
	}
}