summaryrefslogtreecommitdiff
path: root/vendor/github.com/authzed/zed/pkg/backupformat/decoder.go
blob: 8278424b633968ef53b0a81e22afeaecf4f1c23d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package backupformat

import (
	"errors"
	"fmt"
	"io"

	"github.com/hamba/avro/v2"
	"github.com/hamba/avro/v2/ocf"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/types/known/structpb"

	v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
)

func init() {
	// This defaults to a 1MiB limit, but large schemas can exceed this size.
	avro.DefaultConfig = avro.Config{
		MaxByteSliceSize: 1024 * 1024 * 100, // 100 MiB
	}.Freeze()
}

func NewDecoder(r io.Reader) (*Decoder, error) {
	dec, err := ocf.NewDecoder(r)
	if err != nil {
		return nil, fmt.Errorf("unable to create ocf decoder: %w", err)
	}

	md := dec.Metadata()
	var zedToken *v1.ZedToken

	if token, ok := md[metadataKeyZT]; ok {
		zedToken = &v1.ZedToken{
			Token: string(token),
		}
	}

	var schemaText string
	if dec.HasNext() {
		var decodedSchema any
		if err := dec.Decode(&decodedSchema); err != nil {
			return nil, fmt.Errorf("unable to decode schema object: %w", err)
		}

		schema, ok := decodedSchema.(SchemaV1)
		if !ok {
			return nil, fmt.Errorf("received schema object of wrong type: %T", decodedSchema)
		}
		schemaText = schema.SchemaText
	} else {
		return nil, errors.New("avro stream contains no schema object")
	}

	return &Decoder{
		dec,
		schemaText,
		zedToken,
	}, nil
}

type Decoder struct {
	dec      *ocf.Decoder
	schema   string
	zedToken *v1.ZedToken
}

func (d *Decoder) Schema() string {
	return d.schema
}

func (d *Decoder) ZedToken() *v1.ZedToken {
	return d.zedToken
}

func (d *Decoder) Close() error {
	return nil
}

func (d *Decoder) Next() (*v1.Relationship, error) {
	if !d.dec.HasNext() {
		return nil, nil
	}

	var nextRelIFace any
	if err := d.dec.Decode(&nextRelIFace); err != nil {
		return nil, fmt.Errorf("unable to decode relationship from avro stream: %w", err)
	}

	flat := nextRelIFace.(RelationshipV1)

	rel := &v1.Relationship{
		Resource: &v1.ObjectReference{
			ObjectType: flat.ObjectType,
			ObjectId:   flat.ObjectID,
		},
		Relation: flat.Relation,
		Subject: &v1.SubjectReference{
			Object: &v1.ObjectReference{
				ObjectType: flat.SubjectObjectType,
				ObjectId:   flat.SubjectObjectID,
			},
			OptionalRelation: flat.SubjectRelation,
		},
	}

	if flat.CaveatName != "" {
		var deserializedCtxt structpb.Struct

		if err := proto.Unmarshal(flat.CaveatContext, &deserializedCtxt); err != nil {
			return nil, fmt.Errorf("unable to deserialize caveat context: %w", err)
		}

		rel.OptionalCaveat = &v1.ContextualizedCaveat{
			CaveatName: flat.CaveatName,
			Context:    &deserializedCtxt,
		}
	}

	return rel, nil
}