summaryrefslogtreecommitdiff
path: root/vendor/github.com/hamba/avro/v2/codec.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hamba/avro/v2/codec.go')
-rw-r--r--vendor/github.com/hamba/avro/v2/codec.go254
1 files changed, 254 insertions, 0 deletions
diff --git a/vendor/github.com/hamba/avro/v2/codec.go b/vendor/github.com/hamba/avro/v2/codec.go
new file mode 100644
index 0000000..c868f51
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec.go
@@ -0,0 +1,254 @@
+package avro
+
+import (
+ "fmt"
+ "math/big"
+ "reflect"
+ "time"
+ "unsafe"
+
+ "github.com/modern-go/reflect2"
+)
+
+var (
+ timeType = reflect.TypeOf(time.Time{})
+ timeDurationType = reflect.TypeOf(time.Duration(0))
+ ratType = reflect.TypeOf(big.Rat{})
+ durType = reflect.TypeOf(LogicalDuration{})
+)
+
+type null struct{}
+
+// ValDecoder represents an internal value decoder.
+//
+// You should never use ValDecoder directly.
+type ValDecoder interface {
+ Decode(ptr unsafe.Pointer, r *Reader)
+}
+
+// ValEncoder represents an internal value encoder.
+//
+// You should never use ValEncoder directly.
+type ValEncoder interface {
+ Encode(ptr unsafe.Pointer, w *Writer)
+}
+
+// ReadVal parses Avro value and stores the result in the value pointed to by obj.
+func (r *Reader) ReadVal(schema Schema, obj any) {
+ decoder := r.cfg.getDecoderFromCache(schema.CacheFingerprint(), reflect2.RTypeOf(obj))
+ if decoder == nil {
+ typ := reflect2.TypeOf(obj)
+ if typ.Kind() != reflect.Ptr {
+ r.ReportError("ReadVal", "can only unmarshal into pointer")
+ return
+ }
+ decoder = r.cfg.DecoderOf(schema, typ)
+ }
+
+ ptr := reflect2.PtrOf(obj)
+ if ptr == nil {
+ r.ReportError("ReadVal", "can not read into nil pointer")
+ return
+ }
+
+ decoder.Decode(ptr, r)
+}
+
+// WriteVal writes the Avro encoding of obj.
+func (w *Writer) WriteVal(schema Schema, val any) {
+ encoder := w.cfg.getEncoderFromCache(schema.Fingerprint(), reflect2.RTypeOf(val))
+ if encoder == nil {
+ typ := reflect2.TypeOf(val)
+ encoder = w.cfg.EncoderOf(schema, typ)
+ }
+ encoder.Encode(reflect2.PtrOf(val), w)
+}
+
+func (c *frozenConfig) DecoderOf(schema Schema, typ reflect2.Type) ValDecoder {
+ rtype := typ.RType()
+ decoder := c.getDecoderFromCache(schema.CacheFingerprint(), rtype)
+ if decoder != nil {
+ return decoder
+ }
+
+ ptrType := typ.(*reflect2.UnsafePtrType)
+ decoder = decoderOfType(newDecoderContext(c), schema, ptrType.Elem())
+ c.addDecoderToCache(schema.CacheFingerprint(), rtype, decoder)
+ return decoder
+}
+
+type deferDecoder struct {
+ decoder ValDecoder
+}
+
+func (d *deferDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ d.decoder.Decode(ptr, r)
+}
+
+type deferEncoder struct {
+ encoder ValEncoder
+}
+
+func (d *deferEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
+ d.encoder.Encode(ptr, w)
+}
+
+type decoderContext struct {
+ cfg *frozenConfig
+ decoders map[cacheKey]ValDecoder
+}
+
+func newDecoderContext(cfg *frozenConfig) *decoderContext {
+ return &decoderContext{
+ cfg: cfg,
+ decoders: make(map[cacheKey]ValDecoder),
+ }
+}
+
+type encoderContext struct {
+ cfg *frozenConfig
+ encoders map[cacheKey]ValEncoder
+}
+
+func newEncoderContext(cfg *frozenConfig) *encoderContext {
+ return &encoderContext{
+ cfg: cfg,
+ encoders: make(map[cacheKey]ValEncoder),
+ }
+}
+
+//nolint:dupl
+func decoderOfType(d *decoderContext, schema Schema, typ reflect2.Type) ValDecoder {
+ if dec := createDecoderOfMarshaler(schema, typ); dec != nil {
+ return dec
+ }
+
+ // Handle eface (empty interface) case when it isn't a union
+ if typ.Kind() == reflect.Interface && schema.Type() != Union {
+ if _, ok := typ.(*reflect2.UnsafeIFaceType); !ok {
+ return newEfaceDecoder(d, schema)
+ }
+ }
+
+ switch schema.Type() {
+ case Null:
+ return &nullCodec{}
+ case String, Bytes, Int, Long, Float, Double, Boolean:
+ return createDecoderOfNative(schema.(*PrimitiveSchema), typ)
+ case Record:
+ key := cacheKey{fingerprint: schema.CacheFingerprint(), rtype: typ.RType()}
+ defDec := &deferDecoder{}
+ d.decoders[key] = defDec
+ defDec.decoder = createDecoderOfRecord(d, schema.(*RecordSchema), typ)
+ return defDec.decoder
+ case Ref:
+ key := cacheKey{fingerprint: schema.(*RefSchema).Schema().CacheFingerprint(), rtype: typ.RType()}
+ if dec, f := d.decoders[key]; f {
+ return dec
+ }
+ return decoderOfType(d, schema.(*RefSchema).Schema(), typ)
+ case Enum:
+ return createDecoderOfEnum(schema.(*EnumSchema), typ)
+ case Array:
+ return createDecoderOfArray(d, schema.(*ArraySchema), typ)
+ case Map:
+ return createDecoderOfMap(d, schema.(*MapSchema), typ)
+ case Union:
+ return createDecoderOfUnion(d, schema.(*UnionSchema), typ)
+ case Fixed:
+ return createDecoderOfFixed(schema.(*FixedSchema), typ)
+ default:
+ // It is impossible to get here with a valid schema
+ return &errorDecoder{err: fmt.Errorf("avro: schema type %s is unsupported", schema.Type())}
+ }
+}
+
+func (c *frozenConfig) EncoderOf(schema Schema, typ reflect2.Type) ValEncoder {
+ if typ == nil {
+ typ = reflect2.TypeOf((*null)(nil))
+ }
+
+ rtype := typ.RType()
+ encoder := c.getEncoderFromCache(schema.Fingerprint(), rtype)
+ if encoder != nil {
+ return encoder
+ }
+
+ encoder = encoderOfType(newEncoderContext(c), schema, typ)
+ if typ.LikePtr() {
+ encoder = &onePtrEncoder{encoder}
+ }
+ c.addEncoderToCache(schema.Fingerprint(), rtype, encoder)
+ return encoder
+}
+
+type onePtrEncoder struct {
+ enc ValEncoder
+}
+
+func (e *onePtrEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
+ e.enc.Encode(noescape(unsafe.Pointer(&ptr)), w)
+}
+
+//nolint:dupl
+func encoderOfType(e *encoderContext, schema Schema, typ reflect2.Type) ValEncoder {
+ if enc := createEncoderOfMarshaler(schema, typ); enc != nil {
+ return enc
+ }
+
+ if typ.Kind() == reflect.Interface {
+ return &interfaceEncoder{schema: schema, typ: typ}
+ }
+
+ switch schema.Type() {
+ case Null:
+ return &nullCodec{}
+ case String, Bytes, Int, Long, Float, Double, Boolean:
+ return createEncoderOfNative(schema.(*PrimitiveSchema), typ)
+ case Record:
+ key := cacheKey{fingerprint: schema.Fingerprint(), rtype: typ.RType()}
+ defEnc := &deferEncoder{}
+ e.encoders[key] = defEnc
+ defEnc.encoder = createEncoderOfRecord(e, schema.(*RecordSchema), typ)
+ return defEnc.encoder
+ case Ref:
+ key := cacheKey{fingerprint: schema.(*RefSchema).Schema().Fingerprint(), rtype: typ.RType()}
+ if enc, f := e.encoders[key]; f {
+ return enc
+ }
+ return encoderOfType(e, schema.(*RefSchema).Schema(), typ)
+ case Enum:
+ return createEncoderOfEnum(schema.(*EnumSchema), typ)
+ case Array:
+ return createEncoderOfArray(e, schema.(*ArraySchema), typ)
+ case Map:
+ return createEncoderOfMap(e, schema.(*MapSchema), typ)
+ case Union:
+ return createEncoderOfUnion(e, schema.(*UnionSchema), typ)
+ case Fixed:
+ return createEncoderOfFixed(schema.(*FixedSchema), typ)
+ default:
+ // It is impossible to get here with a valid schema
+ return &errorEncoder{err: fmt.Errorf("avro: schema type %s is unsupported", schema.Type())}
+ }
+}
+
+type errorDecoder struct {
+ err error
+}
+
+func (d *errorDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ if r.Error == nil {
+ r.Error = d.err
+ }
+}
+
+type errorEncoder struct {
+ err error
+}
+
+func (e *errorEncoder) Encode(_ unsafe.Pointer, w *Writer) {
+ if w.Error == nil {
+ w.Error = e.err
+ }
+}