diff options
Diffstat (limited to 'vendor/github.com/authzed/spicedb/internal/dispatch')
7 files changed, 932 insertions, 0 deletions
diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/dispatch.go b/vendor/github.com/authzed/spicedb/internal/dispatch/dispatch.go new file mode 100644 index 0000000..95a231a --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/dispatch/dispatch.go @@ -0,0 +1,98 @@ +package dispatch + +import ( + "context" + "fmt" + + "github.com/rs/zerolog" + + log "github.com/authzed/spicedb/internal/logging" + v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1" +) + +// ReadyState represents the ready state of the dispatcher. +type ReadyState struct { + // Message is a human-readable status message for the current state. + Message string + + // IsReady indicates whether the datastore is ready. + IsReady bool +} + +// Dispatcher interface describes a method for passing subchecks off to additional machines. +type Dispatcher interface { + Check + Expand + LookupSubjects + LookupResources2 + + // Close closes the dispatcher. + Close() error + + // ReadyState returns true when dispatcher is able to respond to requests + ReadyState() ReadyState +} + +// Check interface describes just the methods required to dispatch check requests. +type Check interface { + // DispatchCheck submits a single check request and returns its result. + DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error) +} + +// Expand interface describes just the methods required to dispatch expand requests. +type Expand interface { + // DispatchExpand submits a single expand request and returns its result. + // If an error is returned, DispatchExpandResponse will still contain Metadata. + DispatchExpand(ctx context.Context, req *v1.DispatchExpandRequest) (*v1.DispatchExpandResponse, error) +} + +type LookupResources2Stream = Stream[*v1.DispatchLookupResources2Response] + +type LookupResources2 interface { + DispatchLookupResources2( + req *v1.DispatchLookupResources2Request, + stream LookupResources2Stream, + ) error +} + +// LookupSubjectsStream is an alias for the stream to which found subjects will be written. +type LookupSubjectsStream = Stream[*v1.DispatchLookupSubjectsResponse] + +// LookupSubjects interface describes just the methods required to dispatch lookup subjects requests. +type LookupSubjects interface { + // DispatchLookupSubjects submits a single lookup subjects request, writing its results to the specified stream. + DispatchLookupSubjects( + req *v1.DispatchLookupSubjectsRequest, + stream LookupSubjectsStream, + ) error +} + +// DispatchableRequest is an interface for requests. +type DispatchableRequest interface { + zerolog.LogObjectMarshaler + + GetMetadata() *v1.ResolverMeta +} + +// CheckDepth returns ErrMaxDepth if there is insufficient depth remaining to dispatch. +func CheckDepth(ctx context.Context, req DispatchableRequest) error { + metadata := req.GetMetadata() + if metadata == nil { + log.Ctx(ctx).Warn().Object("request", req).Msg("request missing metadata") + return fmt.Errorf("request missing metadata") + } + + if metadata.DepthRemaining == 0 { + return NewMaxDepthExceededError(req) + } + + return nil +} + +// AddResponseMetadata adds the metadata found in the incoming metadata to the existing +// metadata, *modifying it in place*. +func AddResponseMetadata(existing *v1.ResponseMeta, incoming *v1.ResponseMeta) { + existing.DispatchCount += incoming.DispatchCount + existing.CachedDispatchCount += incoming.CachedDispatchCount + existing.DepthRequired = max(existing.DepthRequired, incoming.DepthRequired) +} diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/doc.go b/vendor/github.com/authzed/spicedb/internal/dispatch/doc.go new file mode 100644 index 0000000..8b88bb0 --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/dispatch/doc.go @@ -0,0 +1,2 @@ +// Package dispatch contains logic to dispatch requests locally or to other nodes. +package dispatch diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/errors.go b/vendor/github.com/authzed/spicedb/internal/dispatch/errors.go new file mode 100644 index 0000000..17cec3f --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/dispatch/errors.go @@ -0,0 +1,39 @@ +package dispatch + +import ( + "fmt" + + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/authzed/spicedb/pkg/spiceerrors" +) + +// MaxDepthExceededError is an error returned when the maximum depth for dispatching has been exceeded. +type MaxDepthExceededError struct { + error + + // Request is the request that exceeded the maximum depth. + Request DispatchableRequest +} + +// NewMaxDepthExceededError creates a new MaxDepthExceededError. +func NewMaxDepthExceededError(req DispatchableRequest) error { + return MaxDepthExceededError{ + fmt.Errorf("max depth exceeded: this usually indicates a recursive or too deep data dependency. See: https://spicedb.dev/d/debug-max-depth"), + req, + } +} + +// GRPCStatus implements retrieving the gRPC status for the error. +func (err MaxDepthExceededError) GRPCStatus() *status.Status { + return spiceerrors.WithCodeAndDetails( + err, + codes.ResourceExhausted, + spiceerrors.ForReason( + v1.ErrorReason_ERROR_REASON_MAXIMUM_DEPTH_EXCEEDED, + map[string]string{}, + ), + ) +} diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/graph/errors.go b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/errors.go new file mode 100644 index 0000000..ecaf59a --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/errors.go @@ -0,0 +1,77 @@ +package graph + +import ( + "fmt" + + "github.com/rs/zerolog" +) + +// NamespaceNotFoundError occurs when a namespace was not found. +type NamespaceNotFoundError struct { + error + namespaceName string +} + +// NotFoundNamespaceName returns the name of the namespace that was not found. +func (err NamespaceNotFoundError) NotFoundNamespaceName() string { + return err.namespaceName +} + +// MarshalZerologObject implements zerolog.LogObjectMarshaler +func (err NamespaceNotFoundError) MarshalZerologObject(e *zerolog.Event) { + e.Err(err.error).Str("namespace", err.namespaceName) +} + +// DetailsMetadata returns the metadata for details for this error. +func (err NamespaceNotFoundError) DetailsMetadata() map[string]string { + return map[string]string{ + "definition_name": err.namespaceName, + } +} + +// NewNamespaceNotFoundErr constructs a new namespace not found error. +func NewNamespaceNotFoundErr(nsName string) error { + return NamespaceNotFoundError{ + error: fmt.Errorf("object definition `%s` not found", nsName), + namespaceName: nsName, + } +} + +// RelationNotFoundError occurs when a relation was not found under a namespace. +type RelationNotFoundError struct { + error + namespaceName string + relationName string +} + +// NamespaceName returns the name of the namespace in which the relation was not found. +func (err RelationNotFoundError) NamespaceName() string { + return err.namespaceName +} + +// NotFoundRelationName returns the name of the relation not found. +func (err RelationNotFoundError) NotFoundRelationName() string { + return err.relationName +} + +// MarshalZerologObject implements zerolog.LogObjectMarshaler +func (err RelationNotFoundError) MarshalZerologObject(e *zerolog.Event) { + e.Err(err.error).Str("namespace", err.namespaceName).Str("relation", err.relationName) +} + +// DetailsMetadata returns the metadata for details for this error. +func (err RelationNotFoundError) DetailsMetadata() map[string]string { + return map[string]string{ + "definition_name": err.namespaceName, + "relation_or_permission_name": err.relationName, + } +} + +// NewRelationNotFoundErr constructs a new relation not found error. +func NewRelationNotFoundErr(nsName string, relationName string) error { + return RelationNotFoundError{ + error: fmt.Errorf("relation/permission `%s` not found under definition `%s`", relationName, nsName), + namespaceName: nsName, + relationName: relationName, + } +} diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/graph/graph.go b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/graph.go new file mode 100644 index 0000000..232b1e7 --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/graph.go @@ -0,0 +1,437 @@ +package graph + +import ( + "context" + "errors" + "fmt" + + "github.com/rs/zerolog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/authzed/spicedb/internal/dispatch" + "github.com/authzed/spicedb/internal/graph" + log "github.com/authzed/spicedb/internal/logging" + datastoremw "github.com/authzed/spicedb/internal/middleware/datastore" + caveattypes "github.com/authzed/spicedb/pkg/caveats/types" + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/middleware/nodeid" + core "github.com/authzed/spicedb/pkg/proto/core/v1" + v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1" + "github.com/authzed/spicedb/pkg/tuple" +) + +const errDispatch = "error dispatching request: %w" + +var tracer = otel.Tracer("spicedb/internal/dispatch/local") + +// ConcurrencyLimits defines per-dispatch-type concurrency limits. +// +//go:generate go run github.com/ecordell/optgen -output zz_generated.options.go . ConcurrencyLimits +type ConcurrencyLimits struct { + Check uint16 `debugmap:"visible"` + ReachableResources uint16 `debugmap:"visible"` + LookupResources uint16 `debugmap:"visible"` + LookupSubjects uint16 `debugmap:"visible"` +} + +const defaultConcurrencyLimit = 50 + +// WithOverallDefaultLimit sets the overall default limit for any unspecified limits +// and returns a new struct. +func (cl ConcurrencyLimits) WithOverallDefaultLimit(overallDefaultLimit uint16) ConcurrencyLimits { + return limitsOrDefaults(cl, overallDefaultLimit) +} + +func (cl ConcurrencyLimits) MarshalZerologObject(e *zerolog.Event) { + e.Uint16("concurrency-limit-check-permission", cl.Check) + e.Uint16("concurrency-limit-lookup-resources", cl.LookupResources) + e.Uint16("concurrency-limit-lookup-subjects", cl.LookupSubjects) + e.Uint16("concurrency-limit-reachable-resources", cl.ReachableResources) +} + +func limitsOrDefaults(limits ConcurrencyLimits, overallDefaultLimit uint16) ConcurrencyLimits { + limits.Check = limitOrDefault(limits.Check, overallDefaultLimit) + limits.LookupResources = limitOrDefault(limits.LookupResources, overallDefaultLimit) + limits.LookupSubjects = limitOrDefault(limits.LookupSubjects, overallDefaultLimit) + limits.ReachableResources = limitOrDefault(limits.ReachableResources, overallDefaultLimit) + return limits +} + +func limitOrDefault(limit uint16, defaultLimit uint16) uint16 { + if limit <= 0 { + return defaultLimit + } + return limit +} + +// SharedConcurrencyLimits returns a ConcurrencyLimits struct with the limit +// set to that provided for each operation. +func SharedConcurrencyLimits(concurrencyLimit uint16) ConcurrencyLimits { + return ConcurrencyLimits{ + Check: concurrencyLimit, + ReachableResources: concurrencyLimit, + LookupResources: concurrencyLimit, + LookupSubjects: concurrencyLimit, + } +} + +// NewLocalOnlyDispatcher creates a dispatcher that consults with the graph to formulate a response. +func NewLocalOnlyDispatcher(typeSet *caveattypes.TypeSet, concurrencyLimit uint16, dispatchChunkSize uint16) dispatch.Dispatcher { + return NewLocalOnlyDispatcherWithLimits(typeSet, SharedConcurrencyLimits(concurrencyLimit), dispatchChunkSize) +} + +// NewLocalOnlyDispatcherWithLimits creates a dispatcher thatg consults with the graph to formulate a response +// and has the defined concurrency limits per dispatch type. +func NewLocalOnlyDispatcherWithLimits(typeSet *caveattypes.TypeSet, concurrencyLimits ConcurrencyLimits, dispatchChunkSize uint16) dispatch.Dispatcher { + d := &localDispatcher{} + + concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit) + chunkSize := dispatchChunkSize + if chunkSize == 0 { + chunkSize = 100 + log.Warn().Msgf("LocalOnlyDispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize) + } + + d.checker = graph.NewConcurrentChecker(d, concurrencyLimits.Check, chunkSize) + d.expander = graph.NewConcurrentExpander(d) + d.lookupSubjectsHandler = graph.NewConcurrentLookupSubjects(d, concurrencyLimits.LookupSubjects, chunkSize) + d.lookupResourcesHandler2 = graph.NewCursoredLookupResources2(d, d, typeSet, concurrencyLimits.LookupResources, chunkSize) + + return d +} + +// NewDispatcher creates a dispatcher that consults with the graph and redispatches subproblems to +// the provided redispatcher. +func NewDispatcher(redispatcher dispatch.Dispatcher, typeSet *caveattypes.TypeSet, concurrencyLimits ConcurrencyLimits, dispatchChunkSize uint16) dispatch.Dispatcher { + concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit) + chunkSize := dispatchChunkSize + if chunkSize == 0 { + chunkSize = 100 + log.Warn().Msgf("Dispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize) + } + + checker := graph.NewConcurrentChecker(redispatcher, concurrencyLimits.Check, chunkSize) + expander := graph.NewConcurrentExpander(redispatcher) + lookupSubjectsHandler := graph.NewConcurrentLookupSubjects(redispatcher, concurrencyLimits.LookupSubjects, chunkSize) + lookupResourcesHandler2 := graph.NewCursoredLookupResources2(redispatcher, redispatcher, typeSet, concurrencyLimits.LookupResources, chunkSize) + + return &localDispatcher{ + checker: checker, + expander: expander, + lookupSubjectsHandler: lookupSubjectsHandler, + lookupResourcesHandler2: lookupResourcesHandler2, + } +} + +type localDispatcher struct { + checker *graph.ConcurrentChecker + expander *graph.ConcurrentExpander + lookupSubjectsHandler *graph.ConcurrentLookupSubjects + lookupResourcesHandler2 *graph.CursoredLookupResources2 +} + +func (ld *localDispatcher) loadNamespace(ctx context.Context, nsName string, revision datastore.Revision) (*core.NamespaceDefinition, error) { + ds := datastoremw.MustFromContext(ctx).SnapshotReader(revision) + + // Load namespace and relation from the datastore + ns, _, err := ds.ReadNamespaceByName(ctx, nsName) + if err != nil { + return nil, rewriteNamespaceError(err) + } + + return ns, err +} + +func (ld *localDispatcher) parseRevision(ctx context.Context, s string) (datastore.Revision, error) { + ds := datastoremw.MustFromContext(ctx) + return ds.RevisionFromString(s) +} + +func (ld *localDispatcher) lookupRelation(_ context.Context, ns *core.NamespaceDefinition, relationName string) (*core.Relation, error) { + var relation *core.Relation + for _, candidate := range ns.Relation { + if candidate.Name == relationName { + relation = candidate + break + } + } + + if relation == nil { + return nil, NewRelationNotFoundErr(ns.Name, relationName) + } + + return relation, nil +} + +// DispatchCheck implements dispatch.Check interface +func (ld *localDispatcher) DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error) { + resourceType := tuple.StringCoreRR(req.ResourceRelation) + spanName := "DispatchCheck → " + resourceType + "@" + req.Subject.Namespace + "#" + req.Subject.Relation + + nodeID, err := nodeid.FromContext(ctx) + if err != nil { + log.Err(err).Msg("failed to get node ID") + } + + ctx, span := tracer.Start(ctx, spanName, trace.WithAttributes( + attribute.String("resource-type", resourceType), + attribute.StringSlice("resource-ids", req.ResourceIds), + attribute.String("subject", tuple.StringCoreONR(req.Subject)), + attribute.String("node-id", nodeID), + )) + defer span.End() + + if err := dispatch.CheckDepth(ctx, req); err != nil { + if req.Debug != v1.DispatchCheckRequest_ENABLE_BASIC_DEBUGGING { + return &v1.DispatchCheckResponse{ + Metadata: &v1.ResponseMeta{ + DispatchCount: 0, + }, + }, rewriteError(ctx, err) + } + + // NOTE: we return debug information here to ensure tooling can see the cycle. + nodeID, nerr := nodeid.FromContext(ctx) + if nerr != nil { + log.Err(nerr).Msg("failed to get nodeID from context") + } + + return &v1.DispatchCheckResponse{ + Metadata: &v1.ResponseMeta{ + DispatchCount: 0, + DebugInfo: &v1.DebugInformation{ + Check: &v1.CheckDebugTrace{ + Request: req, + SourceId: nodeID, + }, + }, + }, + }, rewriteError(ctx, err) + } + + revision, err := ld.parseRevision(ctx, req.Metadata.AtRevision) + if err != nil { + return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, rewriteError(ctx, err) + } + + ns, err := ld.loadNamespace(ctx, req.ResourceRelation.Namespace, revision) + if err != nil { + return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, rewriteError(ctx, err) + } + + relation, err := ld.lookupRelation(ctx, ns, req.ResourceRelation.Relation) + if err != nil { + return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, rewriteError(ctx, err) + } + + // If the relation is aliasing another one and the subject does not have the same type as + // resource, load the aliased relation and dispatch to it. We cannot use the alias if the + // resource and subject types are the same because a check on the *exact same* resource and + // subject must pass, and we don't know how many intermediate steps may hit that case. + if relation.AliasingRelation != "" && req.ResourceRelation.Namespace != req.Subject.Namespace { + relation, err := ld.lookupRelation(ctx, ns, relation.AliasingRelation) + if err != nil { + return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, rewriteError(ctx, err) + } + + // Rewrite the request over the aliased relation. + validatedReq := graph.ValidatedCheckRequest{ + DispatchCheckRequest: &v1.DispatchCheckRequest{ + ResourceRelation: &core.RelationReference{ + Namespace: req.ResourceRelation.Namespace, + Relation: relation.Name, + }, + ResourceIds: req.ResourceIds, + Subject: req.Subject, + Metadata: req.Metadata, + Debug: req.Debug, + CheckHints: req.CheckHints, + }, + Revision: revision, + OriginalRelationName: req.ResourceRelation.Relation, + } + + resp, err := ld.checker.Check(ctx, validatedReq, relation) + return resp, rewriteError(ctx, err) + } + + resp, err := ld.checker.Check(ctx, graph.ValidatedCheckRequest{ + DispatchCheckRequest: req, + Revision: revision, + }, relation) + return resp, rewriteError(ctx, err) +} + +// DispatchExpand implements dispatch.Expand interface +func (ld *localDispatcher) DispatchExpand(ctx context.Context, req *v1.DispatchExpandRequest) (*v1.DispatchExpandResponse, error) { + nodeID, err := nodeid.FromContext(ctx) + if err != nil { + log.Err(err).Msg("failed to get node ID") + } + + ctx, span := tracer.Start(ctx, "DispatchExpand", trace.WithAttributes( + attribute.String("start", tuple.StringCoreONR(req.ResourceAndRelation)), + attribute.String("node-id", nodeID), + )) + defer span.End() + + if err := dispatch.CheckDepth(ctx, req); err != nil { + return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err + } + + revision, err := ld.parseRevision(ctx, req.Metadata.AtRevision) + if err != nil { + return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err + } + + ns, err := ld.loadNamespace(ctx, req.ResourceAndRelation.Namespace, revision) + if err != nil { + return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err + } + + relation, err := ld.lookupRelation(ctx, ns, req.ResourceAndRelation.Relation) + if err != nil { + return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err + } + + return ld.expander.Expand(ctx, graph.ValidatedExpandRequest{ + DispatchExpandRequest: req, + Revision: revision, + }, relation) +} + +func (ld *localDispatcher) DispatchLookupResources2( + req *v1.DispatchLookupResources2Request, + stream dispatch.LookupResources2Stream, +) error { + nodeID, err := nodeid.FromContext(stream.Context()) + if err != nil { + log.Err(err).Msg("failed to get node ID") + } + + ctx, span := tracer.Start(stream.Context(), "DispatchLookupResources2", trace.WithAttributes( + attribute.String("resource-type", tuple.StringCoreRR(req.ResourceRelation)), + attribute.String("subject-type", tuple.StringCoreRR(req.SubjectRelation)), + attribute.StringSlice("subject-ids", req.SubjectIds), + attribute.String("terminal-subject", tuple.StringCoreONR(req.TerminalSubject)), + attribute.String("node-id", nodeID), + )) + defer span.End() + + if err := dispatch.CheckDepth(ctx, req); err != nil { + return err + } + + revision, err := ld.parseRevision(ctx, req.Metadata.AtRevision) + if err != nil { + return err + } + + return ld.lookupResourcesHandler2.LookupResources2( + graph.ValidatedLookupResources2Request{ + DispatchLookupResources2Request: req, + Revision: revision, + }, + dispatch.StreamWithContext(ctx, stream), + ) +} + +// DispatchLookupSubjects implements dispatch.LookupSubjects interface +func (ld *localDispatcher) DispatchLookupSubjects( + req *v1.DispatchLookupSubjectsRequest, + stream dispatch.LookupSubjectsStream, +) error { + nodeID, err := nodeid.FromContext(stream.Context()) + if err != nil { + log.Err(err).Msg("failed to get node ID") + } + + resourceType := tuple.StringCoreRR(req.ResourceRelation) + subjectRelation := tuple.StringCoreRR(req.SubjectRelation) + spanName := "DispatchLookupSubjects → " + resourceType + "@" + subjectRelation + + ctx, span := tracer.Start(stream.Context(), spanName, trace.WithAttributes( + attribute.String("resource-type", resourceType), + attribute.String("subject-type", subjectRelation), + attribute.StringSlice("resource-ids", req.ResourceIds), + attribute.String("node-id", nodeID), + )) + defer span.End() + + if err := dispatch.CheckDepth(ctx, req); err != nil { + return err + } + + revision, err := ld.parseRevision(ctx, req.Metadata.AtRevision) + if err != nil { + return err + } + + return ld.lookupSubjectsHandler.LookupSubjects( + graph.ValidatedLookupSubjectsRequest{ + DispatchLookupSubjectsRequest: req, + Revision: revision, + }, + dispatch.StreamWithContext(ctx, stream), + ) +} + +func (ld *localDispatcher) Close() error { + return nil +} + +func (ld *localDispatcher) ReadyState() dispatch.ReadyState { + return dispatch.ReadyState{IsReady: true} +} + +func rewriteNamespaceError(original error) error { + nsNotFound := datastore.NamespaceNotFoundError{} + + switch { + case errors.As(original, &nsNotFound): + return NewNamespaceNotFoundErr(nsNotFound.NotFoundNamespaceName()) + case errors.As(original, &NamespaceNotFoundError{}): + fallthrough + case errors.As(original, &RelationNotFoundError{}): + return original + default: + return fmt.Errorf(errDispatch, original) + } +} + +// rewriteError transforms graph errors into a gRPC Status +func rewriteError(ctx context.Context, err error) error { + if err == nil { + return nil + } + + // Check if the error can be directly used. + if st, ok := status.FromError(err); ok { + return st.Err() + } + + switch { + case errors.Is(err, context.DeadlineExceeded): + return status.Errorf(codes.DeadlineExceeded, "%s", err) + case errors.Is(err, context.Canceled): + err := context.Cause(ctx) + if err != nil { + if _, ok := status.FromError(err); ok { + return err + } + } + + return status.Errorf(codes.Canceled, "%s", err) + default: + log.Ctx(ctx).Err(err).Msg("received unexpected graph error") + return err + } +} + +var emptyMetadata = &v1.ResponseMeta{ + DispatchCount: 0, +} diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/graph/zz_generated.options.go b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/zz_generated.options.go new file mode 100644 index 0000000..9a0a7fc --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/zz_generated.options.go @@ -0,0 +1,92 @@ +// Code generated by github.com/ecordell/optgen. DO NOT EDIT. +package graph + +import ( + defaults "github.com/creasty/defaults" + helpers "github.com/ecordell/optgen/helpers" +) + +type ConcurrencyLimitsOption func(c *ConcurrencyLimits) + +// NewConcurrencyLimitsWithOptions creates a new ConcurrencyLimits with the passed in options set +func NewConcurrencyLimitsWithOptions(opts ...ConcurrencyLimitsOption) *ConcurrencyLimits { + c := &ConcurrencyLimits{} + for _, o := range opts { + o(c) + } + return c +} + +// NewConcurrencyLimitsWithOptionsAndDefaults creates a new ConcurrencyLimits with the passed in options set starting from the defaults +func NewConcurrencyLimitsWithOptionsAndDefaults(opts ...ConcurrencyLimitsOption) *ConcurrencyLimits { + c := &ConcurrencyLimits{} + defaults.MustSet(c) + for _, o := range opts { + o(c) + } + return c +} + +// ToOption returns a new ConcurrencyLimitsOption that sets the values from the passed in ConcurrencyLimits +func (c *ConcurrencyLimits) ToOption() ConcurrencyLimitsOption { + return func(to *ConcurrencyLimits) { + to.Check = c.Check + to.ReachableResources = c.ReachableResources + to.LookupResources = c.LookupResources + to.LookupSubjects = c.LookupSubjects + } +} + +// DebugMap returns a map form of ConcurrencyLimits for debugging +func (c ConcurrencyLimits) DebugMap() map[string]any { + debugMap := map[string]any{} + debugMap["Check"] = helpers.DebugValue(c.Check, false) + debugMap["ReachableResources"] = helpers.DebugValue(c.ReachableResources, false) + debugMap["LookupResources"] = helpers.DebugValue(c.LookupResources, false) + debugMap["LookupSubjects"] = helpers.DebugValue(c.LookupSubjects, false) + return debugMap +} + +// ConcurrencyLimitsWithOptions configures an existing ConcurrencyLimits with the passed in options set +func ConcurrencyLimitsWithOptions(c *ConcurrencyLimits, opts ...ConcurrencyLimitsOption) *ConcurrencyLimits { + for _, o := range opts { + o(c) + } + return c +} + +// WithOptions configures the receiver ConcurrencyLimits with the passed in options set +func (c *ConcurrencyLimits) WithOptions(opts ...ConcurrencyLimitsOption) *ConcurrencyLimits { + for _, o := range opts { + o(c) + } + return c +} + +// WithCheck returns an option that can set Check on a ConcurrencyLimits +func WithCheck(check uint16) ConcurrencyLimitsOption { + return func(c *ConcurrencyLimits) { + c.Check = check + } +} + +// WithReachableResources returns an option that can set ReachableResources on a ConcurrencyLimits +func WithReachableResources(reachableResources uint16) ConcurrencyLimitsOption { + return func(c *ConcurrencyLimits) { + c.ReachableResources = reachableResources + } +} + +// WithLookupResources returns an option that can set LookupResources on a ConcurrencyLimits +func WithLookupResources(lookupResources uint16) ConcurrencyLimitsOption { + return func(c *ConcurrencyLimits) { + c.LookupResources = lookupResources + } +} + +// WithLookupSubjects returns an option that can set LookupSubjects on a ConcurrencyLimits +func WithLookupSubjects(lookupSubjects uint16) ConcurrencyLimitsOption { + return func(c *ConcurrencyLimits) { + c.LookupSubjects = lookupSubjects + } +} diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/stream.go b/vendor/github.com/authzed/spicedb/internal/dispatch/stream.go new file mode 100644 index 0000000..1d6636c --- /dev/null +++ b/vendor/github.com/authzed/spicedb/internal/dispatch/stream.go @@ -0,0 +1,187 @@ +package dispatch + +import ( + "context" + "sync" + "sync/atomic" + + grpc "google.golang.org/grpc" +) + +// Stream defines the interface generically matching a streaming dispatch response. +type Stream[T any] interface { + // Publish publishes the result to the stream. + Publish(T) error + + // Context returns the context for the stream. + Context() context.Context +} + +type grpcStream[T any] interface { + grpc.ServerStream + Send(T) error +} + +// WrapGRPCStream wraps a gRPC result stream with a concurrent-safe dispatch stream. This is +// necessary because gRPC response streams are *not concurrent safe*. +// See: https://groups.google.com/g/grpc-io/c/aI6L6M4fzQ0?pli=1 +func WrapGRPCStream[R any, S grpcStream[R]](grpcStream S) Stream[R] { + return &concurrentSafeStream[R]{ + grpcStream: grpcStream, + mu: sync.Mutex{}, + } +} + +type concurrentSafeStream[T any] struct { + grpcStream grpcStream[T] // GUARDED_BY(mu) + mu sync.Mutex +} + +func (s *concurrentSafeStream[T]) Context() context.Context { + return s.grpcStream.Context() +} + +func (s *concurrentSafeStream[T]) Publish(result T) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.grpcStream.Send(result) +} + +// NewCollectingDispatchStream creates a new CollectingDispatchStream. +func NewCollectingDispatchStream[T any](ctx context.Context) *CollectingDispatchStream[T] { + return &CollectingDispatchStream[T]{ + ctx: ctx, + results: nil, + mu: sync.Mutex{}, + } +} + +// CollectingDispatchStream is a dispatch stream that collects results in memory. +type CollectingDispatchStream[T any] struct { + ctx context.Context + results []T // GUARDED_BY(mu) + mu sync.Mutex +} + +func (s *CollectingDispatchStream[T]) Context() context.Context { + return s.ctx +} + +func (s *CollectingDispatchStream[T]) Results() []T { + return s.results +} + +func (s *CollectingDispatchStream[T]) Publish(result T) error { + s.mu.Lock() + defer s.mu.Unlock() + s.results = append(s.results, result) + return nil +} + +// WrappedDispatchStream is a dispatch stream that wraps another dispatch stream, and performs +// an operation on each result before puppeting back up to the parent stream. +type WrappedDispatchStream[T any] struct { + Stream Stream[T] + Ctx context.Context + Processor func(result T) (T, bool, error) +} + +func (s *WrappedDispatchStream[T]) Publish(result T) error { + if s.Processor == nil { + return s.Stream.Publish(result) + } + + processed, ok, err := s.Processor(result) + if err != nil { + return err + } + if !ok { + return nil + } + + return s.Stream.Publish(processed) +} + +func (s *WrappedDispatchStream[T]) Context() context.Context { + return s.Ctx +} + +// StreamWithContext returns the given dispatch stream, wrapped to return the given context. +func StreamWithContext[T any](context context.Context, stream Stream[T]) Stream[T] { + return &WrappedDispatchStream[T]{ + Stream: stream, + Ctx: context, + Processor: nil, + } +} + +// HandlingDispatchStream is a dispatch stream that executes a handler for each item published. +// It uses an internal mutex to ensure it is thread safe. +type HandlingDispatchStream[T any] struct { + ctx context.Context + processor func(result T) error // GUARDED_BY(mu) + mu sync.Mutex +} + +// NewHandlingDispatchStream returns a new handling dispatch stream. +func NewHandlingDispatchStream[T any](ctx context.Context, processor func(result T) error) Stream[T] { + return &HandlingDispatchStream[T]{ + ctx: ctx, + processor: processor, + mu: sync.Mutex{}, + } +} + +func (s *HandlingDispatchStream[T]) Publish(result T) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.processor == nil { + return nil + } + + return s.processor(result) +} + +func (s *HandlingDispatchStream[T]) Context() context.Context { + return s.ctx +} + +// CountingDispatchStream is a dispatch stream that counts the number of items published. +// It uses an internal atomic int to ensure it is thread safe. +type CountingDispatchStream[T any] struct { + Stream Stream[T] + count *atomic.Uint64 +} + +func NewCountingDispatchStream[T any](wrapped Stream[T]) *CountingDispatchStream[T] { + return &CountingDispatchStream[T]{ + Stream: wrapped, + count: &atomic.Uint64{}, + } +} + +func (s *CountingDispatchStream[T]) PublishedCount() uint64 { + return s.count.Load() +} + +func (s *CountingDispatchStream[T]) Publish(result T) error { + err := s.Stream.Publish(result) + if err != nil { + return err + } + + s.count.Add(1) + return nil +} + +func (s *CountingDispatchStream[T]) Context() context.Context { + return s.Stream.Context() +} + +// Ensure the streams implement the interface. +var ( + _ Stream[any] = &CollectingDispatchStream[any]{} + _ Stream[any] = &WrappedDispatchStream[any]{} + _ Stream[any] = &CountingDispatchStream[any]{} +) |
