summaryrefslogtreecommitdiff
path: root/vendor/github.com/authzed/spicedb/internal/middleware
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/authzed/spicedb/internal/middleware')
-rw-r--r--vendor/github.com/authzed/spicedb/internal/middleware/chain.go58
-rw-r--r--vendor/github.com/authzed/spicedb/internal/middleware/datastore/datastore.go85
-rw-r--r--vendor/github.com/authzed/spicedb/internal/middleware/datastore/doc.go2
-rw-r--r--vendor/github.com/authzed/spicedb/internal/middleware/doc.go2
-rw-r--r--vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/doc.go2
-rw-r--r--vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/handwrittenvalidation.go54
-rw-r--r--vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/doc.go2
-rw-r--r--vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/servicespecific.go39
-rw-r--r--vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/doc.go2
-rw-r--r--vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go57
-rw-r--r--vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/doc.go2
-rw-r--r--vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/usagemetrics.go128
12 files changed, 433 insertions, 0 deletions
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/chain.go b/vendor/github.com/authzed/spicedb/internal/middleware/chain.go
new file mode 100644
index 0000000..de08ffc
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/middleware/chain.go
@@ -0,0 +1,58 @@
+package middleware
+
+import (
+ "context"
+
+ "google.golang.org/grpc"
+)
+
+// Vendored from grpc-go-middleware
+// These were removed in v2, see: https://github.com/grpc-ecosystem/go-grpc-middleware/pull/385
+
+// ChainUnaryServer creates a single interceptor out of a chain of many interceptors.
+//
+// Execution is done in left-to-right order, including passing of context.
+// For example ChainUnaryServer(one, two, three) will execute one before two before three, and three
+// will see context changes of one and two.
+func ChainUnaryServer(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
+ n := len(interceptors)
+
+ return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ chainer := func(currentInter grpc.UnaryServerInterceptor, currentHandler grpc.UnaryHandler) grpc.UnaryHandler {
+ return func(currentCtx context.Context, currentReq interface{}) (interface{}, error) {
+ return currentInter(currentCtx, currentReq, info, currentHandler)
+ }
+ }
+
+ chainedHandler := handler
+ for i := n - 1; i >= 0; i-- {
+ chainedHandler = chainer(interceptors[i], chainedHandler)
+ }
+
+ return chainedHandler(ctx, req)
+ }
+}
+
+// ChainStreamServer creates a single interceptor out of a chain of many interceptors.
+//
+// Execution is done in left-to-right order, including passing of context.
+// For example ChainUnaryServer(one, two, three) will execute one before two before three.
+// If you want to pass context between interceptors, use WrapServerStream.
+func ChainStreamServer(interceptors ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor {
+ n := len(interceptors)
+
+ return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ chainer := func(currentInter grpc.StreamServerInterceptor, currentHandler grpc.StreamHandler) grpc.StreamHandler {
+ return func(currentSrv interface{}, currentStream grpc.ServerStream) error {
+ return currentInter(currentSrv, currentStream, info, currentHandler)
+ }
+ }
+
+ chainedHandler := handler
+ for i := n - 1; i >= 0; i-- {
+ chainedHandler = chainer(interceptors[i], chainedHandler)
+ }
+
+ return chainedHandler(srv, ss)
+ }
+}
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/datastore/datastore.go b/vendor/github.com/authzed/spicedb/internal/middleware/datastore/datastore.go
new file mode 100644
index 0000000..8c321b3
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/middleware/datastore/datastore.go
@@ -0,0 +1,85 @@
+package datastore
+
+import (
+ "context"
+
+ middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2"
+ "google.golang.org/grpc"
+
+ "github.com/authzed/spicedb/pkg/datastore"
+)
+
+type ctxKeyType struct{}
+
+var datastoreKey ctxKeyType = struct{}{}
+
+type datastoreHandle struct {
+ datastore datastore.Datastore
+}
+
+// ContextWithHandle adds a placeholder to a context that will later be
+// filled by the datastore
+func ContextWithHandle(ctx context.Context) context.Context {
+ return context.WithValue(ctx, datastoreKey, &datastoreHandle{})
+}
+
+// FromContext reads the selected datastore out of a context.Context
+// and returns nil if it does not exist.
+func FromContext(ctx context.Context) datastore.Datastore {
+ if c := ctx.Value(datastoreKey); c != nil {
+ handle := c.(*datastoreHandle)
+ return handle.datastore
+ }
+ return nil
+}
+
+// MustFromContext reads the selected datastore out of a context.Context and panics if it does not exist
+func MustFromContext(ctx context.Context) datastore.Datastore {
+ datastore := FromContext(ctx)
+ if datastore == nil {
+ panic("datastore middleware did not inject datastore")
+ }
+
+ return datastore
+}
+
+// SetInContext adds a datastore to the given context
+func SetInContext(ctx context.Context, datastore datastore.Datastore) error {
+ handle := ctx.Value(datastoreKey)
+ if handle == nil {
+ return nil
+ }
+ handle.(*datastoreHandle).datastore = datastore
+ return nil
+}
+
+// ContextWithDatastore adds the handle and datastore in one step
+func ContextWithDatastore(ctx context.Context, datastore datastore.Datastore) context.Context {
+ return context.WithValue(ctx, datastoreKey, &datastoreHandle{datastore: datastore})
+}
+
+// UnaryServerInterceptor returns a new unary server interceptor that adds the
+// datastore to the context
+func UnaryServerInterceptor(datastore datastore.Datastore) grpc.UnaryServerInterceptor {
+ return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ newCtx := ContextWithHandle(ctx)
+ if err := SetInContext(newCtx, datastore); err != nil {
+ return nil, err
+ }
+
+ return handler(newCtx, req)
+ }
+}
+
+// StreamServerInterceptor returns a new stream server interceptor that adds the
+// datastore to the context
+func StreamServerInterceptor(datastore datastore.Datastore) grpc.StreamServerInterceptor {
+ return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ wrapped := middleware.WrapServerStream(stream)
+ wrapped.WrappedContext = ContextWithHandle(wrapped.WrappedContext)
+ if err := SetInContext(wrapped.WrappedContext, datastore); err != nil {
+ return err
+ }
+ return handler(srv, wrapped)
+ }
+}
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/datastore/doc.go b/vendor/github.com/authzed/spicedb/internal/middleware/datastore/doc.go
new file mode 100644
index 0000000..a4d0cf0
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/middleware/datastore/doc.go
@@ -0,0 +1,2 @@
+// Package datastore defines middleware that injects the datastore into the context.
+package datastore
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/doc.go b/vendor/github.com/authzed/spicedb/internal/middleware/doc.go
new file mode 100644
index 0000000..b11553e
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/middleware/doc.go
@@ -0,0 +1,2 @@
+// Package middleware defines various custom middlewares.
+package middleware
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/doc.go b/vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/doc.go
new file mode 100644
index 0000000..2d9aa01
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/doc.go
@@ -0,0 +1,2 @@
+// Package handwrittenvalidation defines middleware that runs custom-made validations on incoming requests.
+package handwrittenvalidation
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/handwrittenvalidation.go b/vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/handwrittenvalidation.go
new file mode 100644
index 0000000..2adc4b3
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/middleware/handwrittenvalidation/handwrittenvalidation.go
@@ -0,0 +1,54 @@
+package handwrittenvalidation
+
+import (
+ "context"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type handwrittenValidator interface {
+ HandwrittenValidate() error
+}
+
+// UnaryServerInterceptor returns a new unary server interceptor that runs the handwritten validation
+// on the incoming request, if any.
+func UnaryServerInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ validator, ok := req.(handwrittenValidator)
+ if ok {
+ err := validator.HandwrittenValidate()
+ if err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "%s", err)
+ }
+ }
+
+ return handler(ctx, req)
+}
+
+// StreamServerInterceptor returns a new stream server interceptor that runs the handwritten validation
+// on the incoming request messages, if any.
+func StreamServerInterceptor(srv interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ wrapper := &recvWrapper{stream}
+ return handler(srv, wrapper)
+}
+
+type recvWrapper struct {
+ grpc.ServerStream
+}
+
+func (s *recvWrapper) RecvMsg(m interface{}) error {
+ if err := s.ServerStream.RecvMsg(m); err != nil {
+ return err
+ }
+
+ validator, ok := m.(handwrittenValidator)
+ if ok {
+ err := validator.HandwrittenValidate()
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/doc.go b/vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/doc.go
new file mode 100644
index 0000000..7e3ae0f
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/doc.go
@@ -0,0 +1,2 @@
+// Package servicespecific defines middleware that injects other middlewares.
+package servicespecific
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/servicespecific.go b/vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/servicespecific.go
new file mode 100644
index 0000000..10fe753
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/middleware/servicespecific/servicespecific.go
@@ -0,0 +1,39 @@
+package servicespecific
+
+import (
+ "context"
+
+ "google.golang.org/grpc"
+)
+
+// ExtraUnaryInterceptor is an interface for a service which has its own bundled
+// unary interceptors that must be run.
+type ExtraUnaryInterceptor interface {
+ UnaryInterceptor() grpc.UnaryServerInterceptor
+}
+
+// ExtraStreamInterceptor is an interface for a service which has its own bundled
+// stream interceptors that must be run.
+type ExtraStreamInterceptor interface {
+ StreamInterceptor() grpc.StreamServerInterceptor
+}
+
+// UnaryServerInterceptor returns a new unary server interceptor that runs bundled interceptors.
+func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+ if hasExtraInterceptor, ok := info.Server.(ExtraUnaryInterceptor); ok {
+ interceptor := hasExtraInterceptor.UnaryInterceptor()
+ return interceptor(ctx, req, info, handler)
+ }
+
+ return handler(ctx, req)
+}
+
+// StreamServerInterceptor returns a new stream server interceptor that runs bundled interceptors.
+func StreamServerInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ if hasExtraInterceptor, ok := srv.(ExtraStreamInterceptor); ok {
+ interceptor := hasExtraInterceptor.StreamInterceptor()
+ return interceptor(srv, stream, info, handler)
+ }
+
+ return handler(srv, stream)
+}
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/doc.go b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/doc.go
new file mode 100644
index 0000000..9eceb4d
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/doc.go
@@ -0,0 +1,2 @@
+// Package streamtimeout defines middleware that cancels the context after a timeout if no new data has been received.
+package streamtimeout
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go
new file mode 100644
index 0000000..8f09fdb
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/middleware/streamtimeout/streamtimeout.go
@@ -0,0 +1,57 @@
+package streamtimeout
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+
+ "github.com/authzed/spicedb/pkg/spiceerrors"
+)
+
+// MustStreamServerInterceptor returns a new stream server interceptor that cancels the context
+// after a timeout if no new data has been received.
+func MustStreamServerInterceptor(timeout time.Duration) grpc.StreamServerInterceptor {
+ if timeout <= 0 {
+ panic("timeout must be >= 0 for streaming timeout interceptor")
+ }
+
+ return func(srv any, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+ ctx := stream.Context()
+ withCancel, internalCancelFn := context.WithCancelCause(ctx)
+ timer := time.AfterFunc(timeout, func() {
+ internalCancelFn(spiceerrors.WithCodeAndDetailsAsError(fmt.Errorf("operation took longer than allowed %v to complete", timeout), codes.DeadlineExceeded))
+ })
+ wrapper := &sendWrapper{stream, withCancel, timer, timeout}
+ return handler(srv, wrapper)
+ }
+}
+
+type sendWrapper struct {
+ grpc.ServerStream
+
+ ctx context.Context
+ timer *time.Timer
+ timeout time.Duration
+}
+
+func (s *sendWrapper) Context() context.Context {
+ return s.ctx
+}
+
+func (s *sendWrapper) SetTrailer(_ metadata.MD) {
+ s.timer.Stop()
+}
+
+func (s *sendWrapper) SendMsg(m any) error {
+ err := s.ServerStream.SendMsg(m)
+ if err != nil {
+ s.timer.Stop()
+ } else {
+ s.timer.Reset(s.timeout)
+ }
+ return err
+}
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/doc.go b/vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/doc.go
new file mode 100644
index 0000000..c05cacc
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/doc.go
@@ -0,0 +1,2 @@
+// Package usagemetrics defines middleware that adds usage data (e.g. dispatch counts) to the response.
+package usagemetrics
diff --git a/vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/usagemetrics.go b/vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/usagemetrics.go
new file mode 100644
index 0000000..32f5676
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/middleware/usagemetrics/usagemetrics.go
@@ -0,0 +1,128 @@
+package usagemetrics
+
+import (
+ "context"
+ "strconv"
+ "time"
+
+ "github.com/authzed/authzed-go/pkg/responsemeta"
+ "github.com/authzed/grpcutil"
+ "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+ "google.golang.org/grpc"
+
+ log "github.com/authzed/spicedb/internal/logging"
+ dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
+)
+
+var (
+ // DispatchedCountLabels are the labels that DispatchedCountHistogram will
+ // have by default.
+ DispatchedCountLabels = []string{"method", "cached"}
+
+ // DispatchedCountHistogram is the metric that SpiceDB uses to keep track
+ // of the number of downstream dispatches that are performed to answer a
+ // single query.
+ DispatchedCountHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
+ Namespace: "spicedb",
+ Subsystem: "services",
+ Name: "dispatches",
+ Help: "Histogram of cluster dispatches performed by the instance.",
+ Buckets: []float64{1, 5, 10, 25, 50, 100, 250},
+ }, DispatchedCountLabels)
+)
+
+type reporter struct{}
+
+func (r *reporter) ServerReporter(ctx context.Context, callMeta interceptors.CallMeta) (interceptors.Reporter, context.Context) {
+ _, methodName := grpcutil.SplitMethodName(callMeta.FullMethod())
+ ctx = ContextWithHandle(ctx)
+ return &serverReporter{ctx: ctx, methodName: methodName}, ctx
+}
+
+type serverReporter struct {
+ interceptors.NoopReporter
+ ctx context.Context
+ methodName string
+}
+
+func (r *serverReporter) PostCall(_ error, _ time.Duration) {
+ responseMeta := FromContext(r.ctx)
+ if responseMeta == nil {
+ responseMeta = &dispatch.ResponseMeta{}
+ }
+
+ err := annotateAndReportForMetadata(r.ctx, r.methodName, responseMeta)
+ // if context is cancelled, the stream will be closed, and gRPC will return ErrIllegalHeaderWrite
+ // this prevents logging unnecessary error messages
+ if r.ctx.Err() != nil {
+ return
+ }
+ if err != nil {
+ log.Ctx(r.ctx).Warn().Err(err).Msg("usagemetrics: could not report metadata")
+ }
+}
+
+// UnaryServerInterceptor implements a gRPC Middleware for reporting usage metrics
+// in both the trailer of the request, as well as to the registered prometheus
+// metrics.
+func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
+ return interceptors.UnaryServerInterceptor(&reporter{})
+}
+
+// StreamServerInterceptor implements a gRPC Middleware for reporting usage metrics
+// in both the trailer of the request, as well as to the registered prometheus
+// metrics
+func StreamServerInterceptor() grpc.StreamServerInterceptor {
+ return interceptors.StreamServerInterceptor(&reporter{})
+}
+
+func annotateAndReportForMetadata(ctx context.Context, methodName string, metadata *dispatch.ResponseMeta) error {
+ DispatchedCountHistogram.WithLabelValues(methodName, "false").Observe(float64(metadata.DispatchCount))
+ DispatchedCountHistogram.WithLabelValues(methodName, "true").Observe(float64(metadata.CachedDispatchCount))
+
+ return responsemeta.SetResponseTrailerMetadata(ctx, map[responsemeta.ResponseMetadataTrailerKey]string{
+ responsemeta.DispatchedOperationsCount: strconv.Itoa(int(metadata.DispatchCount)),
+ responsemeta.CachedOperationsCount: strconv.Itoa(int(metadata.CachedDispatchCount)),
+ })
+}
+
+// Create a new type to prevent context collisions
+type responseMetaKey string
+
+var metadataCtxKey responseMetaKey = "dispatched-response-meta"
+
+type metaHandle struct{ metadata *dispatch.ResponseMeta }
+
+// SetInContext should be called in a gRPC handler to correctly set the response metadata
+// for the dispatched request.
+func SetInContext(ctx context.Context, metadata *dispatch.ResponseMeta) {
+ possibleHandle := ctx.Value(metadataCtxKey)
+ if possibleHandle == nil {
+ return
+ }
+
+ handle := possibleHandle.(*metaHandle)
+ handle.metadata = metadata
+}
+
+// FromContext returns any metadata that was stored in the context.
+//
+// This is useful for testing that a handler is properly setting the context.
+func FromContext(ctx context.Context) *dispatch.ResponseMeta {
+ possibleHandle := ctx.Value(metadataCtxKey)
+ if possibleHandle == nil {
+ return nil
+ }
+ return possibleHandle.(*metaHandle).metadata
+}
+
+// ContextWithHandle creates a new context with a location to store metadata
+// returned from a dispatched request.
+//
+// This should only be called in middleware or testing functions.
+func ContextWithHandle(ctx context.Context) context.Context {
+ var handle metaHandle
+ return context.WithValue(ctx, metadataCtxKey, &handle)
+}