summaryrefslogtreecommitdiff
path: root/vendor/github.com
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com')
-rw-r--r--vendor/github.com/xlgmokha/x/pkg/event/aggregator.go35
-rw-r--r--vendor/github.com/xlgmokha/x/pkg/event/event.go3
-rw-r--r--vendor/github.com/xlgmokha/x/pkg/event/subscription.go3
-rw-r--r--vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go39
4 files changed, 80 insertions, 0 deletions
diff --git a/vendor/github.com/xlgmokha/x/pkg/event/aggregator.go b/vendor/github.com/xlgmokha/x/pkg/event/aggregator.go
new file mode 100644
index 0000000..302ad51
--- /dev/null
+++ b/vendor/github.com/xlgmokha/x/pkg/event/aggregator.go
@@ -0,0 +1,35 @@
+package event
+
+import (
+ "sync"
+
+ "github.com/xlgmokha/x/pkg/x"
+)
+
+type Aggregator struct {
+ mu sync.RWMutex
+ subscriptions map[Event][]Subscription
+}
+
+func WithDefaults() x.Option[*Aggregator] {
+ return x.With(func(item *Aggregator) {
+ item.mu = sync.RWMutex{}
+ item.subscriptions = map[Event][]Subscription{}
+ })
+}
+
+func (a *Aggregator) Subscribe(event Event, f Subscription) {
+ a.mu.Lock()
+ defer a.mu.Unlock()
+
+ a.subscriptions[event] = append(a.subscriptions[event], f)
+}
+
+func (a *Aggregator) Publish(event Event, message any) {
+ a.mu.RLock()
+ defer a.mu.RUnlock()
+
+ for _, subscription := range a.subscriptions[event] {
+ subscription(message)
+ }
+}
diff --git a/vendor/github.com/xlgmokha/x/pkg/event/event.go b/vendor/github.com/xlgmokha/x/pkg/event/event.go
new file mode 100644
index 0000000..a1ff812
--- /dev/null
+++ b/vendor/github.com/xlgmokha/x/pkg/event/event.go
@@ -0,0 +1,3 @@
+package event
+
+type Event any
diff --git a/vendor/github.com/xlgmokha/x/pkg/event/subscription.go b/vendor/github.com/xlgmokha/x/pkg/event/subscription.go
new file mode 100644
index 0000000..0a76efc
--- /dev/null
+++ b/vendor/github.com/xlgmokha/x/pkg/event/subscription.go
@@ -0,0 +1,3 @@
+package event
+
+type Subscription func(any)
diff --git a/vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go b/vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go
new file mode 100644
index 0000000..9295e7d
--- /dev/null
+++ b/vendor/github.com/xlgmokha/x/pkg/event/typed_aggregator.go
@@ -0,0 +1,39 @@
+package event
+
+import "github.com/xlgmokha/x/pkg/x"
+
+type TypedAggregator[T any] struct {
+ aggregator *Aggregator
+}
+
+func New[T any]() *TypedAggregator[T] {
+ return x.New[*TypedAggregator[T]](
+ WithAggregator[T](
+ x.New(
+ WithDefaults(),
+ ),
+ ),
+ )
+}
+
+func WithAggregator[T any](aggregator *Aggregator) x.Option[*TypedAggregator[T]] {
+ return x.With(func(item *TypedAggregator[T]) {
+ item.aggregator = aggregator
+ })
+}
+
+func (a *TypedAggregator[T]) SubscribeTo(event Event, f func(T)) {
+ a.aggregator.Subscribe(event, a.mapFrom(f))
+}
+
+func (a *TypedAggregator[T]) Publish(event Event, message T) {
+ a.aggregator.Publish(event, message)
+}
+
+func (a *TypedAggregator[T]) mapFrom(f func(T)) Subscription {
+ return func(message any) {
+ if item, ok := message.(T); ok {
+ f(item)
+ }
+ }
+}