diff options
Diffstat (limited to 'vendor/github.com')
4 files changed, 80 insertions, 0 deletions
diff --git a/vendor/github.com/xlgmokha/x/pkg/event/aggregator.go b/vendor/github.com/xlgmokha/x/pkg/event/aggregator.go new file mode 100644 index 0000000..302ad51 --- /dev/null +++ b/vendor/github.com/xlgmokha/x/pkg/event/aggregator.go @@ -0,0 +1,35 @@ +package event + +import ( + "sync" + + "github.com/xlgmokha/x/pkg/x" +) + +type Aggregator struct { + mu sync.RWMutex + subscriptions map[Event][]Subscription +} + +func WithDefaults() x.Option[*Aggregator] { + return x.With(func(item *Aggregator) { + item.mu = sync.RWMutex{} + item.subscriptions = map[Event][]Subscription{} + }) +} + +func (a *Aggregator) Subscribe(event Event, f Subscription) { + a.mu.Lock() + defer a.mu.Unlock() + + a.subscriptions[event] = append(a.subscriptions[event], f) +} + +func (a *Aggregator) Publish(event Event, message any) { + a.mu.RLock() + defer a.mu.RUnlock() + + for _, subscription := range a.subscriptions[event] { + subscription(message) + } +} diff --git a/vendor/github.com/xlgmokha/x/pkg/event/event.go b/vendor/github.com/xlgmokha/x/pkg/event/event.go new file mode 100644 index 0000000..a1ff812 --- /dev/null +++ b/vendor/github.com/xlgmokha/x/pkg/event/event.go @@ -0,0 +1,3 @@ +package event + +type Event any diff --git a/vendor/github.com/xlgmokha/x/pkg/event/subscription.go b/vendor/github.com/xlgmokha/x/pkg/event/subscription.go new file mode 100644 index 0000000..0a76efc --- /dev/null +++ b/vendor/github.com/xlgmokha/x/pkg/event/subscription.go @@ -0,0 +1,3 @@ +package event + +type Subscription func(any) diff --git a/vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go b/vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go new file mode 100644 index 0000000..9295e7d --- /dev/null +++ b/vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go @@ -0,0 +1,39 @@ +package event + +import "github.com/xlgmokha/x/pkg/x" + +type TypedAggregator[T any] struct { + aggregator *Aggregator +} + +func New[T any]() *TypedAggregator[T] { + return x.New[*TypedAggregator[T]]( + WithAggregator[T]( + x.New( + WithDefaults(), + ), + ), + ) +} + +func WithAggregator[T any](aggregator *Aggregator) x.Option[*TypedAggregator[T]] { + return x.With(func(item *TypedAggregator[T]) { + item.aggregator = aggregator + }) +} + +func (a *TypedAggregator[T]) SubscribeTo(event Event, f func(T)) { + a.aggregator.Subscribe(event, a.mapFrom(f)) +} + +func (a *TypedAggregator[T]) Publish(event Event, message T) { + a.aggregator.Publish(event, message) +} + +func (a *TypedAggregator[T]) mapFrom(f func(T)) Subscription { + return func(message any) { + if item, ok := message.(T); ok { + f(item) + } + } +} |
