summaryrefslogtreecommitdiff
path: root/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go
diff options
context:
space:
mode:
authormo khan <mo@mokhan.ca>2025-07-22 17:35:49 -0600
committermo khan <mo@mokhan.ca>2025-07-22 17:35:49 -0600
commit20ef0d92694465ac86b550df139e8366a0a2b4fa (patch)
tree3f14589e1ce6eb9306a3af31c3a1f9e1af5ed637 /vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go
parent44e0d272c040cdc53a98b9f1dc58ae7da67752e6 (diff)
feat: connect to spicedb
Diffstat (limited to 'vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go')
-rw-r--r--vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go57
1 files changed, 57 insertions, 0 deletions
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go
new file mode 100644
index 0000000..8f09fdb
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go
@@ -0,0 +1,57 @@
+package streamtimeout
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+
+ "github.com/authzed/spicedb/pkg/spiceerrors"
+)
+
+// MustStreamServerInterceptor returns a new stream server interceptor that cancels the context
+// after a timeout if no new data has been received.
+func MustStreamServerInterceptor(timeout time.Duration) grpc.StreamServerInterceptor {
+ if timeout <= 0 {
+ panic("timeout must be >= 0 for streaming timeout interceptor")
+ }
+
+ return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ ctx := stream.Context()
+ withCancel, internalCancelFn := context.WithCancelCause(ctx)
+ timer := time.AfterFunc(timeout, func() {
+ internalCancelFn(spiceerrors.WithCodeAndDetailsAsError(fmt.Errorf("operation took longer than allowed %v to complete", timeout), codes.DeadlineExceeded))
+ })
+ wrapper := &sendWrapper{stream, withCancel, timer, timeout}
+ return handler(srv, wrapper)
+ }
+}
+
+type sendWrapper struct {
+ grpc.ServerStream
+
+ ctx context.Context
+ timer *time.Timer
+ timeout time.Duration
+}
+
+func (s *sendWrapper) Context() context.Context {
+ return s.ctx
+}
+
+func (s *sendWrapper) SetTrailer(_ metadata.MD) {
+ s.timer.Stop()
+}
+
+func (s *sendWrapper) SendMsg(m any) error {
+ err := s.ServerStream.SendMsg(m)
+ if err != nil {
+ s.timer.Stop()
+ } else {
+ s.timer.Reset(s.timeout)
+ }
+ return err
+}