summaryrefslogtreecommitdiff
path: root/vendor/github.com/authzed/spicedb/internal/datastore/revisions/remoteclock.go
blob: ef793c86d366a6a0474efa9cc0d0afaad31eeaaa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package revisions

import (
	"context"
	"time"

	log "github.com/authzed/spicedb/internal/logging"
	"github.com/authzed/spicedb/pkg/datastore"
	"github.com/authzed/spicedb/pkg/spiceerrors"
)

// RemoteNowFunction queries the datastore to get a current revision.
type RemoteNowFunction func(context.Context) (datastore.Revision, error)

// RemoteClockRevisions handles revision calculation for datastores that provide
// their own clocks.
type RemoteClockRevisions struct {
	*CachedOptimizedRevisions

	gcWindowNanos          int64
	nowFunc                RemoteNowFunction
	followerReadDelayNanos int64
	quantizationNanos      int64
}

// NewRemoteClockRevisions returns a RemoteClockRevisions for the given configuration
func NewRemoteClockRevisions(gcWindow, maxRevisionStaleness, followerReadDelay, quantization time.Duration) *RemoteClockRevisions {
	// Ensure the max revision staleness never exceeds the GC window.
	if maxRevisionStaleness > gcWindow {
		log.Warn().
			Dur("maxRevisionStaleness", maxRevisionStaleness).
			Dur("gcWindow", gcWindow).
			Msg("the configured maximum revision staleness exceeds the configured gc window, so capping to gcWindow")
		maxRevisionStaleness = gcWindow - 1
	}

	revisions := &RemoteClockRevisions{
		CachedOptimizedRevisions: NewCachedOptimizedRevisions(
			maxRevisionStaleness,
		),
		gcWindowNanos:          gcWindow.Nanoseconds(),
		followerReadDelayNanos: followerReadDelay.Nanoseconds(),
		quantizationNanos:      quantization.Nanoseconds(),
	}

	revisions.SetOptimizedRevisionFunc(revisions.optimizedRevisionFunc)

	return revisions
}

func (rcr *RemoteClockRevisions) optimizedRevisionFunc(ctx context.Context) (datastore.Revision, time.Duration, error) {
	nowRev, err := rcr.nowFunc(ctx)
	if err != nil {
		return datastore.NoRevision, 0, err
	}

	if nowRev == datastore.NoRevision {
		return datastore.NoRevision, 0, datastore.NewInvalidRevisionErr(nowRev, datastore.CouldNotDetermineRevision)
	}

	nowTS, ok := nowRev.(WithTimestampRevision)
	if !ok {
		return datastore.NoRevision, 0, spiceerrors.MustBugf("expected with-timestamp revision, got %T", nowRev)
	}

	delayedNow := nowTS.TimestampNanoSec() - rcr.followerReadDelayNanos
	quantized := delayedNow
	validForNanos := int64(0)
	if rcr.quantizationNanos > 0 {
		afterLastQuantization := delayedNow % rcr.quantizationNanos
		quantized -= afterLastQuantization
		validForNanos = rcr.quantizationNanos - afterLastQuantization
	}
	log.Ctx(ctx).Debug().
		Time("quantized", time.Unix(0, quantized)).
		Int64("readSkew", rcr.followerReadDelayNanos).
		Int64("totalSkew", nowTS.TimestampNanoSec()-quantized).
		Msg("revision skews")

	return nowTS.ConstructForTimestamp(quantized), time.Duration(validForNanos) * time.Nanosecond, nil
}

// SetNowFunc sets the function used to determine the head revision
func (rcr *RemoteClockRevisions) SetNowFunc(nowFunc RemoteNowFunction) {
	rcr.nowFunc = nowFunc
}

func (rcr *RemoteClockRevisions) CheckRevision(ctx context.Context, dsRevision datastore.Revision) error {
	if dsRevision == datastore.NoRevision {
		return datastore.NewInvalidRevisionErr(dsRevision, datastore.CouldNotDetermineRevision)
	}

	revision := dsRevision.(WithTimestampRevision)

	ctx, span := tracer.Start(ctx, "CheckRevision")
	defer span.End()

	// Make sure the system time indicated is within the software GC window
	now, err := rcr.nowFunc(ctx)
	if err != nil {
		return err
	}

	nowTS, ok := now.(WithTimestampRevision)
	if !ok {
		return spiceerrors.MustBugf("expected HLC revision, got %T", now)
	}

	nowNanos := nowTS.TimestampNanoSec()
	revisionNanos := revision.TimestampNanoSec()

	isStale := revisionNanos < (nowNanos - rcr.gcWindowNanos)
	if isStale {
		log.Ctx(ctx).Debug().Stringer("now", now).Stringer("revision", revision).Msg("stale revision")
		return datastore.NewInvalidRevisionErr(revision, datastore.RevisionStale)
	}

	isUnknown := revisionNanos > nowNanos
	if isUnknown {
		log.Ctx(ctx).Debug().Stringer("now", now).Stringer("revision", revision).Msg("unknown revision")
		return datastore.NewInvalidRevisionErr(revision, datastore.CouldNotDetermineRevision)
	}

	return nil
}