summaryrefslogtreecommitdiff
path: root/vendor/github.com/authzed/spicedb/internal/dispatch/graph/graph.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/authzed/spicedb/internal/dispatch/graph/graph.go')
-rw-r--r--vendor/github.com/authzed/spicedb/internal/dispatch/graph/graph.go437
1 files changed, 437 insertions, 0 deletions
diff --git a/vendor/github.com/authzed/spicedb/internal/dispatch/graph/graph.go b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/graph.go
new file mode 100644
index 0000000..232b1e7
--- /dev/null
+++ b/vendor/github.com/authzed/spicedb/internal/dispatch/graph/graph.go
@@ -0,0 +1,437 @@
+package graph
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ "github.com/rs/zerolog"
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/trace"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "github.com/authzed/spicedb/internal/dispatch"
+ "github.com/authzed/spicedb/internal/graph"
+ log "github.com/authzed/spicedb/internal/logging"
+ datastoremw "github.com/authzed/spicedb/internal/middleware/datastore"
+ caveattypes "github.com/authzed/spicedb/pkg/caveats/types"
+ "github.com/authzed/spicedb/pkg/datastore"
+ "github.com/authzed/spicedb/pkg/middleware/nodeid"
+ core "github.com/authzed/spicedb/pkg/proto/core/v1"
+ v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
+ "github.com/authzed/spicedb/pkg/tuple"
+)
+
+const errDispatch = "error dispatching request: %w"
+
+var tracer = otel.Tracer("spicedb/internal/dispatch/local")
+
+// ConcurrencyLimits defines per-dispatch-type concurrency limits.
+//
+//go:generate go run github.com/ecordell/optgen -output zz_generated.options.go . ConcurrencyLimits
+type ConcurrencyLimits struct {
+ Check uint16 `debugmap:"visible"`
+ ReachableResources uint16 `debugmap:"visible"`
+ LookupResources uint16 `debugmap:"visible"`
+ LookupSubjects uint16 `debugmap:"visible"`
+}
+
+const defaultConcurrencyLimit = 50
+
+// WithOverallDefaultLimit sets the overall default limit for any unspecified limits
+// and returns a new struct.
+func (cl ConcurrencyLimits) WithOverallDefaultLimit(overallDefaultLimit uint16) ConcurrencyLimits {
+ return limitsOrDefaults(cl, overallDefaultLimit)
+}
+
+func (cl ConcurrencyLimits) MarshalZerologObject(e *zerolog.Event) {
+ e.Uint16("concurrency-limit-check-permission", cl.Check)
+ e.Uint16("concurrency-limit-lookup-resources", cl.LookupResources)
+ e.Uint16("concurrency-limit-lookup-subjects", cl.LookupSubjects)
+ e.Uint16("concurrency-limit-reachable-resources", cl.ReachableResources)
+}
+
+func limitsOrDefaults(limits ConcurrencyLimits, overallDefaultLimit uint16) ConcurrencyLimits {
+ limits.Check = limitOrDefault(limits.Check, overallDefaultLimit)
+ limits.LookupResources = limitOrDefault(limits.LookupResources, overallDefaultLimit)
+ limits.LookupSubjects = limitOrDefault(limits.LookupSubjects, overallDefaultLimit)
+ limits.ReachableResources = limitOrDefault(limits.ReachableResources, overallDefaultLimit)
+ return limits
+}
+
+func limitOrDefault(limit uint16, defaultLimit uint16) uint16 {
+ if limit <= 0 {
+ return defaultLimit
+ }
+ return limit
+}
+
+// SharedConcurrencyLimits returns a ConcurrencyLimits struct with the limit
+// set to that provided for each operation.
+func SharedConcurrencyLimits(concurrencyLimit uint16) ConcurrencyLimits {
+ return ConcurrencyLimits{
+ Check: concurrencyLimit,
+ ReachableResources: concurrencyLimit,
+ LookupResources: concurrencyLimit,
+ LookupSubjects: concurrencyLimit,
+ }
+}
+
+// NewLocalOnlyDispatcher creates a dispatcher that consults with the graph to formulate a response.
+func NewLocalOnlyDispatcher(typeSet *caveattypes.TypeSet, concurrencyLimit uint16, dispatchChunkSize uint16) dispatch.Dispatcher {
+ return NewLocalOnlyDispatcherWithLimits(typeSet, SharedConcurrencyLimits(concurrencyLimit), dispatchChunkSize)
+}
+
+// NewLocalOnlyDispatcherWithLimits creates a dispatcher thatg consults with the graph to formulate a response
+// and has the defined concurrency limits per dispatch type.
+func NewLocalOnlyDispatcherWithLimits(typeSet *caveattypes.TypeSet, concurrencyLimits ConcurrencyLimits, dispatchChunkSize uint16) dispatch.Dispatcher {
+ d := &localDispatcher{}
+
+ concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit)
+ chunkSize := dispatchChunkSize
+ if chunkSize == 0 {
+ chunkSize = 100
+ log.Warn().Msgf("LocalOnlyDispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize)
+ }
+
+ d.checker = graph.NewConcurrentChecker(d, concurrencyLimits.Check, chunkSize)
+ d.expander = graph.NewConcurrentExpander(d)
+ d.lookupSubjectsHandler = graph.NewConcurrentLookupSubjects(d, concurrencyLimits.LookupSubjects, chunkSize)
+ d.lookupResourcesHandler2 = graph.NewCursoredLookupResources2(d, d, typeSet, concurrencyLimits.LookupResources, chunkSize)
+
+ return d
+}
+
+// NewDispatcher creates a dispatcher that consults with the graph and redispatches subproblems to
+// the provided redispatcher.
+func NewDispatcher(redispatcher dispatch.Dispatcher, typeSet *caveattypes.TypeSet, concurrencyLimits ConcurrencyLimits, dispatchChunkSize uint16) dispatch.Dispatcher {
+ concurrencyLimits = limitsOrDefaults(concurrencyLimits, defaultConcurrencyLimit)
+ chunkSize := dispatchChunkSize
+ if chunkSize == 0 {
+ chunkSize = 100
+ log.Warn().Msgf("Dispatcher: dispatchChunkSize not set, defaulting to %d", chunkSize)
+ }
+
+ checker := graph.NewConcurrentChecker(redispatcher, concurrencyLimits.Check, chunkSize)
+ expander := graph.NewConcurrentExpander(redispatcher)
+ lookupSubjectsHandler := graph.NewConcurrentLookupSubjects(redispatcher, concurrencyLimits.LookupSubjects, chunkSize)
+ lookupResourcesHandler2 := graph.NewCursoredLookupResources2(redispatcher, redispatcher, typeSet, concurrencyLimits.LookupResources, chunkSize)
+
+ return &localDispatcher{
+ checker: checker,
+ expander: expander,
+ lookupSubjectsHandler: lookupSubjectsHandler,
+ lookupResourcesHandler2: lookupResourcesHandler2,
+ }
+}
+
+type localDispatcher struct {
+ checker *graph.ConcurrentChecker
+ expander *graph.ConcurrentExpander
+ lookupSubjectsHandler *graph.ConcurrentLookupSubjects
+ lookupResourcesHandler2 *graph.CursoredLookupResources2
+}
+
+func (ld *localDispatcher) loadNamespace(ctx context.Context, nsName string, revision datastore.Revision) (*core.NamespaceDefinition, error) {
+ ds := datastoremw.MustFromContext(ctx).SnapshotReader(revision)
+
+ // Load namespace and relation from the datastore
+ ns, _, err := ds.ReadNamespaceByName(ctx, nsName)
+ if err != nil {
+ return nil, rewriteNamespaceError(err)
+ }
+
+ return ns, err
+}
+
+func (ld *localDispatcher) parseRevision(ctx context.Context, s string) (datastore.Revision, error) {
+ ds := datastoremw.MustFromContext(ctx)
+ return ds.RevisionFromString(s)
+}
+
+func (ld *localDispatcher) lookupRelation(_ context.Context, ns *core.NamespaceDefinition, relationName string) (*core.Relation, error) {
+ var relation *core.Relation
+ for _, candidate := range ns.Relation {
+ if candidate.Name == relationName {
+ relation = candidate
+ break
+ }
+ }
+
+ if relation == nil {
+ return nil, NewRelationNotFoundErr(ns.Name, relationName)
+ }
+
+ return relation, nil
+}
+
+// DispatchCheck implements dispatch.Check interface
+func (ld *localDispatcher) DispatchCheck(ctx context.Context, req *v1.DispatchCheckRequest) (*v1.DispatchCheckResponse, error) {
+ resourceType := tuple.StringCoreRR(req.ResourceRelation)
+ spanName := "DispatchCheck → " + resourceType + "@" + req.Subject.Namespace + "#" + req.Subject.Relation
+
+ nodeID, err := nodeid.FromContext(ctx)
+ if err != nil {
+ log.Err(err).Msg("failed to get node ID")
+ }
+
+ ctx, span := tracer.Start(ctx, spanName, trace.WithAttributes(
+ attribute.String("resource-type", resourceType),
+ attribute.StringSlice("resource-ids", req.ResourceIds),
+ attribute.String("subject", tuple.StringCoreONR(req.Subject)),
+ attribute.String("node-id", nodeID),
+ ))
+ defer span.End()
+
+ if err := dispatch.CheckDepth(ctx, req); err != nil {
+ if req.Debug != v1.DispatchCheckRequest_ENABLE_BASIC_DEBUGGING {
+ return &v1.DispatchCheckResponse{
+ Metadata: &v1.ResponseMeta{
+ DispatchCount: 0,
+ },
+ }, rewriteError(ctx, err)
+ }
+
+ // NOTE: we return debug information here to ensure tooling can see the cycle.
+ nodeID, nerr := nodeid.FromContext(ctx)
+ if nerr != nil {
+ log.Err(nerr).Msg("failed to get nodeID from context")
+ }
+
+ return &v1.DispatchCheckResponse{
+ Metadata: &v1.ResponseMeta{
+ DispatchCount: 0,
+ DebugInfo: &v1.DebugInformation{
+ Check: &v1.CheckDebugTrace{
+ Request: req,
+ SourceId: nodeID,
+ },
+ },
+ },
+ }, rewriteError(ctx, err)
+ }
+
+ revision, err := ld.parseRevision(ctx, req.Metadata.AtRevision)
+ if err != nil {
+ return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, rewriteError(ctx, err)
+ }
+
+ ns, err := ld.loadNamespace(ctx, req.ResourceRelation.Namespace, revision)
+ if err != nil {
+ return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, rewriteError(ctx, err)
+ }
+
+ relation, err := ld.lookupRelation(ctx, ns, req.ResourceRelation.Relation)
+ if err != nil {
+ return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, rewriteError(ctx, err)
+ }
+
+ // If the relation is aliasing another one and the subject does not have the same type as
+ // resource, load the aliased relation and dispatch to it. We cannot use the alias if the
+ // resource and subject types are the same because a check on the *exact same* resource and
+ // subject must pass, and we don't know how many intermediate steps may hit that case.
+ if relation.AliasingRelation != "" && req.ResourceRelation.Namespace != req.Subject.Namespace {
+ relation, err := ld.lookupRelation(ctx, ns, relation.AliasingRelation)
+ if err != nil {
+ return &v1.DispatchCheckResponse{Metadata: emptyMetadata}, rewriteError(ctx, err)
+ }
+
+ // Rewrite the request over the aliased relation.
+ validatedReq := graph.ValidatedCheckRequest{
+ DispatchCheckRequest: &v1.DispatchCheckRequest{
+ ResourceRelation: &core.RelationReference{
+ Namespace: req.ResourceRelation.Namespace,
+ Relation: relation.Name,
+ },
+ ResourceIds: req.ResourceIds,
+ Subject: req.Subject,
+ Metadata: req.Metadata,
+ Debug: req.Debug,
+ CheckHints: req.CheckHints,
+ },
+ Revision: revision,
+ OriginalRelationName: req.ResourceRelation.Relation,
+ }
+
+ resp, err := ld.checker.Check(ctx, validatedReq, relation)
+ return resp, rewriteError(ctx, err)
+ }
+
+ resp, err := ld.checker.Check(ctx, graph.ValidatedCheckRequest{
+ DispatchCheckRequest: req,
+ Revision: revision,
+ }, relation)
+ return resp, rewriteError(ctx, err)
+}
+
+// DispatchExpand implements dispatch.Expand interface
+func (ld *localDispatcher) DispatchExpand(ctx context.Context, req *v1.DispatchExpandRequest) (*v1.DispatchExpandResponse, error) {
+ nodeID, err := nodeid.FromContext(ctx)
+ if err != nil {
+ log.Err(err).Msg("failed to get node ID")
+ }
+
+ ctx, span := tracer.Start(ctx, "DispatchExpand", trace.WithAttributes(
+ attribute.String("start", tuple.StringCoreONR(req.ResourceAndRelation)),
+ attribute.String("node-id", nodeID),
+ ))
+ defer span.End()
+
+ if err := dispatch.CheckDepth(ctx, req); err != nil {
+ return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err
+ }
+
+ revision, err := ld.parseRevision(ctx, req.Metadata.AtRevision)
+ if err != nil {
+ return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err
+ }
+
+ ns, err := ld.loadNamespace(ctx, req.ResourceAndRelation.Namespace, revision)
+ if err != nil {
+ return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err
+ }
+
+ relation, err := ld.lookupRelation(ctx, ns, req.ResourceAndRelation.Relation)
+ if err != nil {
+ return &v1.DispatchExpandResponse{Metadata: emptyMetadata}, err
+ }
+
+ return ld.expander.Expand(ctx, graph.ValidatedExpandRequest{
+ DispatchExpandRequest: req,
+ Revision: revision,
+ }, relation)
+}
+
+func (ld *localDispatcher) DispatchLookupResources2(
+ req *v1.DispatchLookupResources2Request,
+ stream dispatch.LookupResources2Stream,
+) error {
+ nodeID, err := nodeid.FromContext(stream.Context())
+ if err != nil {
+ log.Err(err).Msg("failed to get node ID")
+ }
+
+ ctx, span := tracer.Start(stream.Context(), "DispatchLookupResources2", trace.WithAttributes(
+ attribute.String("resource-type", tuple.StringCoreRR(req.ResourceRelation)),
+ attribute.String("subject-type", tuple.StringCoreRR(req.SubjectRelation)),
+ attribute.StringSlice("subject-ids", req.SubjectIds),
+ attribute.String("terminal-subject", tuple.StringCoreONR(req.TerminalSubject)),
+ attribute.String("node-id", nodeID),
+ ))
+ defer span.End()
+
+ if err := dispatch.CheckDepth(ctx, req); err != nil {
+ return err
+ }
+
+ revision, err := ld.parseRevision(ctx, req.Metadata.AtRevision)
+ if err != nil {
+ return err
+ }
+
+ return ld.lookupResourcesHandler2.LookupResources2(
+ graph.ValidatedLookupResources2Request{
+ DispatchLookupResources2Request: req,
+ Revision: revision,
+ },
+ dispatch.StreamWithContext(ctx, stream),
+ )
+}
+
+// DispatchLookupSubjects implements dispatch.LookupSubjects interface
+func (ld *localDispatcher) DispatchLookupSubjects(
+ req *v1.DispatchLookupSubjectsRequest,
+ stream dispatch.LookupSubjectsStream,
+) error {
+ nodeID, err := nodeid.FromContext(stream.Context())
+ if err != nil {
+ log.Err(err).Msg("failed to get node ID")
+ }
+
+ resourceType := tuple.StringCoreRR(req.ResourceRelation)
+ subjectRelation := tuple.StringCoreRR(req.SubjectRelation)
+ spanName := "DispatchLookupSubjects → " + resourceType + "@" + subjectRelation
+
+ ctx, span := tracer.Start(stream.Context(), spanName, trace.WithAttributes(
+ attribute.String("resource-type", resourceType),
+ attribute.String("subject-type", subjectRelation),
+ attribute.StringSlice("resource-ids", req.ResourceIds),
+ attribute.String("node-id", nodeID),
+ ))
+ defer span.End()
+
+ if err := dispatch.CheckDepth(ctx, req); err != nil {
+ return err
+ }
+
+ revision, err := ld.parseRevision(ctx, req.Metadata.AtRevision)
+ if err != nil {
+ return err
+ }
+
+ return ld.lookupSubjectsHandler.LookupSubjects(
+ graph.ValidatedLookupSubjectsRequest{
+ DispatchLookupSubjectsRequest: req,
+ Revision: revision,
+ },
+ dispatch.StreamWithContext(ctx, stream),
+ )
+}
+
+func (ld *localDispatcher) Close() error {
+ return nil
+}
+
+func (ld *localDispatcher) ReadyState() dispatch.ReadyState {
+ return dispatch.ReadyState{IsReady: true}
+}
+
+func rewriteNamespaceError(original error) error {
+ nsNotFound := datastore.NamespaceNotFoundError{}
+
+ switch {
+ case errors.As(original, &nsNotFound):
+ return NewNamespaceNotFoundErr(nsNotFound.NotFoundNamespaceName())
+ case errors.As(original, &NamespaceNotFoundError{}):
+ fallthrough
+ case errors.As(original, &RelationNotFoundError{}):
+ return original
+ default:
+ return fmt.Errorf(errDispatch, original)
+ }
+}
+
+// rewriteError transforms graph errors into a gRPC Status
+func rewriteError(ctx context.Context, err error) error {
+ if err == nil {
+ return nil
+ }
+
+ // Check if the error can be directly used.
+ if st, ok := status.FromError(err); ok {
+ return st.Err()
+ }
+
+ switch {
+ case errors.Is(err, context.DeadlineExceeded):
+ return status.Errorf(codes.DeadlineExceeded, "%s", err)
+ case errors.Is(err, context.Canceled):
+ err := context.Cause(ctx)
+ if err != nil {
+ if _, ok := status.FromError(err); ok {
+ return err
+ }
+ }
+
+ return status.Errorf(codes.Canceled, "%s", err)
+ default:
+ log.Ctx(ctx).Err(err).Msg("received unexpected graph error")
+ return err
+ }
+}
+
+var emptyMetadata = &v1.ResponseMeta{
+ DispatchCount: 0,
+}