summaryrefslogtreecommitdiff
path: root/vendor/github.com/authzed/spicedb/internal/dispatch
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/authzed/spicedb/internal/dispatch')
-rw-r--r--vendor/github.com/authzed/spicedb/internal/dispatch/dispatch.go98
-rw-r--r--vendor/github.com/authzed/spicedb/internal/dispatch/doc.go2
-rw-r--r--vendor/github.com/authzed/spicedb/internal/dispatch/errors.go39
-rw-r--r--vendor/github.com/authzed/spicedb/internal/dispatch/graph/errors.go77
-rw-r--r--vendor/github.com/authzed/spicedb/internal/dispatch/graph/graph.go437
-rw-r--r--vendor/github.com/authzed/spicedb/internal/dispatch/graph/zz_generated.options.go92
-rw-r--r--vendor/github.com/authzed/spicedb/internal/dispatch/stream.go187
7 files changed, 932 insertions, 0 deletions
diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/dispatch.go b/vendor/github.com/authzed/spicedb/internal/dispatch/dispatch.go
new file mode 100644
index 0000000..95a231a
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/dispatch/dispatch.go
@@ -0,0 +1,98 @@
+package dispatch
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/rs/zerolog"
+
+ log "github.com/authzed/spicedb/internal/logging"
+ v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
+)
+
+// ReadyState represents the ready state of the dispatcher.
+type ReadyState struct {
+ // Message is a human-readable status message for the current state.
+ Message string
+
+ // IsReady indicates whether the datastore is ready.
+ IsReady bool
+}
+
+// Dispatcher interface describes a method for passing subchecks off to additional machines.
+type Dispatcher interface {
+ Check
+ Expand
+ LookupSubjects
+ LookupResources2
+
+ // Close closes the dispatcher.
+ Close() error
+
+ // ReadyState returns true when dispatcher is able to respond to requests
+ ReadyState() ReadyState
+}
+
+// Check interface describes just the methods required to dispatch check requests.
+type Check interface {
+ // DispatchCheck submits a single check request and returns its result.
+ DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error)
+}
+
+// Expand interface describes just the methods required to dispatch expand requests.
+type Expand interface {
+ // DispatchExpand submits a single expand request and returns its result.
+ // If an error is returned, DispatchExpandResponse will still contain Metadata.
+ DispatchExpand(ctx context.Context, req *v1.DispatchExpandRequest) (*v1.DispatchExpandResponse, error)
+}
+
+type LookupResources2Stream = Stream[*v1.DispatchLookupResources2Response]
+
+type LookupResources2 interface {
+ DispatchLookupResources2(
+ req *v1.DispatchLookupResources2Request,
+ stream LookupResources2Stream,
+ ) error
+}
+
+// LookupSubjectsStream is an alias for the stream to which found subjects will be written.
+type LookupSubjectsStream = Stream[*v1.DispatchLookupSubjectsResponse]
+
+// LookupSubjects interface describes just the methods required to dispatch lookup subjects requests.
+type LookupSubjects interface {
+ // DispatchLookupSubjects submits a single lookup subjects request, writing its results to the specified stream.
+ DispatchLookupSubjects(
+ req *v1.DispatchLookupSubjectsRequest,
+ stream LookupSubjectsStream,
+ ) error
+}
+
+// DispatchableRequest is an interface for requests.
+type DispatchableRequest interface {
+ zerolog.LogObjectMarshaler
+
+ GetMetadata() *v1.ResolverMeta
+}
+
+// CheckDepth returns ErrMaxDepth if there is insufficient depth remaining to dispatch.
+func CheckDepth(ctx context.Context, req DispatchableRequest) error {
+ metadata := req.GetMetadata()
+ if metadata == nil {
+ log.Ctx(ctx).Warn().Object("request", req).Msg("request missing metadata")
+ return fmt.Errorf("request missing metadata")
+ }
+
+ if metadata.DepthRemaining == 0 {
+ return NewMaxDepthExceededError(req)
+ }
+
+ return nil
+}
+
+// AddResponseMetadata adds the metadata found in the incoming metadata to the existing
+// metadata, *modifying it in place*.
+func AddResponseMetadata(existing *v1.ResponseMeta, incoming *v1.ResponseMeta) {
+ existing.DispatchCount += incoming.DispatchCount
+ existing.CachedDispatchCount += incoming.CachedDispatchCount
+ existing.DepthRequired = max(existing.DepthRequired, incoming.DepthRequired)
+}
diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/doc.go b/vendor/github.com/authzed/spicedb/internal/dispatch/doc.go
new file mode 100644
index 0000000..8b88bb0
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/dispatch/doc.go
@@ -0,0 +1,2 @@
+// Package dispatch contains logic to dispatch requests locally or to other nodes.
+package dispatch
diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/errors.go b/vendor/github.com/authzed/spicedb/internal/dispatch/errors.go
new file mode 100644
index 0000000..17cec3f
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/dispatch/errors.go
@@ -0,0 +1,39 @@
+package dispatch
+
+import (
+ "fmt"
+
+ v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "github.com/authzed/spicedb/pkg/spiceerrors"
+)
+
+// MaxDepthExceededError is an error returned when the maximum depth for dispatching has been exceeded.
+type MaxDepthExceededError struct {
+ error
+
+ // Request is the request that exceeded the maximum depth.
+ Request DispatchableRequest
+}
+
+// NewMaxDepthExceededError creates a new MaxDepthExceededError.
+func NewMaxDepthExceededError(req DispatchableRequest) error {
+ return MaxDepthExceededError{
+ fmt.Errorf("max depth exceeded: this usually indicates a recursive or too deep data dependency. See: https://spicedb.dev/d/debug-max-depth"),
+ req,
+ }
+}
+
+// GRPCStatus implements retrieving the gRPC status for the error.
+func (err MaxDepthExceededError) GRPCStatus() *status.Status {
+ return spiceerrors.WithCodeAndDetails(
+ err,
+ codes.ResourceExhausted,
+ spiceerrors.ForReason(
+ v1.ErrorReason_ERROR_REASON_MAXIMUM_DEPTH_EXCEEDED,
+ map[string]string{},
+ ),
+ )
+}
diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/graph/errors.go b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/errors.go
new file mode 100644
index 0000000..ecaf59a
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/errors.go
@@ -0,0 +1,77 @@
+package graph
+
+import (
+ "fmt"
+
+ "github.com/rs/zerolog"
+)
+
+// NamespaceNotFoundError occurs when a namespace was not found.
+type NamespaceNotFoundError struct {
+ error
+ namespaceName string
+}
+
+// NotFoundNamespaceName returns the name of the namespace that was not found.
+func (err NamespaceNotFoundError) NotFoundNamespaceName() string {
+ return err.namespaceName
+}
+
+// MarshalZerologObject implements zerolog.LogObjectMarshaler
+func (err NamespaceNotFoundError) MarshalZerologObject(e *zerolog.Event) {
+ e.Err(err.error).Str("namespace", err.namespaceName)
+}
+
+// DetailsMetadata returns the metadata for details for this error.
+func (err NamespaceNotFoundError) DetailsMetadata() map[string]string {
+ return map[string]string{
+ "definition_name": err.namespaceName,
+ }
+}
+
+// NewNamespaceNotFoundErr constructs a new namespace not found error.
+func NewNamespaceNotFoundErr(nsName string) error {
+ return NamespaceNotFoundError{
+ error: fmt.Errorf("object definition `%s` not found", nsName),
+ namespaceName: nsName,
+ }
+}
+
+// RelationNotFoundError occurs when a relation was not found under a namespace.
+type RelationNotFoundError struct {
+ error
+ namespaceName string
+ relationName string
+}
+
+// NamespaceName returns the name of the namespace in which the relation was not found.
+func (err RelationNotFoundError) NamespaceName() string {
+ return err.namespaceName
+}
+
+// NotFoundRelationName returns the name of the relation not found.
+func (err RelationNotFoundError) NotFoundRelationName() string {
+ return err.relationName
+}
+
+// MarshalZerologObject implements zerolog.LogObjectMarshaler
+func (err RelationNotFoundError) MarshalZerologObject(e *zerolog.Event) {
+ e.Err(err.error).Str("namespace", err.namespaceName).Str("relation", err.relationName)
+}
+
+// DetailsMetadata returns the metadata for details for this error.
+func (err RelationNotFoundError) DetailsMetadata() map[string]string {
+ return map[string]string{
+ "definition_name": err.namespaceName,
+ "relation_or_permission_name": err.relationName,
+ }
+}
+
+// NewRelationNotFoundErr constructs a new relation not found error.
+func NewRelationNotFoundErr(nsName string, relationName string) error {
+ return RelationNotFoundError{
+ error: fmt.Errorf("relation/permission `%s` not found under definition `%s`", relationName, nsName),
+ namespaceName: nsName,
+ relationName: relationName,
+ }
+}
diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/graph/graph.go b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/graph.go
new file mode 100644
index 0000000..232b1e7
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/graph.go
@@ -0,0 +1,437 @@
+package graph
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ "github.com/rs/zerolog"
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/trace"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "github.com/authzed/spicedb/internal/dispatch"
+ "github.com/authzed/spicedb/internal/graph"
+ log "github.com/authzed/spicedb/internal/logging"
+ datastoremw "github.com/authzed/spicedb/internal/middleware/datastore"
+ caveattypes "github.com/authzed/spicedb/pkg/caveats/types"
+ "github.com/authzed/spicedb/pkg/datastore"
+ "github.com/authzed/spicedb/pkg/middleware/nodeid"
+ core "github.com/authzed/spicedb/pkg/proto/core/v1"
+ v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
+ "github.com/authzed/spicedb/pkg/tuple"
+)
+
+const errDispatch = "error dispatching request: %w"
+
+var tracer = otel.Tracer("spicedb/internal/dispatch/local")
+
+// ConcurrencyLimits defines per-dispatch-type concurrency limits.
+//
+//go:generate go run github.com/ecordell/optgen -output zz_generated.options.go . ConcurrencyLimits
+type ConcurrencyLimits struct {
+ Check uint16 `debugmap:"visible"`
+ ReachableResources uint16 `debugmap:"visible"`
+ LookupResources uint16 `debugmap:"visible"`
+ LookupSubjects uint16 `debugmap:"visible"`
+}
+
+const defaultConcurrencyLimit = 50
+
+// WithOverallDefaultLimit sets the overall default limit for any unspecified limits
+// and returns a new struct.
+func (cl ConcurrencyLimits) WithOverallDefaultLimit(overallDefaultLimit uint16) ConcurrencyLimits {
+ return limitsOrDefaults(cl, overallDefaultLimit)
+}
+
+func (cl ConcurrencyLimits) MarshalZerologObject(e *zerolog.Event) {
+ e.Uint16("concurrency-limit-check-permission", cl.Check)
+ e.Uint16("concurrency-limit-lookup-resources", cl.LookupResources)
+ e.Uint16("concurrency-limit-lookup-subjects", cl.LookupSubjects)
+ e.Uint16("concurrency-limit-reachable-resources", cl.ReachableResources)
+}
+
+func limitsOrDefaults(limits ConcurrencyLimits, overallDefaultLimit uint16) ConcurrencyLimits {
+ limits.Check = limitOrDefault(limits.Check, overallDefaultLimit)
+ limits.LookupResources = limitOrDefault(limits.LookupResources, overallDefaultLimit)
+ limits.LookupSubjects = limitOrDefault(limits.LookupSubjects, overallDefaultLimit)
+ limits.ReachableResources = limitOrDefault(limits.ReachableResources, overallDefaultLimit)
+ return limits
+}
+
+func limitOrDefault(limit uint16, defaultLimit uint16) uint16 {
+ if limit <= 0 {
+ return defaultLimit
+ }
+ return limit
+}
+
+// SharedConcurrencyLimits returns a ConcurrencyLimits struct with the limit
+// set to that provided for each operation.
+func SharedConcurrencyLimits(concurrencyLimit uint16) ConcurrencyLimits {
+ return ConcurrencyLimits{
+ Check: concurrencyLimit,
+ ReachableResources: concurrencyLimit,
+ LookupResources: concurrencyLimit,
+ LookupSubjects: concurrencyLimit,
+ }
+}
+
+// NewLocalOnlyDispatcher creates a dispatcher that consults with the graph to formulate a response.
+func NewLocalOnlyDispatcher(typeSet *caveattypes.TypeSet, concurrencyLimit uint16, dispatchChunkSize uint16) dispatch.Dispatcher {
+ return NewLocalOnlyDispatcherWithLimits(typeSet, SharedConcurrencyLimits(concurrencyLimit), dispatchChunkSize)
+}
+
+// NewLocalOnlyDispatcherWithLimits creates a dispatcher thatg consults with the graph to formulate a response
+// and has the defined concurrency limits per dispatch type.
+func NewLocalOnlyDispatcherWithLimits(typeSet *caveattypes.TypeSet, concurrencyLimits ConcurrencyLimits, dispatchChunkSize uint16) dispatch.Dispatcher {
+ d := &localDispatcher{}
+
+ concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit)
+ chunkSize := dispatchChunkSize
+ if chunkSize == 0 {
+ chunkSize = 100
+ log.Warn().Msgf("LocalOnlyDispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize)
+ }
+
+ d.checker = graph.NewConcurrentChecker(d, concurrencyLimits.Check, chunkSize)
+ d.expander = graph.NewConcurrentExpander(d)
+ d.lookupSubjectsHandler = graph.NewConcurrentLookupSubjects(d, concurrencyLimits.LookupSubjects, chunkSize)
+ d.lookupResourcesHandler2 = graph.NewCursoredLookupResources2(d, d, typeSet, concurrencyLimits.LookupResources, chunkSize)
+
+ return d
+}
+
+// NewDispatcher creates a dispatcher that consults with the graph and redispatches subproblems to
+// the provided redispatcher.
+func NewDispatcher(redispatcher dispatch.Dispatcher, typeSet *caveattypes.TypeSet, concurrencyLimits ConcurrencyLimits, dispatchChunkSize uint16) dispatch.Dispatcher {
+ concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit)
+ chunkSize := dispatchChunkSize
+ if chunkSize == 0 {
+ chunkSize = 100
+ log.Warn().Msgf("Dispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize)
+ }
+
+ checker := graph.NewConcurrentChecker(redispatcher, concurrencyLimits.Check, chunkSize)
+ expander := graph.NewConcurrentExpander(redispatcher)
+ lookupSubjectsHandler := graph.NewConcurrentLookupSubjects(redispatcher, concurrencyLimits.LookupSubjects, chunkSize)
+ lookupResourcesHandler2 := graph.NewCursoredLookupResources2(redispatcher, redispatcher, typeSet, concurrencyLimits.LookupResources, chunkSize)
+
+ return &localDispatcher{
+ checker: checker,
+ expander: expander,
+ lookupSubjectsHandler: lookupSubjectsHandler,
+ lookupResourcesHandler2: lookupResourcesHandler2,
+ }
+}
+
+type localDispatcher struct {
+ checker *graph.ConcurrentChecker
+ expander *graph.ConcurrentExpander
+ lookupSubjectsHandler *graph.ConcurrentLookupSubjects
+ lookupResourcesHandler2 *graph.CursoredLookupResources2
+}
+
+func (ld *localDispatcher) loadNamespace(ctx context.Context, nsName string, revision datastore.Revision) (*core.NamespaceDefinition, error) {
+ ds := datastoremw.MustFromContext(ctx).SnapshotReader(revision)
+
+ // Load namespace and relation from the datastore
+ ns, _, err := ds.ReadNamespaceByName(ctx, nsName)
+ if err != nil {
+ return nil, rewriteNamespaceError(err)
+ }
+
+ return ns, err
+}
+
+func (ld *localDispatcher) parseRevision(ctx context.Context, s string) (datastore.Revision, error) {
+ ds := datastoremw.MustFromContext(ctx)
+ return ds.RevisionFromString(s)
+}
+
+func (ld *localDispatcher) lookupRelation(_ context.Context, ns *core.NamespaceDefinition, relationName string) (*core.Relation, error) {
+ var relation *core.Relation
+ for _, candidate := range ns.Relation {
+ if candidate.Name == relationName {
+ relation = candidate
+ break
+ }
+ }
+
+ if relation == nil {
+ return nil, NewRelationNotFoundErr(ns.Name, relationName)
+ }
+
+ return relation, nil
+}
+
+// DispatchCheck implements dispatch.Check interface
+func (ld *localDispatcher) DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error) {
+ resourceType := tuple.StringCoreRR(req.ResourceRelation)
+ spanName := "DispatchCheck → " + resourceType + "@" + req.Subject.Namespace + "#" + req.Subject.Relation
+
+ nodeID, err := nodeid.FromContext(ctx)
+ if err != nil {
+ log.Err(err).Msg("failed to get node ID")
+ }
+
+ ctx, span := tracer.Start(ctx, spanName, trace.WithAttributes(
+ attribute.String("resource-type", resourceType),
+ attribute.StringSlice("resource-ids", req.ResourceIds),
+ attribute.String("subject", tuple.StringCoreONR(req.Subject)),
+ attribute.String("node-id", nodeID),
+ ))
+ defer span.End()
+
+ if err := dispatch.CheckDepth(ctx, req); err != nil {
+ if req.Debug != v1.DispatchCheckRequest_ENABLE_BASIC_DEBUGGING {
+ return &v1.DispatchCheckResponse{
+ Metadata: &v1.ResponseMeta{
+ DispatchCount: 0,
+ },
+ }, rewriteError(ctx, err)
+ }
+
+ // NOTE: we return debug information here to ensure tooling can see the cycle.
+ nodeID, nerr := nodeid.FromContext(ctx)
+ if nerr != nil {
+ log.Err(nerr).Msg("failed to get nodeID from context")
+ }
+
+ return &v1.DispatchCheckResponse{
+ Metadata: &v1.ResponseMeta{
+ DispatchCount: 0,
+ DebugInfo: &v1.DebugInformation{
+ Check: &v1.CheckDebugTrace{
+ Request: req,
+ SourceId: nodeID,
+ },
+ },
+ },
+ }, rewriteError(ctx, err)
+ }
+
+ revision, err := ld.parseRevision(ctx, req.Metadata.AtRevision)
+ if err != nil {
+ return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, rewriteError(ctx, err)
+ }
+
+ ns, err := ld.loadNamespace(ctx, req.ResourceRelation.Namespace, revision)
+ if err != nil {
+ return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, rewriteError(ctx, err)
+ }
+
+ relation, err := ld.lookupRelation(ctx, ns, req.ResourceRelation.Relation)
+ if err != nil {
+ return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, rewriteError(ctx, err)
+ }
+
+ // If the relation is aliasing another one and the subject does not have the same type as
+ // resource, load the aliased relation and dispatch to it. We cannot use the alias if the
+ // resource and subject types are the same because a check on the *exact same* resource and
+ // subject must pass, and we don't know how many intermediate steps may hit that case.
+ if relation.AliasingRelation != "" && req.ResourceRelation.Namespace != req.Subject.Namespace {
+ relation, err := ld.lookupRelation(ctx, ns, relation.AliasingRelation)
+ if err != nil {
+ return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, rewriteError(ctx, err)
+ }
+
+ // Rewrite the request over the aliased relation.
+ validatedReq := graph.ValidatedCheckRequest{
+ DispatchCheckRequest: &v1.DispatchCheckRequest{
+ ResourceRelation: &core.RelationReference{
+ Namespace: req.ResourceRelation.Namespace,
+ Relation: relation.Name,
+ },
+ ResourceIds: req.ResourceIds,
+ Subject: req.Subject,
+ Metadata: req.Metadata,
+ Debug: req.Debug,
+ CheckHints: req.CheckHints,
+ },
+ Revision: revision,
+ OriginalRelationName: req.ResourceRelation.Relation,
+ }
+
+ resp, err := ld.checker.Check(ctx, validatedReq, relation)
+ return resp, rewriteError(ctx, err)
+ }
+
+ resp, err := ld.checker.Check(ctx, graph.ValidatedCheckRequest{
+ DispatchCheckRequest: req,
+ Revision: revision,
+ }, relation)
+ return resp, rewriteError(ctx, err)
+}
+
+// DispatchExpand implements dispatch.Expand interface
+func (ld *localDispatcher) DispatchExpand(ctx context.Context, req *v1.DispatchExpandRequest) (*v1.DispatchExpandResponse, error) {
+ nodeID, err := nodeid.FromContext(ctx)
+ if err != nil {
+ log.Err(err).Msg("failed to get node ID")
+ }
+
+ ctx, span := tracer.Start(ctx, "DispatchExpand", trace.WithAttributes(
+ attribute.String("start", tuple.StringCoreONR(req.ResourceAndRelation)),
+ attribute.String("node-id", nodeID),
+ ))
+ defer span.End()
+
+ if err := dispatch.CheckDepth(ctx, req); err != nil {
+ return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err
+ }
+
+ revision, err := ld.parseRevision(ctx, req.Metadata.AtRevision)
+ if err != nil {
+ return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err
+ }
+
+ ns, err := ld.loadNamespace(ctx, req.ResourceAndRelation.Namespace, revision)
+ if err != nil {
+ return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err
+ }
+
+ relation, err := ld.lookupRelation(ctx, ns, req.ResourceAndRelation.Relation)
+ if err != nil {
+ return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err
+ }
+
+ return ld.expander.Expand(ctx, graph.ValidatedExpandRequest{
+ DispatchExpandRequest: req,
+ Revision: revision,
+ }, relation)
+}
+
+func (ld *localDispatcher) DispatchLookupResources2(
+ req *v1.DispatchLookupResources2Request,
+ stream dispatch.LookupResources2Stream,
+) error {
+ nodeID, err := nodeid.FromContext(stream.Context())
+ if err != nil {
+ log.Err(err).Msg("failed to get node ID")
+ }
+
+ ctx, span := tracer.Start(stream.Context(), "DispatchLookupResources2", trace.WithAttributes(
+ attribute.String("resource-type", tuple.StringCoreRR(req.ResourceRelation)),
+ attribute.String("subject-type", tuple.StringCoreRR(req.SubjectRelation)),
+ attribute.StringSlice("subject-ids", req.SubjectIds),
+ attribute.String("terminal-subject", tuple.StringCoreONR(req.TerminalSubject)),
+ attribute.String("node-id", nodeID),
+ ))
+ defer span.End()
+
+ if err := dispatch.CheckDepth(ctx, req); err != nil {
+ return err
+ }
+
+ revision, err := ld.parseRevision(ctx, req.Metadata.AtRevision)
+ if err != nil {
+ return err
+ }
+
+ return ld.lookupResourcesHandler2.LookupResources2(
+ graph.ValidatedLookupResources2Request{
+ DispatchLookupResources2Request: req,
+ Revision: revision,
+ },
+ dispatch.StreamWithContext(ctx, stream),
+ )
+}
+
+// DispatchLookupSubjects implements dispatch.LookupSubjects interface
+func (ld *localDispatcher) DispatchLookupSubjects(
+ req *v1.DispatchLookupSubjectsRequest,
+ stream dispatch.LookupSubjectsStream,
+) error {
+ nodeID, err := nodeid.FromContext(stream.Context())
+ if err != nil {
+ log.Err(err).Msg("failed to get node ID")
+ }
+
+ resourceType := tuple.StringCoreRR(req.ResourceRelation)
+ subjectRelation := tuple.StringCoreRR(req.SubjectRelation)
+ spanName := "DispatchLookupSubjects → " + resourceType + "@" + subjectRelation
+
+ ctx, span := tracer.Start(stream.Context(), spanName, trace.WithAttributes(
+ attribute.String("resource-type", resourceType),
+ attribute.String("subject-type", subjectRelation),
+ attribute.StringSlice("resource-ids", req.ResourceIds),
+ attribute.String("node-id", nodeID),
+ ))
+ defer span.End()
+
+ if err := dispatch.CheckDepth(ctx, req); err != nil {
+ return err
+ }
+
+ revision, err := ld.parseRevision(ctx, req.Metadata.AtRevision)
+ if err != nil {
+ return err
+ }
+
+ return ld.lookupSubjectsHandler.LookupSubjects(
+ graph.ValidatedLookupSubjectsRequest{
+ DispatchLookupSubjectsRequest: req,
+ Revision: revision,
+ },
+ dispatch.StreamWithContext(ctx, stream),
+ )
+}
+
+func (ld *localDispatcher) Close() error {
+ return nil
+}
+
+func (ld *localDispatcher) ReadyState() dispatch.ReadyState {
+ return dispatch.ReadyState{IsReady: true}
+}
+
+func rewriteNamespaceError(original error) error {
+ nsNotFound := datastore.NamespaceNotFoundError{}
+
+ switch {
+ case errors.As(original, &nsNotFound):
+ return NewNamespaceNotFoundErr(nsNotFound.NotFoundNamespaceName())
+ case errors.As(original, &NamespaceNotFoundError{}):
+ fallthrough
+ case errors.As(original, &RelationNotFoundError{}):
+ return original
+ default:
+ return fmt.Errorf(errDispatch, original)
+ }
+}
+
+// rewriteError transforms graph errors into a gRPC Status
+func rewriteError(ctx context.Context, err error) error {
+ if err == nil {
+ return nil
+ }
+
+ // Check if the error can be directly used.
+ if st, ok := status.FromError(err); ok {
+ return st.Err()
+ }
+
+ switch {
+ case errors.Is(err, context.DeadlineExceeded):
+ return status.Errorf(codes.DeadlineExceeded, "%s", err)
+ case errors.Is(err, context.Canceled):
+ err := context.Cause(ctx)
+ if err != nil {
+ if _, ok := status.FromError(err); ok {
+ return err
+ }
+ }
+
+ return status.Errorf(codes.Canceled, "%s", err)
+ default:
+ log.Ctx(ctx).Err(err).Msg("received unexpected graph error")
+ return err
+ }
+}
+
+var emptyMetadata = &v1.ResponseMeta{
+ DispatchCount: 0,
+}
diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/graph/zz_generated.options.go b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/zz_generated.options.go
new file mode 100644
index 0000000..9a0a7fc
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/zz_generated.options.go
@@ -0,0 +1,92 @@
+// Code generated by github.com/ecordell/optgen. DO NOT EDIT.
+package graph
+
+import (
+ defaults "github.com/creasty/defaults"
+ helpers "github.com/ecordell/optgen/helpers"
+)
+
+type ConcurrencyLimitsOption func(c *ConcurrencyLimits)
+
+// NewConcurrencyLimitsWithOptions creates a new ConcurrencyLimits with the passed in options set
+func NewConcurrencyLimitsWithOptions(opts ...ConcurrencyLimitsOption) *ConcurrencyLimits {
+ c := &ConcurrencyLimits{}
+ for _, o := range opts {
+ o(c)
+ }
+ return c
+}
+
+// NewConcurrencyLimitsWithOptionsAndDefaults creates a new ConcurrencyLimits with the passed in options set starting from the defaults
+func NewConcurrencyLimitsWithOptionsAndDefaults(opts ...ConcurrencyLimitsOption) *ConcurrencyLimits {
+ c := &ConcurrencyLimits{}
+ defaults.MustSet(c)
+ for _, o := range opts {
+ o(c)
+ }
+ return c
+}
+
+// ToOption returns a new ConcurrencyLimitsOption that sets the values from the passed in ConcurrencyLimits
+func (c *ConcurrencyLimits) ToOption() ConcurrencyLimitsOption {
+ return func(to *ConcurrencyLimits) {
+ to.Check = c.Check
+ to.ReachableResources = c.ReachableResources
+ to.LookupResources = c.LookupResources
+ to.LookupSubjects = c.LookupSubjects
+ }
+}
+
+// DebugMap returns a map form of ConcurrencyLimits for debugging
+func (c ConcurrencyLimits) DebugMap() map[string]any {
+ debugMap := map[string]any{}
+ debugMap["Check"] = helpers.DebugValue(c.Check, false)
+ debugMap["ReachableResources"] = helpers.DebugValue(c.ReachableResources, false)
+ debugMap["LookupResources"] = helpers.DebugValue(c.LookupResources, false)
+ debugMap["LookupSubjects"] = helpers.DebugValue(c.LookupSubjects, false)
+ return debugMap
+}
+
+// ConcurrencyLimitsWithOptions configures an existing ConcurrencyLimits with the passed in options set
+func ConcurrencyLimitsWithOptions(c *ConcurrencyLimits, opts ...ConcurrencyLimitsOption) *ConcurrencyLimits {
+ for _, o := range opts {
+ o(c)
+ }
+ return c
+}
+
+// WithOptions configures the receiver ConcurrencyLimits with the passed in options set
+func (c *ConcurrencyLimits) WithOptions(opts ...ConcurrencyLimitsOption) *ConcurrencyLimits {
+ for _, o := range opts {
+ o(c)
+ }
+ return c
+}
+
+// WithCheck returns an option that can set Check on a ConcurrencyLimits
+func WithCheck(check uint16) ConcurrencyLimitsOption {
+ return func(c *ConcurrencyLimits) {
+ c.Check = check
+ }
+}
+
+// WithReachableResources returns an option that can set ReachableResources on a ConcurrencyLimits
+func WithReachableResources(reachableResources uint16) ConcurrencyLimitsOption {
+ return func(c *ConcurrencyLimits) {
+ c.ReachableResources = reachableResources
+ }
+}
+
+// WithLookupResources returns an option that can set LookupResources on a ConcurrencyLimits
+func WithLookupResources(lookupResources uint16) ConcurrencyLimitsOption {
+ return func(c *ConcurrencyLimits) {
+ c.LookupResources = lookupResources
+ }
+}
+
+// WithLookupSubjects returns an option that can set LookupSubjects on a ConcurrencyLimits
+func WithLookupSubjects(lookupSubjects uint16) ConcurrencyLimitsOption {
+ return func(c *ConcurrencyLimits) {
+ c.LookupSubjects = lookupSubjects
+ }
+}
diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/stream.go b/vendor/github.com/authzed/spicedb/internal/dispatch/stream.go
new file mode 100644
index 0000000..1d6636c
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/dispatch/stream.go
@@ -0,0 +1,187 @@
+package dispatch
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ grpc "google.golang.org/grpc"
+)
+
+// Stream defines the interface generically matching a streaming dispatch response.
+type Stream[T any] interface {
+ // Publish publishes the result to the stream.
+ Publish(T) error
+
+ // Context returns the context for the stream.
+ Context() context.Context
+}
+
+type grpcStream[T any] interface {
+ grpc.ServerStream
+ Send(T) error
+}
+
+// WrapGRPCStream wraps a gRPC result stream with a concurrent-safe dispatch stream. This is
+// necessary because gRPC response streams are *not concurrent safe*.
+// See: https://groups.google.com/g/grpc-io/c/aI6L6M4fzQ0?pli=1
+func WrapGRPCStream[R any, S grpcStream[R]](grpcStream S) Stream[R] {
+ return &concurrentSafeStream[R]{
+ grpcStream: grpcStream,
+ mu: sync.Mutex{},
+ }
+}
+
+type concurrentSafeStream[T any] struct {
+ grpcStream grpcStream[T] // GUARDED_BY(mu)
+ mu sync.Mutex
+}
+
+func (s *concurrentSafeStream[T]) Context() context.Context {
+ return s.grpcStream.Context()
+}
+
+func (s *concurrentSafeStream[T]) Publish(result T) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.grpcStream.Send(result)
+}
+
+// NewCollectingDispatchStream creates a new CollectingDispatchStream.
+func NewCollectingDispatchStream[T any](ctx context.Context) *CollectingDispatchStream[T] {
+ return &CollectingDispatchStream[T]{
+ ctx: ctx,
+ results: nil,
+ mu: sync.Mutex{},
+ }
+}
+
+// CollectingDispatchStream is a dispatch stream that collects results in memory.
+type CollectingDispatchStream[T any] struct {
+ ctx context.Context
+ results []T // GUARDED_BY(mu)
+ mu sync.Mutex
+}
+
+func (s *CollectingDispatchStream[T]) Context() context.Context {
+ return s.ctx
+}
+
+func (s *CollectingDispatchStream[T]) Results() []T {
+ return s.results
+}
+
+func (s *CollectingDispatchStream[T]) Publish(result T) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.results = append(s.results, result)
+ return nil
+}
+
+// WrappedDispatchStream is a dispatch stream that wraps another dispatch stream, and performs
+// an operation on each result before puppeting back up to the parent stream.
+type WrappedDispatchStream[T any] struct {
+ Stream Stream[T]
+ Ctx context.Context
+ Processor func(result T) (T, bool, error)
+}
+
+func (s *WrappedDispatchStream[T]) Publish(result T) error {
+ if s.Processor == nil {
+ return s.Stream.Publish(result)
+ }
+
+ processed, ok, err := s.Processor(result)
+ if err != nil {
+ return err
+ }
+ if !ok {
+ return nil
+ }
+
+ return s.Stream.Publish(processed)
+}
+
+func (s *WrappedDispatchStream[T]) Context() context.Context {
+ return s.Ctx
+}
+
+// StreamWithContext returns the given dispatch stream, wrapped to return the given context.
+func StreamWithContext[T any](context context.Context, stream Stream[T]) Stream[T] {
+ return &WrappedDispatchStream[T]{
+ Stream: stream,
+ Ctx: context,
+ Processor: nil,
+ }
+}
+
+// HandlingDispatchStream is a dispatch stream that executes a handler for each item published.
+// It uses an internal mutex to ensure it is thread safe.
+type HandlingDispatchStream[T any] struct {
+ ctx context.Context
+ processor func(result T) error // GUARDED_BY(mu)
+ mu sync.Mutex
+}
+
+// NewHandlingDispatchStream returns a new handling dispatch stream.
+func NewHandlingDispatchStream[T any](ctx context.Context, processor func(result T) error) Stream[T] {
+ return &HandlingDispatchStream[T]{
+ ctx: ctx,
+ processor: processor,
+ mu: sync.Mutex{},
+ }
+}
+
+func (s *HandlingDispatchStream[T]) Publish(result T) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if s.processor == nil {
+ return nil
+ }
+
+ return s.processor(result)
+}
+
+func (s *HandlingDispatchStream[T]) Context() context.Context {
+ return s.ctx
+}
+
+// CountingDispatchStream is a dispatch stream that counts the number of items published.
+// It uses an internal atomic int to ensure it is thread safe.
+type CountingDispatchStream[T any] struct {
+ Stream Stream[T]
+ count *atomic.Uint64
+}
+
+func NewCountingDispatchStream[T any](wrapped Stream[T]) *CountingDispatchStream[T] {
+ return &CountingDispatchStream[T]{
+ Stream: wrapped,
+ count: &atomic.Uint64{},
+ }
+}
+
+func (s *CountingDispatchStream[T]) PublishedCount() uint64 {
+ return s.count.Load()
+}
+
+func (s *CountingDispatchStream[T]) Publish(result T) error {
+ err := s.Stream.Publish(result)
+ if err != nil {
+ return err
+ }
+
+ s.count.Add(1)
+ return nil
+}
+
+func (s *CountingDispatchStream[T]) Context() context.Context {
+ return s.Stream.Context()
+}
+
+// Ensure the streams implement the interface.
+var (
+ _ Stream[any] = &CollectingDispatchStream[any]{}
+ _ Stream[any] = &WrappedDispatchStream[any]{}
+ _ Stream[any] = &CountingDispatchStream[any]{}
+)