summaryrefslogtreecommitdiff
path: root/vendor/github.com/hamba/avro/v2/reader_generic.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hamba/avro/v2/reader_generic.go')
-rw-r--r--vendor/github.com/hamba/avro/v2/reader_generic.go163
1 files changed, 163 insertions, 0 deletions
diff --git a/vendor/github.com/hamba/avro/v2/reader_generic.go b/vendor/github.com/hamba/avro/v2/reader_generic.go
new file mode 100644
index 0000000..32d341c
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/reader_generic.go
@@ -0,0 +1,163 @@
+package avro
+
+import (
+ "fmt"
+ "reflect"
+ "time"
+)
+
+// ReadNext reads the next Avro element as a generic interface.
+func (r *Reader) ReadNext(schema Schema) any {
+ var ls LogicalSchema
+ lts, ok := schema.(LogicalTypeSchema)
+ if ok {
+ ls = lts.Logical()
+ }
+
+ switch schema.Type() {
+ case Boolean:
+ return r.ReadBool()
+ case Int:
+ if ls != nil {
+ switch ls.Type() {
+ case Date:
+ i := r.ReadInt()
+ sec := int64(i) * int64(24*time.Hour/time.Second)
+ return time.Unix(sec, 0).UTC()
+
+ case TimeMillis:
+ return time.Duration(r.ReadInt()) * time.Millisecond
+ }
+ }
+ return int(r.ReadInt())
+ case Long:
+ if ls != nil {
+ switch ls.Type() {
+ case TimeMicros:
+ return time.Duration(r.ReadLong()) * time.Microsecond
+
+ case TimestampMillis:
+ i := r.ReadLong()
+ sec := i / 1e3
+ nsec := (i - sec*1e3) * 1e6
+ return time.Unix(sec, nsec).UTC()
+
+ case TimestampMicros:
+ i := r.ReadLong()
+ sec := i / 1e6
+ nsec := (i - sec*1e6) * 1e3
+ return time.Unix(sec, nsec).UTC()
+ }
+ }
+ return r.ReadLong()
+ case Float:
+ return r.ReadFloat()
+ case Double:
+ return r.ReadDouble()
+ case String:
+ return r.ReadString()
+ case Bytes:
+ if ls != nil && ls.Type() == Decimal {
+ dec := ls.(*DecimalLogicalSchema)
+ return ratFromBytes(r.ReadBytes(), dec.Scale())
+ }
+ return r.ReadBytes()
+ case Record:
+ fields := schema.(*RecordSchema).Fields()
+ obj := make(map[string]any, len(fields))
+ for _, field := range fields {
+ obj[field.Name()] = r.ReadNext(field.Type())
+ }
+ return obj
+ case Ref:
+ return r.ReadNext(schema.(*RefSchema).Schema())
+ case Enum:
+ symbols := schema.(*EnumSchema).Symbols()
+ idx := int(r.ReadInt())
+ if idx < 0 || idx >= len(symbols) {
+ r.ReportError("Read", "unknown enum symbol")
+ return nil
+ }
+ return symbols[idx]
+ case Array:
+ arr := []any{}
+ r.ReadArrayCB(func(r *Reader) bool {
+ elem := r.ReadNext(schema.(*ArraySchema).Items())
+ arr = append(arr, elem)
+ return true
+ })
+ return arr
+ case Map:
+ obj := map[string]any{}
+ r.ReadMapCB(func(r *Reader, field string) bool {
+ elem := r.ReadNext(schema.(*MapSchema).Values())
+ obj[field] = elem
+ return true
+ })
+ return obj
+ case Union:
+ types := schema.(*UnionSchema).Types()
+ idx := int(r.ReadLong())
+ if idx < 0 || idx > len(types)-1 {
+ r.ReportError("Read", "unknown union type")
+ return nil
+ }
+ schema = types[idx]
+ if schema.Type() == Null {
+ return nil
+ }
+
+ key := schemaTypeName(schema)
+ obj := map[string]any{}
+ obj[key] = r.ReadNext(types[idx])
+ return obj
+ case Fixed:
+ size := schema.(*FixedSchema).Size()
+ obj := make([]byte, size)
+ r.Read(obj)
+ if ls != nil && ls.Type() == Decimal {
+ dec := ls.(*DecimalLogicalSchema)
+ return ratFromBytes(obj, dec.Scale())
+ }
+ return byteSliceToArray(obj, size)
+ default:
+ r.ReportError("Read", fmt.Sprintf("unexpected schema type: %v", schema.Type()))
+ return nil
+ }
+}
+
+// ReadArrayCB reads an array with a callback per item.
+func (r *Reader) ReadArrayCB(fn func(*Reader) bool) {
+ for {
+ l, _ := r.ReadBlockHeader()
+ if l == 0 {
+ break
+ }
+ for range l {
+ fn(r)
+ }
+ }
+}
+
+// ReadMapCB reads an array with a callback per item.
+func (r *Reader) ReadMapCB(fn func(*Reader, string) bool) {
+ for {
+ l, _ := r.ReadBlockHeader()
+ if l == 0 {
+ break
+ }
+
+ for range l {
+ field := r.ReadString()
+ fn(r, field)
+ }
+ }
+}
+
+var byteType = reflect.TypeOf((*byte)(nil)).Elem()
+
+func byteSliceToArray(b []byte, size int) any {
+ vArr := reflect.New(reflect.ArrayOf(size, byteType)).Elem()
+ reflect.Copy(vArr, reflect.ValueOf(b))
+ return vArr.Interface()
+}