diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-22 17:35:49 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-22 17:35:49 -0600 |
| commit | 20ef0d92694465ac86b550df139e8366a0a2b4fa (patch) | |
| tree | 3f14589e1ce6eb9306a3af31c3a1f9e1af5ed637 /vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout | |
| parent | 44e0d272c040cdc53a98b9f1dc58ae7da67752e6 (diff) | |
feat: connect to spicedb
Diffstat (limited to 'vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout')
| -rw-r--r-- | vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/doc.go | 2 | ||||
| -rw-r--r-- | vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go | 57 |
2 files changed, 59 insertions, 0 deletions
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/doc.go b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/doc.go new file mode 100644 index 0000000..9eceb4d --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/doc.go @@ -0,0 +1,2 @@ +// Package streamtimeout defines middleware that cancels the context after a timeout if no new data has been received. +package streamtimeout diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go new file mode 100644 index 0000000..8f09fdb --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go @@ -0,0 +1,57 @@ +package streamtimeout + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + + "github.com/authzed/spicedb/pkg/spiceerrors" +) + +// MustStreamServerInterceptor returns a new stream server interceptor that cancels the context +// after a timeout if no new data has been received. +func MustStreamServerInterceptor(timeout time.Duration) grpc.StreamServerInterceptor { + if timeout <= 0 { + panic("timeout must be >= 0 for streaming timeout interceptor") + } + + return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctx := stream.Context() + withCancel, internalCancelFn := context.WithCancelCause(ctx) + timer := time.AfterFunc(timeout, func() { + internalCancelFn(spiceerrors.WithCodeAndDetailsAsError(fmt.Errorf("operation took longer than allowed %v to complete", timeout), codes.DeadlineExceeded)) + }) + wrapper := &sendWrapper{stream, withCancel, timer, timeout} + return handler(srv, wrapper) + } +} + +type sendWrapper struct { + grpc.ServerStream + + ctx context.Context + timer *time.Timer + timeout time.Duration +} + +func (s *sendWrapper) Context() context.Context { + return s.ctx +} + +func (s *sendWrapper) SetTrailer(_ metadata.MD) { + s.timer.Stop() +} + +func (s *sendWrapper) SendMsg(m any) error { + err := s.ServerStream.SendMsg(m) + if err != nil { + s.timer.Stop() + } else { + s.timer.Reset(s.timeout) + } + return err +} |
