diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-22 17:35:49 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-22 17:35:49 -0600 |
| commit | 20ef0d92694465ac86b550df139e8366a0a2b4fa (patch) | |
| tree | 3f14589e1ce6eb9306a3af31c3a1f9e1af5ed637 /vendor/github.com/rs/zerolog | |
| parent | 44e0d272c040cdc53a98b9f1dc58ae7da67752e6 (diff) | |
feat: connect to spicedb
Diffstat (limited to 'vendor/github.com/rs/zerolog')
7 files changed, 673 insertions, 0 deletions
diff --git a/vendor/github.com/rs/zerolog/diode/diode.go b/vendor/github.com/rs/zerolog/diode/diode.go new file mode 100644 index 0000000..45a8910 --- /dev/null +++ b/vendor/github.com/rs/zerolog/diode/diode.go @@ -0,0 +1,114 @@ +// Package diode provides a thread-safe, lock-free, non-blocking io.Writer +// wrapper. +package diode + +import ( + "context" + "io" + "sync" + "time" + + "github.com/rs/zerolog/diode/internal/diodes" +) + +var bufPool = &sync.Pool{ + New: func() interface{} { + return make([]byte, 0, 500) + }, +} + +type Alerter func(missed int) + +type diodeFetcher interface { + diodes.Diode + Next() diodes.GenericDataType +} + +// Writer is a io.Writer wrapper that uses a diode to make Write lock-free, +// non-blocking and thread safe. +type Writer struct { + w io.Writer + d diodeFetcher + c context.CancelFunc + done chan struct{} +} + +// NewWriter creates a writer wrapping w with a many-to-one diode in order to +// never block log producers and drop events if the writer can't keep up with +// the flow of data. +// +// Use a diode.Writer when +// +// wr := diode.NewWriter(w, 1000, 0, func(missed int) { +// log.Printf("Dropped %d messages", missed) +// }) +// log := zerolog.New(wr) +// +// If pollInterval is greater than 0, a poller is used otherwise a waiter is +// used. +// +// See code.cloudfoundry.org/go-diodes for more info on diode. +func NewWriter(w io.Writer, size int, pollInterval time.Duration, f Alerter) Writer { + ctx, cancel := context.WithCancel(context.Background()) + dw := Writer{ + w: w, + c: cancel, + done: make(chan struct{}), + } + if f == nil { + f = func(int) {} + } + d := diodes.NewManyToOne(size, diodes.AlertFunc(f)) + if pollInterval > 0 { + dw.d = diodes.NewPoller(d, + diodes.WithPollingInterval(pollInterval), + diodes.WithPollingContext(ctx)) + } else { + dw.d = diodes.NewWaiter(d, + diodes.WithWaiterContext(ctx)) + } + go dw.poll() + return dw +} + +func (dw Writer) Write(p []byte) (n int, err error) { + // p is pooled in zerolog so we can't hold it passed this call, hence the + // copy. + p = append(bufPool.Get().([]byte), p...) + dw.d.Set(diodes.GenericDataType(&p)) + return len(p), nil +} + +// Close releases the diode poller and call Close on the wrapped writer if +// io.Closer is implemented. +func (dw Writer) Close() error { + dw.c() + <-dw.done + if w, ok := dw.w.(io.Closer); ok { + return w.Close() + } + return nil +} + +func (dw Writer) poll() { + defer close(dw.done) + for { + d := dw.d.Next() + if d == nil { + return + } + p := *(*[]byte)(d) + dw.w.Write(p) + + // Proper usage of a sync.Pool requires each entry to have approximately + // the same memory cost. To obtain this property when the stored type + // contains a variably-sized buffer, we add a hard limit on the maximum buffer + // to place back in the pool. + // + // See https://golang.org/issue/23199 + const maxSize = 1 << 16 // 64KiB + if cap(p) <= maxSize { + bufPool.Put(p[:0]) + } + } +} diff --git a/vendor/github.com/rs/zerolog/diode/internal/diodes/README b/vendor/github.com/rs/zerolog/diode/internal/diodes/README new file mode 100644 index 0000000..6c4ec5f --- /dev/null +++ b/vendor/github.com/rs/zerolog/diode/internal/diodes/README @@ -0,0 +1 @@ +Copied from https://github.com/cloudfoundry/go-diodes to avoid test dependencies. diff --git a/vendor/github.com/rs/zerolog/diode/internal/diodes/many_to_one.go b/vendor/github.com/rs/zerolog/diode/internal/diodes/many_to_one.go new file mode 100644 index 0000000..477eddd --- /dev/null +++ b/vendor/github.com/rs/zerolog/diode/internal/diodes/many_to_one.go @@ -0,0 +1,130 @@ +package diodes + +import ( + "log" + "sync/atomic" + "unsafe" +) + +// ManyToOne diode is optimal for many writers (go-routines B-n) and a single +// reader (go-routine A). It is not thread safe for multiple readers. +type ManyToOne struct { + writeIndex uint64 + readIndex uint64 + buffer []unsafe.Pointer + alerter Alerter +} + +// NewManyToOne creates a new diode (ring buffer). The ManyToOne diode +// is optimized for many writers (on go-routines B-n) and a single reader +// (on go-routine A). The alerter is invoked on the read's go-routine. It is +// called when it notices that the writer go-routine has passed it and wrote +// over data. A nil can be used to ignore alerts. +func NewManyToOne(size int, alerter Alerter) *ManyToOne { + if alerter == nil { + alerter = AlertFunc(func(int) {}) + } + + d := &ManyToOne{ + buffer: make([]unsafe.Pointer, size), + alerter: alerter, + } + + // Start write index at the value before 0 + // to allow the first write to use AddUint64 + // and still have a beginning index of 0 + d.writeIndex = ^d.writeIndex + return d +} + +// Set sets the data in the next slot of the ring buffer. +func (d *ManyToOne) Set(data GenericDataType) { + for { + writeIndex := atomic.AddUint64(&d.writeIndex, 1) + idx := writeIndex % uint64(len(d.buffer)) + old := atomic.LoadPointer(&d.buffer[idx]) + + if old != nil && + (*bucket)(old) != nil && + (*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) { + log.Println("Diode set collision: consider using a larger diode") + continue + } + + newBucket := &bucket{ + data: data, + seq: writeIndex, + } + + if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) { + log.Println("Diode set collision: consider using a larger diode") + continue + } + + return + } +} + +// TryNext will attempt to read from the next slot of the ring buffer. +// If there is no data available, it will return (nil, false). +func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) { + // Read a value from the ring buffer based on the readIndex. + idx := d.readIndex % uint64(len(d.buffer)) + result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil)) + + // When the result is nil that means the writer has not had the + // opportunity to write a value into the diode. This value must be ignored + // and the read head must not increment. + if result == nil { + return nil, false + } + + // When the seq value is less than the current read index that means a + // value was read from idx that was previously written but since has + // been dropped. This value must be ignored and the read head must not + // increment. + // + // The simulation for this scenario assumes the fast forward occurred as + // detailed below. + // + // 5. The reader reads again getting seq 5. It then reads again expecting + // seq 6 but gets seq 2. This is a read of a stale value that was + // effectively "dropped" so the read fails and the read head stays put. + // `| 4 | 5 | 2 | 3 |` r: 7, w: 6 + // + if result.seq < d.readIndex { + return nil, false + } + + // When the seq value is greater than the current read index that means a + // value was read from idx that overwrote the value that was expected to + // be at this idx. This happens when the writer has lapped the reader. The + // reader needs to catch up to the writer so it moves its write head to + // the new seq, effectively dropping the messages that were not read in + // between the two values. + // + // Here is a simulation of this scenario: + // + // 1. Both the read and write heads start at 0. + // `| nil | nil | nil | nil |` r: 0, w: 0 + // 2. The writer fills the buffer. + // `| 0 | 1 | 2 | 3 |` r: 0, w: 4 + // 3. The writer laps the read head. + // `| 4 | 5 | 2 | 3 |` r: 0, w: 6 + // 4. The reader reads the first value, expecting a seq of 0 but reads 4, + // this forces the reader to fast forward to 5. + // `| 4 | 5 | 2 | 3 |` r: 5, w: 6 + // + if result.seq > d.readIndex { + dropped := result.seq - d.readIndex + d.readIndex = result.seq + d.alerter.Alert(int(dropped)) + } + + // Only increment read index if a regular read occurred (where seq was + // equal to readIndex) or a value was read that caused a fast forward + // (where seq was greater than readIndex). + // + d.readIndex++ + return result.data, true +} diff --git a/vendor/github.com/rs/zerolog/diode/internal/diodes/one_to_one.go b/vendor/github.com/rs/zerolog/diode/internal/diodes/one_to_one.go new file mode 100644 index 0000000..6c454eb --- /dev/null +++ b/vendor/github.com/rs/zerolog/diode/internal/diodes/one_to_one.go @@ -0,0 +1,129 @@ +package diodes + +import ( + "sync/atomic" + "unsafe" +) + +// GenericDataType is the data type the diodes operate on. +type GenericDataType unsafe.Pointer + +// Alerter is used to report how many values were overwritten since the +// last write. +type Alerter interface { + Alert(missed int) +} + +// AlertFunc type is an adapter to allow the use of ordinary functions as +// Alert handlers. +type AlertFunc func(missed int) + +// Alert calls f(missed) +func (f AlertFunc) Alert(missed int) { + f(missed) +} + +type bucket struct { + data GenericDataType + seq uint64 // seq is the recorded write index at the time of writing +} + +// OneToOne diode is meant to be used by a single reader and a single writer. +// It is not thread safe if used otherwise. +type OneToOne struct { + writeIndex uint64 + readIndex uint64 + buffer []unsafe.Pointer + alerter Alerter +} + +// NewOneToOne creates a new diode is meant to be used by a single reader and +// a single writer. The alerter is invoked on the read's go-routine. It is +// called when it notices that the writer go-routine has passed it and wrote +// over data. A nil can be used to ignore alerts. +func NewOneToOne(size int, alerter Alerter) *OneToOne { + if alerter == nil { + alerter = AlertFunc(func(int) {}) + } + + return &OneToOne{ + buffer: make([]unsafe.Pointer, size), + alerter: alerter, + } +} + +// Set sets the data in the next slot of the ring buffer. +func (d *OneToOne) Set(data GenericDataType) { + idx := d.writeIndex % uint64(len(d.buffer)) + + newBucket := &bucket{ + data: data, + seq: d.writeIndex, + } + d.writeIndex++ + + atomic.StorePointer(&d.buffer[idx], unsafe.Pointer(newBucket)) +} + +// TryNext will attempt to read from the next slot of the ring buffer. +// If there is no data available, it will return (nil, false). +func (d *OneToOne) TryNext() (data GenericDataType, ok bool) { + // Read a value from the ring buffer based on the readIndex. + idx := d.readIndex % uint64(len(d.buffer)) + result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil)) + + // When the result is nil that means the writer has not had the + // opportunity to write a value into the diode. This value must be ignored + // and the read head must not increment. + if result == nil { + return nil, false + } + + // When the seq value is less than the current read index that means a + // value was read from idx that was previously written but since has + // been dropped. This value must be ignored and the read head must not + // increment. + // + // The simulation for this scenario assumes the fast forward occurred as + // detailed below. + // + // 5. The reader reads again getting seq 5. It then reads again expecting + // seq 6 but gets seq 2. This is a read of a stale value that was + // effectively "dropped" so the read fails and the read head stays put. + // `| 4 | 5 | 2 | 3 |` r: 7, w: 6 + // + if result.seq < d.readIndex { + return nil, false + } + + // When the seq value is greater than the current read index that means a + // value was read from idx that overwrote the value that was expected to + // be at this idx. This happens when the writer has lapped the reader. The + // reader needs to catch up to the writer so it moves its write head to + // the new seq, effectively dropping the messages that were not read in + // between the two values. + // + // Here is a simulation of this scenario: + // + // 1. Both the read and write heads start at 0. + // `| nil | nil | nil | nil |` r: 0, w: 0 + // 2. The writer fills the buffer. + // `| 0 | 1 | 2 | 3 |` r: 0, w: 4 + // 3. The writer laps the read head. + // `| 4 | 5 | 2 | 3 |` r: 0, w: 6 + // 4. The reader reads the first value, expecting a seq of 0 but reads 4, + // this forces the reader to fast forward to 5. + // `| 4 | 5 | 2 | 3 |` r: 5, w: 6 + // + if result.seq > d.readIndex { + dropped := result.seq - d.readIndex + d.readIndex = result.seq + d.alerter.Alert(int(dropped)) + } + + // Only increment read index if a regular read occurred (where seq was + // equal to readIndex) or a value was read that caused a fast forward + // (where seq was greater than readIndex). + d.readIndex++ + return result.data, true +} diff --git a/vendor/github.com/rs/zerolog/diode/internal/diodes/poller.go b/vendor/github.com/rs/zerolog/diode/internal/diodes/poller.go new file mode 100644 index 0000000..dc51fd7 --- /dev/null +++ b/vendor/github.com/rs/zerolog/diode/internal/diodes/poller.go @@ -0,0 +1,80 @@ +package diodes + +import ( + "context" + "time" +) + +// Diode is any implementation of a diode. +type Diode interface { + Set(GenericDataType) + TryNext() (GenericDataType, bool) +} + +// Poller will poll a diode until a value is available. +type Poller struct { + Diode + interval time.Duration + ctx context.Context +} + +// PollerConfigOption can be used to setup the poller. +type PollerConfigOption func(*Poller) + +// WithPollingInterval sets the interval at which the diode is queried +// for new data. The default is 10ms. +func WithPollingInterval(interval time.Duration) PollerConfigOption { + return func(c *Poller) { + c.interval = interval + } +} + +// WithPollingContext sets the context to cancel any retrieval (Next()). It +// will not change any results for adding data (Set()). Default is +// context.Background(). +func WithPollingContext(ctx context.Context) PollerConfigOption { + return func(c *Poller) { + c.ctx = ctx + } +} + +// NewPoller returns a new Poller that wraps the given diode. +func NewPoller(d Diode, opts ...PollerConfigOption) *Poller { + p := &Poller{ + Diode: d, + interval: 10 * time.Millisecond, + ctx: context.Background(), + } + + for _, o := range opts { + o(p) + } + + return p +} + +// Next polls the diode until data is available or until the context is done. +// If the context is done, then nil will be returned. +func (p *Poller) Next() GenericDataType { + for { + data, ok := p.Diode.TryNext() + if !ok { + if p.isDone() { + return nil + } + + time.Sleep(p.interval) + continue + } + return data + } +} + +func (p *Poller) isDone() bool { + select { + case <-p.ctx.Done(): + return true + default: + return false + } +} diff --git a/vendor/github.com/rs/zerolog/diode/internal/diodes/waiter.go b/vendor/github.com/rs/zerolog/diode/internal/diodes/waiter.go new file mode 100644 index 0000000..a4a8e97 --- /dev/null +++ b/vendor/github.com/rs/zerolog/diode/internal/diodes/waiter.go @@ -0,0 +1,88 @@ +package diodes + +import ( + "context" + "sync" +) + +// Waiter will use a conditional mutex to alert the reader to when data is +// available. +type Waiter struct { + Diode + mu sync.Mutex + c *sync.Cond + ctx context.Context +} + +// WaiterConfigOption can be used to setup the waiter. +type WaiterConfigOption func(*Waiter) + +// WithWaiterContext sets the context to cancel any retrieval (Next()). It +// will not change any results for adding data (Set()). Default is +// context.Background(). +func WithWaiterContext(ctx context.Context) WaiterConfigOption { + return func(c *Waiter) { + c.ctx = ctx + } +} + +// NewWaiter returns a new Waiter that wraps the given diode. +func NewWaiter(d Diode, opts ...WaiterConfigOption) *Waiter { + w := new(Waiter) + w.Diode = d + w.c = sync.NewCond(&w.mu) + w.ctx = context.Background() + + for _, opt := range opts { + opt(w) + } + + go func() { + <-w.ctx.Done() + + // Mutex is strictly necessary here to avoid a race in Next() (between + // w.isDone() and w.c.Wait()) and w.c.Broadcast() here. + w.mu.Lock() + w.c.Broadcast() + w.mu.Unlock() + }() + + return w +} + +// Set invokes the wrapped diode's Set with the given data and uses Broadcast +// to wake up any readers. +func (w *Waiter) Set(data GenericDataType) { + w.Diode.Set(data) + w.c.Broadcast() +} + +// Next returns the next data point on the wrapped diode. If there is not any +// new data, it will Wait for set to be called or the context to be done. +// If the context is done, then nil will be returned. +func (w *Waiter) Next() GenericDataType { + w.mu.Lock() + defer w.mu.Unlock() + + for { + data, ok := w.Diode.TryNext() + if !ok { + if w.isDone() { + return nil + } + + w.c.Wait() + continue + } + return data + } +} + +func (w *Waiter) isDone() bool { + select { + case <-w.ctx.Done(): + return true + default: + return false + } +} diff --git a/vendor/github.com/rs/zerolog/log/log.go b/vendor/github.com/rs/zerolog/log/log.go new file mode 100644 index 0000000..a96ec50 --- /dev/null +++ b/vendor/github.com/rs/zerolog/log/log.go @@ -0,0 +1,131 @@ +// Package log provides a global logger for zerolog. +package log + +import ( + "context" + "fmt" + "io" + "os" + + "github.com/rs/zerolog" +) + +// Logger is the global logger. +var Logger = zerolog.New(os.Stderr).With().Timestamp().Logger() + +// Output duplicates the global logger and sets w as its output. +func Output(w io.Writer) zerolog.Logger { + return Logger.Output(w) +} + +// With creates a child logger with the field added to its context. +func With() zerolog.Context { + return Logger.With() +} + +// Level creates a child logger with the minimum accepted level set to level. +func Level(level zerolog.Level) zerolog.Logger { + return Logger.Level(level) +} + +// Sample returns a logger with the s sampler. +func Sample(s zerolog.Sampler) zerolog.Logger { + return Logger.Sample(s) +} + +// Hook returns a logger with the h Hook. +func Hook(h zerolog.Hook) zerolog.Logger { + return Logger.Hook(h) +} + +// Err starts a new message with error level with err as a field if not nil or +// with info level if err is nil. +// +// You must call Msg on the returned event in order to send the event. +func Err(err error) *zerolog.Event { + return Logger.Err(err) +} + +// Trace starts a new message with trace level. +// +// You must call Msg on the returned event in order to send the event. +func Trace() *zerolog.Event { + return Logger.Trace() +} + +// Debug starts a new message with debug level. +// +// You must call Msg on the returned event in order to send the event. +func Debug() *zerolog.Event { + return Logger.Debug() +} + +// Info starts a new message with info level. +// +// You must call Msg on the returned event in order to send the event. +func Info() *zerolog.Event { + return Logger.Info() +} + +// Warn starts a new message with warn level. +// +// You must call Msg on the returned event in order to send the event. +func Warn() *zerolog.Event { + return Logger.Warn() +} + +// Error starts a new message with error level. +// +// You must call Msg on the returned event in order to send the event. +func Error() *zerolog.Event { + return Logger.Error() +} + +// Fatal starts a new message with fatal level. The os.Exit(1) function +// is called by the Msg method. +// +// You must call Msg on the returned event in order to send the event. +func Fatal() *zerolog.Event { + return Logger.Fatal() +} + +// Panic starts a new message with panic level. The message is also sent +// to the panic function. +// +// You must call Msg on the returned event in order to send the event. +func Panic() *zerolog.Event { + return Logger.Panic() +} + +// WithLevel starts a new message with level. +// +// You must call Msg on the returned event in order to send the event. +func WithLevel(level zerolog.Level) *zerolog.Event { + return Logger.WithLevel(level) +} + +// Log starts a new message with no level. Setting zerolog.GlobalLevel to +// zerolog.Disabled will still disable events produced by this method. +// +// You must call Msg on the returned event in order to send the event. +func Log() *zerolog.Event { + return Logger.Log() +} + +// Print sends a log event using debug level and no extra field. +// Arguments are handled in the manner of fmt.Print. +func Print(v ...interface{}) { + Logger.Debug().CallerSkipFrame(1).Msg(fmt.Sprint(v...)) +} + +// Printf sends a log event using debug level and no extra field. +// Arguments are handled in the manner of fmt.Printf. +func Printf(format string, v ...interface{}) { + Logger.Debug().CallerSkipFrame(1).Msgf(format, v...) +} + +// Ctx returns the Logger associated with the ctx. If no logger +// is associated, a disabled logger is returned. +func Ctx(ctx context.Context) *zerolog.Logger { + return zerolog.Ctx(ctx) +} |
