diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-22 17:35:49 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-22 17:35:49 -0600 |
| commit | 20ef0d92694465ac86b550df139e8366a0a2b4fa (patch) | |
| tree | 3f14589e1ce6eb9306a3af31c3a1f9e1af5ed637 /vendor/github.com/authzed/spicedb/internal/middleware | |
| parent | 44e0d272c040cdc53a98b9f1dc58ae7da67752e6 (diff) | |
feat: connect to spicedb
Diffstat (limited to 'vendor/github.com/authzed/spicedb/internal/middleware')
12 files changed, 433 insertions, 0 deletions
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/chain.go b/vendor/github.com/authzed/spicedb/internal/middleware/chain.go new file mode 100644 index 0000000..de08ffc --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/chain.go @@ -0,0 +1,58 @@ +package middleware + +import ( + "context" + + "google.golang.org/grpc" +) + +// Vendored from grpc-go-middleware +// These were removed in v2, see: https://github.com/grpc-ecosystem/go-grpc-middleware/pull/385 + +// ChainUnaryServer creates a single interceptor out of a chain of many interceptors. +// +// Execution is done in left-to-right order, including passing of context. +// For example ChainUnaryServer(one, two, three) will execute one before two before three, and three +// will see context changes of one and two. +func ChainUnaryServer(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor { + n := len(interceptors) + + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + chainer := func(currentInter grpc.UnaryServerInterceptor, currentHandler grpc.UnaryHandler) grpc.UnaryHandler { + return func(currentCtx context.Context, currentReq interface{}) (interface{}, error) { + return currentInter(currentCtx, currentReq, info, currentHandler) + } + } + + chainedHandler := handler + for i := n - 1; i >= 0; i-- { + chainedHandler = chainer(interceptors[i], chainedHandler) + } + + return chainedHandler(ctx, req) + } +} + +// ChainStreamServer creates a single interceptor out of a chain of many interceptors. +// +// Execution is done in left-to-right order, including passing of context. +// For example ChainUnaryServer(one, two, three) will execute one before two before three. +// If you want to pass context between interceptors, use WrapServerStream. +func ChainStreamServer(interceptors ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor { + n := len(interceptors) + + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + chainer := func(currentInter grpc.StreamServerInterceptor, currentHandler grpc.StreamHandler) grpc.StreamHandler { + return func(currentSrv interface{}, currentStream grpc.ServerStream) error { + return currentInter(currentSrv, currentStream, info, currentHandler) + } + } + + chainedHandler := handler + for i := n - 1; i >= 0; i-- { + chainedHandler = chainer(interceptors[i], chainedHandler) + } + + return chainedHandler(srv, ss) + } +} diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/datastore/datastore.go b/vendor/github.com/authzed/spicedb/internal/middleware/datastore/datastore.go new file mode 100644 index 0000000..8c321b3 --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/datastore/datastore.go @@ -0,0 +1,85 @@ +package datastore + +import ( + "context" + + middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2" + "google.golang.org/grpc" + + "github.com/authzed/spicedb/pkg/datastore" +) + +type ctxKeyType struct{} + +var datastoreKey ctxKeyType = struct{}{} + +type datastoreHandle struct { + datastore datastore.Datastore +} + +// ContextWithHandle adds a placeholder to a context that will later be +// filled by the datastore +func ContextWithHandle(ctx context.Context) context.Context { + return context.WithValue(ctx, datastoreKey, &datastoreHandle{}) +} + +// FromContext reads the selected datastore out of a context.Context +// and returns nil if it does not exist. +func FromContext(ctx context.Context) datastore.Datastore { + if c := ctx.Value(datastoreKey); c != nil { + handle := c.(*datastoreHandle) + return handle.datastore + } + return nil +} + +// MustFromContext reads the selected datastore out of a context.Context and panics if it does not exist +func MustFromContext(ctx context.Context) datastore.Datastore { + datastore := FromContext(ctx) + if datastore == nil { + panic("datastore middleware did not inject datastore") + } + + return datastore +} + +// SetInContext adds a datastore to the given context +func SetInContext(ctx context.Context, datastore datastore.Datastore) error { + handle := ctx.Value(datastoreKey) + if handle == nil { + return nil + } + handle.(*datastoreHandle).datastore = datastore + return nil +} + +// ContextWithDatastore adds the handle and datastore in one step +func ContextWithDatastore(ctx context.Context, datastore datastore.Datastore) context.Context { + return context.WithValue(ctx, datastoreKey, &datastoreHandle{datastore: datastore}) +} + +// UnaryServerInterceptor returns a new unary server interceptor that adds the +// datastore to the context +func UnaryServerInterceptor(datastore datastore.Datastore) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + newCtx := ContextWithHandle(ctx) + if err := SetInContext(newCtx, datastore); err != nil { + return nil, err + } + + return handler(newCtx, req) + } +} + +// StreamServerInterceptor returns a new stream server interceptor that adds the +// datastore to the context +func StreamServerInterceptor(datastore datastore.Datastore) grpc.StreamServerInterceptor { + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + wrapped := middleware.WrapServerStream(stream) + wrapped.WrappedContext = ContextWithHandle(wrapped.WrappedContext) + if err := SetInContext(wrapped.WrappedContext, datastore); err != nil { + return err + } + return handler(srv, wrapped) + } +} diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/datastore/doc.go b/vendor/github.com/authzed/spicedb/internal/middleware/datastore/doc.go new file mode 100644 index 0000000..a4d0cf0 --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/datastore/doc.go @@ -0,0 +1,2 @@ +// Package datastore defines middleware that injects the datastore into the context. +package datastore diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/doc.go b/vendor/github.com/authzed/spicedb/internal/middleware/doc.go new file mode 100644 index 0000000..b11553e --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/doc.go @@ -0,0 +1,2 @@ +// Package middleware defines various custom middlewares. +package middleware diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/doc.go b/vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/doc.go new file mode 100644 index 0000000..2d9aa01 --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/doc.go @@ -0,0 +1,2 @@ +// Package handwrittenvalidation defines middleware that runs custom-made validations on incoming requests. +package handwrittenvalidation diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/handwrittenvalidation.go b/vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/handwrittenvalidation.go new file mode 100644 index 0000000..2adc4b3 --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/handwrittenvalidation.go @@ -0,0 +1,54 @@ +package handwrittenvalidation + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type handwrittenValidator interface { + HandwrittenValidate() error +} + +// UnaryServerInterceptor returns a new unary server interceptor that runs the handwritten validation +// on the incoming request, if any. +func UnaryServerInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + validator, ok := req.(handwrittenValidator) + if ok { + err := validator.HandwrittenValidate() + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "%s", err) + } + } + + return handler(ctx, req) +} + +// StreamServerInterceptor returns a new stream server interceptor that runs the handwritten validation +// on the incoming request messages, if any. +func StreamServerInterceptor(srv interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + wrapper := &recvWrapper{stream} + return handler(srv, wrapper) +} + +type recvWrapper struct { + grpc.ServerStream +} + +func (s *recvWrapper) RecvMsg(m interface{}) error { + if err := s.ServerStream.RecvMsg(m); err != nil { + return err + } + + validator, ok := m.(handwrittenValidator) + if ok { + err := validator.HandwrittenValidate() + if err != nil { + return err + } + } + + return nil +} diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/doc.go b/vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/doc.go new file mode 100644 index 0000000..7e3ae0f --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/doc.go @@ -0,0 +1,2 @@ +// Package servicespecific defines middleware that injects other middlewares. +package servicespecific diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/servicespecific.go b/vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/servicespecific.go new file mode 100644 index 0000000..10fe753 --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/servicespecific.go @@ -0,0 +1,39 @@ +package servicespecific + +import ( + "context" + + "google.golang.org/grpc" +) + +// ExtraUnaryInterceptor is an interface for a service which has its own bundled +// unary interceptors that must be run. +type ExtraUnaryInterceptor interface { + UnaryInterceptor() grpc.UnaryServerInterceptor +} + +// ExtraStreamInterceptor is an interface for a service which has its own bundled +// stream interceptors that must be run. +type ExtraStreamInterceptor interface { + StreamInterceptor() grpc.StreamServerInterceptor +} + +// UnaryServerInterceptor returns a new unary server interceptor that runs bundled interceptors. +func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if hasExtraInterceptor, ok := info.Server.(ExtraUnaryInterceptor); ok { + interceptor := hasExtraInterceptor.UnaryInterceptor() + return interceptor(ctx, req, info, handler) + } + + return handler(ctx, req) +} + +// StreamServerInterceptor returns a new stream server interceptor that runs bundled interceptors. +func StreamServerInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if hasExtraInterceptor, ok := srv.(ExtraStreamInterceptor); ok { + interceptor := hasExtraInterceptor.StreamInterceptor() + return interceptor(srv, stream, info, handler) + } + + return handler(srv, stream) +} diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/doc.go b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/doc.go new file mode 100644 index 0000000..9eceb4d --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/doc.go @@ -0,0 +1,2 @@ +// Package streamtimeout defines middleware that cancels the context after a timeout if no new data has been received. +package streamtimeout diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go new file mode 100644 index 0000000..8f09fdb --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go @@ -0,0 +1,57 @@ +package streamtimeout + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + + "github.com/authzed/spicedb/pkg/spiceerrors" +) + +// MustStreamServerInterceptor returns a new stream server interceptor that cancels the context +// after a timeout if no new data has been received. +func MustStreamServerInterceptor(timeout time.Duration) grpc.StreamServerInterceptor { + if timeout <= 0 { + panic("timeout must be >= 0 for streaming timeout interceptor") + } + + return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ctx := stream.Context() + withCancel, internalCancelFn := context.WithCancelCause(ctx) + timer := time.AfterFunc(timeout, func() { + internalCancelFn(spiceerrors.WithCodeAndDetailsAsError(fmt.Errorf("operation took longer than allowed %v to complete", timeout), codes.DeadlineExceeded)) + }) + wrapper := &sendWrapper{stream, withCancel, timer, timeout} + return handler(srv, wrapper) + } +} + +type sendWrapper struct { + grpc.ServerStream + + ctx context.Context + timer *time.Timer + timeout time.Duration +} + +func (s *sendWrapper) Context() context.Context { + return s.ctx +} + +func (s *sendWrapper) SetTrailer(_ metadata.MD) { + s.timer.Stop() +} + +func (s *sendWrapper) SendMsg(m any) error { + err := s.ServerStream.SendMsg(m) + if err != nil { + s.timer.Stop() + } else { + s.timer.Reset(s.timeout) + } + return err +} diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/doc.go b/vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/doc.go new file mode 100644 index 0000000..c05cacc --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/doc.go @@ -0,0 +1,2 @@ +// Package usagemetrics defines middleware that adds usage data (e.g. dispatch counts) to the response. +package usagemetrics diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/usagemetrics.go b/vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/usagemetrics.go new file mode 100644 index 0000000..32f5676 --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/usagemetrics.go @@ -0,0 +1,128 @@ +package usagemetrics + +import ( + "context" + "strconv" + "time" + + "github.com/authzed/authzed-go/pkg/responsemeta" + "github.com/authzed/grpcutil" + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "google.golang.org/grpc" + + log "github.com/authzed/spicedb/internal/logging" + dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" +) + +var ( + // DispatchedCountLabels are the labels that DispatchedCountHistogram will + // have by default. + DispatchedCountLabels = []string{"method", "cached"} + + // DispatchedCountHistogram is the metric that SpiceDB uses to keep track + // of the number of downstream dispatches that are performed to answer a + // single query. + DispatchedCountHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "spicedb", + Subsystem: "services", + Name: "dispatches", + Help: "Histogram of cluster dispatches performed by the instance.", + Buckets: []float64{1, 5, 10, 25, 50, 100, 250}, + }, DispatchedCountLabels) +) + +type reporter struct{} + +func (r *reporter) ServerReporter(ctx context.Context, callMeta interceptors.CallMeta) (interceptors.Reporter, context.Context) { + _, methodName := grpcutil.SplitMethodName(callMeta.FullMethod()) + ctx = ContextWithHandle(ctx) + return &serverReporter{ctx: ctx, methodName: methodName}, ctx +} + +type serverReporter struct { + interceptors.NoopReporter + ctx context.Context + methodName string +} + +func (r *serverReporter) PostCall(_ error, _ time.Duration) { + responseMeta := FromContext(r.ctx) + if responseMeta == nil { + responseMeta = &dispatch.ResponseMeta{} + } + + err := annotateAndReportForMetadata(r.ctx, r.methodName, responseMeta) + // if context is cancelled, the stream will be closed, and gRPC will return ErrIllegalHeaderWrite + // this prevents logging unnecessary error messages + if r.ctx.Err() != nil { + return + } + if err != nil { + log.Ctx(r.ctx).Warn().Err(err).Msg("usagemetrics: could not report metadata") + } +} + +// UnaryServerInterceptor implements a gRPC Middleware for reporting usage metrics +// in both the trailer of the request, as well as to the registered prometheus +// metrics. +func UnaryServerInterceptor() grpc.UnaryServerInterceptor { + return interceptors.UnaryServerInterceptor(&reporter{}) +} + +// StreamServerInterceptor implements a gRPC Middleware for reporting usage metrics +// in both the trailer of the request, as well as to the registered prometheus +// metrics +func StreamServerInterceptor() grpc.StreamServerInterceptor { + return interceptors.StreamServerInterceptor(&reporter{}) +} + +func annotateAndReportForMetadata(ctx context.Context, methodName string, metadata *dispatch.ResponseMeta) error { + DispatchedCountHistogram.WithLabelValues(methodName, "false").Observe(float64(metadata.DispatchCount)) + DispatchedCountHistogram.WithLabelValues(methodName, "true").Observe(float64(metadata.CachedDispatchCount)) + + return responsemeta.SetResponseTrailerMetadata(ctx, map[responsemeta.ResponseMetadataTrailerKey]string{ + responsemeta.DispatchedOperationsCount: strconv.Itoa(int(metadata.DispatchCount)), + responsemeta.CachedOperationsCount: strconv.Itoa(int(metadata.CachedDispatchCount)), + }) +} + +// Create a new type to prevent context collisions +type responseMetaKey string + +var metadataCtxKey responseMetaKey = "dispatched-response-meta" + +type metaHandle struct{ metadata *dispatch.ResponseMeta } + +// SetInContext should be called in a gRPC handler to correctly set the response metadata +// for the dispatched request. +func SetInContext(ctx context.Context, metadata *dispatch.ResponseMeta) { + possibleHandle := ctx.Value(metadataCtxKey) + if possibleHandle == nil { + return + } + + handle := possibleHandle.(*metaHandle) + handle.metadata = metadata +} + +// FromContext returns any metadata that was stored in the context. +// +// This is useful for testing that a handler is properly setting the context. +func FromContext(ctx context.Context) *dispatch.ResponseMeta { + possibleHandle := ctx.Value(metadataCtxKey) + if possibleHandle == nil { + return nil + } + return possibleHandle.(*metaHandle).metadata +} + +// ContextWithHandle creates a new context with a location to store metadata +// returned from a dispatched request. +// +// This should only be called in middleware or testing functions. +func ContextWithHandle(ctx context.Context) context.Context { + var handle metaHandle + return context.WithValue(ctx, metadataCtxKey, &handle) +} |
