1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
package backupformat
import (
"errors"
"fmt"
"io"
"github.com/hamba/avro/v2"
"github.com/hamba/avro/v2/ocf"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
)
func init() {
// This defaults to a 1MiB limit, but large schemas can exceed this size.
avro.DefaultConfig = avro.Config{
MaxByteSliceSize: 1024 * 1024 * 100, // 100 MiB
}.Freeze()
}
func NewDecoder(r io.Reader) (*Decoder, error) {
dec, err := ocf.NewDecoder(r)
if err != nil {
return nil, fmt.Errorf("unable to create ocf decoder: %w", err)
}
md := dec.Metadata()
var zedToken *v1.ZedToken
if token, ok := md[metadataKeyZT]; ok {
zedToken = &v1.ZedToken{
Token: string(token),
}
}
var schemaText string
if dec.HasNext() {
var decodedSchema any
if err := dec.Decode(&decodedSchema); err != nil {
return nil, fmt.Errorf("unable to decode schema object: %w", err)
}
schema, ok := decodedSchema.(SchemaV1)
if !ok {
return nil, fmt.Errorf("received schema object of wrong type: %T", decodedSchema)
}
schemaText = schema.SchemaText
} else {
return nil, errors.New("avro stream contains no schema object")
}
return &Decoder{
dec,
schemaText,
zedToken,
}, nil
}
type Decoder struct {
dec *ocf.Decoder
schema string
zedToken *v1.ZedToken
}
func (d *Decoder) Schema() string {
return d.schema
}
func (d *Decoder) ZedToken() *v1.ZedToken {
return d.zedToken
}
func (d *Decoder) Close() error {
return nil
}
func (d *Decoder) Next() (*v1.Relationship, error) {
if !d.dec.HasNext() {
return nil, nil
}
var nextRelIFace any
if err := d.dec.Decode(&nextRelIFace); err != nil {
return nil, fmt.Errorf("unable to decode relationship from avro stream: %w", err)
}
flat := nextRelIFace.(RelationshipV1)
rel := &v1.Relationship{
Resource: &v1.ObjectReference{
ObjectType: flat.ObjectType,
ObjectId: flat.ObjectID,
},
Relation: flat.Relation,
Subject: &v1.SubjectReference{
Object: &v1.ObjectReference{
ObjectType: flat.SubjectObjectType,
ObjectId: flat.SubjectObjectID,
},
OptionalRelation: flat.SubjectRelation,
},
}
if flat.CaveatName != "" {
var deserializedCtxt structpb.Struct
if err := proto.Unmarshal(flat.CaveatContext, &deserializedCtxt); err != nil {
return nil, fmt.Errorf("unable to deserialize caveat context: %w", err)
}
rel.OptionalCaveat = &v1.ContextualizedCaveat{
CaveatName: flat.CaveatName,
Context: &deserializedCtxt,
}
}
return rel, nil
}
|