diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-22 17:35:49 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-22 17:35:49 -0600 |
| commit | 20ef0d92694465ac86b550df139e8366a0a2b4fa (patch) | |
| tree | 3f14589e1ce6eb9306a3af31c3a1f9e1af5ed637 /vendor/github.com/samber/lo/channel.go | |
| parent | 44e0d272c040cdc53a98b9f1dc58ae7da67752e6 (diff) | |
feat: connect to spicedb
Diffstat (limited to 'vendor/github.com/samber/lo/channel.go')
| -rw-r--r-- | vendor/github.com/samber/lo/channel.go | 314 |
1 files changed, 314 insertions, 0 deletions
diff --git a/vendor/github.com/samber/lo/channel.go b/vendor/github.com/samber/lo/channel.go new file mode 100644 index 0000000..a1e2cdd --- /dev/null +++ b/vendor/github.com/samber/lo/channel.go @@ -0,0 +1,314 @@ +package lo + +import ( + "context" + "sync" + "time" + + "github.com/samber/lo/internal/rand" +) + +type DispatchingStrategy[T any] func(msg T, index uint64, channels []<-chan T) int + +// ChannelDispatcher distributes messages from input channels into N child channels. +// Close events are propagated to children. +// Underlying channels can have a fixed buffer capacity or be unbuffered when cap is 0. +func ChannelDispatcher[T any](stream <-chan T, count int, channelBufferCap int, strategy DispatchingStrategy[T]) []<-chan T { + children := createChannels[T](count, channelBufferCap) + + roChildren := channelsToReadOnly(children) + + go func() { + // propagate channel closing to children + defer closeChannels(children) + + var i uint64 = 0 + + for { + msg, ok := <-stream + if !ok { + return + } + + destination := strategy(msg, i, roChildren) % count + children[destination] <- msg + + i++ + } + }() + + return roChildren +} + +func createChannels[T any](count int, channelBufferCap int) []chan T { + children := make([]chan T, 0, count) + + for i := 0; i < count; i++ { + children = append(children, make(chan T, channelBufferCap)) + } + + return children +} + +func channelsToReadOnly[T any](children []chan T) []<-chan T { + roChildren := make([]<-chan T, 0, len(children)) + + for i := range children { + roChildren = append(roChildren, children[i]) + } + + return roChildren +} + +func closeChannels[T any](children []chan T) { + for i := 0; i < len(children); i++ { + close(children[i]) + } +} + +func channelIsNotFull[T any](ch <-chan T) bool { + return cap(ch) == 0 || len(ch) < cap(ch) +} + +// DispatchingStrategyRoundRobin distributes messages in a rotating sequential manner. +// If the channel capacity is exceeded, the next channel will be selected and so on. +func DispatchingStrategyRoundRobin[T any](msg T, index uint64, channels []<-chan T) int { + for { + i := int(index % uint64(len(channels))) + if channelIsNotFull(channels[i]) { + return i + } + + index++ + time.Sleep(10 * time.Microsecond) // prevent CPU from burning 🔥 + } +} + +// DispatchingStrategyRandom distributes messages in a random manner. +// If the channel capacity is exceeded, another random channel will be selected and so on. +func DispatchingStrategyRandom[T any](msg T, index uint64, channels []<-chan T) int { + for { + i := rand.IntN(len(channels)) + if channelIsNotFull(channels[i]) { + return i + } + + time.Sleep(10 * time.Microsecond) // prevent CPU from burning 🔥 + } +} + +// DispatchingStrategyWeightedRandom distributes messages in a weighted manner. +// If the channel capacity is exceeded, another random channel will be selected and so on. +func DispatchingStrategyWeightedRandom[T any](weights []int) DispatchingStrategy[T] { + seq := []int{} + + for i := 0; i < len(weights); i++ { + for j := 0; j < weights[i]; j++ { + seq = append(seq, i) + } + } + + return func(msg T, index uint64, channels []<-chan T) int { + for { + i := seq[rand.IntN(len(seq))] + if channelIsNotFull(channels[i]) { + return i + } + + time.Sleep(10 * time.Microsecond) // prevent CPU from burning 🔥 + } + } +} + +// DispatchingStrategyFirst distributes messages in the first non-full channel. +// If the capacity of the first channel is exceeded, the second channel will be selected and so on. +func DispatchingStrategyFirst[T any](msg T, index uint64, channels []<-chan T) int { + for { + for i := range channels { + if channelIsNotFull(channels[i]) { + return i + } + } + + time.Sleep(10 * time.Microsecond) // prevent CPU from burning 🔥 + } +} + +// DispatchingStrategyLeast distributes messages in the emptiest channel. +func DispatchingStrategyLeast[T any](msg T, index uint64, channels []<-chan T) int { + seq := Range(len(channels)) + + return MinBy(seq, func(item int, min int) bool { + return len(channels[item]) < len(channels[min]) + }) +} + +// DispatchingStrategyMost distributes messages in the fullest channel. +// If the channel capacity is exceeded, the next channel will be selected and so on. +func DispatchingStrategyMost[T any](msg T, index uint64, channels []<-chan T) int { + seq := Range(len(channels)) + + return MaxBy(seq, func(item int, max int) bool { + return len(channels[item]) > len(channels[max]) && channelIsNotFull(channels[item]) + }) +} + +// SliceToChannel returns a read-only channels of collection elements. +func SliceToChannel[T any](bufferSize int, collection []T) <-chan T { + ch := make(chan T, bufferSize) + + go func() { + for i := range collection { + ch <- collection[i] + } + + close(ch) + }() + + return ch +} + +// ChannelToSlice returns a slice built from channels items. Blocks until channel closes. +func ChannelToSlice[T any](ch <-chan T) []T { + collection := []T{} + + for item := range ch { + collection = append(collection, item) + } + + return collection +} + +// Generator implements the generator design pattern. +func Generator[T any](bufferSize int, generator func(yield func(T))) <-chan T { + ch := make(chan T, bufferSize) + + go func() { + // WARNING: infinite loop + generator(func(t T) { + ch <- t + }) + + close(ch) + }() + + return ch +} + +// Buffer creates a slice of n elements from a channel. Returns the slice and the slice length. +// @TODO: we should probably provide an helper that reuse the same buffer. +func Buffer[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) { + buffer := make([]T, 0, size) + index := 0 + now := time.Now() + + for ; index < size; index++ { + item, ok := <-ch + if !ok { + return buffer, index, time.Since(now), false + } + + buffer = append(buffer, item) + } + + return buffer, index, time.Since(now), true +} + +// Batch creates a slice of n elements from a channel. Returns the slice and the slice length. +// +// Deprecated: Use [Buffer] instead. +func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) { + return Buffer(ch, size) +} + +// BufferWithContext creates a slice of n elements from a channel, with context. Returns the slice and the slice length. +// @TODO: we should probably provide an helper that reuse the same buffer. +func BufferWithContext[T any](ctx context.Context, ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) { + buffer := make([]T, 0, size) + now := time.Now() + + for index := 0; index < size; index++ { + select { + case item, ok := <-ch: + if !ok { + return buffer, index, time.Since(now), false + } + + buffer = append(buffer, item) + + case <-ctx.Done(): + return buffer, index, time.Since(now), true + } + } + + return buffer, size, time.Since(now), true +} + +// BufferWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length. +func BufferWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return BufferWithContext(ctx, ch, size) +} + +// BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length. +// +// Deprecated: Use [BufferWithTimeout] instead. +func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) { + return BufferWithTimeout(ch, size, timeout) +} + +// FanIn collects messages from multiple input channels into a single buffered channel. +// Output messages has no priority. When all upstream channels reach EOF, downstream channel closes. +func FanIn[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T { + out := make(chan T, channelBufferCap) + var wg sync.WaitGroup + + // Start an output goroutine for each input channel in upstreams. + wg.Add(len(upstreams)) + for i := range upstreams { + go func(index int) { + for n := range upstreams[index] { + out <- n + } + wg.Done() + }(i) + } + + // Start a goroutine to close out once all the output goroutines are done. + go func() { + wg.Wait() + close(out) + }() + return out +} + +// ChannelMerge collects messages from multiple input channels into a single buffered channel. +// Output messages has no priority. When all upstream channels reach EOF, downstream channel closes. +// +// Deprecated: Use [FanIn] instead. +func ChannelMerge[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T { + return FanIn(channelBufferCap, upstreams...) +} + +// FanOut broadcasts all the upstream messages to multiple downstream channels. +// When upstream channel reach EOF, downstream channels close. If any downstream +// channels is full, broadcasting is paused. +func FanOut[T any](count int, channelsBufferCap int, upstream <-chan T) []<-chan T { + downstreams := createChannels[T](count, channelsBufferCap) + + go func() { + for msg := range upstream { + for i := range downstreams { + downstreams[i] <- msg + } + } + + // Close out once all the output goroutines are done. + for i := range downstreams { + close(downstreams[i]) + } + }() + + return channelsToReadOnly(downstreams) +} |
