diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-24 17:58:01 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-24 17:58:01 -0600 |
| commit | 72296119fc9755774719f8f625ad03e0e0ec457a (patch) | |
| tree | ed236ddee12a20fb55b7cfecf13f62d3a000dcb5 /vendor/github.com/hamba | |
| parent | a920a8cfe415858bb2777371a77018599ffed23f (diff) | |
| parent | eaa1bd3b8e12934aed06413d75e7482ac58d805a (diff) | |
Merge branch 'the-spice-must-flow' into 'main'
Add SpiceDB Authorization
See merge request gitlab-org/software-supply-chain-security/authorization/sparkled!19
Diffstat (limited to 'vendor/github.com/hamba')
45 files changed, 9174 insertions, 0 deletions
diff --git a/vendor/github.com/hamba/avro/v2/.gitignore b/vendor/github.com/hamba/avro/v2/.gitignore new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/.gitignore diff --git a/vendor/github.com/hamba/avro/v2/.golangci.yml b/vendor/github.com/hamba/avro/v2/.golangci.yml new file mode 100644 index 0000000..fa86e9a --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/.golangci.yml @@ -0,0 +1,49 @@ +run: + tests: false + deadline: 5m + +linters-settings: + gofumpt: + extra-rules: true + gosec: + excludes: + - G115 + +linters: + enable-all: true + disable: + - cyclop # duplicate of gocyclo + - exportloopref # deprecated + - depguard + - err113 + - exhaustive + - exhaustruct + - forcetypeassert + - funlen + - gochecknoglobals + - gochecknoinits + - gocognit + - goconst + - gocyclo + - gosmopolitan + - inamedparam + - interfacebloat + - ireturn + - mnd + - nestif + - nlreturn + - nonamedreturns + - tagliatelle + - varnamelen + - wrapcheck + - wsl + +issues: + exclude-use-default: false + exclude: + - 'package-comments: should have a package comment' + - 'G103: Use of unsafe calls should be audited' + exclude-rules: + - path: (schema|protocol)\.go + linters: + - gosec
\ No newline at end of file diff --git a/vendor/github.com/hamba/avro/v2/CODE_OF_CONDUCT.md b/vendor/github.com/hamba/avro/v2/CODE_OF_CONDUCT.md new file mode 100644 index 0000000..f3e9129 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/CODE_OF_CONDUCT.md @@ -0,0 +1,76 @@ +# Contributor Covenant Code of Conduct + +## Our Pledge + +In the interest of fostering an open and welcoming environment, we as +contributors and maintainers pledge to making participation in our project and +our community a harassment-free experience for everyone, regardless of age, body +size, disability, ethnicity, sex characteristics, gender identity and expression, +level of experience, education, socio-economic status, nationality, personal +appearance, race, religion, or sexual identity and orientation. + +## Our Standards + +Examples of behavior that contributes to creating a positive environment +include: + +* Using welcoming and inclusive language +* Being respectful of differing viewpoints and experiences +* Gracefully accepting constructive criticism +* Focusing on what is best for the community +* Showing empathy towards other community members + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery and unwelcome sexual attention or + advances +* Trolling, insulting/derogatory comments, and personal or political attacks +* Public or private harassment +* Publishing others' private information, such as a physical or electronic + address, without explicit permission +* Other conduct which could reasonably be considered inappropriate in a + professional setting + +## Our Responsibilities + +Project maintainers are responsible for clarifying the standards of acceptable +behavior and are expected to take appropriate and fair corrective action in +response to any instances of unacceptable behavior. + +Project maintainers have the right and responsibility to remove, edit, or +reject comments, commits, code, wiki edits, issues, and other contributions +that are not aligned to this Code of Conduct, or to ban temporarily or +permanently any contributor for other behaviors that they deem inappropriate, +threatening, offensive, or harmful. + +## Scope + +This Code of Conduct applies both within project spaces and in public spaces +when an individual is representing the project or its community. Examples of +representing a project or community include using an official project e-mail +address, posting via an official social media account, or acting as an appointed +representative at an online or offline event. Representation of a project may be +further defined and clarified by project maintainers. + +## Enforcement + +Instances of abusive, harassing, or otherwise unacceptable behavior may be +reported by contacting the project team at nick@wiersma.co.za. All +complaints will be reviewed and investigated and will result in a response that +is deemed necessary and appropriate to the circumstances. The project team is +obligated to maintain confidentiality with regard to the reporter of an incident. +Further details of specific enforcement policies may be posted separately. + +Project maintainers who do not follow or enforce the Code of Conduct in good +faith may face temporary or permanent repercussions as determined by other +members of the project's leadership. + +## Attribution + +This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, +available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html + +[homepage]: https://www.contributor-covenant.org + +For answers to common questions about this code of conduct, see +https://www.contributor-covenant.org/faq diff --git a/vendor/github.com/hamba/avro/v2/LICENCE b/vendor/github.com/hamba/avro/v2/LICENCE new file mode 100644 index 0000000..fac73a4 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/LICENCE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Nicholas Wiersma + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.
\ No newline at end of file diff --git a/vendor/github.com/hamba/avro/v2/Makefile b/vendor/github.com/hamba/avro/v2/Makefile new file mode 100644 index 0000000..3de65fa --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/Makefile @@ -0,0 +1,27 @@ +# Format all files +fmt: + @echo "==> Formatting source" + @gofmt -s -w $(shell find . -type f -name '*.go' -not -path "./vendor/*") + @echo "==> Done" +.PHONY: fmt + +# Tidy the go.mod file +tidy: + @echo "==> Cleaning go.mod" + @go mod tidy + @echo "==> Done" +.PHONY: tidy + +# Run all tests +test: + @go test -cover -race ./... +.PHONY: test + +# Lint the project +lint: + @golangci-lint run ./... +.PHONY: lint + +# Run CI tasks +ci: lint test +.PHONY: ci diff --git a/vendor/github.com/hamba/avro/v2/README.md b/vendor/github.com/hamba/avro/v2/README.md new file mode 100644 index 0000000..7ccd9b4 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/README.md @@ -0,0 +1,257 @@ +<picture> + <source media="(prefers-color-scheme: dark)" srcset="http://svg.wiersma.co.za/hamba/project?title=avro&tag=A%20fast%20Go%20avro%20codec&mode=dark"> + <source media="(prefers-color-scheme: light)" srcset="http://svg.wiersma.co.za/hamba/project?title=avro&tag=A%20fast%20Go%20avro%20codec"> + <img alt="Logo" src="http://svg.wiersma.co.za/hamba/project?title=avro&tag=A%20fast%20Go%20avro%20codec"> +</picture> + +[](https://goreportcard.com/report/github.com/hamba/avro/v2) +[](https://github.com/hamba/avro/actions) +[](https://coveralls.io/github/hamba/avro?branch=main) +[](https://pkg.go.dev/github.com/hamba/avro/v2) +[](https://github.com/hamba/avro/releases) +[](https://raw.githubusercontent.com/hamba/avro/master/LICENSE) + +A fast Go avro codec + +## Overview + +Install with: + +```shell +go get github.com/hamba/avro/v2 +``` + +**Note:** This project has renamed the default branch from `master` to `main`. You will need to update your local environment. + +## Usage + +```go +type SimpleRecord struct { + A int64 `avro:"a"` + B string `avro:"b"` +} + +schema, err := avro.Parse(`{ + "type": "record", + "name": "simple", + "namespace": "org.hamba.avro", + "fields" : [ + {"name": "a", "type": "long"}, + {"name": "b", "type": "string"} + ] +}`) +if err != nil { + log.Fatal(err) +} + +in := SimpleRecord{A: 27, B: "foo"} + +data, err := avro.Marshal(schema, in) +if err != nil { + log.Fatal(err) +} + +fmt.Println(data) +// Outputs: [54 6 102 111 111] + +out := SimpleRecord{} +err = avro.Unmarshal(schema, data, &out) +if err != nil { + log.Fatal(err) +} + +fmt.Println(out) +// Outputs: {27 foo} +``` + +More examples in the [godoc](https://pkg.go.dev/github.com/hamba/avro/v2). + +#### Types Conversions + +| Avro | Go Struct | Go Interface | +|-------------------------------|------------------------------------------------------------|--------------------------| +| `null` | `nil` | `nil` | +| `boolean` | `bool` | `bool` | +| `bytes` | `[]byte` | `[]byte` | +| `float` | `float32` | `float32` | +| `double` | `float64` | `float64` | +| `long` | `int`\*, `int64`, `uint32`\** | `int`, `int64`, `uint32` | +| `int` | `int`\*, `int32`, `int16`, `int8`, `uint8`\**, `uint16`\** | `int`, `uint8`, `uint16` | +| `fixed` | `uint64` | `uint64` | +| `string` | `string` | `string` | +| `array` | `[]T` | `[]any` | +| `enum` | `string` | `string` | +| `fixed` | `[n]byte` | `[n]byte` | +| `map` | `map[string]T{}` | `map[string]any` | +| `record` | `struct` | `map[string]any` | +| `union` | *see below* | *see below* | +| `int.date` | `time.Time` | `time.Time` | +| `int.time-millis` | `time.Duration` | `time.Duration` | +| `long.time-micros` | `time.Duration` | `time.Duration` | +| `long.timestamp-millis` | `time.Time` | `time.Time` | +| `long.timestamp-micros` | `time.Time` | `time.Time` | +| `long.local-timestamp-millis` | `time.Time` | `time.Time` | +| `long.local-timestamp-micros` | `time.Time` | `time.Time` | +| `bytes.decimal` | `*big.Rat` | `*big.Rat` | +| `fixed.decimal` | `*big.Rat` | `*big.Rat` | +| `string.uuid` | `string` | `string` | + +\* Please note that the size of the Go type `int` is platform dependent. Decoding an Avro `long` into a Go `int` is +only allowed on 64-bit platforms and will result in an error on 32-bit platforms. Similarly, be careful when encoding a +Go `int` using Avro `int` on a 64-bit platform, as that can result in an integer overflow causing misinterpretation of +the data. + +\** Please note that when the Go type is an unsigned integer care must be taken to ensure that information is not lost +when converting between the Avro type and Go type. For example, storing a *negative* number in Avro of `int = -100` +would be interpreted as `uint16 = 65,436` in Go. Another example would be storing numbers in Avro `int = 256` that +are larger than the Go type `uint8 = 0`. + +##### Unions + +The following union types are accepted: `map[string]any`, `*T` and `any`. + +* **map[string]any:** If the union value is `nil`, a `nil` map will be en/decoded. +When a non-`nil` union value is encountered, a single key is en/decoded. The key is the avro +type name, or scheam full name in the case of a named schema (enum, fixed or record). +* ***T:** This is allowed in a "nullable" union. A nullable union is defined as a two schema union, +with one of the types being `null` (ie. `["null", "string"]` or `["string", "null"]`), in this case +a `*T` is allowed, with `T` matching the conversion table above. In the case of a slice, the slice can be used +directly. +* **any:** An `interface` can be provided and the type or name resolved. Primitive types +are pre-registered, but named types, maps and slices will need to be registered with the `Register` function. +In the case of arrays and maps the enclosed schema type or name is postfix to the type with a `:` separator, +e.g `"map:string"`. Behavior when a type cannot be resolved will depend on your chosen configuation options: + * !Config.UnionResolutionError && !Config.PartialUnionTypeResolution: the map type above is used + * Config.UnionResolutionError && !Config.PartialUnionTypeResolution: an error is returned + * !Config.UnionResolutionError && Config.PartialUnionTypeResolution: any registered type will get resolved while any unregistered type will fallback to the map type above. + * Config.UnionResolutionError && !Config.PartialUnionTypeResolution: any registered type will get resolved while any unregistered type will return an error. + +##### TextMarshaler and TextUnmarshaler + +The interfaces `TextMarshaler` and `TextUnmarshaler` are supported for a `string` schema type. The object will +be tested first for implementation of these interfaces, in the case of a `string` schema, before trying regular +encoding and decoding. + +Enums may also implement `TextMarshaler` and `TextUnmarshaler`, and must resolve to valid symbols in the given enum schema. + +##### Identical Underlying Types + +One type can be [ConvertibleTo](https://go.dev/ref/spec#Conversions) another type if they have identical underlying types. +A non-native type is allowed be used if it can be convertible to *time.Time*, *big.Rat* or *avro.LogicalDuration* for the particular of *LogicalTypes*. + +Ex.: `type Timestamp time.Time` + +##### Untrusted Input With Bytes and Strings + +For security reasons, the configuration `Config.MaxByteSliceSize` restricts the maximum size of `bytes` and `string` types created +by the `Reader`. The default maximum size is `1MiB` and is configurable. This is required to stop untrusted input from consuming all memory and +crashing the application. Should this not be need, setting a negative number will disable the behaviour. + +## Benchmark + +Benchmark source code can be found at: [https://github.com/nrwiersma/avro-benchmarks](https://github.com/nrwiersma/avro-benchmarks) + +``` +BenchmarkGoAvroDecode-8 788455 1505 ns/op 418 B/op 27 allocs/op +BenchmarkGoAvroEncode-8 624343 1908 ns/op 806 B/op 63 allocs/op +BenchmarkGoGenAvroDecode-8 1360375 876.4 ns/op 320 B/op 11 allocs/op +BenchmarkGoGenAvroEncode-8 2801583 425.9 ns/op 240 B/op 3 allocs/op +BenchmarkHambaDecode-8 5046832 238.7 ns/op 47 B/op 0 allocs/op +BenchmarkHambaEncode-8 6017635 196.2 ns/op 112 B/op 1 allocs/op +BenchmarkLinkedinDecode-8 1000000 1003 ns/op 1688 B/op 35 allocs/op +BenchmarkLinkedinEncode-8 3170553 381.5 ns/op 248 B/op 5 allocs/op +``` + +Always benchmark with your own workload. The result depends heavily on the data input. + +## Go structs generation + +Go structs can be generated for you from the schema. The types generated follow the same logic in [types conversions](#types-conversions) + +Install the struct generator with: + +```shell +go install github.com/hamba/avro/v2/cmd/avrogen@<version> +``` + +Example usage assuming there's a valid schema in `in.avsc`: + +```shell +avrogen -pkg avro -o bla.go -tags json:snake,yaml:upper-camel in.avsc +``` + +**Tip:** Omit `-o FILE` to dump the generated Go structs to stdout instead of a file. + +Check the options and usage with `-h`: + +```shell +avrogen -h +``` + +Or use it as a lib in internal commands, it's the `gen` package + +## Avro schema validation + +### avrosv + +A small Avro schema validation command-line utility is also available. This simple tool leverages the +schema parsing functionality of the library, showing validation errors or optionally dumping parsed +schemas to the console. It can be used in CI/CD pipelines to validate schema changes in a repository. + +Install the Avro schema validator with: + +```shell +go install github.com/hamba/avro/v2/cmd/avrosv@<version> +``` + +Example usage assuming there's a valid schema in `in.avsc` (exit status code is `0`): + +```shell +avrosv in.avsc +``` + +An invalid schema will result in a diagnostic output and a non-zero exit status code: + +```shell +avrosv bad-default-schema.avsc; echo $? +Error: avro: invalid default for field someString. <nil> not a string +2 +``` + +Schemas referencing other schemas can also be validated by providing all of them (schemas are parsed in order): + +```shell +avrosv base-schema.avsc schema-withref.avsc +``` + +Check the options and usage with `-h`: + +```shell +avrosv -h +``` + +### Name Validation + +Avro names are validated according to the +[Avro specification](https://avro.apache.org/docs/1.11.1/specification/#names). + +However, the official Java library does not validate said names accordingly, resulting to some files out in the wild +to have invalid names. Thus, this library has a configuration option to allow for these invalid names to be parsed. + +```go +avro.SkipNameValidation = true +``` + +Note that this variable is global, so ideally you'd need to unset it after you're done with the invalid schema. + +## Go Version Support + +This library supports the last two versions of Go. While the minimum Go version is +not guaranteed to increase along side Go, it may jump from time to time to support +additional features. This will be not be considered a breaking change. + +## Who uses hamba/avro? + +- [Apache Arrow for Go](https://github.com/apache/arrow-go) +- [confluent-kafka-go](https://github.com/confluentinc/confluent-kafka-go) +- [pulsar-client-go](https://github.com/apache/pulsar-client-go) diff --git a/vendor/github.com/hamba/avro/v2/codec.go b/vendor/github.com/hamba/avro/v2/codec.go new file mode 100644 index 0000000..c868f51 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec.go @@ -0,0 +1,254 @@ +package avro + +import ( + "fmt" + "math/big" + "reflect" + "time" + "unsafe" + + "github.com/modern-go/reflect2" +) + +var ( + timeType = reflect.TypeOf(time.Time{}) + timeDurationType = reflect.TypeOf(time.Duration(0)) + ratType = reflect.TypeOf(big.Rat{}) + durType = reflect.TypeOf(LogicalDuration{}) +) + +type null struct{} + +// ValDecoder represents an internal value decoder. +// +// You should never use ValDecoder directly. +type ValDecoder interface { + Decode(ptr unsafe.Pointer, r *Reader) +} + +// ValEncoder represents an internal value encoder. +// +// You should never use ValEncoder directly. +type ValEncoder interface { + Encode(ptr unsafe.Pointer, w *Writer) +} + +// ReadVal parses Avro value and stores the result in the value pointed to by obj. +func (r *Reader) ReadVal(schema Schema, obj any) { + decoder := r.cfg.getDecoderFromCache(schema.CacheFingerprint(), reflect2.RTypeOf(obj)) + if decoder == nil { + typ := reflect2.TypeOf(obj) + if typ.Kind() != reflect.Ptr { + r.ReportError("ReadVal", "can only unmarshal into pointer") + return + } + decoder = r.cfg.DecoderOf(schema, typ) + } + + ptr := reflect2.PtrOf(obj) + if ptr == nil { + r.ReportError("ReadVal", "can not read into nil pointer") + return + } + + decoder.Decode(ptr, r) +} + +// WriteVal writes the Avro encoding of obj. +func (w *Writer) WriteVal(schema Schema, val any) { + encoder := w.cfg.getEncoderFromCache(schema.Fingerprint(), reflect2.RTypeOf(val)) + if encoder == nil { + typ := reflect2.TypeOf(val) + encoder = w.cfg.EncoderOf(schema, typ) + } + encoder.Encode(reflect2.PtrOf(val), w) +} + +func (c *frozenConfig) DecoderOf(schema Schema, typ reflect2.Type) ValDecoder { + rtype := typ.RType() + decoder := c.getDecoderFromCache(schema.CacheFingerprint(), rtype) + if decoder != nil { + return decoder + } + + ptrType := typ.(*reflect2.UnsafePtrType) + decoder = decoderOfType(newDecoderContext(c), schema, ptrType.Elem()) + c.addDecoderToCache(schema.CacheFingerprint(), rtype, decoder) + return decoder +} + +type deferDecoder struct { + decoder ValDecoder +} + +func (d *deferDecoder) Decode(ptr unsafe.Pointer, r *Reader) { + d.decoder.Decode(ptr, r) +} + +type deferEncoder struct { + encoder ValEncoder +} + +func (d *deferEncoder) Encode(ptr unsafe.Pointer, w *Writer) { + d.encoder.Encode(ptr, w) +} + +type decoderContext struct { + cfg *frozenConfig + decoders map[cacheKey]ValDecoder +} + +func newDecoderContext(cfg *frozenConfig) *decoderContext { + return &decoderContext{ + cfg: cfg, + decoders: make(map[cacheKey]ValDecoder), + } +} + +type encoderContext struct { + cfg *frozenConfig + encoders map[cacheKey]ValEncoder +} + +func newEncoderContext(cfg *frozenConfig) *encoderContext { + return &encoderContext{ + cfg: cfg, + encoders: make(map[cacheKey]ValEncoder), + } +} + +//nolint:dupl +func decoderOfType(d *decoderContext, schema Schema, typ reflect2.Type) ValDecoder { + if dec := createDecoderOfMarshaler(schema, typ); dec != nil { + return dec + } + + // Handle eface (empty interface) case when it isn't a union + if typ.Kind() == reflect.Interface && schema.Type() != Union { + if _, ok := typ.(*reflect2.UnsafeIFaceType); !ok { + return newEfaceDecoder(d, schema) + } + } + + switch schema.Type() { + case Null: + return &nullCodec{} + case String, Bytes, Int, Long, Float, Double, Boolean: + return createDecoderOfNative(schema.(*PrimitiveSchema), typ) + case Record: + key := cacheKey{fingerprint: schema.CacheFingerprint(), rtype: typ.RType()} + defDec := &deferDecoder{} + d.decoders[key] = defDec + defDec.decoder = createDecoderOfRecord(d, schema.(*RecordSchema), typ) + return defDec.decoder + case Ref: + key := cacheKey{fingerprint: schema.(*RefSchema).Schema().CacheFingerprint(), rtype: typ.RType()} + if dec, f := d.decoders[key]; f { + return dec + } + return decoderOfType(d, schema.(*RefSchema).Schema(), typ) + case Enum: + return createDecoderOfEnum(schema.(*EnumSchema), typ) + case Array: + return createDecoderOfArray(d, schema.(*ArraySchema), typ) + case Map: + return createDecoderOfMap(d, schema.(*MapSchema), typ) + case Union: + return createDecoderOfUnion(d, schema.(*UnionSchema), typ) + case Fixed: + return createDecoderOfFixed(schema.(*FixedSchema), typ) + default: + // It is impossible to get here with a valid schema + return &errorDecoder{err: fmt.Errorf("avro: schema type %s is unsupported", schema.Type())} + } +} + +func (c *frozenConfig) EncoderOf(schema Schema, typ reflect2.Type) ValEncoder { + if typ == nil { + typ = reflect2.TypeOf((*null)(nil)) + } + + rtype := typ.RType() + encoder := c.getEncoderFromCache(schema.Fingerprint(), rtype) + if encoder != nil { + return encoder + } + + encoder = encoderOfType(newEncoderContext(c), schema, typ) + if typ.LikePtr() { + encoder = &onePtrEncoder{encoder} + } + c.addEncoderToCache(schema.Fingerprint(), rtype, encoder) + return encoder +} + +type onePtrEncoder struct { + enc ValEncoder +} + +func (e *onePtrEncoder) Encode(ptr unsafe.Pointer, w *Writer) { + e.enc.Encode(noescape(unsafe.Pointer(&ptr)), w) +} + +//nolint:dupl +func encoderOfType(e *encoderContext, schema Schema, typ reflect2.Type) ValEncoder { + if enc := createEncoderOfMarshaler(schema, typ); enc != nil { + return enc + } + + if typ.Kind() == reflect.Interface { + return &interfaceEncoder{schema: schema, typ: typ} + } + + switch schema.Type() { + case Null: + return &nullCodec{} + case String, Bytes, Int, Long, Float, Double, Boolean: + return createEncoderOfNative(schema.(*PrimitiveSchema), typ) + case Record: + key := cacheKey{fingerprint: schema.Fingerprint(), rtype: typ.RType()} + defEnc := &deferEncoder{} + e.encoders[key] = defEnc + defEnc.encoder = createEncoderOfRecord(e, schema.(*RecordSchema), typ) + return defEnc.encoder + case Ref: + key := cacheKey{fingerprint: schema.(*RefSchema).Schema().Fingerprint(), rtype: typ.RType()} + if enc, f := e.encoders[key]; f { + return enc + } + return encoderOfType(e, schema.(*RefSchema).Schema(), typ) + case Enum: + return createEncoderOfEnum(schema.(*EnumSchema), typ) + case Array: + return createEncoderOfArray(e, schema.(*ArraySchema), typ) + case Map: + return createEncoderOfMap(e, schema.(*MapSchema), typ) + case Union: + return createEncoderOfUnion(e, schema.(*UnionSchema), typ) + case Fixed: + return createEncoderOfFixed(schema.(*FixedSchema), typ) + default: + // It is impossible to get here with a valid schema + return &errorEncoder{err: fmt.Errorf("avro: schema type %s is unsupported", schema.Type())} + } +} + +type errorDecoder struct { + err error +} + +func (d *errorDecoder) Decode(_ unsafe.Pointer, r *Reader) { + if r.Error == nil { + r.Error = d.err + } +} + +type errorEncoder struct { + err error +} + +func (e *errorEncoder) Encode(_ unsafe.Pointer, w *Writer) { + if w.Error == nil { + w.Error = e.err + } +} diff --git a/vendor/github.com/hamba/avro/v2/codec_array.go b/vendor/github.com/hamba/avro/v2/codec_array.go new file mode 100644 index 0000000..0b412d9 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec_array.go @@ -0,0 +1,119 @@ +package avro + +import ( + "errors" + "fmt" + "io" + "reflect" + "unsafe" + + "github.com/modern-go/reflect2" +) + +func createDecoderOfArray(d *decoderContext, schema *ArraySchema, typ reflect2.Type) ValDecoder { + if typ.Kind() == reflect.Slice { + return decoderOfArray(d, schema, typ) + } + + return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())} +} + +func createEncoderOfArray(e *encoderContext, schema *ArraySchema, typ reflect2.Type) ValEncoder { + if typ.Kind() == reflect.Slice { + return encoderOfArray(e, schema, typ) + } + + return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())} +} + +func decoderOfArray(d *decoderContext, arr *ArraySchema, typ reflect2.Type) ValDecoder { + sliceType := typ.(*reflect2.UnsafeSliceType) + decoder := decoderOfType(d, arr.Items(), sliceType.Elem()) + + return &arrayDecoder{typ: sliceType, decoder: decoder} +} + +type arrayDecoder struct { + typ *reflect2.UnsafeSliceType + decoder ValDecoder +} + +func (d *arrayDecoder) Decode(ptr unsafe.Pointer, r *Reader) { + var size int + sliceType := d.typ + + for { + l, _ := r.ReadBlockHeader() + if l == 0 { + break + } + + start := size + size += int(l) + + if size > r.cfg.getMaxSliceAllocSize() { + r.ReportError("decode array", "size is greater than `Config.MaxSliceAllocSize`") + return + } + + sliceType.UnsafeGrow(ptr, size) + + for i := start; i < size; i++ { + elemPtr := sliceType.UnsafeGetIndex(ptr, i) + d.decoder.Decode(elemPtr, r) + if r.Error != nil { + r.Error = fmt.Errorf("reading %s: %w", d.typ.String(), r.Error) + return + } + } + } + + if r.Error != nil && !errors.Is(r.Error, io.EOF) { + r.Error = fmt.Errorf("%v: %w", d.typ, r.Error) + } +} + +func encoderOfArray(e *encoderContext, arr *ArraySchema, typ reflect2.Type) ValEncoder { + sliceType := typ.(*reflect2.UnsafeSliceType) + encoder := encoderOfType(e, arr.Items(), sliceType.Elem()) + + return &arrayEncoder{ + blockLength: e.cfg.getBlockLength(), + typ: sliceType, + encoder: encoder, + } +} + +type arrayEncoder struct { + blockLength int + typ *reflect2.UnsafeSliceType + encoder ValEncoder +} + +func (e *arrayEncoder) Encode(ptr unsafe.Pointer, w *Writer) { + blockLength := e.blockLength + length := e.typ.UnsafeLengthOf(ptr) + + for i := 0; i < length; i += blockLength { + w.WriteBlockCB(func(w *Writer) int64 { + count := int64(0) + for j := i; j < i+blockLength && j < length; j++ { + elemPtr := e.typ.UnsafeGetIndex(ptr, j) + e.encoder.Encode(elemPtr, w) + if w.Error != nil && !errors.Is(w.Error, io.EOF) { + w.Error = fmt.Errorf("%s: %w", e.typ.String(), w.Error) + return count + } + count++ + } + + return count + }) + } + + w.WriteBlockHeader(0, 0) + + if w.Error != nil && !errors.Is(w.Error, io.EOF) { + w.Error = fmt.Errorf("%v: %w", e.typ, w.Error) + } +} diff --git a/vendor/github.com/hamba/avro/v2/codec_default.go b/vendor/github.com/hamba/avro/v2/codec_default.go new file mode 100644 index 0000000..c42bdc3 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec_default.go @@ -0,0 +1,58 @@ +package avro + +import ( + "fmt" + "unsafe" + + "github.com/modern-go/reflect2" +) + +func createDefaultDecoder(d *decoderContext, field *Field, typ reflect2.Type) ValDecoder { + cfg := d.cfg + fn := func(def any) ([]byte, error) { + defaultType := reflect2.TypeOf(def) + if defaultType == nil { + defaultType = reflect2.TypeOf((*null)(nil)) + } + defaultEncoder := encoderOfType(newEncoderContext(cfg), field.Type(), defaultType) + if defaultType.LikePtr() { + defaultEncoder = &onePtrEncoder{defaultEncoder} + } + w := cfg.borrowWriter() + defer cfg.returnWriter(w) + + defaultEncoder.Encode(reflect2.PtrOf(def), w) + if w.Error != nil { + return nil, w.Error + } + b := w.Buffer() + data := make([]byte, len(b)) + copy(data, b) + + return data, nil + } + + b, err := field.encodeDefault(fn) + if err != nil { + return &errorDecoder{err: fmt.Errorf("decode default: %w", err)} + } + return &defaultDecoder{ + data: b, + decoder: decoderOfType(d, field.Type(), typ), + } +} + +type defaultDecoder struct { + data []byte + decoder ValDecoder +} + +// Decode implements ValDecoder. +func (d *defaultDecoder) Decode(ptr unsafe.Pointer, r *Reader) { + rr := r.cfg.borrowReader(d.data) + defer r.cfg.returnReader(rr) + + d.decoder.Decode(ptr, rr) +} + +var _ ValDecoder = &defaultDecoder{} diff --git a/vendor/github.com/hamba/avro/v2/codec_dynamic.go b/vendor/github.com/hamba/avro/v2/codec_dynamic.go new file mode 100644 index 0000000..f14a04e --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec_dynamic.go @@ -0,0 +1,59 @@ +package avro + +import ( + "reflect" + "unsafe" + + "github.com/modern-go/reflect2" +) + +type efaceDecoder struct { + schema Schema + typ reflect2.Type + dec ValDecoder +} + +func newEfaceDecoder(d *decoderContext, schema Schema) *efaceDecoder { + typ, _ := genericReceiver(schema) + dec := decoderOfType(d, schema, typ) + + return &efaceDecoder{ + schema: schema, + typ: typ, + dec: dec, + } +} + +func (d *efaceDecoder) Decode(ptr unsafe.Pointer, r *Reader) { + pObj := (*any)(ptr) + if *pObj == nil { + *pObj = genericDecode(d.typ, d.dec, r) + return + } + + typ := reflect2.TypeOf(*pObj) + if typ.Kind() != reflect.Ptr { + *pObj = genericDecode(d.typ, d.dec, r) + return + } + + ptrType := typ.(*reflect2.UnsafePtrType) + ptrElemType := ptrType.Elem() + if reflect2.IsNil(*pObj) { + obj := ptrElemType.New() + r.ReadVal(d.schema, obj) + *pObj = obj + return + } + r.ReadVal(d.schema, *pObj) +} + +type interfaceEncoder struct { + schema Schema + typ reflect2.Type +} + +func (e *interfaceEncoder) Encode(ptr unsafe.Pointer, w *Writer) { + obj := e.typ.UnsafeIndirect(ptr) + w.WriteVal(e.schema, obj) +} diff --git a/vendor/github.com/hamba/avro/v2/codec_enum.go b/vendor/github.com/hamba/avro/v2/codec_enum.go new file mode 100644 index 0000000..65ab453 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec_enum.go @@ -0,0 +1,131 @@ +package avro + +import ( + "encoding" + "errors" + "fmt" + "reflect" + "unsafe" + + "github.com/modern-go/reflect2" +) + +func createDecoderOfEnum(schema *EnumSchema, typ reflect2.Type) ValDecoder { + switch { + case typ.Kind() == reflect.String: + return &enumCodec{enum: schema} + case typ.Implements(textUnmarshalerType): + return &enumTextMarshalerCodec{typ: typ, enum: schema} + case reflect2.PtrTo(typ).Implements(textUnmarshalerType): + return &enumTextMarshalerCodec{typ: typ, enum: schema, ptr: true} + } + + return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())} +} + +func createEncoderOfEnum(schema *EnumSchema, typ reflect2.Type) ValEncoder { + switch { + case typ.Kind() == reflect.String: + return &enumCodec{enum: schema} + case typ.Implements(textMarshalerType): + return &enumTextMarshalerCodec{typ: typ, enum: schema} + case reflect2.PtrTo(typ).Implements(textMarshalerType): + return &enumTextMarshalerCodec{typ: typ, enum: schema, ptr: true} + } + + return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())} +} + +type enumCodec struct { + enum *EnumSchema +} + +func (c *enumCodec) Decode(ptr unsafe.Pointer, r *Reader) { + i := int(r.ReadInt()) + + symbol, ok := c.enum.Symbol(i) + if !ok { + r.ReportError("decode enum symbol", "unknown enum symbol") + return + } + + *((*string)(ptr)) = symbol +} + +func (c *enumCodec) Encode(ptr unsafe.Pointer, w *Writer) { + str := *((*string)(ptr)) + for i, sym := range c.enum.symbols { + if str != sym { + continue + } + + w.WriteInt(int32(i)) + return + } + + w.Error = fmt.Errorf("avro: unknown enum symbol: %s", str) +} + +type enumTextMarshalerCodec struct { + typ reflect2.Type + enum *EnumSchema + ptr bool +} + +func (c *enumTextMarshalerCodec) Decode(ptr unsafe.Pointer, r *Reader) { + i := int(r.ReadInt()) + + symbol, ok := c.enum.Symbol(i) + if !ok { + r.ReportError("decode enum symbol", "unknown enum symbol") + return + } + + var obj any + if c.ptr { + obj = c.typ.PackEFace(ptr) + } else { + obj = c.typ.UnsafeIndirect(ptr) + } + if reflect2.IsNil(obj) { + ptrType := c.typ.(*reflect2.UnsafePtrType) + newPtr := ptrType.Elem().UnsafeNew() + *((*unsafe.Pointer)(ptr)) = newPtr + obj = c.typ.UnsafeIndirect(ptr) + } + unmarshaler := (obj).(encoding.TextUnmarshaler) + if err := unmarshaler.UnmarshalText([]byte(symbol)); err != nil { + r.ReportError("decode enum text unmarshaler", err.Error()) + } +} + +func (c *enumTextMarshalerCodec) Encode(ptr unsafe.Pointer, w *Writer) { + var obj any + if c.ptr { + obj = c.typ.PackEFace(ptr) + } else { + obj = c.typ.UnsafeIndirect(ptr) + } + if c.typ.IsNullable() && reflect2.IsNil(obj) { + w.Error = errors.New("encoding nil enum text marshaler") + return + } + marshaler := (obj).(encoding.TextMarshaler) + b, err := marshaler.MarshalText() + if err != nil { + w.Error = err + return + } + + str := string(b) + for i, sym := range c.enum.symbols { + if str != sym { + continue + } + + w.WriteInt(int32(i)) + return + } + + w.Error = fmt.Errorf("avro: unknown enum symbol: %s", str) +} diff --git a/vendor/github.com/hamba/avro/v2/codec_fixed.go b/vendor/github.com/hamba/avro/v2/codec_fixed.go new file mode 100644 index 0000000..d6a642b --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec_fixed.go @@ -0,0 +1,192 @@ +package avro + +import ( + "encoding/binary" + "fmt" + "math/big" + "reflect" + "unsafe" + + "github.com/modern-go/reflect2" +) + +func createDecoderOfFixed(fixed *FixedSchema, typ reflect2.Type) ValDecoder { + switch typ.Kind() { + case reflect.Array: + arrayType := typ.(reflect2.ArrayType) + if arrayType.Elem().Kind() != reflect.Uint8 || arrayType.Len() != fixed.Size() { + break + } + return &fixedCodec{arrayType: typ.(*reflect2.UnsafeArrayType)} + case reflect.Uint64: + if fixed.Size() != 8 { + break + } + + return &fixedUint64Codec{} + case reflect.Ptr: + ptrType := typ.(*reflect2.UnsafePtrType) + elemType := ptrType.Elem() + + ls := fixed.Logical() + tpy1 := elemType.Type1() + if elemType.Kind() != reflect.Struct || !tpy1.ConvertibleTo(ratType) || ls == nil || + ls.Type() != Decimal { + break + } + dec := ls.(*DecimalLogicalSchema) + return &fixedDecimalCodec{prec: dec.Precision(), scale: dec.Scale(), size: fixed.Size()} + case reflect.Struct: + ls := fixed.Logical() + if ls == nil { + break + } + typ1 := typ.Type1() + if !typ1.ConvertibleTo(durType) || ls.Type() != Duration { + break + } + return &fixedDurationCodec{} + } + + return &errorDecoder{ + err: fmt.Errorf("avro: %s is unsupported for Avro %s, size=%d", typ.String(), fixed.Type(), fixed.Size()), + } +} + +func createEncoderOfFixed(fixed *FixedSchema, typ reflect2.Type) ValEncoder { + switch typ.Kind() { + case reflect.Array: + arrayType := typ.(reflect2.ArrayType) + if arrayType.Elem().Kind() != reflect.Uint8 || arrayType.Len() != fixed.Size() { + break + } + return &fixedCodec{arrayType: typ.(*reflect2.UnsafeArrayType)} + case reflect.Uint64: + if fixed.Size() != 8 { + break + } + + return &fixedUint64Codec{} + case reflect.Ptr: + ptrType := typ.(*reflect2.UnsafePtrType) + elemType := ptrType.Elem() + + ls := fixed.Logical() + tpy1 := elemType.Type1() + if elemType.Kind() != reflect.Struct || !tpy1.ConvertibleTo(ratType) || ls == nil || + ls.Type() != Decimal { + break + } + dec := ls.(*DecimalLogicalSchema) + return &fixedDecimalCodec{prec: dec.Precision(), scale: dec.Scale(), size: fixed.Size()} + + case reflect.Struct: + ls := fixed.Logical() + if ls == nil { + break + } + typ1 := typ.Type1() + if typ1.ConvertibleTo(durType) && ls.Type() == Duration { + return &fixedDurationCodec{} + } + } + + return &errorEncoder{ + err: fmt.Errorf("avro: %s is unsupported for Avro %s, size=%d", typ.String(), fixed.Type(), fixed.Size()), + } +} + +type fixedUint64Codec [8]byte + +func (c *fixedUint64Codec) Decode(ptr unsafe.Pointer, r *Reader) { + buffer := c[:] + r.Read(buffer) + *(*uint64)(ptr) = binary.BigEndian.Uint64(buffer) +} + +func (c *fixedUint64Codec) Encode(ptr unsafe.Pointer, w *Writer) { + buffer := c[:] + binary.BigEndian.PutUint64(buffer, *(*uint64)(ptr)) + _, _ = w.Write(buffer) +} + +type fixedCodec struct { + arrayType *reflect2.UnsafeArrayType +} + +func (c *fixedCodec) Decode(ptr unsafe.Pointer, r *Reader) { + for i := range c.arrayType.Len() { + c.arrayType.UnsafeSetIndex(ptr, i, reflect2.PtrOf(r.readByte())) + } +} + +func (c *fixedCodec) Encode(ptr unsafe.Pointer, w *Writer) { + for i := range c.arrayType.Len() { + bytePtr := c.arrayType.UnsafeGetIndex(ptr, i) + w.writeByte(*((*byte)(bytePtr))) + } +} + +type fixedDecimalCodec struct { + prec int + scale int + size int +} + +func (c *fixedDecimalCodec) Decode(ptr unsafe.Pointer, r *Reader) { + b := make([]byte, c.size) + r.Read(b) + *((**big.Rat)(ptr)) = ratFromBytes(b, c.scale) +} + +func (c *fixedDecimalCodec) Encode(ptr unsafe.Pointer, w *Writer) { + r := *((**big.Rat)(ptr)) + scale := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(c.scale)), nil) + i := (&big.Int{}).Mul(r.Num(), scale) + i = i.Div(i, r.Denom()) + + var b []byte + switch i.Sign() { + case 0: + b = make([]byte, c.size) + + case 1: + b = i.Bytes() + if b[0]&0x80 > 0 { + b = append([]byte{0}, b...) + } + if len(b) < c.size { + padded := make([]byte, c.size) + copy(padded[c.size-len(b):], b) + b = padded + } + + case -1: + b = i.Add(i, (&big.Int{}).Lsh(one, uint(c.size*8))).Bytes() + } + + _, _ = w.Write(b) +} + +type fixedDurationCodec struct{} + +func (*fixedDurationCodec) Decode(ptr unsafe.Pointer, r *Reader) { + b := make([]byte, 12) + r.Read(b) + var duration LogicalDuration + duration.Months = binary.LittleEndian.Uint32(b[0:4]) + duration.Days = binary.LittleEndian.Uint32(b[4:8]) + duration.Milliseconds = binary.LittleEndian.Uint32(b[8:12]) + *((*LogicalDuration)(ptr)) = duration +} + +func (*fixedDurationCodec) Encode(ptr unsafe.Pointer, w *Writer) { + duration := (*LogicalDuration)(ptr) + b := make([]byte, 4) + binary.LittleEndian.PutUint32(b, duration.Months) + _, _ = w.Write(b) + binary.LittleEndian.PutUint32(b, duration.Days) + _, _ = w.Write(b) + binary.LittleEndian.PutUint32(b, duration.Milliseconds) + _, _ = w.Write(b) +} diff --git a/vendor/github.com/hamba/avro/v2/codec_generic.go b/vendor/github.com/hamba/avro/v2/codec_generic.go new file mode 100644 index 0000000..00ef1e2 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec_generic.go @@ -0,0 +1,133 @@ +package avro + +import ( + "errors" + "math/big" + "time" + + "github.com/modern-go/reflect2" +) + +func genericDecode(typ reflect2.Type, dec ValDecoder, r *Reader) any { + ptr := typ.UnsafeNew() + dec.Decode(ptr, r) + if r.Error != nil { + return nil + } + + obj := typ.UnsafeIndirect(ptr) + if reflect2.IsNil(obj) { + return nil + } + return obj +} + +func genericReceiver(schema Schema) (reflect2.Type, error) { + if schema.Type() == Ref { + schema = schema.(*RefSchema).Schema() + } + + var ls LogicalSchema + lts, ok := schema.(LogicalTypeSchema) + if ok { + ls = lts.Logical() + } + + schemaName := string(schema.Type()) + if ls != nil { + schemaName += "." + string(ls.Type()) + } + + switch schema.Type() { + case Null: + return reflect2.TypeOf((*null)(nil)), nil + case Boolean: + var v bool + return reflect2.TypeOf(v), nil + case Int: + if ls != nil { + switch ls.Type() { + case Date: + var v time.Time + return reflect2.TypeOf(v), nil + + case TimeMillis: + var v time.Duration + return reflect2.TypeOf(v), nil + } + } + var v int + return reflect2.TypeOf(v), nil + case Long: + if ls != nil { + switch ls.Type() { + case TimeMicros: + var v time.Duration + return reflect2.TypeOf(v), nil + case TimestampMillis: + var v time.Time + return reflect2.TypeOf(v), nil + case TimestampMicros: + var v time.Time + return reflect2.TypeOf(v), nil + case LocalTimestampMillis: + var v time.Time + return reflect2.TypeOf(v), nil + case LocalTimestampMicros: + var v time.Time + return reflect2.TypeOf(v), nil + } + } + var v int64 + return reflect2.TypeOf(v), nil + case Float: + var v float32 + return reflect2.TypeOf(v), nil + case Double: + var v float64 + return reflect2.TypeOf(v), nil + case String: + var v string + return reflect2.TypeOf(v), nil + case Bytes: + if ls != nil && ls.Type() == Decimal { + var v *big.Rat + return reflect2.TypeOf(v), nil + } + var v []byte + return reflect2.TypeOf(v), nil + case Record: + var v map[string]any + return reflect2.TypeOf(v), nil + case Enum: + var v string + return reflect2.TypeOf(v), nil + case Array: + v := make([]any, 0) + return reflect2.TypeOf(v), nil + case Map: + var v map[string]any + return reflect2.TypeOf(v), nil + case Union: + var v map[string]any + return reflect2.TypeOf(v), nil + case Fixed: + fixed := schema.(*FixedSchema) + ls := fixed.Logical() + if ls != nil { + switch ls.Type() { + case Duration: + var v LogicalDuration + return reflect2.TypeOf(v), nil + case Decimal: + var v *big.Rat + return reflect2.TypeOf(v), nil + } + } + v := byteSliceToArray(make([]byte, fixed.Size()), fixed.Size()) + return reflect2.TypeOf(v), nil + default: + // This should not be possible. + return nil, errors.New("dynamic receiver not found for schema " + schemaName) + } +} diff --git a/vendor/github.com/hamba/avro/v2/codec_map.go b/vendor/github.com/hamba/avro/v2/codec_map.go new file mode 100644 index 0000000..18c7de1 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec_map.go @@ -0,0 +1,246 @@ +package avro + +import ( + "encoding" + "errors" + "fmt" + "io" + "reflect" + "unsafe" + + "github.com/modern-go/reflect2" +) + +func createDecoderOfMap(d *decoderContext, schema *MapSchema, typ reflect2.Type) ValDecoder { + if typ.Kind() == reflect.Map { + keyType := typ.(reflect2.MapType).Key() + switch { + case keyType.Kind() == reflect.String: + return decoderOfMap(d, schema, typ) + case keyType.Implements(textUnmarshalerType): + return decoderOfMapUnmarshaler(d, schema, typ) + } + } + + return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())} +} + +func createEncoderOfMap(e *encoderContext, schema *MapSchema, typ reflect2.Type) ValEncoder { + if typ.Kind() == reflect.Map { + keyType := typ.(reflect2.MapType).Key() + switch { + case keyType.Kind() == reflect.String: + return encoderOfMap(e, schema, typ) + case keyType.Implements(textMarshalerType): + return encoderOfMapMarshaler(e, schema, typ) + } + } + + return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())} +} + +func decoderOfMap(d *decoderContext, m *MapSchema, typ reflect2.Type) ValDecoder { + mapType := typ.(*reflect2.UnsafeMapType) + decoder := decoderOfType(d, m.Values(), mapType.Elem()) + + return &mapDecoder{ + mapType: mapType, + elemType: mapType.Elem(), + decoder: decoder, + } +} + +type mapDecoder struct { + mapType *reflect2.UnsafeMapType + elemType reflect2.Type + decoder ValDecoder +} + +func (d *mapDecoder) Decode(ptr unsafe.Pointer, r *Reader) { + if d.mapType.UnsafeIsNil(ptr) { + d.mapType.UnsafeSet(ptr, d.mapType.UnsafeMakeMap(0)) + } + + for { + l, _ := r.ReadBlockHeader() + if l == 0 { + break + } + + for range l { + keyPtr := reflect2.PtrOf(r.ReadString()) + elemPtr := d.elemType.UnsafeNew() + d.decoder.Decode(elemPtr, r) + if r.Error != nil { + r.Error = fmt.Errorf("reading map[string]%s: %w", d.elemType.String(), r.Error) + return + } + + d.mapType.UnsafeSetIndex(ptr, keyPtr, elemPtr) + } + } + + if r.Error != nil && !errors.Is(r.Error, io.EOF) { + r.Error = fmt.Errorf("%v: %w", d.mapType, r.Error) + } +} + +func decoderOfMapUnmarshaler(d *decoderContext, m *MapSchema, typ reflect2.Type) ValDecoder { + mapType := typ.(*reflect2.UnsafeMapType) + decoder := decoderOfType(d, m.Values(), mapType.Elem()) + + return &mapDecoderUnmarshaler{ + mapType: mapType, + keyType: mapType.Key(), + elemType: mapType.Elem(), + decoder: decoder, + } +} + +type mapDecoderUnmarshaler struct { + mapType *reflect2.UnsafeMapType + keyType reflect2.Type + elemType reflect2.Type + decoder ValDecoder +} + +func (d *mapDecoderUnmarshaler) Decode(ptr unsafe.Pointer, r *Reader) { + if d.mapType.UnsafeIsNil(ptr) { + d.mapType.UnsafeSet(ptr, d.mapType.UnsafeMakeMap(0)) + } + + for { + l, _ := r.ReadBlockHeader() + if l == 0 { + break + } + + for range l { + keyPtr := d.keyType.UnsafeNew() + keyObj := d.keyType.UnsafeIndirect(keyPtr) + if reflect2.IsNil(keyObj) { + ptrType := d.keyType.(*reflect2.UnsafePtrType) + newPtr := ptrType.Elem().UnsafeNew() + *((*unsafe.Pointer)(keyPtr)) = newPtr + keyObj = d.keyType.UnsafeIndirect(keyPtr) + } + unmarshaler := keyObj.(encoding.TextUnmarshaler) + err := unmarshaler.UnmarshalText([]byte(r.ReadString())) + if err != nil { + r.ReportError("mapDecoderUnmarshaler", err.Error()) + return + } + + elemPtr := d.elemType.UnsafeNew() + d.decoder.Decode(elemPtr, r) + + d.mapType.UnsafeSetIndex(ptr, keyPtr, elemPtr) + } + } + + if r.Error != nil && !errors.Is(r.Error, io.EOF) { + r.Error = fmt.Errorf("%v: %w", d.mapType, r.Error) + } +} + +func encoderOfMap(e *encoderContext, m *MapSchema, typ reflect2.Type) ValEncoder { + mapType := typ.(*reflect2.UnsafeMapType) + encoder := encoderOfType(e, m.Values(), mapType.Elem()) + + return &mapEncoder{ + blockLength: e.cfg.getBlockLength(), + mapType: mapType, + encoder: encoder, + } +} + +type mapEncoder struct { + blockLength int + mapType *reflect2.UnsafeMapType + encoder ValEncoder +} + +func (e *mapEncoder) Encode(ptr unsafe.Pointer, w *Writer) { + blockLength := e.blockLength + + iter := e.mapType.UnsafeIterate(ptr) + + for { + wrote := w.WriteBlockCB(func(w *Writer) int64 { + var i int + for i = 0; iter.HasNext() && i < blockLength; i++ { + keyPtr, elemPtr := iter.UnsafeNext() + w.WriteString(*((*string)(keyPtr))) + e.encoder.Encode(elemPtr, w) + } + + return int64(i) + }) + + if wrote == 0 { + break + } + } + + if w.Error != nil && !errors.Is(w.Error, io.EOF) { + w.Error = fmt.Errorf("%v: %w", e.mapType, w.Error) + } +} + +func encoderOfMapMarshaler(e *encoderContext, m *MapSchema, typ reflect2.Type) ValEncoder { + mapType := typ.(*reflect2.UnsafeMapType) + encoder := encoderOfType(e, m.Values(), mapType.Elem()) + + return &mapEncoderMarshaller{ + blockLength: e.cfg.getBlockLength(), + mapType: mapType, + keyType: mapType.Key(), + encoder: encoder, + } +} + +type mapEncoderMarshaller struct { + blockLength int + mapType *reflect2.UnsafeMapType + keyType reflect2.Type + encoder ValEncoder +} + +func (e *mapEncoderMarshaller) Encode(ptr unsafe.Pointer, w *Writer) { + blockLength := e.blockLength + + iter := e.mapType.UnsafeIterate(ptr) + + for { + wrote := w.WriteBlockCB(func(w *Writer) int64 { + var i int + for i = 0; iter.HasNext() && i < blockLength; i++ { + keyPtr, elemPtr := iter.UnsafeNext() + + obj := e.keyType.UnsafeIndirect(keyPtr) + if e.keyType.IsNullable() && reflect2.IsNil(obj) { + w.Error = errors.New("avro: mapEncoderMarshaller: encoding nil TextMarshaller") + return int64(0) + } + marshaler := (obj).(encoding.TextMarshaler) + b, err := marshaler.MarshalText() + if err != nil { + w.Error = err + return int64(0) + } + w.WriteString(string(b)) + + e.encoder.Encode(elemPtr, w) + } + return int64(i) + }) + + if wrote == 0 { + break + } + } + + if w.Error != nil && !errors.Is(w.Error, io.EOF) { + w.Error = fmt.Errorf("%v: %w", e.mapType, w.Error) + } +} diff --git a/vendor/github.com/hamba/avro/v2/codec_marshaler.go b/vendor/github.com/hamba/avro/v2/codec_marshaler.go new file mode 100644 index 0000000..d783d17 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec_marshaler.go @@ -0,0 +1,70 @@ +package avro + +import ( + "encoding" + "unsafe" + + "github.com/modern-go/reflect2" +) + +var ( + textMarshalerType = reflect2.TypeOfPtr((*encoding.TextMarshaler)(nil)).Elem() + textUnmarshalerType = reflect2.TypeOfPtr((*encoding.TextUnmarshaler)(nil)).Elem() +) + +func createDecoderOfMarshaler(schema Schema, typ reflect2.Type) ValDecoder { + if typ.Implements(textUnmarshalerType) && schema.Type() == String { + return &textMarshalerCodec{typ} + } + ptrType := reflect2.PtrTo(typ) + if ptrType.Implements(textUnmarshalerType) && schema.Type() == String { + return &referenceDecoder{ + &textMarshalerCodec{ptrType}, + } + } + return nil +} + +func createEncoderOfMarshaler(schema Schema, typ reflect2.Type) ValEncoder { + if typ.Implements(textMarshalerType) && schema.Type() == String { + return &textMarshalerCodec{ + typ: typ, + } + } + return nil +} + +type textMarshalerCodec struct { + typ reflect2.Type +} + +func (c textMarshalerCodec) Decode(ptr unsafe.Pointer, r *Reader) { + obj := c.typ.UnsafeIndirect(ptr) + if reflect2.IsNil(obj) { + ptrType := c.typ.(*reflect2.UnsafePtrType) + newPtr := ptrType.Elem().UnsafeNew() + *((*unsafe.Pointer)(ptr)) = newPtr + obj = c.typ.UnsafeIndirect(ptr) + } + unmarshaler := (obj).(encoding.TextUnmarshaler) + b := r.ReadBytes() + err := unmarshaler.UnmarshalText(b) + if err != nil { + r.ReportError("textMarshalerCodec", err.Error()) + } +} + +func (c textMarshalerCodec) Encode(ptr unsafe.Pointer, w *Writer) { + obj := c.typ.UnsafeIndirect(ptr) + if c.typ.IsNullable() && reflect2.IsNil(obj) { + w.WriteBytes(nil) + return + } + marshaler := (obj).(encoding.TextMarshaler) + b, err := marshaler.MarshalText() + if err != nil { + w.Error = err + return + } + w.WriteBytes(b) +} diff --git a/vendor/github.com/hamba/avro/v2/codec_native.go b/vendor/github.com/hamba/avro/v2/codec_native.go new file mode 100644 index 0000000..85612a6 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec_native.go @@ -0,0 +1,666 @@ +package avro + +import ( + "fmt" + "math/big" + "reflect" + "strconv" + "time" + "unsafe" + + "github.com/modern-go/reflect2" +) + +//nolint:maintidx // Splitting this would not make it simpler. +func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecoder { + resolved := schema.encodedType != "" + switch typ.Kind() { + case reflect.Bool: + if schema.Type() != Boolean { + break + } + return &boolCodec{} + + case reflect.Int: + switch schema.Type() { + case Int: + return &intCodec[int]{} + case Long: + if strconv.IntSize == 64 { + // allow decoding into int when it's 64-bit + return &longCodec[int]{} + } + } + + case reflect.Int8: + if schema.Type() != Int { + break + } + return &intCodec[int8]{} + + case reflect.Uint8: + if schema.Type() != Int { + break + } + return &intCodec[uint8]{} + + case reflect.Int16: + if schema.Type() != Int { + break + } + return &intCodec[int16]{} + + case reflect.Uint16: + if schema.Type() != Int { + break + } + return &intCodec[uint16]{} + + case reflect.Int32: + if schema.Type() != Int { + break + } + return &intCodec[int32]{} + + case reflect.Uint32: + if schema.Type() != Long { + break + } + if resolved { + return &longConvCodec[uint32]{convert: createLongConverter(schema.encodedType)} + } + return &longCodec[uint32]{} + + case reflect.Int64: + st := schema.Type() + lt := getLogicalType(schema) + switch { + case st == Int && lt == TimeMillis: // time.Duration + return &timeMillisCodec{} + + case st == Long && lt == TimeMicros: // time.Duration + return &timeMicrosCodec{ + convert: createLongConverter(schema.encodedType), + } + + case st == Long: + isTimestamp := (lt == TimestampMillis || lt == TimestampMicros) + if isTimestamp && typ.Type1() == timeDurationType { + return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s", + typ.Type1().String(), schema.Type(), lt)} + } + if resolved { + return &longConvCodec[int64]{convert: createLongConverter(schema.encodedType)} + } + return &longCodec[int64]{} + + default: + break + } + + case reflect.Float32: + if schema.Type() != Float { + break + } + if resolved { + return &float32ConvCodec{convert: createFloatConverter(schema.encodedType)} + } + return &float32Codec{} + + case reflect.Float64: + if schema.Type() != Double { + break + } + if resolved { + return &float64ConvCodec{convert: createDoubleConverter(schema.encodedType)} + } + return &float64Codec{} + + case reflect.String: + if schema.Type() != String { + break + } + return &stringCodec{} + + case reflect.Slice: + if typ.(reflect2.SliceType).Elem().Kind() != reflect.Uint8 || schema.Type() != Bytes { + break + } + return &bytesCodec{sliceType: typ.(*reflect2.UnsafeSliceType)} + + case reflect.Struct: + st := schema.Type() + ls := getLogicalSchema(schema) + lt := getLogicalType(schema) + isTime := typ.Type1().ConvertibleTo(timeType) + switch { + case isTime && st == Int && lt == Date: + return &dateCodec{} + case isTime && st == Long && lt == TimestampMillis: + return ×tampMillisCodec{ + convert: createLongConverter(schema.encodedType), + } + case isTime && st == Long && lt == TimestampMicros: + return ×tampMicrosCodec{ + convert: createLongConverter(schema.encodedType), + } + case isTime && st == Long && lt == LocalTimestampMillis: + return ×tampMillisCodec{ + local: true, + convert: createLongConverter(schema.encodedType), + } + case isTime && st == Long && lt == LocalTimestampMicros: + return ×tampMicrosCodec{ + local: true, + convert: createLongConverter(schema.encodedType), + } + case typ.Type1().ConvertibleTo(ratType) && st == Bytes && lt == Decimal: + dec := ls.(*DecimalLogicalSchema) + return &bytesDecimalCodec{prec: dec.Precision(), scale: dec.Scale()} + + default: + break + } + case reflect.Ptr: + ptrType := typ.(*reflect2.UnsafePtrType) + elemType := ptrType.Elem() + tpy1 := elemType.Type1() + ls := getLogicalSchema(schema) + if ls == nil { + break + } + if !tpy1.ConvertibleTo(ratType) || schema.Type() != Bytes || ls.Type() != Decimal { + break + } + dec := ls.(*DecimalLogicalSchema) + + return &bytesDecimalPtrCodec{prec: dec.Precision(), scale: dec.Scale()} + } + + return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())} +} + +//nolint:maintidx // Splitting this would not make it simpler. +func createEncoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValEncoder { + switch typ.Kind() { + case reflect.Bool: + if schema.Type() != Boolean { + break + } + return &boolCodec{} + + case reflect.Int: + switch schema.Type() { + case Int: + return &intCodec[int]{} + case Long: + return &longCodec[int]{} + } + + case reflect.Int8: + if schema.Type() != Int { + break + } + return &intCodec[int8]{} + + case reflect.Uint8: + if schema.Type() != Int { + break + } + return &intCodec[uint8]{} + + case reflect.Int16: + if schema.Type() != Int { + break + } + return &intCodec[int16]{} + + case reflect.Uint16: + if schema.Type() != Int { + break + } + return &intCodec[uint16]{} + + case reflect.Int32: + switch schema.Type() { + case Long: + return &longCodec[int32]{} + + case Int: + return &intCodec[int32]{} + } + + case reflect.Uint32: + if schema.Type() != Long { + break + } + return &longCodec[uint32]{} + + case reflect.Int64: + st := schema.Type() + lt := getLogicalType(schema) + switch { + case st == Int && lt == TimeMillis: // time.Duration + return &timeMillisCodec{} + + case st == Long && lt == TimeMicros: // time.Duration + return &timeMicrosCodec{} + + case st == Long: + isTimestamp := (lt == TimestampMillis || lt == TimestampMicros) + if isTimestamp && typ.Type1() == timeDurationType { + return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s", + typ.Type1().String(), schema.Type(), lt)} + } + return &longCodec[int64]{} + + default: + break + } + + case reflect.Float32: + switch schema.Type() { + case Double: + return &float32DoubleCodec{} + case Float: + return &float32Codec{} + } + + case reflect.Float64: + if schema.Type() != Double { + break + } + return &float64Codec{} + + case reflect.String: + if schema.Type() != String { + break + } + return &stringCodec{} + + case reflect.Slice: + if typ.(reflect2.SliceType).Elem().Kind() != reflect.Uint8 || schema.Type() != Bytes { + break + } + return &bytesCodec{sliceType: typ.(*reflect2.UnsafeSliceType)} + + case reflect.Struct: + st := schema.Type() + lt := getLogicalType(schema) + isTime := typ.Type1().ConvertibleTo(timeType) + switch { + case isTime && st == Int && lt == Date: + return &dateCodec{} + case isTime && st == Long && lt == TimestampMillis: + return ×tampMillisCodec{} + case isTime && st == Long && lt == TimestampMicros: + return ×tampMicrosCodec{} + case isTime && st == Long && lt == LocalTimestampMillis: + return ×tampMillisCodec{local: true} + case isTime && st == Long && lt == LocalTimestampMicros: + return ×tampMicrosCodec{local: true} + case typ.Type1().ConvertibleTo(ratType) && st != Bytes || lt == Decimal: + ls := getLogicalSchema(schema) + dec := ls.(*DecimalLogicalSchema) + return &bytesDecimalCodec{prec: dec.Precision(), scale: dec.Scale()} + default: + break + } + + case reflect.Ptr: + ptrType := typ.(*reflect2.UnsafePtrType) + elemType := ptrType.Elem() + tpy1 := elemType.Type1() + ls := getLogicalSchema(schema) + if ls == nil { + break + } + if !tpy1.ConvertibleTo(ratType) || schema.Type() != Bytes || ls.Type() != Decimal { + break + } + dec := ls.(*DecimalLogicalSchema) + + return &bytesDecimalPtrCodec{prec: dec.Precision(), scale: dec.Scale()} + } + + return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())} +} + +func getLogicalSchema(schema Schema) LogicalSchema { + lts, ok := schema.(LogicalTypeSchema) + if !ok { + return nil + } + + return lts.Logical() +} + +func getLogicalType(schema Schema) LogicalType { + ls := getLogicalSchema(schema) + if ls == nil { + return "" + } + + return ls.Type() +} + +type nullCodec struct{} + +func (*nullCodec) Decode(unsafe.Pointer, *Reader) {} + +func (*nullCodec) Encode(unsafe.Pointer, *Writer) {} + +type boolCodec struct{} + +func (*boolCodec) Decode(ptr unsafe.Pointer, r *Reader) { + *((*bool)(ptr)) = r.ReadBool() +} + +func (*boolCodec) Encode(ptr unsafe.Pointer, w *Writer) { + w.WriteBool(*((*bool)(ptr))) +} + +type smallInt interface { + ~int | ~int8 | ~int16 | ~int32 | ~uint | ~uint8 | ~uint16 +} + +type intCodec[T smallInt] struct{} + +func (*intCodec[T]) Decode(ptr unsafe.Pointer, r *Reader) { + *((*T)(ptr)) = T(r.ReadInt()) +} + +func (*intCodec[T]) Encode(ptr unsafe.Pointer, w *Writer) { + w.WriteInt(int32(*((*T)(ptr)))) +} + +type largeInt interface { + ~int | ~int32 | ~uint32 | int64 +} + +type longCodec[T largeInt] struct{} + +func (c *longCodec[T]) Decode(ptr unsafe.Pointer, r *Reader) { + *((*T)(ptr)) = T(r.ReadLong()) +} + +func (*longCodec[T]) Encode(ptr unsafe.Pointer, w *Writer) { + w.WriteLong(int64(*((*T)(ptr)))) +} + +type longConvCodec[T largeInt] struct { + convert func(*Reader) int64 +} + +func (c *longConvCodec[T]) Decode(ptr unsafe.Pointer, r *Reader) { + *((*T)(ptr)) = T(c.convert(r)) +} + +type float32Codec struct{} + +func (c *float32Codec) Decode(ptr unsafe.Pointer, r *Reader) { + *((*float32)(ptr)) = r.ReadFloat() +} + +func (*float32Codec) Encode(ptr unsafe.Pointer, w *Writer) { + w.WriteFloat(*((*float32)(ptr))) +} + +type float32ConvCodec struct { + convert func(*Reader) float32 +} + +func (c *float32ConvCodec) Decode(ptr unsafe.Pointer, r *Reader) { + *((*float32)(ptr)) = c.convert(r) +} + +type float32DoubleCodec struct{} + +func (*float32DoubleCodec) Encode(ptr unsafe.Pointer, w *Writer) { + w.WriteDouble(float64(*((*float32)(ptr)))) +} + +type float64Codec struct{} + +func (c *float64Codec) Decode(ptr unsafe.Pointer, r *Reader) { + *((*float64)(ptr)) = r.ReadDouble() +} + +func (*float64Codec) Encode(ptr unsafe.Pointer, w *Writer) { + w.WriteDouble(*((*float64)(ptr))) +} + +type float64ConvCodec struct { + convert func(*Reader) float64 +} + +func (c *float64ConvCodec) Decode(ptr unsafe.Pointer, r *Reader) { + *((*float64)(ptr)) = c.convert(r) +} + +type stringCodec struct{} + +func (c *stringCodec) Decode(ptr unsafe.Pointer, r *Reader) { + *((*string)(ptr)) = r.ReadString() +} + +func (*stringCodec) Encode(ptr unsafe.Pointer, w *Writer) { + w.WriteString(*((*string)(ptr))) +} + +type bytesCodec struct { + sliceType *reflect2.UnsafeSliceType +} + +func (c *bytesCodec) Decode(ptr unsafe.Pointer, r *Reader) { + b := r.ReadBytes() + c.sliceType.UnsafeSet(ptr, reflect2.PtrOf(b)) +} + +func (c *bytesCodec) Encode(ptr unsafe.Pointer, w *Writer) { + w.WriteBytes(*((*[]byte)(ptr))) +} + +type dateCodec struct{} + +func (c *dateCodec) Decode(ptr unsafe.Pointer, r *Reader) { + i := r.ReadInt() + sec := int64(i) * int64(24*time.Hour/time.Second) + *((*time.Time)(ptr)) = time.Unix(sec, 0).UTC() +} + +func (c *dateCodec) Encode(ptr unsafe.Pointer, w *Writer) { + t := *((*time.Time)(ptr)) + days := t.Unix() / int64(24*time.Hour/time.Second) + w.WriteInt(int32(days)) +} + +type timestampMillisCodec struct { + local bool + convert func(*Reader) int64 +} + +func (c *timestampMillisCodec) Decode(ptr unsafe.Pointer, r *Reader) { + var i int64 + if c.convert != nil { + i = c.convert(r) + } else { + i = r.ReadLong() + } + sec := i / 1e3 + nsec := (i - sec*1e3) * 1e6 + t := time.Unix(sec, nsec) + + if c.local { + // When doing unix time, Go will convert the time from UTC to Local, + // changing the time by the number of seconds in the zone offset. + // Remove those added seconds. + _, offset := t.Zone() + t = t.Add(time.Duration(-1*offset) * time.Second) + *((*time.Time)(ptr)) = t + return + } + *((*time.Time)(ptr)) = t.UTC() +} + +func (c *timestampMillisCodec) Encode(ptr unsafe.Pointer, w *Writer) { + t := *((*time.Time)(ptr)) + if c.local { + t = t.Local() + t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) + } + w.WriteLong(t.Unix()*1e3 + int64(t.Nanosecond()/1e6)) +} + +type timestampMicrosCodec struct { + local bool + convert func(*Reader) int64 +} + +func (c *timestampMicrosCodec) Decode(ptr unsafe.Pointer, r *Reader) { + var i int64 + if c.convert != nil { + i = c.convert(r) + } else { + i = r.ReadLong() + } + sec := i / 1e6 + nsec := (i - sec*1e6) * 1e3 + t := time.Unix(sec, nsec) + + if c.local { + // When doing unix time, Go will convert the time from UTC to Local, + // changing the time by the number of seconds in the zone offset. + // Remove those added seconds. + _, offset := t.Zone() + t = t.Add(time.Duration(-1*offset) * time.Second) + *((*time.Time)(ptr)) = t + return + } + *((*time.Time)(ptr)) = t.UTC() +} + +func (c *timestampMicrosCodec) Encode(ptr unsafe.Pointer, w *Writer) { + t := *((*time.Time)(ptr)) + if c.local { + t = t.Local() + t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) + } + w.WriteLong(t.Unix()*1e6 + int64(t.Nanosecond()/1e3)) +} + +type timeMillisCodec struct{} + +func (c *timeMillisCodec) Decode(ptr unsafe.Pointer, r *Reader) { + i := r.ReadInt() + *((*time.Duration)(ptr)) = time.Duration(i) * time.Millisecond +} + +func (c *timeMillisCodec) Encode(ptr unsafe.Pointer, w *Writer) { + d := *((*time.Duration)(ptr)) + w.WriteInt(int32(d.Nanoseconds() / int64(time.Millisecond))) +} + +type timeMicrosCodec struct { + convert func(*Reader) int64 +} + +func (c *timeMicrosCodec) Decode(ptr unsafe.Pointer, r *Reader) { + var i int64 + if c.convert != nil { + i = c.convert(r) + } else { + i = r.ReadLong() + } + *((*time.Duration)(ptr)) = time.Duration(i) * time.Microsecond +} + +func (c *timeMicrosCodec) Encode(ptr unsafe.Pointer, w *Writer) { + d := *((*time.Duration)(ptr)) + w.WriteLong(d.Nanoseconds() / int64(time.Microsecond)) +} + +var one = big.NewInt(1) + +type bytesDecimalCodec struct { + prec int + scale int +} + +func (c *bytesDecimalCodec) Decode(ptr unsafe.Pointer, r *Reader) { + b := r.ReadBytes() + if i := (&big.Int{}).SetBytes(b); len(b) > 0 && b[0]&0x80 > 0 { + i.Sub(i, new(big.Int).Lsh(one, uint(len(b))*8)) + } + *((**big.Rat)(ptr)) = ratFromBytes(b, c.scale) +} + +func ratFromBytes(b []byte, scale int) *big.Rat { + num := (&big.Int{}).SetBytes(b) + if len(b) > 0 && b[0]&0x80 > 0 { + num.Sub(num, new(big.Int).Lsh(one, uint(len(b))*8)) + } + denom := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil) + return new(big.Rat).SetFrac(num, denom) +} + +func (c *bytesDecimalCodec) Encode(ptr unsafe.Pointer, w *Writer) { + r := (*big.Rat)(ptr) + scale := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(c.scale)), nil) + i := (&big.Int{}).Mul(r.Num(), scale) + i = i.Div(i, r.Denom()) + + var b []byte + switch i.Sign() { + case 0: + b = []byte{0} + + case 1: + b = i.Bytes() + if b[0]&0x80 > 0 { + b = append([]byte{0}, b...) + } + + case -1: + length := uint(i.BitLen()/8+1) * 8 + b = i.Add(i, (&big.Int{}).Lsh(one, length)).Bytes() + } + w.WriteBytes(b) +} + +type bytesDecimalPtrCodec struct { + prec int + scale int +} + +func (c *bytesDecimalPtrCodec) Decode(ptr unsafe.Pointer, r *Reader) { + b := r.ReadBytes() + if i := (&big.Int{}).SetBytes(b); len(b) > 0 && b[0]&0x80 > 0 { + i.Sub(i, new(big.Int).Lsh(one, uint(len(b))*8)) + } + *((**big.Rat)(ptr)) = ratFromBytes(b, c.scale) +} + +func (c *bytesDecimalPtrCodec) Encode(ptr unsafe.Pointer, w *Writer) { + r := *((**big.Rat)(ptr)) + scale := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(c.scale)), nil) + i := (&big.Int{}).Mul(r.Num(), scale) + i = i.Div(i, r.Denom()) + + var b []byte + switch i.Sign() { + case 0: + b = []byte{0} + + case 1: + b = i.Bytes() + if b[0]&0x80 > 0 { + b = append([]byte{0}, b...) + } + + case -1: + length := uint(i.BitLen()/8+1) * 8 + b = i.Add(i, (&big.Int{}).Lsh(one, length)).Bytes() + } + w.WriteBytes(b) +} diff --git a/vendor/github.com/hamba/avro/v2/codec_ptr.go b/vendor/github.com/hamba/avro/v2/codec_ptr.go new file mode 100644 index 0000000..07b099e --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec_ptr.go @@ -0,0 +1,66 @@ +package avro + +import ( + "errors" + "unsafe" + + "github.com/modern-go/reflect2" +) + +func decoderOfPtr(d *decoderContext, schema Schema, typ reflect2.Type) ValDecoder { + ptrType := typ.(*reflect2.UnsafePtrType) + elemType := ptrType.Elem() + + decoder := decoderOfType(d, schema, elemType) + + return &dereferenceDecoder{typ: elemType, decoder: decoder} +} + +type dereferenceDecoder struct { + typ reflect2.Type + decoder ValDecoder +} + +func (d *dereferenceDecoder) Decode(ptr unsafe.Pointer, r *Reader) { + if *((*unsafe.Pointer)(ptr)) == nil { + // Create new instance + newPtr := d.typ.UnsafeNew() + d.decoder.Decode(newPtr, r) + *((*unsafe.Pointer)(ptr)) = newPtr + return + } + + // Reuse existing instance + d.decoder.Decode(*((*unsafe.Pointer)(ptr)), r) +} + +func encoderOfPtr(e *encoderContext, schema Schema, typ reflect2.Type) ValEncoder { + ptrType := typ.(*reflect2.UnsafePtrType) + elemType := ptrType.Elem() + + enc := encoderOfType(e, schema, elemType) + + return &dereferenceEncoder{typ: elemType, encoder: enc} +} + +type dereferenceEncoder struct { + typ reflect2.Type + encoder ValEncoder +} + +func (d *dereferenceEncoder) Encode(ptr unsafe.Pointer, w *Writer) { + if *((*unsafe.Pointer)(ptr)) == nil { + w.Error = errors.New("avro: cannot encode nil pointer") + return + } + + d.encoder.Encode(*((*unsafe.Pointer)(ptr)), w) +} + +type referenceDecoder struct { + decoder ValDecoder +} + +func (decoder *referenceDecoder) Decode(ptr unsafe.Pointer, r *Reader) { + decoder.decoder.Decode(unsafe.Pointer(&ptr), r) +} diff --git a/vendor/github.com/hamba/avro/v2/codec_record.go b/vendor/github.com/hamba/avro/v2/codec_record.go new file mode 100644 index 0000000..67562b7 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec_record.go @@ -0,0 +1,511 @@ +package avro + +import ( + "errors" + "fmt" + "io" + "reflect" + "unsafe" + + "github.com/modern-go/reflect2" +) + +func createDecoderOfRecord(d *decoderContext, schema Schema, typ reflect2.Type) ValDecoder { + switch typ.Kind() { + case reflect.Struct: + return decoderOfStruct(d, schema, typ) + + case reflect.Map: + if typ.(reflect2.MapType).Key().Kind() != reflect.String || + typ.(reflect2.MapType).Elem().Kind() != reflect.Interface { + break + } + return decoderOfRecord(d, schema, typ) + + case reflect.Ptr: + return decoderOfPtr(d, schema, typ) + + case reflect.Interface: + if ifaceType, ok := typ.(*reflect2.UnsafeIFaceType); ok { + return &recordIfaceDecoder{schema: schema, valType: ifaceType} + } + } + + return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for avro %s", typ.String(), schema.Type())} +} + +func createEncoderOfRecord(e *encoderContext, schema *RecordSchema, typ reflect2.Type) ValEncoder { + switch typ.Kind() { + case reflect.Struct: + return encoderOfStruct(e, schema, typ) + + case reflect.Map: + if typ.(reflect2.MapType).Key().Kind() != reflect.String || + typ.(reflect2.MapType).Elem().Kind() != reflect.Interface { + break + } + return encoderOfRecord(e, schema, typ) + + case reflect.Ptr: + return encoderOfPtr(e, schema, typ) + } + + return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for avro %s", typ.String(), schema.Type())} +} + +func decoderOfStruct(d *decoderContext, schema Schema, typ reflect2.Type) ValDecoder { + rec := schema.(*RecordSchema) + structDesc := describeStruct(d.cfg.getTagKey(), typ) + + fields := make([]*structFieldDecoder, 0, len(rec.Fields())) + + for _, field := range rec.Fields() { + if field.action == FieldIgnore { + fields = append(fields, &structFieldDecoder{ + decoder: createSkipDecoder(field.Type()), + }) + continue + } + + sf := structDesc.Fields.Get(field.Name()) + if sf == nil { + for _, alias := range field.Aliases() { + sf = structDesc.Fields.Get(alias) + if sf != nil { + break + } + } + } + // Skip field if it doesn't exist + if sf == nil { + // If the field value doesn't exist in the binary, ignore it instead of + // appending a 'SkipDecoder'. + // + // Note: 'SkipDecoder' performs a read and moves the cursor, which, + // in this case, will lead to a dirty read. + if field.action == FieldSetDefault { + continue + } + + fields = append(fields, &structFieldDecoder{ + decoder: createSkipDecoder(field.Type()), + }) + continue + } + + if field.action == FieldSetDefault { + if field.hasDef { + fields = append(fields, &structFieldDecoder{ + field: sf.Field, + decoder: createDefaultDecoder(d, field, sf.Field[len(sf.Field)-1].Type()), + }) + + continue + } + } + + dec := decoderOfType(d, field.Type(), sf.Field[len(sf.Field)-1].Type()) + fields = append(fields, &structFieldDecoder{ + field: sf.Field, + decoder: dec, + }) + } + + return &structDecoder{typ: typ, fields: fields} +} + +type structFieldDecoder struct { + field []*reflect2.UnsafeStructField + decoder ValDecoder +} + +type structDecoder struct { + typ reflect2.Type + fields []*structFieldDecoder +} + +func (d *structDecoder) Decode(ptr unsafe.Pointer, r *Reader) { + for _, field := range d.fields { + // Skip case + if field.field == nil { + field.decoder.Decode(nil, r) + continue + } + + fieldPtr := ptr + for i, f := range field.field { + fieldPtr = f.UnsafeGet(fieldPtr) + + if i == len(field.field)-1 { + break + } + + if f.Type().Kind() == reflect.Ptr { + if *((*unsafe.Pointer)(fieldPtr)) == nil { + newPtr := f.Type().(*reflect2.UnsafePtrType).Elem().UnsafeNew() + *((*unsafe.Pointer)(fieldPtr)) = newPtr + } + + fieldPtr = *((*unsafe.Pointer)(fieldPtr)) + } + } + field.decoder.Decode(fieldPtr, r) + + if r.Error != nil && !errors.Is(r.Error, io.EOF) { + for _, f := range field.field { + r.Error = fmt.Errorf("%s: %w", f.Name(), r.Error) + return + } + } + } +} + +func encoderOfStruct(e *encoderContext, rec *RecordSchema, typ reflect2.Type) ValEncoder { + structDesc := describeStruct(e.cfg.getTagKey(), typ) + + fields := make([]*structFieldEncoder, 0, len(rec.Fields())) + for _, field := range rec.Fields() { + sf := structDesc.Fields.Get(field.Name()) + if sf != nil { + fields = append(fields, &structFieldEncoder{ + field: sf.Field, + encoder: encoderOfType(e, field.Type(), sf.Field[len(sf.Field)-1].Type()), + }) + continue + } + + if !field.HasDefault() { + // In all other cases, this is a required field + err := fmt.Errorf("avro: record %s is missing required field %q", rec.FullName(), field.Name()) + return &errorEncoder{err: err} + } + + def := field.Default() + if field.Default() == nil { + if field.Type().Type() == Null { + // We write nothing in a Null case, just skip it + continue + } + + if field.Type().Type() == Union && field.Type().(*UnionSchema).Nullable() { + defaultType := reflect2.TypeOf(&def) + fields = append(fields, &structFieldEncoder{ + defaultPtr: reflect2.PtrOf(&def), + encoder: encoderOfNullableUnion(e, field.Type(), defaultType), + }) + continue + } + } + + defaultType := reflect2.TypeOf(def) + defaultEncoder := encoderOfType(e, field.Type(), defaultType) + if defaultType.LikePtr() { + defaultEncoder = &onePtrEncoder{defaultEncoder} + } + fields = append(fields, &structFieldEncoder{ + defaultPtr: reflect2.PtrOf(def), + encoder: defaultEncoder, + }) + } + return &structEncoder{typ: typ, fields: fields} +} + +type structFieldEncoder struct { + field []*reflect2.UnsafeStructField + defaultPtr unsafe.Pointer + encoder ValEncoder +} + +type structEncoder struct { + typ reflect2.Type + fields []*structFieldEncoder +} + +func (e *structEncoder) Encode(ptr unsafe.Pointer, w *Writer) { + for _, field := range e.fields { + // Default case + if field.field == nil { + field.encoder.Encode(field.defaultPtr, w) + continue + } + + fieldPtr := ptr + for i, f := range field.field { + fieldPtr = f.UnsafeGet(fieldPtr) + + if i == len(field.field)-1 { + break + } + + if f.Type().Kind() == reflect.Ptr { + if *((*unsafe.Pointer)(fieldPtr)) == nil { + w.Error = fmt.Errorf("embedded field %q is nil", f.Name()) + return + } + + fieldPtr = *((*unsafe.Pointer)(fieldPtr)) + } + } + field.encoder.Encode(fieldPtr, w) + + if w.Error != nil && !errors.Is(w.Error, io.EOF) { + for _, f := range field.field { + w.Error = fmt.Errorf("%s: %w", f.Name(), w.Error) + return + } + } + } +} + +func decoderOfRecord(d *decoderContext, schema Schema, typ reflect2.Type) ValDecoder { + rec := schema.(*RecordSchema) + mapType := typ.(*reflect2.UnsafeMapType) + + fields := make([]recordMapDecoderField, len(rec.Fields())) + for i, field := range rec.Fields() { + switch field.action { + case FieldIgnore: + fields[i] = recordMapDecoderField{ + name: field.Name(), + decoder: createSkipDecoder(field.Type()), + skip: true, + } + continue + case FieldSetDefault: + if field.hasDef { + fields[i] = recordMapDecoderField{ + name: field.Name(), + decoder: createDefaultDecoder(d, field, mapType.Elem()), + } + continue + } + } + + fields[i] = recordMapDecoderField{ + name: field.Name(), + decoder: decoderOfType(d, field.Type(), mapType.Elem()), + } + } + + return &recordMapDecoder{ + mapType: mapType, + elemType: mapType.Elem(), + fields: fields, + } +} + +type recordMapDecoderField struct { + name string + decoder ValDecoder + skip bool +} + +type recordMapDecoder struct { + mapType *reflect2.UnsafeMapType + elemType reflect2.Type + fields []recordMapDecoderField +} + +func (d *recordMapDecoder) Decode(ptr unsafe.Pointer, r *Reader) { + if d.mapType.UnsafeIsNil(ptr) { + d.mapType.UnsafeSet(ptr, d.mapType.UnsafeMakeMap(len(d.fields))) + } + + for _, field := range d.fields { + elemPtr := d.elemType.UnsafeNew() + field.decoder.Decode(elemPtr, r) + if field.skip { + continue + } + + d.mapType.UnsafeSetIndex(ptr, reflect2.PtrOf(field), elemPtr) + } + + if r.Error != nil && !errors.Is(r.Error, io.EOF) { + r.Error = fmt.Errorf("%v: %w", d.mapType, r.Error) + } +} + +func encoderOfRecord(e *encoderContext, rec *RecordSchema, typ reflect2.Type) ValEncoder { + mapType := typ.(*reflect2.UnsafeMapType) + + fields := make([]mapEncoderField, len(rec.Fields())) + for i, field := range rec.Fields() { + fields[i] = mapEncoderField{ + name: field.Name(), + hasDef: field.HasDefault(), + def: field.Default(), + encoder: encoderOfType(e, field.Type(), mapType.Elem()), + } + + if field.HasDefault() { + switch { + case field.Type().Type() == Union: + union := field.Type().(*UnionSchema) + fields[i].def = map[string]any{ + string(union.Types()[0].Type()): field.Default(), + } + case field.Default() == nil: + continue + } + + defaultType := reflect2.TypeOf(fields[i].def) + fields[i].defEncoder = encoderOfType(e, field.Type(), defaultType) + if defaultType.LikePtr() { + fields[i].defEncoder = &onePtrEncoder{fields[i].defEncoder} + } + } + } + + return &recordMapEncoder{ + mapType: mapType, + fields: fields, + } +} + +type mapEncoderField struct { + name string + hasDef bool + def any + defEncoder ValEncoder + encoder ValEncoder +} + +type recordMapEncoder struct { + mapType *reflect2.UnsafeMapType + fields []mapEncoderField +} + +func (e *recordMapEncoder) Encode(ptr unsafe.Pointer, w *Writer) { + for _, field := range e.fields { + // The first property of mapEncoderField is the name, so a pointer + // to field is a pointer to the name. + valPtr := e.mapType.UnsafeGetIndex(ptr, reflect2.PtrOf(field)) + if valPtr == nil { + // Missing required field + if !field.hasDef { + w.Error = fmt.Errorf("avro: missing required field %s", field.name) + return + } + + // Null default + if field.def == nil { + continue + } + + defPtr := reflect2.PtrOf(field.def) + field.defEncoder.Encode(defPtr, w) + continue + } + + field.encoder.Encode(valPtr, w) + + if w.Error != nil && !errors.Is(w.Error, io.EOF) { + w.Error = fmt.Errorf("%s: %w", field.name, w.Error) + return + } + } +} + +type recordIfaceDecoder struct { + schema Schema + valType *reflect2.UnsafeIFaceType +} + +func (d *recordIfaceDecoder) Decode(ptr unsafe.Pointer, r *Reader) { + obj := d.valType.UnsafeIndirect(ptr) + if reflect2.IsNil(obj) { + r.ReportError("decode non empty interface", "can not unmarshal into nil") + return + } + + r.ReadVal(d.schema, obj) +} + +type structDescriptor struct { + Type reflect2.Type + Fields structFields +} + +type structFields []*structField + +func (sf structFields) Get(name string) *structField { + for _, f := range sf { + if f.Name == name { + return f + } + } + + return nil +} + +type structField struct { + Name string + Field []*reflect2.UnsafeStructField + + anon *reflect2.UnsafeStructType +} + +func describeStruct(tagKey string, typ reflect2.Type) *structDescriptor { + structType := typ.(*reflect2.UnsafeStructType) + fields := structFields{} + + var curr []structField + next := []structField{{anon: structType}} + + visited := map[uintptr]bool{} + + for len(next) > 0 { + curr, next = next, curr[:0] + + for _, f := range curr { + rtype := f.anon.RType() + if visited[f.anon.RType()] { + continue + } + visited[rtype] = true + + for i := range f.anon.NumField() { + field := f.anon.Field(i).(*reflect2.UnsafeStructField) + isUnexported := field.PkgPath() != "" + + chain := make([]*reflect2.UnsafeStructField, len(f.Field)+1) + copy(chain, f.Field) + chain[len(f.Field)] = field + + if field.Anonymous() { + t := field.Type() + if t.Kind() == reflect.Ptr { + t = t.(*reflect2.UnsafePtrType).Elem() + } + if t.Kind() != reflect.Struct { + continue + } + + next = append(next, structField{Field: chain, anon: t.(*reflect2.UnsafeStructType)}) + continue + } + + // Ignore unexported fields. + if isUnexported { + continue + } + + fieldName := field.Name() + if tag, ok := field.Tag().Lookup(tagKey); ok { + fieldName = tag + } + + fields = append(fields, &structField{ + Name: fieldName, + Field: chain, + }) + } + } + } + + return &structDescriptor{ + Type: structType, + Fields: fields, + } +} diff --git a/vendor/github.com/hamba/avro/v2/codec_skip.go b/vendor/github.com/hamba/avro/v2/codec_skip.go new file mode 100644 index 0000000..3be0dd6 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec_skip.go @@ -0,0 +1,225 @@ +package avro + +import ( + "fmt" + "unsafe" +) + +func createSkipDecoder(schema Schema) ValDecoder { + switch schema.Type() { + case Boolean: + return &boolSkipDecoder{} + + case Int: + return &intSkipDecoder{} + + case Long: + return &longSkipDecoder{} + + case Float: + return &floatSkipDecoder{} + + case Double: + return &doubleSkipDecoder{} + + case String: + return &stringSkipDecoder{} + + case Bytes: + return &bytesSkipDecoder{} + + case Record: + return skipDecoderOfRecord(schema) + + case Ref: + return createSkipDecoder(schema.(*RefSchema).Schema()) + + case Enum: + return &enumSkipDecoder{symbols: schema.(*EnumSchema).Symbols()} + + case Array: + return skipDecoderOfArray(schema) + + case Map: + return skipDecoderOfMap(schema) + + case Union: + return skipDecoderOfUnion(schema) + + case Fixed: + return &fixedSkipDecoder{size: schema.(*FixedSchema).Size()} + + default: + return &errorDecoder{err: fmt.Errorf("avro: schema type %s is unsupported", schema.Type())} + } +} + +type boolSkipDecoder struct{} + +func (*boolSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) { + r.SkipBool() +} + +type intSkipDecoder struct{} + +func (*intSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) { + r.SkipInt() +} + +type longSkipDecoder struct{} + +func (*longSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) { + r.SkipLong() +} + +type floatSkipDecoder struct{} + +func (*floatSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) { + r.SkipFloat() +} + +type doubleSkipDecoder struct{} + +func (*doubleSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) { + r.SkipDouble() +} + +type stringSkipDecoder struct{} + +func (*stringSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) { + r.SkipString() +} + +type bytesSkipDecoder struct{} + +func (c *bytesSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) { + r.SkipBytes() +} + +func skipDecoderOfRecord(schema Schema) ValDecoder { + rec := schema.(*RecordSchema) + + decoders := make([]ValDecoder, len(rec.Fields())) + for i, field := range rec.Fields() { + decoders[i] = createSkipDecoder(field.Type()) + } + + return &recordSkipDecoder{ + decoders: decoders, + } +} + +type recordSkipDecoder struct { + decoders []ValDecoder +} + +func (d *recordSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) { + for _, decoder := range d.decoders { + decoder.Decode(nil, r) + } +} + +type enumSkipDecoder struct { + symbols []string +} + +func (c *enumSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) { + r.SkipInt() +} + +func skipDecoderOfArray(schema Schema) ValDecoder { + arr := schema.(*ArraySchema) + decoder := createSkipDecoder(arr.Items()) + + return &sliceSkipDecoder{ + decoder: decoder, + } +} + +type sliceSkipDecoder struct { + decoder ValDecoder +} + +func (d *sliceSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) { + for { + l, size := r.ReadBlockHeader() + if l == 0 { + break + } + + if size > 0 { + r.SkipNBytes(int(size)) + continue + } + + for range l { + d.decoder.Decode(nil, r) + } + } +} + +func skipDecoderOfMap(schema Schema) ValDecoder { + m := schema.(*MapSchema) + decoder := createSkipDecoder(m.Values()) + + return &mapSkipDecoder{ + decoder: decoder, + } +} + +type mapSkipDecoder struct { + decoder ValDecoder +} + +func (d *mapSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) { + for { + l, size := r.ReadBlockHeader() + if l == 0 { + break + } + + if size > 0 { + r.SkipNBytes(int(size)) + continue + } + + for range l { + r.SkipString() + d.decoder.Decode(nil, r) + } + } +} + +func skipDecoderOfUnion(schema Schema) ValDecoder { + union := schema.(*UnionSchema) + + return &unionSkipDecoder{ + schema: union, + } +} + +type unionSkipDecoder struct { + schema *UnionSchema +} + +func (d *unionSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) { + _, resSchema := getUnionSchema(d.schema, r) + if resSchema == nil { + return + } + + // In a null case, just return + if resSchema.Type() == Null { + return + } + + createSkipDecoder(resSchema).Decode(nil, r) +} + +type fixedSkipDecoder struct { + size int +} + +func (d *fixedSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) { + r.SkipNBytes(d.size) +} diff --git a/vendor/github.com/hamba/avro/v2/codec_union.go b/vendor/github.com/hamba/avro/v2/codec_union.go new file mode 100644 index 0000000..7d80b53 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/codec_union.go @@ -0,0 +1,460 @@ +package avro + +import ( + "errors" + "fmt" + "reflect" + "strings" + "unsafe" + + "github.com/modern-go/reflect2" +) + +func createDecoderOfUnion(d *decoderContext, schema *UnionSchema, typ reflect2.Type) ValDecoder { + switch typ.Kind() { + case reflect.Map: + if typ.(reflect2.MapType).Key().Kind() != reflect.String || + typ.(reflect2.MapType).Elem().Kind() != reflect.Interface { + break + } + return decoderOfMapUnion(d, schema, typ) + case reflect.Slice: + if !schema.Nullable() { + break + } + return decoderOfNullableUnion(d, schema, typ) + case reflect.Ptr: + if !schema.Nullable() { + break + } + return decoderOfNullableUnion(d, schema, typ) + case reflect.Interface: + if _, ok := typ.(*reflect2.UnsafeIFaceType); !ok { + dec, err := decoderOfResolvedUnion(d, schema) + if err != nil { + return &errorDecoder{err: fmt.Errorf("avro: problem resolving decoder for Avro %s: %w", schema.Type(), err)} + } + return dec + } + } + + return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())} +} + +func createEncoderOfUnion(e *encoderContext, schema *UnionSchema, typ reflect2.Type) ValEncoder { + switch typ.Kind() { + case reflect.Map: + if typ.(reflect2.MapType).Key().Kind() != reflect.String || + typ.(reflect2.MapType).Elem().Kind() != reflect.Interface { + break + } + return encoderOfMapUnion(e, schema, typ) + case reflect.Slice: + if !schema.Nullable() { + break + } + return encoderOfNullableUnion(e, schema, typ) + case reflect.Ptr: + if !schema.Nullable() { + break + } + return encoderOfNullableUnion(e, schema, typ) + } + return encoderOfResolverUnion(e, schema, typ) +} + +func decoderOfMapUnion(d *decoderContext, union *UnionSchema, typ reflect2.Type) ValDecoder { + mapType := typ.(*reflect2.UnsafeMapType) + + typeDecs := make([]ValDecoder, len(union.Types())) + for i, s := range union.Types() { + if s.Type() == Null { + continue + } + typeDecs[i] = newEfaceDecoder(d, s) + } + + return &mapUnionDecoder{ + cfg: d.cfg, + schema: union, + mapType: mapType, + elemType: mapType.Elem(), + typeDecs: typeDecs, + } +} + +type mapUnionDecoder struct { + cfg *frozenConfig + schema *UnionSchema + mapType *reflect2.UnsafeMapType + elemType reflect2.Type + typeDecs []ValDecoder +} + +func (d *mapUnionDecoder) Decode(ptr unsafe.Pointer, r *Reader) { + idx, resSchema := getUnionSchema(d.schema, r) + if resSchema == nil { + return + } + + // In a null case, just return + if resSchema.Type() == Null { + return + } + + if d.mapType.UnsafeIsNil(ptr) { + d.mapType.UnsafeSet(ptr, d.mapType.UnsafeMakeMap(1)) + } + + key := schemaTypeName(resSchema) + keyPtr := reflect2.PtrOf(key) + + elemPtr := d.elemType.UnsafeNew() + d.typeDecs[idx].Decode(elemPtr, r) + + d.mapType.UnsafeSetIndex(ptr, keyPtr, elemPtr) +} + +func encoderOfMapUnion(e *encoderContext, union *UnionSchema, _ reflect2.Type) ValEncoder { + return &mapUnionEncoder{ + cfg: e.cfg, + schema: union, + } +} + +type mapUnionEncoder struct { + cfg *frozenConfig + schema *UnionSchema +} + +func (e *mapUnionEncoder) Encode(ptr unsafe.Pointer, w *Writer) { + m := *((*map[string]any)(ptr)) + + if len(m) > 1 { + w.Error = errors.New("avro: cannot encode union map with multiple entries") + return + } + + name := "null" + val := any(nil) + for k, v := range m { + name = k + val = v + break + } + + schema, pos := e.schema.Types().Get(name) + if schema == nil { + w.Error = fmt.Errorf("avro: unknown union type %s", name) + return + } + + w.WriteInt(int32(pos)) + + if schema.Type() == Null && val == nil { + return + } + + elemType := reflect2.TypeOf(val) + elemPtr := reflect2.PtrOf(val) + + encoder := encoderOfType(newEncoderContext(e.cfg), schema, elemType) + if elemType.LikePtr() { + encoder = &onePtrEncoder{encoder} + } + encoder.Encode(elemPtr, w) +} + +func decoderOfNullableUnion(d *decoderContext, schema Schema, typ reflect2.Type) ValDecoder { + union := schema.(*UnionSchema) + _, typeIdx := union.Indices() + + var ( + baseTyp reflect2.Type + isPtr bool + ) + switch v := typ.(type) { + case *reflect2.UnsafePtrType: + baseTyp = v.Elem() + isPtr = true + case *reflect2.UnsafeSliceType: + baseTyp = v + } + decoder := decoderOfType(d, union.Types()[typeIdx], baseTyp) + + return &unionNullableDecoder{ + schema: union, + typ: baseTyp, + isPtr: isPtr, + decoder: decoder, + } +} + +type unionNullableDecoder struct { + schema *UnionSchema + typ reflect2.Type + isPtr bool + decoder ValDecoder +} + +func (d *unionNullableDecoder) Decode(ptr unsafe.Pointer, r *Reader) { + _, schema := getUnionSchema(d.schema, r) + if schema == nil { + return + } + + if schema.Type() == Null { + *((*unsafe.Pointer)(ptr)) = nil + return + } + + // Handle the non-ptr case separately. + if !d.isPtr { + if d.typ.UnsafeIsNil(ptr) { + // Create a new instance. + newPtr := d.typ.UnsafeNew() + d.decoder.Decode(newPtr, r) + d.typ.UnsafeSet(ptr, newPtr) + return + } + + // Reuse the existing instance. + d.decoder.Decode(ptr, r) + return + } + + if *((*unsafe.Pointer)(ptr)) == nil { + // Create new instance. + newPtr := d.typ.UnsafeNew() + d.decoder.Decode(newPtr, r) + *((*unsafe.Pointer)(ptr)) = newPtr + return + } + + // Reuse existing instance. + d.decoder.Decode(*((*unsafe.Pointer)(ptr)), r) +} + +func encoderOfNullableUnion(e *encoderContext, schema Schema, typ reflect2.Type) ValEncoder { + union := schema.(*UnionSchema) + nullIdx, typeIdx := union.Indices() + + var ( + baseTyp reflect2.Type + isPtr bool + ) + switch v := typ.(type) { + case *reflect2.UnsafePtrType: + baseTyp = v.Elem() + isPtr = true + case *reflect2.UnsafeSliceType: + baseTyp = v + } + encoder := encoderOfType(e, union.Types()[typeIdx], baseTyp) + + return &unionNullableEncoder{ + schema: union, + encoder: encoder, + isPtr: isPtr, + nullIdx: int32(nullIdx), + typeIdx: int32(typeIdx), + } +} + +type unionNullableEncoder struct { + schema *UnionSchema + encoder ValEncoder + isPtr bool + nullIdx int32 + typeIdx int32 +} + +func (e *unionNullableEncoder) Encode(ptr unsafe.Pointer, w *Writer) { + if *((*unsafe.Pointer)(ptr)) == nil { + w.WriteInt(e.nullIdx) + return + } + + w.WriteInt(e.typeIdx) + newPtr := ptr + if e.isPtr { + newPtr = *((*unsafe.Pointer)(ptr)) + } + e.encoder.Encode(newPtr, w) +} + +func decoderOfResolvedUnion(d *decoderContext, schema Schema) (ValDecoder, error) { + union := schema.(*UnionSchema) + + types := make([]reflect2.Type, len(union.Types())) + decoders := make([]ValDecoder, len(union.Types())) + for i, schema := range union.Types() { + name := unionResolutionName(schema) + + typ, err := d.cfg.resolver.Type(name) + if err != nil { + if d.cfg.config.UnionResolutionError { + return nil, err + } + + if d.cfg.config.PartialUnionTypeResolution { + decoders[i] = nil + types[i] = nil + continue + } + + decoders = []ValDecoder{} + types = []reflect2.Type{} + break + } + + decoder := decoderOfType(d, schema, typ) + decoders[i] = decoder + types[i] = typ + } + + return &unionResolvedDecoder{ + cfg: d.cfg, + schema: union, + types: types, + decoders: decoders, + }, nil +} + +type unionResolvedDecoder struct { + cfg *frozenConfig + schema *UnionSchema + types []reflect2.Type + decoders []ValDecoder +} + +func (d *unionResolvedDecoder) Decode(ptr unsafe.Pointer, r *Reader) { + i, schema := getUnionSchema(d.schema, r) + if schema == nil { + return + } + + pObj := (*any)(ptr) + + if schema.Type() == Null { + *pObj = nil + return + } + + if i >= len(d.decoders) || d.decoders[i] == nil { + if d.cfg.config.UnionResolutionError { + r.ReportError("decode union type", "unknown union type") + return + } + + // We cannot resolve this, set it to the map type + name := schemaTypeName(schema) + obj := map[string]any{} + vTyp, err := genericReceiver(schema) + if err != nil { + r.ReportError("Union", err.Error()) + return + } + obj[name] = genericDecode(vTyp, decoderOfType(newDecoderContext(d.cfg), schema, vTyp), r) + + *pObj = obj + return + } + + typ := d.types[i] + var newPtr unsafe.Pointer + switch typ.Kind() { + case reflect.Map: + mapType := typ.(*reflect2.UnsafeMapType) + newPtr = mapType.UnsafeMakeMap(1) + + case reflect.Slice: + mapType := typ.(*reflect2.UnsafeSliceType) + newPtr = mapType.UnsafeMakeSlice(1, 1) + + case reflect.Ptr: + elemType := typ.(*reflect2.UnsafePtrType).Elem() + newPtr = elemType.UnsafeNew() + + default: + newPtr = typ.UnsafeNew() + } + + d.decoders[i].Decode(newPtr, r) + *pObj = typ.UnsafeIndirect(newPtr) +} + +func unionResolutionName(schema Schema) string { + name := schemaTypeName(schema) + switch schema.Type() { + case Map: + name += ":" + valSchema := schema.(*MapSchema).Values() + valName := schemaTypeName(valSchema) + + name += valName + + case Array: + name += ":" + itemSchema := schema.(*ArraySchema).Items() + itemName := schemaTypeName(itemSchema) + + name += itemName + } + + return name +} + +func encoderOfResolverUnion(e *encoderContext, schema Schema, typ reflect2.Type) ValEncoder { + union := schema.(*UnionSchema) + + names, err := e.cfg.resolver.Name(typ) + if err != nil { + return &errorEncoder{err: err} + } + + var pos int + for _, name := range names { + if idx := strings.Index(name, ":"); idx > 0 { + name = name[:idx] + } + + schema, pos = union.Types().Get(name) + if schema != nil { + break + } + } + if schema == nil { + return &errorEncoder{err: fmt.Errorf("avro: unknown union type %s", names[0])} + } + + encoder := encoderOfType(e, schema, typ) + + return &unionResolverEncoder{ + pos: pos, + encoder: encoder, + } +} + +type unionResolverEncoder struct { + pos int + encoder ValEncoder +} + +func (e *unionResolverEncoder) Encode(ptr unsafe.Pointer, w *Writer) { + w.WriteInt(int32(e.pos)) + + e.encoder.Encode(ptr, w) +} + +func getUnionSchema(schema *UnionSchema, r *Reader) (int, Schema) { + types := schema.Types() + + idx := int(r.ReadInt()) + if idx < 0 || idx > len(types)-1 { + r.ReportError("decode union type", "unknown union type") + return 0, nil + } + + return idx, types[idx] +} diff --git a/vendor/github.com/hamba/avro/v2/config.go b/vendor/github.com/hamba/avro/v2/config.go new file mode 100644 index 0000000..57b2769 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/config.go @@ -0,0 +1,282 @@ +package avro + +import ( + "errors" + "io" + "sync" + + "github.com/modern-go/reflect2" +) + +const ( + defaultMaxByteSliceSize = 1_048_576 // 1 MiB +) + +// DefaultConfig is the default API. +var DefaultConfig = Config{}.Freeze() + +// Config customises how the codec should behave. +type Config struct { + // TagKey is the struct tag key used when en/decoding structs. + // This defaults to "avro". + TagKey string + + // BlockLength is the length of blocks for maps and arrays. + // This defaults to 100. + BlockLength int + + // DisableBlockSizeHeader disables encoding of an array/map size in bytes. + // Encoded array/map will be prefixed with only the number of elements in + // contrast with default behavior which prefixes them with the number of elements + // and the total number of bytes in the array/map. Both approaches are valid according to the + // Avro specification, however not all decoders support the latter. + DisableBlockSizeHeader bool + + // UnionResolutionError determines if an error will be returned + // when a type cannot be resolved while decoding a union. + UnionResolutionError bool + + // PartialUnionTypeResolution dictates if the union type resolution + // should be attempted even when not all union types are registered. + // When enabled, the underlying type will get resolved if it is registered + // even if other types of the union are not. If resolution fails, logic + // falls back to default union resolution behavior based on the value of + // UnionResolutionError. + PartialUnionTypeResolution bool + + // Disable caching layer for encoders and decoders, forcing them to get rebuilt on every + // call to Marshal() and Unmarshal() + DisableCaching bool + + // MaxByteSliceSize is the maximum size of `bytes` or `string` types the Reader will create, defaulting to 1MiB. + // If this size is exceeded, the Reader returns an error. This can be disabled by setting a negative number. + MaxByteSliceSize int + + // MaxSliceAllocSize is the maximum size that the decoder will allocate, set to the max heap + // allocation size by default. + // If this size is exceeded, the decoder returns an error. + MaxSliceAllocSize int +} + +// Freeze makes the configuration immutable. +func (c Config) Freeze() API { + api := &frozenConfig{ + config: c, + resolver: NewTypeResolver(), + } + + api.readerPool = &sync.Pool{ + New: func() any { + return &Reader{ + cfg: api, + reader: nil, + buf: nil, + head: 0, + tail: 0, + } + }, + } + api.writerPool = &sync.Pool{ + New: func() any { + return &Writer{ + cfg: api, + out: nil, + buf: make([]byte, 0, 512), + Error: nil, + } + }, + } + + return api +} + +// API represents a frozen Config. +type API interface { + // Marshal returns the Avro encoding of v. + Marshal(schema Schema, v any) ([]byte, error) + + // Unmarshal parses the Avro encoded data and stores the result in the value pointed to by v. + // If v is nil or not a pointer, Unmarshal returns an error. + Unmarshal(schema Schema, data []byte, v any) error + + // NewEncoder returns a new encoder that writes to w using schema. + NewEncoder(schema Schema, w io.Writer) *Encoder + + // NewDecoder returns a new decoder that reads from reader r using schema. + NewDecoder(schema Schema, r io.Reader) *Decoder + + // DecoderOf returns the value decoder for a given schema and type. + DecoderOf(schema Schema, typ reflect2.Type) ValDecoder + + // EncoderOf returns the value encoder for a given schema and type. + EncoderOf(schema Schema, tpy reflect2.Type) ValEncoder + + // Register registers names to their types for resolution. All primitive types are pre-registered. + Register(name string, obj any) +} + +type frozenConfig struct { + config Config + + decoderCache sync.Map // map[cacheKey]ValDecoder + encoderCache sync.Map // map[cacheKey]ValEncoder + + readerPool *sync.Pool + writerPool *sync.Pool + + resolver *TypeResolver +} + +func (c *frozenConfig) Marshal(schema Schema, v any) ([]byte, error) { + writer := c.borrowWriter() + defer c.returnWriter(writer) + + writer.WriteVal(schema, v) + if err := writer.Error; err != nil { + return nil, err + } + + result := writer.Buffer() + copied := make([]byte, len(result)) + copy(copied, result) + + return copied, nil +} + +func (c *frozenConfig) borrowWriter() *Writer { + writer := c.writerPool.Get().(*Writer) + writer.Reset(nil) + return writer +} + +func (c *frozenConfig) returnWriter(writer *Writer) { + writer.out = nil + writer.Error = nil + + c.writerPool.Put(writer) +} + +func (c *frozenConfig) Unmarshal(schema Schema, data []byte, v any) error { + reader := c.borrowReader(data) + defer c.returnReader(reader) + + reader.ReadVal(schema, v) + err := reader.Error + + if errors.Is(err, io.EOF) { + return nil + } + + return err +} + +func (c *frozenConfig) borrowReader(data []byte) *Reader { + reader := c.readerPool.Get().(*Reader) + reader.Reset(data) + return reader +} + +func (c *frozenConfig) returnReader(reader *Reader) { + reader.Error = nil + c.readerPool.Put(reader) +} + +func (c *frozenConfig) NewEncoder(schema Schema, w io.Writer) *Encoder { + writer, ok := w.(*Writer) + if !ok { + writer = NewWriter(w, 512, WithWriterConfig(c)) + } + return &Encoder{ + s: schema, + w: writer, + } +} + +func (c *frozenConfig) NewDecoder(schema Schema, r io.Reader) *Decoder { + reader := NewReader(r, 512, WithReaderConfig(c)) + return &Decoder{ + s: schema, + r: reader, + } +} + +func (c *frozenConfig) Register(name string, obj any) { + c.resolver.Register(name, obj) +} + +type cacheKey struct { + fingerprint [32]byte + rtype uintptr +} + +func (c *frozenConfig) addDecoderToCache(fingerprint [32]byte, rtype uintptr, dec ValDecoder) { + if c.config.DisableCaching { + return + } + key := cacheKey{fingerprint: fingerprint, rtype: rtype} + c.decoderCache.Store(key, dec) +} + +func (c *frozenConfig) getDecoderFromCache(fingerprint [32]byte, rtype uintptr) ValDecoder { + if c.config.DisableCaching { + return nil + } + key := cacheKey{fingerprint: fingerprint, rtype: rtype} + if dec, ok := c.decoderCache.Load(key); ok { + return dec.(ValDecoder) + } + + return nil +} + +func (c *frozenConfig) addEncoderToCache(fingerprint [32]byte, rtype uintptr, enc ValEncoder) { + if c.config.DisableCaching { + return + } + key := cacheKey{fingerprint: fingerprint, rtype: rtype} + c.encoderCache.Store(key, enc) +} + +func (c *frozenConfig) getEncoderFromCache(fingerprint [32]byte, rtype uintptr) ValEncoder { + if c.config.DisableCaching { + return nil + } + key := cacheKey{fingerprint: fingerprint, rtype: rtype} + if enc, ok := c.encoderCache.Load(key); ok { + return enc.(ValEncoder) + } + + return nil +} + +func (c *frozenConfig) getTagKey() string { + tagKey := c.config.TagKey + if tagKey == "" { + return "avro" + } + return tagKey +} + +func (c *frozenConfig) getBlockLength() int { + blockSize := c.config.BlockLength + if blockSize <= 0 { + return 100 + } + return blockSize +} + +func (c *frozenConfig) getMaxByteSliceSize() int { + size := c.config.MaxByteSliceSize + if size == 0 { + return defaultMaxByteSliceSize + } + return size +} + +func (c *frozenConfig) getMaxSliceAllocSize() int { + size := c.config.MaxSliceAllocSize + if size > maxAllocSize || size <= 0 { + return maxAllocSize + } + return size +} diff --git a/vendor/github.com/hamba/avro/v2/config_386.go b/vendor/github.com/hamba/avro/v2/config_386.go new file mode 100644 index 0000000..a168fd7 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/config_386.go @@ -0,0 +1,8 @@ +package avro + +import "math" + +// Max allocation size for an array due to the limit in number of bits in a heap address: +// https://github.com/golang/go/blob/7f76c00fc5678fa782708ba8fece63750cb89d03/src/runtime/malloc.go#L190 +// 32-bit systems accept the full 32bit address space +const maxAllocSize = math.MaxInt diff --git a/vendor/github.com/hamba/avro/v2/config_arm.go b/vendor/github.com/hamba/avro/v2/config_arm.go new file mode 100644 index 0000000..a168fd7 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/config_arm.go @@ -0,0 +1,8 @@ +package avro + +import "math" + +// Max allocation size for an array due to the limit in number of bits in a heap address: +// https://github.com/golang/go/blob/7f76c00fc5678fa782708ba8fece63750cb89d03/src/runtime/malloc.go#L190 +// 32-bit systems accept the full 32bit address space +const maxAllocSize = math.MaxInt diff --git a/vendor/github.com/hamba/avro/v2/config_x64.go b/vendor/github.com/hamba/avro/v2/config_x64.go new file mode 100644 index 0000000..cecafd6 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/config_x64.go @@ -0,0 +1,7 @@ +//go:build !386 && !arm + +package avro + +// Max allocation size for an array due to the limit in number of bits in a heap address: +// https://github.com/golang/go/blob/7f76c00fc5678fa782708ba8fece63750cb89d03/src/runtime/malloc.go#L183 +const maxAllocSize = int(1 << 48) diff --git a/vendor/github.com/hamba/avro/v2/converter.go b/vendor/github.com/hamba/avro/v2/converter.go new file mode 100644 index 0000000..cc1f17c --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/converter.go @@ -0,0 +1,34 @@ +package avro + +func createLongConverter(typ Type) func(*Reader) int64 { + switch typ { + case Int: + return func(r *Reader) int64 { return int64(r.ReadInt()) } + default: + return nil + } +} + +func createFloatConverter(typ Type) func(*Reader) float32 { + switch typ { + case Int: + return func(r *Reader) float32 { return float32(r.ReadInt()) } + case Long: + return func(r *Reader) float32 { return float32(r.ReadLong()) } + default: + return nil + } +} + +func createDoubleConverter(typ Type) func(*Reader) float64 { + switch typ { + case Int: + return func(r *Reader) float64 { return float64(r.ReadInt()) } + case Long: + return func(r *Reader) float64 { return float64(r.ReadLong()) } + case Float: + return func(r *Reader) float64 { return float64(r.ReadFloat()) } + default: + return nil + } +} diff --git a/vendor/github.com/hamba/avro/v2/decoder.go b/vendor/github.com/hamba/avro/v2/decoder.go new file mode 100644 index 0000000..0e0ab9d --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/decoder.go @@ -0,0 +1,49 @@ +package avro + +import ( + "io" +) + +// Decoder reads and decodes Avro values from an input stream. +type Decoder struct { + s Schema + r *Reader +} + +// NewDecoder returns a new decoder that reads from reader r using schema s. +func NewDecoder(s string, r io.Reader) (*Decoder, error) { + sch, err := Parse(s) + if err != nil { + return nil, err + } + + return NewDecoderForSchema(sch, r), nil +} + +// NewDecoderForSchema returns a new decoder that reads from r using schema. +func NewDecoderForSchema(schema Schema, reader io.Reader) *Decoder { + return DefaultConfig.NewDecoder(schema, reader) +} + +// Decode reads the next Avro encoded value from its input and stores it in the value pointed to by v. +func (d *Decoder) Decode(v any) error { + if d.r.head == d.r.tail && d.r.reader != nil { + if !d.r.loadMore() { + return io.EOF + } + } + + d.r.ReadVal(d.s, v) + + //nolint:errorlint // Only direct EOF errors should be discarded. + if d.r.Error == io.EOF { + return nil + } + return d.r.Error +} + +// Unmarshal parses the Avro encoded data and stores the result in the value pointed to by v. +// If v is nil or not a pointer, Unmarshal returns an error. +func Unmarshal(schema Schema, data []byte, v any) error { + return DefaultConfig.Unmarshal(schema, data, v) +} diff --git a/vendor/github.com/hamba/avro/v2/doc.go b/vendor/github.com/hamba/avro/v2/doc.go new file mode 100644 index 0000000..5445573 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/doc.go @@ -0,0 +1,4 @@ +// Package avro implements encoding and decoding of Avro as defined by the Avro specification. +// +// See the Avro specification for an understanding of Avro: http://avro.apache.org/docs/current/ +package avro diff --git a/vendor/github.com/hamba/avro/v2/encoder.go b/vendor/github.com/hamba/avro/v2/encoder.go new file mode 100644 index 0000000..faa285e --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/encoder.go @@ -0,0 +1,37 @@ +package avro + +import ( + "io" +) + +// Encoder writes Avro values to an output stream. +type Encoder struct { + s Schema + w *Writer +} + +// NewEncoder returns a new encoder that writes to w using schema s. +func NewEncoder(s string, w io.Writer) (*Encoder, error) { + sch, err := Parse(s) + if err != nil { + return nil, err + } + return NewEncoderForSchema(sch, w), nil +} + +// NewEncoderForSchema returns a new encoder that writes to w using schema. +func NewEncoderForSchema(schema Schema, w io.Writer) *Encoder { + return DefaultConfig.NewEncoder(schema, w) +} + +// Encode writes the Avro encoding of v to the stream. +func (e *Encoder) Encode(v any) error { + e.w.WriteVal(e.s, v) + _ = e.w.Flush() + return e.w.Error +} + +// Marshal returns the Avro encoding of v. +func Marshal(schema Schema, v any) ([]byte, error) { + return DefaultConfig.Marshal(schema, v) +} diff --git a/vendor/github.com/hamba/avro/v2/internal/bytesx/reset.go b/vendor/github.com/hamba/avro/v2/internal/bytesx/reset.go new file mode 100644 index 0000000..0e2485c --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/internal/bytesx/reset.go @@ -0,0 +1,38 @@ +// Package bytesx implements bytes extensions. +package bytesx + +import "io" + +// ResetReader implements the io.Reader reading from a resettable byte slice. +type ResetReader struct { + buf []byte + head int + tail int +} + +// NewResetReader returns a new ResetReader reading from b. +func NewResetReader(b []byte) *ResetReader { + r := &ResetReader{} + r.Reset(b) + + return r +} + +// Read reads bytes into p. +func (r *ResetReader) Read(p []byte) (int, error) { + if r.head == r.tail { + return 0, io.EOF + } + + n := copy(p, r.buf[r.head:]) + r.head += n + + return n, nil +} + +// Reset resets the byte slice being read from. +func (r *ResetReader) Reset(b []byte) { + r.buf = b + r.head = 0 + r.tail = len(b) +} diff --git a/vendor/github.com/hamba/avro/v2/noescape.go b/vendor/github.com/hamba/avro/v2/noescape.go new file mode 100644 index 0000000..8907846 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/noescape.go @@ -0,0 +1,21 @@ +package avro + +import ( + "unsafe" +) + +// noescape hides a pointer from escape analysis. noescape is +// the identity function but escape analysis doesn't think the +// output depends on the input. noescape is inlined and currently +// compiles down to zero instructions. +// USE CAREFULLY! +// +// This function is taken from Go std lib: +// https://github.com/golang/go/blob/master/src/runtime/stubs.go#L178 +// +//nolint:govet,staticcheck +//go:nosplit +func noescape(p unsafe.Pointer) unsafe.Pointer { + x := uintptr(p) + return unsafe.Pointer(x ^ 0) +} diff --git a/vendor/github.com/hamba/avro/v2/noescape.s b/vendor/github.com/hamba/avro/v2/noescape.s new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/noescape.s diff --git a/vendor/github.com/hamba/avro/v2/ocf/codec.go b/vendor/github.com/hamba/avro/v2/ocf/codec.go new file mode 100644 index 0000000..8d0cab2 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/ocf/codec.go @@ -0,0 +1,164 @@ +package ocf + +import ( + "bytes" + "compress/flate" + "encoding/binary" + "errors" + "fmt" + "hash/crc32" + "io" + + "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" +) + +// CodecName represents a compression codec name. +type CodecName string + +// Supported compression codecs. +const ( + Null CodecName = "null" + Deflate CodecName = "deflate" + Snappy CodecName = "snappy" + ZStandard CodecName = "zstandard" +) + +type codecOptions struct { + DeflateCompressionLevel int + ZStandardOptions zstdOptions +} + +type zstdOptions struct { + EOptions []zstd.EOption + DOptions []zstd.DOption +} + +func resolveCodec(name CodecName, codecOpts codecOptions) (Codec, error) { + switch name { + case Null, "": + return &NullCodec{}, nil + + case Deflate: + return &DeflateCodec{compLvl: codecOpts.DeflateCompressionLevel}, nil + + case Snappy: + return &SnappyCodec{}, nil + + case ZStandard: + return newZStandardCodec(codecOpts.ZStandardOptions), nil + + default: + return nil, fmt.Errorf("unknown codec %s", name) + } +} + +// Codec represents a compression codec. +type Codec interface { + // Decode decodes the given bytes. + Decode([]byte) ([]byte, error) + // Encode encodes the given bytes. + Encode([]byte) []byte +} + +// NullCodec is a no op codec. +type NullCodec struct{} + +// Decode decodes the given bytes. +func (*NullCodec) Decode(b []byte) ([]byte, error) { + return b, nil +} + +// Encode encodes the given bytes. +func (*NullCodec) Encode(b []byte) []byte { + return b +} + +// DeflateCodec is a flate compression codec. +type DeflateCodec struct { + compLvl int +} + +// Decode decodes the given bytes. +func (c *DeflateCodec) Decode(b []byte) ([]byte, error) { + r := flate.NewReader(bytes.NewBuffer(b)) + data, err := io.ReadAll(r) + if err != nil { + _ = r.Close() + return nil, err + } + _ = r.Close() + + return data, nil +} + +// Encode encodes the given bytes. +func (c *DeflateCodec) Encode(b []byte) []byte { + data := bytes.NewBuffer(make([]byte, 0, len(b))) + + w, _ := flate.NewWriter(data, c.compLvl) + _, _ = w.Write(b) + _ = w.Close() + + return data.Bytes() +} + +// SnappyCodec is a snappy compression codec. +type SnappyCodec struct{} + +// Decode decodes the given bytes. +func (*SnappyCodec) Decode(b []byte) ([]byte, error) { + l := len(b) + if l < 5 { + return nil, errors.New("block does not contain snappy checksum") + } + + dst, err := snappy.Decode(nil, b[:l-4]) + if err != nil { + return nil, err + } + + crc := binary.BigEndian.Uint32(b[l-4:]) + if crc32.ChecksumIEEE(dst) != crc { + return nil, errors.New("snappy checksum mismatch") + } + + return dst, nil +} + +// Encode encodes the given bytes. +func (*SnappyCodec) Encode(b []byte) []byte { + dst := snappy.Encode(nil, b) + + dst = append(dst, 0, 0, 0, 0) + binary.BigEndian.PutUint32(dst[len(dst)-4:], crc32.ChecksumIEEE(b)) + + return dst +} + +// ZStandardCodec is a zstandard compression codec. +type ZStandardCodec struct { + decoder *zstd.Decoder + encoder *zstd.Encoder +} + +func newZStandardCodec(opts zstdOptions) *ZStandardCodec { + decoder, _ := zstd.NewReader(nil, opts.DOptions...) + encoder, _ := zstd.NewWriter(nil, opts.EOptions...) + return &ZStandardCodec{ + decoder: decoder, + encoder: encoder, + } +} + +// Decode decodes the given bytes. +func (zstdCodec *ZStandardCodec) Decode(b []byte) ([]byte, error) { + defer func() { _ = zstdCodec.decoder.Reset(nil) }() + return zstdCodec.decoder.DecodeAll(b, nil) +} + +// Encode encodes the given bytes. +func (zstdCodec *ZStandardCodec) Encode(b []byte) []byte { + defer zstdCodec.encoder.Reset(nil) + return zstdCodec.encoder.EncodeAll(b, nil) +} diff --git a/vendor/github.com/hamba/avro/v2/ocf/ocf.go b/vendor/github.com/hamba/avro/v2/ocf/ocf.go new file mode 100644 index 0000000..b3d7408 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/ocf/ocf.go @@ -0,0 +1,554 @@ +// Package ocf implements encoding and decoding of Avro Object Container Files as defined by the Avro specification. +// +// See the Avro specification for an understanding of Avro: http://avro.apache.org/docs/current/ +package ocf + +import ( + "bytes" + "compress/flate" + "crypto/rand" + "encoding/json" + "errors" + "fmt" + "io" + "os" + + "github.com/hamba/avro/v2" + "github.com/hamba/avro/v2/internal/bytesx" + "github.com/klauspost/compress/zstd" +) + +const ( + schemaKey = "avro.schema" + codecKey = "avro.codec" +) + +var ( + magicBytes = [4]byte{'O', 'b', 'j', 1} + + // HeaderSchema is the Avro schema of a container file header. + HeaderSchema = avro.MustParse(`{ + "type": "record", + "name": "org.apache.avro.file.Header", + "fields": [ + {"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}}, + {"name": "meta", "type": {"type": "map", "values": "bytes"}}, + {"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}} + ] +}`) + + // DefaultSchemaMarshaler calls the schema's String() method, to produce + // a "canonical" schema. + DefaultSchemaMarshaler = defaultMarshalSchema + // FullSchemaMarshaler calls the schema's MarshalJSON() method, to produce + // a schema with all details preserved. The "canonical" schema returned by + // the default marshaler does not preserve a type's extra properties. + FullSchemaMarshaler = fullMarshalSchema +) + +// Header represents an Avro container file header. +type Header struct { + Magic [4]byte `avro:"magic"` + Meta map[string][]byte `avro:"meta"` + Sync [16]byte `avro:"sync"` +} + +type decoderConfig struct { + DecoderConfig avro.API + SchemaCache *avro.SchemaCache + CodecOptions codecOptions +} + +// DecoderFunc represents a configuration function for Decoder. +type DecoderFunc func(cfg *decoderConfig) + +// WithDecoderConfig sets the value decoder config on the OCF decoder. +func WithDecoderConfig(wCfg avro.API) DecoderFunc { + return func(cfg *decoderConfig) { + cfg.DecoderConfig = wCfg + } +} + +// WithDecoderSchemaCache sets the schema cache for the decoder. +// If not specified, defaults to avro.DefaultSchemaCache. +func WithDecoderSchemaCache(cache *avro.SchemaCache) DecoderFunc { + return func(cfg *decoderConfig) { + cfg.SchemaCache = cache + } +} + +// WithZStandardDecoderOptions sets the options for the ZStandard decoder. +func WithZStandardDecoderOptions(opts ...zstd.DOption) DecoderFunc { + return func(cfg *decoderConfig) { + cfg.CodecOptions.ZStandardOptions.DOptions = append(cfg.CodecOptions.ZStandardOptions.DOptions, opts...) + } +} + +// Decoder reads and decodes Avro values from a container file. +type Decoder struct { + reader *avro.Reader + resetReader *bytesx.ResetReader + decoder *avro.Decoder + meta map[string][]byte + sync [16]byte + schema avro.Schema + + codec Codec + + count int64 +} + +// NewDecoder returns a new decoder that reads from reader r. +func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) { + cfg := decoderConfig{ + DecoderConfig: avro.DefaultConfig, + SchemaCache: avro.DefaultSchemaCache, + CodecOptions: codecOptions{ + DeflateCompressionLevel: flate.DefaultCompression, + }, + } + for _, opt := range opts { + opt(&cfg) + } + + reader := avro.NewReader(r, 1024) + + h, err := readHeader(reader, cfg.SchemaCache, cfg.CodecOptions) + if err != nil { + return nil, fmt.Errorf("decoder: %w", err) + } + + decReader := bytesx.NewResetReader([]byte{}) + + return &Decoder{ + reader: reader, + resetReader: decReader, + decoder: cfg.DecoderConfig.NewDecoder(h.Schema, decReader), + meta: h.Meta, + sync: h.Sync, + codec: h.Codec, + schema: h.Schema, + }, nil +} + +// Metadata returns the header metadata. +func (d *Decoder) Metadata() map[string][]byte { + return d.meta +} + +// Schema returns the schema that was parsed from the file's metadata +// and that is used to interpret the file's contents. +func (d *Decoder) Schema() avro.Schema { + return d.schema +} + +// HasNext determines if there is another value to read. +func (d *Decoder) HasNext() bool { + if d.count <= 0 { + count := d.readBlock() + d.count = count + } + + if d.reader.Error != nil { + return false + } + + return d.count > 0 +} + +// Decode reads the next Avro encoded value from its input and stores it in the value pointed to by v. +func (d *Decoder) Decode(v any) error { + if d.count <= 0 { + return errors.New("decoder: no data found, call HasNext first") + } + + d.count-- + + return d.decoder.Decode(v) +} + +// Error returns the last reader error. +func (d *Decoder) Error() error { + if errors.Is(d.reader.Error, io.EOF) { + return nil + } + + return d.reader.Error +} + +func (d *Decoder) readBlock() int64 { + _ = d.reader.Peek() + if errors.Is(d.reader.Error, io.EOF) { + // There is no next block + return 0 + } + + count := d.reader.ReadLong() + size := d.reader.ReadLong() + + // Read the blocks data + switch { + case count > 0: + data := make([]byte, size) + d.reader.Read(data) + + data, err := d.codec.Decode(data) + if err != nil { + d.reader.Error = err + } + + d.resetReader.Reset(data) + + case size > 0: + // Skip the block data when count is 0 + data := make([]byte, size) + d.reader.Read(data) + } + + // Read the sync. + var sync [16]byte + d.reader.Read(sync[:]) + if d.sync != sync && !errors.Is(d.reader.Error, io.EOF) { + d.reader.Error = errors.New("decoder: invalid block") + } + + return count +} + +type encoderConfig struct { + BlockLength int + CodecName CodecName + CodecOptions codecOptions + Metadata map[string][]byte + Sync [16]byte + EncodingConfig avro.API + SchemaCache *avro.SchemaCache + SchemaMarshaler func(avro.Schema) ([]byte, error) +} + +// EncoderFunc represents a configuration function for Encoder. +type EncoderFunc func(cfg *encoderConfig) + +// WithBlockLength sets the block length on the encoder. +func WithBlockLength(length int) EncoderFunc { + return func(cfg *encoderConfig) { + cfg.BlockLength = length + } +} + +// WithCodec sets the compression codec on the encoder. +func WithCodec(codec CodecName) EncoderFunc { + return func(cfg *encoderConfig) { + cfg.CodecName = codec + } +} + +// WithCompressionLevel sets the compression codec to deflate and +// the compression level on the encoder. +func WithCompressionLevel(compLvl int) EncoderFunc { + return func(cfg *encoderConfig) { + cfg.CodecName = Deflate + cfg.CodecOptions.DeflateCompressionLevel = compLvl + } +} + +// WithZStandardEncoderOptions sets the options for the ZStandard encoder. +func WithZStandardEncoderOptions(opts ...zstd.EOption) EncoderFunc { + return func(cfg *encoderConfig) { + cfg.CodecOptions.ZStandardOptions.EOptions = append(cfg.CodecOptions.ZStandardOptions.EOptions, opts...) + } +} + +// WithMetadata sets the metadata on the encoder header. +func WithMetadata(meta map[string][]byte) EncoderFunc { + return func(cfg *encoderConfig) { + cfg.Metadata = meta + } +} + +// WithEncoderSchemaCache sets the schema cache for the encoder. +// If not specified, defaults to avro.DefaultSchemaCache. +func WithEncoderSchemaCache(cache *avro.SchemaCache) EncoderFunc { + return func(cfg *encoderConfig) { + cfg.SchemaCache = cache + } +} + +// WithSchemaMarshaler sets the schema marshaler for the encoder. +// If not specified, defaults to DefaultSchemaMarshaler. +func WithSchemaMarshaler(m func(avro.Schema) ([]byte, error)) EncoderFunc { + return func(cfg *encoderConfig) { + cfg.SchemaMarshaler = m + } +} + +// WithSyncBlock sets the sync block. +func WithSyncBlock(sync [16]byte) EncoderFunc { + return func(cfg *encoderConfig) { + cfg.Sync = sync + } +} + +// WithEncodingConfig sets the value encoder config on the OCF encoder. +func WithEncodingConfig(wCfg avro.API) EncoderFunc { + return func(cfg *encoderConfig) { + cfg.EncodingConfig = wCfg + } +} + +// Encoder writes Avro container file to an output stream. +type Encoder struct { + writer *avro.Writer + buf *bytes.Buffer + encoder *avro.Encoder + sync [16]byte + + codec Codec + + blockLength int + count int +} + +// NewEncoder returns a new encoder that writes to w using schema s. +// +// If the writer is an existing ocf file, it will append data using the +// existing schema. +func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) { + cfg := computeEncoderConfig(opts) + schema, err := avro.ParseWithCache(s, "", cfg.SchemaCache) + if err != nil { + return nil, err + } + return newEncoder(schema, w, cfg) +} + +// NewEncoderWithSchema returns a new encoder that writes to w using schema s. +// +// If the writer is an existing ocf file, it will append data using the +// existing schema. +func NewEncoderWithSchema(schema avro.Schema, w io.Writer, opts ...EncoderFunc) (*Encoder, error) { + return newEncoder(schema, w, computeEncoderConfig(opts)) +} + +func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, error) { + switch file := w.(type) { + case nil: + return nil, errors.New("writer cannot be nil") + case *os.File: + info, err := file.Stat() + if err != nil { + return nil, err + } + + if info.Size() > 0 { + reader := avro.NewReader(file, 1024) + h, err := readHeader(reader, cfg.SchemaCache, cfg.CodecOptions) + if err != nil { + return nil, err + } + if err = skipToEnd(reader, h.Sync); err != nil { + return nil, err + } + + writer := avro.NewWriter(w, 512, avro.WithWriterConfig(cfg.EncodingConfig)) + buf := &bytes.Buffer{} + e := &Encoder{ + writer: writer, + buf: buf, + encoder: cfg.EncodingConfig.NewEncoder(h.Schema, buf), + sync: h.Sync, + codec: h.Codec, + blockLength: cfg.BlockLength, + } + return e, nil + } + } + + schemaJSON, err := cfg.SchemaMarshaler(schema) + if err != nil { + return nil, err + } + + cfg.Metadata[schemaKey] = schemaJSON + cfg.Metadata[codecKey] = []byte(cfg.CodecName) + header := Header{ + Magic: magicBytes, + Meta: cfg.Metadata, + } + header.Sync = cfg.Sync + if header.Sync == [16]byte{} { + _, _ = rand.Read(header.Sync[:]) + } + + codec, err := resolveCodec(cfg.CodecName, cfg.CodecOptions) + if err != nil { + return nil, err + } + + writer := avro.NewWriter(w, 512, avro.WithWriterConfig(cfg.EncodingConfig)) + writer.WriteVal(HeaderSchema, header) + if err = writer.Flush(); err != nil { + return nil, err + } + + buf := &bytes.Buffer{} + e := &Encoder{ + writer: writer, + buf: buf, + encoder: cfg.EncodingConfig.NewEncoder(schema, buf), + sync: header.Sync, + codec: codec, + blockLength: cfg.BlockLength, + } + return e, nil +} + +func computeEncoderConfig(opts []EncoderFunc) encoderConfig { + cfg := encoderConfig{ + BlockLength: 100, + CodecName: Null, + CodecOptions: codecOptions{ + DeflateCompressionLevel: flate.DefaultCompression, + }, + Metadata: map[string][]byte{}, + EncodingConfig: avro.DefaultConfig, + SchemaCache: avro.DefaultSchemaCache, + SchemaMarshaler: DefaultSchemaMarshaler, + } + for _, opt := range opts { + opt(&cfg) + } + return cfg +} + +// Write v to the internal buffer. This method skips the internal encoder and +// therefore the caller is responsible for encoding the bytes. No error will be +// thrown if the bytes does not conform to the schema given to NewEncoder, but +// the final ocf data will be corrupted. +func (e *Encoder) Write(p []byte) (n int, err error) { + n, err = e.buf.Write(p) + if err != nil { + return n, err + } + + e.count++ + if e.count >= e.blockLength { + if err = e.writerBlock(); err != nil { + return n, err + } + } + + return n, e.writer.Error +} + +// Encode writes the Avro encoding of v to the stream. +func (e *Encoder) Encode(v any) error { + if err := e.encoder.Encode(v); err != nil { + return err + } + + e.count++ + if e.count >= e.blockLength { + if err := e.writerBlock(); err != nil { + return err + } + } + + return e.writer.Error +} + +// Flush flushes the underlying writer. +func (e *Encoder) Flush() error { + if e.count == 0 { + return nil + } + + if err := e.writerBlock(); err != nil { + return err + } + + return e.writer.Error +} + +// Close closes the encoder, flushing the writer. +func (e *Encoder) Close() error { + return e.Flush() +} + +func (e *Encoder) writerBlock() error { + e.writer.WriteLong(int64(e.count)) + + b := e.codec.Encode(e.buf.Bytes()) + + e.writer.WriteLong(int64(len(b))) + _, _ = e.writer.Write(b) + + _, _ = e.writer.Write(e.sync[:]) + + e.count = 0 + e.buf.Reset() + return e.writer.Flush() +} + +type ocfHeader struct { + Schema avro.Schema + Codec Codec + Meta map[string][]byte + Sync [16]byte +} + +func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache, codecOpts codecOptions) (*ocfHeader, error) { + var h Header + reader.ReadVal(HeaderSchema, &h) + if reader.Error != nil { + return nil, fmt.Errorf("unexpected error: %w", reader.Error) + } + + if h.Magic != magicBytes { + return nil, errors.New("invalid avro file") + } + schema, err := avro.ParseBytesWithCache(h.Meta[schemaKey], "", schemaCache) + if err != nil { + return nil, err + } + + codec, err := resolveCodec(CodecName(h.Meta[codecKey]), codecOpts) + if err != nil { + return nil, err + } + + return &ocfHeader{ + Schema: schema, + Codec: codec, + Meta: h.Meta, + Sync: h.Sync, + }, nil +} + +func skipToEnd(reader *avro.Reader, sync [16]byte) error { + for { + _ = reader.ReadLong() + if errors.Is(reader.Error, io.EOF) { + return nil + } + size := reader.ReadLong() + reader.SkipNBytes(int(size)) + if reader.Error != nil { + return reader.Error + } + + var synMark [16]byte + reader.Read(synMark[:]) + if sync != synMark && !errors.Is(reader.Error, io.EOF) { + reader.Error = errors.New("invalid block") + } + } +} + +func defaultMarshalSchema(schema avro.Schema) ([]byte, error) { + return []byte(schema.String()), nil +} + +func fullMarshalSchema(schema avro.Schema) ([]byte, error) { + return json.Marshal(schema) +} diff --git a/vendor/github.com/hamba/avro/v2/pkg/crc64/crc64.go b/vendor/github.com/hamba/avro/v2/pkg/crc64/crc64.go new file mode 100644 index 0000000..6ca8205 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/pkg/crc64/crc64.go @@ -0,0 +1,154 @@ +// Package crc64 implements the Avro CRC-64 checksum. +// See https://avro.apache.org/docs/current/spec.html#schema_fingerprints for information. +package crc64 + +import ( + "hash" +) + +func init() { + buildTable() +} + +// Size is the of a CRC-64 checksum in bytes. +const Size = 8 + +// ByteOrder denotes how integers are encoded into bytes. The ByteOrder +// interface in encoding/binary cancels some optimizations, so use a more +// direct implementation. +type ByteOrder int + +// ByteOrder constants. +const ( + LittleEndian ByteOrder = iota + BigEndian +) + +// Empty is the empty checksum. +const Empty = 0xc15d213aa4d7a795 + +// Table is a 256-word table representing the polynomial for efficient processing. +type Table [256]uint64 + +func makeTable() *Table { + t := new(Table) + for i := range 256 { + fp := uint64(i) + for range 8 { + fp = (fp >> 1) ^ (Empty & -(fp & 1)) + } + t[i] = fp + } + return t +} + +var crc64Table *Table + +func buildTable() { + crc64Table = makeTable() +} + +type digest struct { + crc uint64 + tab *Table + byteOrder ByteOrder +} + +// New creates a new hash.Hash64 computing the Avro CRC-64 checksum. +// Its Sum method will lay the value out in big-endian byte order. +func New() hash.Hash64 { + return newDigest(BigEndian) +} + +// NewWithByteOrder creates a new hash.Hash64 computing the Avro CRC-64 +// checksum. Its Sum method will lay the value out in specified byte order. +func NewWithByteOrder(byteOrder ByteOrder) hash.Hash64 { + return newDigest(byteOrder) +} + +func newDigest(byteOrder ByteOrder) *digest { + return &digest{ + crc: Empty, + tab: crc64Table, + byteOrder: byteOrder, + } +} + +// Size returns the bytes size of the checksum. +func (d *digest) Size() int { + return Size +} + +// BlockSize returns the block size of the checksum. +func (d *digest) BlockSize() int { + return 1 +} + +// Reset resets the hash instance. +func (d *digest) Reset() { + d.crc = Empty +} + +// Write accumulatively adds the given data to the checksum. +func (d *digest) Write(p []byte) (n int, err error) { + for i := range p { + d.crc = (d.crc >> 8) ^ d.tab[(int)(byte(d.crc)^p[i])&0xff] + } + + return len(p), nil +} + +// Sum64 returns the checksum as a uint64. +func (d *digest) Sum64() uint64 { + return d.crc +} + +// Sum returns the checksum as a byte slice, using the given byte slice. +func (d *digest) Sum(in []byte) []byte { + b := d.sumBytes() + return append(in, b[:]...) +} + +// sumBytes returns the checksum as a byte array in digest byte order. +func (d *digest) sumBytes() [Size]byte { + s := d.Sum64() + + switch d.byteOrder { + case LittleEndian: + return [Size]byte{ + byte(s), + byte(s >> 8), + byte(s >> 16), + byte(s >> 24), + byte(s >> 32), + byte(s >> 40), + byte(s >> 48), + byte(s >> 56), + } + case BigEndian: + return [Size]byte{ + byte(s >> 56), + byte(s >> 48), + byte(s >> 40), + byte(s >> 32), + byte(s >> 24), + byte(s >> 16), + byte(s >> 8), + byte(s), + } + } + panic("unknown byte order") +} + +// Sum returns the CRC64 checksum of the data, in big-endian byte order. +func Sum(data []byte) [Size]byte { + return SumWithByteOrder(data, BigEndian) +} + +// SumWithByteOrder returns the CRC64 checksum of the data, in specified byte +// order. +func SumWithByteOrder(data []byte, byteOrder ByteOrder) [Size]byte { + d := newDigest(byteOrder) + _, _ = d.Write(data) + return d.sumBytes() +} diff --git a/vendor/github.com/hamba/avro/v2/protocol.go b/vendor/github.com/hamba/avro/v2/protocol.go new file mode 100644 index 0000000..3a62baa --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/protocol.go @@ -0,0 +1,377 @@ +package avro + +import ( + "crypto/md5" + "encoding/hex" + "errors" + "fmt" + "os" + + jsoniter "github.com/json-iterator/go" + "github.com/mitchellh/mapstructure" +) + +var ( + protocolReserved = []string{"doc", "types", "messages", "protocol", "namespace"} + messageReserved = []string{"doc", "response", "request", "errors", "one-way"} +) + +type protocolConfig struct { + doc string + props map[string]any +} + +// ProtocolOption is a function that sets a protocol option. +type ProtocolOption func(*protocolConfig) + +// WithProtoDoc sets the doc on a protocol. +func WithProtoDoc(doc string) ProtocolOption { + return func(opts *protocolConfig) { + opts.doc = doc + } +} + +// WithProtoProps sets the properties on a protocol. +func WithProtoProps(props map[string]any) ProtocolOption { + return func(opts *protocolConfig) { + opts.props = props + } +} + +// Protocol is an Avro protocol. +type Protocol struct { + name + properties + + types []NamedSchema + messages map[string]*Message + + doc string + + hash string +} + +// NewProtocol creates a protocol instance. +func NewProtocol( + name, namepsace string, + types []NamedSchema, + messages map[string]*Message, + opts ...ProtocolOption, +) (*Protocol, error) { + var cfg protocolConfig + for _, opt := range opts { + opt(&cfg) + } + + n, err := newName(name, namepsace, nil) + if err != nil { + return nil, err + } + + p := &Protocol{ + name: n, + properties: newProperties(cfg.props, protocolReserved), + types: types, + messages: messages, + doc: cfg.doc, + } + + b := md5.Sum([]byte(p.String())) + p.hash = hex.EncodeToString(b[:]) + + return p, nil +} + +// Message returns a message with the given name or nil. +func (p *Protocol) Message(name string) *Message { + return p.messages[name] +} + +// Doc returns the protocol doc. +func (p *Protocol) Doc() string { + return p.doc +} + +// Hash returns the MD5 hash of the protocol. +func (p *Protocol) Hash() string { + return p.hash +} + +// Types returns the types of the protocol. +func (p *Protocol) Types() []NamedSchema { + return p.types +} + +// String returns the canonical form of the protocol. +func (p *Protocol) String() string { + types := "" + for _, f := range p.types { + types += f.String() + "," + } + if len(types) > 0 { + types = types[:len(types)-1] + } + + messages := "" + for k, m := range p.messages { + messages += `"` + k + `":` + m.String() + "," + } + if len(messages) > 0 { + messages = messages[:len(messages)-1] + } + + return `{"protocol":"` + p.Name() + + `","namespace":"` + p.Namespace() + + `","types":[` + types + `],"messages":{` + messages + `}}` +} + +// Message is an Avro protocol message. +type Message struct { + properties + + req *RecordSchema + resp Schema + errs *UnionSchema + oneWay bool + + doc string +} + +// NewMessage creates a protocol message instance. +func NewMessage(req *RecordSchema, resp Schema, errors *UnionSchema, oneWay bool, opts ...ProtocolOption) *Message { + var cfg protocolConfig + for _, opt := range opts { + opt(&cfg) + } + + return &Message{ + properties: newProperties(cfg.props, messageReserved), + req: req, + resp: resp, + errs: errors, + oneWay: oneWay, + doc: cfg.doc, + } +} + +// Request returns the message request schema. +func (m *Message) Request() *RecordSchema { + return m.req +} + +// Response returns the message response schema. +func (m *Message) Response() Schema { + return m.resp +} + +// Errors returns the message errors union schema. +func (m *Message) Errors() *UnionSchema { + return m.errs +} + +// OneWay determines of the message is a one way message. +func (m *Message) OneWay() bool { + return m.oneWay +} + +// Doc returns the message doc. +func (m *Message) Doc() string { + return m.doc +} + +// String returns the canonical form of the message. +func (m *Message) String() string { + fields := "" + for _, f := range m.req.fields { + fields += f.String() + "," + } + if len(fields) > 0 { + fields = fields[:len(fields)-1] + } + + str := `{"request":[` + fields + `]` + if m.resp != nil { + str += `,"response":` + m.resp.String() + } + if m.errs != nil && len(m.errs.Types()) > 1 { + errs, _ := NewUnionSchema(m.errs.Types()[1:]) + str += `,"errors":` + errs.String() + } + str += "}" + return str +} + +// ParseProtocolFile parses an Avro protocol from a file. +func ParseProtocolFile(path string) (*Protocol, error) { + s, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + return ParseProtocol(string(s)) +} + +// MustParseProtocol parses an Avro protocol, panicing if there is an error. +func MustParseProtocol(protocol string) *Protocol { + parsed, err := ParseProtocol(protocol) + if err != nil { + panic(err) + } + + return parsed +} + +// ParseProtocol parses an Avro protocol. +func ParseProtocol(protocol string) (*Protocol, error) { + cache := &SchemaCache{} + + var m map[string]any + if err := jsoniter.Unmarshal([]byte(protocol), &m); err != nil { + return nil, err + } + + seen := seenCache{} + return parseProtocol(m, seen, cache) +} + +type protocol struct { + Protocol string `mapstructure:"protocol"` + Namespace string `mapstructure:"namespace"` + Doc string `mapstructure:"doc"` + Types []any `mapstructure:"types"` + Messages map[string]map[string]any `mapstructure:"messages"` + Props map[string]any `mapstructure:",remain"` +} + +func parseProtocol(m map[string]any, seen seenCache, cache *SchemaCache) (*Protocol, error) { + var ( + p protocol + meta mapstructure.Metadata + ) + if err := decodeMap(m, &p, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding protocol: %w", err) + } + + if err := checkParsedName(p.Protocol); err != nil { + return nil, err + } + + var ( + types []NamedSchema + err error + ) + if len(p.Types) > 0 { + types, err = parseProtocolTypes(p.Namespace, p.Types, seen, cache) + if err != nil { + return nil, err + } + } + + messages := map[string]*Message{} + if len(p.Messages) > 0 { + for k, msg := range p.Messages { + message, err := parseMessage(p.Namespace, msg, seen, cache) + if err != nil { + return nil, err + } + + messages[k] = message + } + } + + return NewProtocol(p.Protocol, p.Namespace, types, messages, WithProtoDoc(p.Doc), WithProtoProps(p.Props)) +} + +func parseProtocolTypes(namespace string, types []any, seen seenCache, cache *SchemaCache) ([]NamedSchema, error) { + ts := make([]NamedSchema, len(types)) + for i, typ := range types { + schema, err := parseType(namespace, typ, seen, cache) + if err != nil { + return nil, err + } + + namedSchema, ok := schema.(NamedSchema) + if !ok { + return nil, errors.New("avro: protocol types must be named schemas") + } + + ts[i] = namedSchema + } + + return ts, nil +} + +type message struct { + Doc string `mapstructure:"doc"` + Request []map[string]any `mapstructure:"request"` + Response any `mapstructure:"response"` + Errors []any `mapstructure:"errors"` + OneWay bool `mapstructure:"one-way"` + Props map[string]any `mapstructure:",remain"` +} + +func parseMessage(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (*Message, error) { + var ( + msg message + meta mapstructure.Metadata + ) + if err := decodeMap(m, &msg, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding message: %w", err) + } + + fields := make([]*Field, len(msg.Request)) + for i, f := range msg.Request { + field, err := parseField(namespace, f, seen, cache) + if err != nil { + return nil, err + } + fields[i] = field + } + request := &RecordSchema{ + name: name{}, + properties: properties{}, + fields: fields, + } + + var response Schema + if msg.Response != nil { + schema, err := parseType(namespace, msg.Response, seen, cache) + if err != nil { + return nil, err + } + + if schema.Type() != Null { + response = schema + } + } + + types := []Schema{NewPrimitiveSchema(String, nil)} + if len(msg.Errors) > 0 { + for _, e := range msg.Errors { + schema, err := parseType(namespace, e, seen, cache) + if err != nil { + return nil, err + } + + if rec, ok := schema.(*RecordSchema); ok && !rec.IsError() { + return nil, errors.New("avro: errors record schema must be of type error") + } + + types = append(types, schema) + } + } + errs, err := NewUnionSchema(types) + if err != nil { + return nil, err + } + + oneWay := msg.OneWay + if hasKey(meta.Keys, "one-way") && oneWay && (len(errs.Types()) > 1 || response != nil) { + return nil, errors.New("avro: one-way messages cannot not have a response or errors") + } + if !oneWay && len(errs.Types()) <= 1 && response == nil { + oneWay = true + } + + return NewMessage(request, response, errs, oneWay, WithProtoDoc(msg.Doc), WithProtoProps(msg.Props)), nil +} diff --git a/vendor/github.com/hamba/avro/v2/reader.go b/vendor/github.com/hamba/avro/v2/reader.go new file mode 100644 index 0000000..3c11b18 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/reader.go @@ -0,0 +1,324 @@ +package avro + +import ( + "errors" + "fmt" + "io" + "strings" + "unsafe" +) + +const ( + maxIntBufSize = 5 + maxLongBufSize = 10 +) + +// ReaderFunc is a function used to customize the Reader. +type ReaderFunc func(r *Reader) + +// WithReaderConfig specifies the configuration to use with a reader. +func WithReaderConfig(cfg API) ReaderFunc { + return func(r *Reader) { + r.cfg = cfg.(*frozenConfig) + } +} + +// Reader is an Avro specific io.Reader. +type Reader struct { + cfg *frozenConfig + reader io.Reader + slab []byte + buf []byte + head int + tail int + Error error +} + +// NewReader creates a new Reader. +func NewReader(r io.Reader, bufSize int, opts ...ReaderFunc) *Reader { + reader := &Reader{ + cfg: DefaultConfig.(*frozenConfig), + reader: r, + buf: make([]byte, bufSize), + head: 0, + tail: 0, + } + + for _, opt := range opts { + opt(reader) + } + + return reader +} + +// Reset resets a Reader with a new byte array attached. +func (r *Reader) Reset(b []byte) *Reader { + r.reader = nil + r.buf = b + r.head = 0 + r.tail = len(b) + return r +} + +// ReportError record an error in iterator instance with current position. +func (r *Reader) ReportError(operation, msg string) { + if r.Error != nil && !errors.Is(r.Error, io.EOF) { + return + } + + r.Error = fmt.Errorf("avro: %s: %s", operation, msg) +} + +func (r *Reader) loadMore() bool { + if r.reader == nil { + if r.Error == nil { + r.head = r.tail + r.Error = io.EOF + } + return false + } + + for { + n, err := r.reader.Read(r.buf) + if n == 0 { + if err != nil { + if r.Error == nil { + r.Error = err + } + return false + } + continue + } + + r.head = 0 + r.tail = n + return true + } +} + +func (r *Reader) readByte() byte { + if r.head == r.tail { + if !r.loadMore() { + r.Error = io.ErrUnexpectedEOF + return 0 + } + } + + b := r.buf[r.head] + r.head++ + + return b +} + +// Peek returns the next byte in the buffer. +// The Reader Error will be io.EOF if no next byte exists. +func (r *Reader) Peek() byte { + if r.head == r.tail { + if !r.loadMore() { + return 0 + } + } + return r.buf[r.head] +} + +// Read reads data into the given bytes. +func (r *Reader) Read(b []byte) { + size := len(b) + read := 0 + + for read < size { + if r.head == r.tail { + if !r.loadMore() { + r.Error = io.ErrUnexpectedEOF + return + } + } + + n := copy(b[read:], r.buf[r.head:r.tail]) + r.head += n + read += n + } +} + +// ReadBool reads a Bool from the Reader. +func (r *Reader) ReadBool() bool { + b := r.readByte() + + if b != 0 && b != 1 { + r.ReportError("ReadBool", "invalid bool") + } + return b == 1 +} + +// ReadInt reads an Int from the Reader. +// +//nolint:dupl +func (r *Reader) ReadInt() int32 { + if r.Error != nil { + return 0 + } + + var ( + n int + v uint32 + s uint8 + ) + + for { + tail := r.tail + if r.tail-r.head+n > maxIntBufSize { + tail = r.head + maxIntBufSize - n + } + + // Consume what it is in the buffer. + var i int + for _, b := range r.buf[r.head:tail] { + v |= uint32(b&0x7f) << s + if b&0x80 == 0 { + r.head += i + 1 + return int32((v >> 1) ^ -(v & 1)) + } + s += 7 + i++ + } + if n >= maxIntBufSize { + r.ReportError("ReadInt", "int overflow") + return 0 + } + r.head += i + n += i + + // We ran out of buffer and are not at the end of the int, + // Read more into the buffer. + if !r.loadMore() { + r.Error = fmt.Errorf("reading int: %w", r.Error) + return 0 + } + } +} + +// ReadLong reads a Long from the Reader. +// +//nolint:dupl +func (r *Reader) ReadLong() int64 { + if r.Error != nil { + return 0 + } + + var ( + n int + v uint64 + s uint8 + ) + + for { + tail := r.tail + if r.tail-r.head+n > maxLongBufSize { + tail = r.head + maxLongBufSize - n + } + + // Consume what it is in the buffer. + var i int + for _, b := range r.buf[r.head:tail] { + v |= uint64(b&0x7f) << s + if b&0x80 == 0 { + r.head += i + 1 + return int64((v >> 1) ^ -(v & 1)) + } + s += 7 + i++ + } + if n >= maxLongBufSize { + r.ReportError("ReadLong", "int overflow") + return 0 + } + r.head += i + n += i + + // We ran out of buffer and are not at the end of the long, + // Read more into the buffer. + if !r.loadMore() { + r.Error = fmt.Errorf("reading long: %w", r.Error) + return 0 + } + } +} + +// ReadFloat reads a Float from the Reader. +func (r *Reader) ReadFloat() float32 { + var buf [4]byte + r.Read(buf[:]) + + float := *(*float32)(unsafe.Pointer(&buf[0])) + return float +} + +// ReadDouble reads a Double from the Reader. +func (r *Reader) ReadDouble() float64 { + var buf [8]byte + r.Read(buf[:]) + + float := *(*float64)(unsafe.Pointer(&buf[0])) + return float +} + +// ReadBytes reads Bytes from the Reader. +func (r *Reader) ReadBytes() []byte { + return r.readBytes("bytes") +} + +// ReadString reads a String from the Reader. +func (r *Reader) ReadString() string { + b := r.readBytes("string") + if len(b) == 0 { + return "" + } + + return *(*string)(unsafe.Pointer(&b)) +} + +func (r *Reader) readBytes(op string) []byte { + size := int(r.ReadLong()) + if size < 0 { + fnName := "Read" + strings.ToTitle(op) + r.ReportError(fnName, "invalid "+op+" length") + return nil + } + if size == 0 { + return []byte{} + } + if maxSize := r.cfg.getMaxByteSliceSize(); maxSize > 0 && size > maxSize { + fnName := "Read" + strings.ToTitle(op) + r.ReportError(fnName, "size is greater than `Config.MaxByteSliceSize`") + return nil + } + + // The bytes are entirely in the buffer and of a reasonable size. + // Use the byte slab. + if r.head+size <= r.tail && size <= 1024 { + if cap(r.slab) < size { + r.slab = make([]byte, 1024) + } + dst := r.slab[:size] + r.slab = r.slab[size:] + copy(dst, r.buf[r.head:r.head+size]) + r.head += size + return dst + } + + buf := make([]byte, size) + r.Read(buf) + return buf +} + +// ReadBlockHeader reads a Block Header from the Reader. +func (r *Reader) ReadBlockHeader() (int64, int64) { + length := r.ReadLong() + if length < 0 { + size := r.ReadLong() + + return -length, size + } + + return length, 0 +} diff --git a/vendor/github.com/hamba/avro/v2/reader_generic.go b/vendor/github.com/hamba/avro/v2/reader_generic.go new file mode 100644 index 0000000..32d341c --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/reader_generic.go @@ -0,0 +1,163 @@ +package avro + +import ( + "fmt" + "reflect" + "time" +) + +// ReadNext reads the next Avro element as a generic interface. +func (r *Reader) ReadNext(schema Schema) any { + var ls LogicalSchema + lts, ok := schema.(LogicalTypeSchema) + if ok { + ls = lts.Logical() + } + + switch schema.Type() { + case Boolean: + return r.ReadBool() + case Int: + if ls != nil { + switch ls.Type() { + case Date: + i := r.ReadInt() + sec := int64(i) * int64(24*time.Hour/time.Second) + return time.Unix(sec, 0).UTC() + + case TimeMillis: + return time.Duration(r.ReadInt()) * time.Millisecond + } + } + return int(r.ReadInt()) + case Long: + if ls != nil { + switch ls.Type() { + case TimeMicros: + return time.Duration(r.ReadLong()) * time.Microsecond + + case TimestampMillis: + i := r.ReadLong() + sec := i / 1e3 + nsec := (i - sec*1e3) * 1e6 + return time.Unix(sec, nsec).UTC() + + case TimestampMicros: + i := r.ReadLong() + sec := i / 1e6 + nsec := (i - sec*1e6) * 1e3 + return time.Unix(sec, nsec).UTC() + } + } + return r.ReadLong() + case Float: + return r.ReadFloat() + case Double: + return r.ReadDouble() + case String: + return r.ReadString() + case Bytes: + if ls != nil && ls.Type() == Decimal { + dec := ls.(*DecimalLogicalSchema) + return ratFromBytes(r.ReadBytes(), dec.Scale()) + } + return r.ReadBytes() + case Record: + fields := schema.(*RecordSchema).Fields() + obj := make(map[string]any, len(fields)) + for _, field := range fields { + obj[field.Name()] = r.ReadNext(field.Type()) + } + return obj + case Ref: + return r.ReadNext(schema.(*RefSchema).Schema()) + case Enum: + symbols := schema.(*EnumSchema).Symbols() + idx := int(r.ReadInt()) + if idx < 0 || idx >= len(symbols) { + r.ReportError("Read", "unknown enum symbol") + return nil + } + return symbols[idx] + case Array: + arr := []any{} + r.ReadArrayCB(func(r *Reader) bool { + elem := r.ReadNext(schema.(*ArraySchema).Items()) + arr = append(arr, elem) + return true + }) + return arr + case Map: + obj := map[string]any{} + r.ReadMapCB(func(r *Reader, field string) bool { + elem := r.ReadNext(schema.(*MapSchema).Values()) + obj[field] = elem + return true + }) + return obj + case Union: + types := schema.(*UnionSchema).Types() + idx := int(r.ReadLong()) + if idx < 0 || idx > len(types)-1 { + r.ReportError("Read", "unknown union type") + return nil + } + schema = types[idx] + if schema.Type() == Null { + return nil + } + + key := schemaTypeName(schema) + obj := map[string]any{} + obj[key] = r.ReadNext(types[idx]) + return obj + case Fixed: + size := schema.(*FixedSchema).Size() + obj := make([]byte, size) + r.Read(obj) + if ls != nil && ls.Type() == Decimal { + dec := ls.(*DecimalLogicalSchema) + return ratFromBytes(obj, dec.Scale()) + } + return byteSliceToArray(obj, size) + default: + r.ReportError("Read", fmt.Sprintf("unexpected schema type: %v", schema.Type())) + return nil + } +} + +// ReadArrayCB reads an array with a callback per item. +func (r *Reader) ReadArrayCB(fn func(*Reader) bool) { + for { + l, _ := r.ReadBlockHeader() + if l == 0 { + break + } + for range l { + fn(r) + } + } +} + +// ReadMapCB reads an array with a callback per item. +func (r *Reader) ReadMapCB(fn func(*Reader, string) bool) { + for { + l, _ := r.ReadBlockHeader() + if l == 0 { + break + } + + for range l { + field := r.ReadString() + fn(r, field) + } + } +} + +var byteType = reflect.TypeOf((*byte)(nil)).Elem() + +func byteSliceToArray(b []byte, size int) any { + vArr := reflect.New(reflect.ArrayOf(size, byteType)).Elem() + reflect.Copy(vArr, reflect.ValueOf(b)) + return vArr.Interface() +} diff --git a/vendor/github.com/hamba/avro/v2/reader_skip.go b/vendor/github.com/hamba/avro/v2/reader_skip.go new file mode 100644 index 0000000..94288c8 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/reader_skip.go @@ -0,0 +1,79 @@ +package avro + +// SkipNBytes skips the given number of bytes in the reader. +func (r *Reader) SkipNBytes(n int) { + read := 0 + for read < n { + if r.head == r.tail { + if !r.loadMore() { + return + } + } + + if read+r.tail-r.head < n { + read += r.tail - r.head + r.head = r.tail + continue + } + + r.head += n - read + read += n - read + } +} + +// SkipBool skips a Bool in the reader. +func (r *Reader) SkipBool() { + _ = r.readByte() +} + +// SkipInt skips an Int in the reader. +func (r *Reader) SkipInt() { + var n int + for r.Error == nil && n < maxIntBufSize { + b := r.readByte() + if b&0x80 == 0 { + break + } + n++ + } +} + +// SkipLong skips a Long in the reader. +func (r *Reader) SkipLong() { + var n int + for r.Error == nil && n < maxLongBufSize { + b := r.readByte() + if b&0x80 == 0 { + break + } + n++ + } +} + +// SkipFloat skips a Float in the reader. +func (r *Reader) SkipFloat() { + r.SkipNBytes(4) +} + +// SkipDouble skips a Double in the reader. +func (r *Reader) SkipDouble() { + r.SkipNBytes(8) +} + +// SkipString skips a String in the reader. +func (r *Reader) SkipString() { + size := r.ReadLong() + if size <= 0 { + return + } + r.SkipNBytes(int(size)) +} + +// SkipBytes skips Bytes in the reader. +func (r *Reader) SkipBytes() { + size := r.ReadLong() + if size <= 0 { + return + } + r.SkipNBytes(int(size)) +} diff --git a/vendor/github.com/hamba/avro/v2/resolver.go b/vendor/github.com/hamba/avro/v2/resolver.go new file mode 100644 index 0000000..c1b6ab6 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/resolver.go @@ -0,0 +1,90 @@ +package avro + +import ( + "fmt" + "math/big" + "sync" + "time" + + "github.com/modern-go/reflect2" +) + +// TypeResolver resolves types by name. +type TypeResolver struct { + names sync.Map // map[string]reflect2.Type + types sync.Map // map[int][]string +} + +// NewTypeResolver creates a new type resolver with all primitive types +// registered. +func NewTypeResolver() *TypeResolver { + r := &TypeResolver{} + + // Register basic types + r.Register(string(Null), &null{}) + r.Register(string(Int), int8(0)) + r.Register(string(Int), int16(0)) + r.Register(string(Int), int32(0)) + r.Register(string(Int), int(0)) + r.Register(string(Long), int(0)) + r.Register(string(Long), int64(0)) + r.Register(string(Float), float32(0)) + r.Register(string(Double), float64(0)) + r.Register(string(String), "") + r.Register(string(Bytes), []byte{}) + r.Register(string(Boolean), true) + + // Register logical types + r.Register(string(Int)+"."+string(Date), time.Time{}) + r.Register(string(Int)+"."+string(TimeMillis), time.Duration(0)) + r.Register(string(Long)+"."+string(TimestampMillis), time.Time{}) + r.Register(string(Long)+"."+string(TimestampMicros), time.Time{}) + r.Register(string(Long)+"."+string(TimeMicros), time.Duration(0)) + r.Register(string(Bytes)+"."+string(Decimal), big.NewRat(1, 1)) + r.Register(string(String)+"."+string(UUID), "") + + return r +} + +// Register registers names to their types for resolution. +func (r *TypeResolver) Register(name string, obj any) { + typ := reflect2.TypeOf(obj) + rtype := typ.RType() + + r.names.Store(name, typ) + + raw, ok := r.types.LoadOrStore(rtype, []string{name}) + if !ok { + return + } + names := raw.([]string) + names = append(names, name) + r.types.Store(rtype, names) +} + +// Name gets the name for a type, or an error. +func (r *TypeResolver) Name(typ reflect2.Type) ([]string, error) { + rtype := typ.RType() + + names, ok := r.types.Load(rtype) + if !ok { + return nil, fmt.Errorf("avro: unable to resolve type %s", typ.String()) + } + + return names.([]string), nil +} + +// Type gets the type for a name, or an error. +func (r *TypeResolver) Type(name string) (reflect2.Type, error) { + typ, ok := r.names.Load(name) + if !ok { + return nil, fmt.Errorf("avro: unable to resolve type with name %s", name) + } + + return typ.(reflect2.Type), nil +} + +// Register registers names to their types for resolution. All primitive types are pre-registered. +func Register(name string, obj any) { + DefaultConfig.Register(name, obj) +} diff --git a/vendor/github.com/hamba/avro/v2/schema.go b/vendor/github.com/hamba/avro/v2/schema.go new file mode 100644 index 0000000..84a9fd2 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/schema.go @@ -0,0 +1,1807 @@ +package avro + +import ( + "bytes" + "crypto/md5" + "crypto/sha256" + "errors" + "fmt" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + + "github.com/hamba/avro/v2/pkg/crc64" + jsoniter "github.com/json-iterator/go" +) + +type nullDefaultType struct{} + +func (nullDefaultType) MarshalJSON() ([]byte, error) { + return []byte("null"), nil +} + +var nullDefault nullDefaultType = struct{}{} + +var ( + // Note: order matches the order of properties as they are named in the spec. + // https://avro.apache.org/docs/1.12.0/specification + recordReserved = []string{"type", "name", "namespace", "doc", "aliases", "fields"} + fieldReserved = []string{"name", "doc", "type", "order", "aliases", "default"} + enumReserved = []string{"type", "name", "namespace", "aliases", "doc", "symbols", "default"} + arrayReserved = []string{"type", "items"} + mapReserved = []string{"type", "values"} + fixedReserved = []string{"type", "name", "namespace", "aliases", "size"} + fixedWithLogicalTypeReserved = []string{"type", "name", "namespace", "aliases", "size", "logicalType"} + fixedWithDecimalTypeReserved = []string{ + "type", "name", "namespace", "aliases", "size", "logicalType", "precision", "scale", + } + primitiveReserved = []string{"type"} + primitiveWithLogicalTypeReserved = []string{"type", "logicalType"} + primitiveWithDecimalTypeReserved = []string{"type", "logicalType", "precision", "scale"} +) + +// Type is a schema type. +type Type string + +// Schema type constants. +const ( + Record Type = "record" + Error Type = "error" + Ref Type = "<ref>" + Enum Type = "enum" + Array Type = "array" + Map Type = "map" + Union Type = "union" + Fixed Type = "fixed" + String Type = "string" + Bytes Type = "bytes" + Int Type = "int" + Long Type = "long" + Float Type = "float" + Double Type = "double" + Boolean Type = "boolean" + Null Type = "null" +) + +// Order is a field order. +type Order string + +// Field orders. +const ( + Asc Order = "ascending" + Desc Order = "descending" + Ignore Order = "ignore" +) + +// LogicalType is a schema logical type. +type LogicalType string + +// Schema logical type constants. +const ( + Decimal LogicalType = "decimal" + UUID LogicalType = "uuid" + Date LogicalType = "date" + TimeMillis LogicalType = "time-millis" + TimeMicros LogicalType = "time-micros" + TimestampMillis LogicalType = "timestamp-millis" + TimestampMicros LogicalType = "timestamp-micros" + LocalTimestampMillis LogicalType = "local-timestamp-millis" + LocalTimestampMicros LogicalType = "local-timestamp-micros" + Duration LogicalType = "duration" +) + +// Action is a field action used during decoding process. +type Action string + +// Action type constants. +const ( + FieldIgnore Action = "ignore" + FieldSetDefault Action = "set_default" +) + +// FingerprintType is a fingerprinting algorithm. +type FingerprintType string + +// Fingerprint type constants. +const ( + CRC64Avro FingerprintType = "CRC64-AVRO" + CRC64AvroLE FingerprintType = "CRC64-AVRO-LE" + MD5 FingerprintType = "MD5" + SHA256 FingerprintType = "SHA256" +) + +// SchemaCache is a cache of schemas. +type SchemaCache struct { + cache sync.Map // map[string]Schema +} + +// Add adds a schema to the cache with the given name. +func (c *SchemaCache) Add(name string, schema Schema) { + c.cache.Store(name, schema) +} + +// Get returns the Schema if it exists. +func (c *SchemaCache) Get(name string) Schema { + if v, ok := c.cache.Load(name); ok { + return v.(Schema) + } + + return nil +} + +// AddAll adds all schemas from the given cache to the current cache. +func (c *SchemaCache) AddAll(cache *SchemaCache) { + if cache == nil { + return + } + cache.cache.Range(func(key, value interface{}) bool { + c.cache.Store(key, value) + return true + }) +} + +// Schemas is a slice of Schemas. +type Schemas []Schema + +// Get gets a schema and position by type or name if it is a named schema. +func (s Schemas) Get(name string) (Schema, int) { + for i, schema := range s { + if schemaTypeName(schema) == name { + return schema, i + } + } + + return nil, -1 +} + +// Schema represents an Avro schema. +type Schema interface { + // Type returns the type of the schema. + Type() Type + + // String returns the canonical form of the schema. + String() string + + // Fingerprint returns the SHA256 fingerprint of the schema. + Fingerprint() [32]byte + + // FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error. + FingerprintUsing(FingerprintType) ([]byte, error) + + // CacheFingerprint returns the unique identity of the schema. + // This returns a unique identity for schemas resolved from a writer schema, otherwise it returns + // the schemas Fingerprint. + CacheFingerprint() [32]byte +} + +// LogicalSchema represents an Avro schema with a logical type. +type LogicalSchema interface { + // Type returns the type of the logical schema. + Type() LogicalType + + // String returns the canonical form of the logical schema. + String() string +} + +// PropertySchema represents a schema with properties. +type PropertySchema interface { + // Prop gets a property from the schema. + Prop(string) any +} + +// NamedSchema represents a schema with a name. +type NamedSchema interface { + Schema + PropertySchema + + // Name returns the name of the schema. + Name() string + + // Namespace returns the namespace of a schema. + Namespace() string + + // FullName returns the full qualified name of a schema. + FullName() string + + // Aliases returns the full qualified aliases of a schema. + Aliases() []string +} + +// LogicalTypeSchema represents a schema that can contain a logical type. +type LogicalTypeSchema interface { + // Logical returns the logical schema or nil. + Logical() LogicalSchema +} + +type name struct { + name string + namespace string + full string + aliases []string +} + +func newName(n, ns string, aliases []string) (name, error) { + if idx := strings.LastIndexByte(n, '.'); idx > -1 { + ns = n[:idx] + n = n[idx+1:] + } + + full := n + if ns != "" { + full = ns + "." + n + } + + for _, part := range strings.Split(full, ".") { + if err := validateName(part); err != nil { + return name{}, fmt.Errorf("avro: invalid name part %q in name %q: %w", full, part, err) + } + } + + a := make([]string, 0, len(aliases)) + for _, alias := range aliases { + if !strings.Contains(alias, ".") { + if err := validateName(alias); err != nil { + return name{}, fmt.Errorf("avro: invalid name %q: %w", alias, err) + } + if ns == "" { + a = append(a, alias) + continue + } + a = append(a, ns+"."+alias) + continue + } + + for _, part := range strings.Split(alias, ".") { + if err := validateName(part); err != nil { + return name{}, fmt.Errorf("avro: invalid name part %q in name %q: %w", full, part, err) + } + } + a = append(a, alias) + } + + return name{ + name: n, + namespace: ns, + full: full, + aliases: a, + }, nil +} + +// Name returns the name of a schema. +func (n name) Name() string { + return n.name +} + +// Namespace returns the namespace of a schema. +func (n name) Namespace() string { + return n.namespace +} + +// FullName returns the fully qualified name of a schema. +func (n name) FullName() string { + return n.full +} + +// Aliases returns the fully qualified aliases of a schema. +func (n name) Aliases() []string { + return n.aliases +} + +type fingerprinter struct { + fingerprint atomic.Value // [32]byte + cache sync.Map // map[FingerprintType][]byte +} + +// Fingerprint returns the SHA256 fingerprint of the schema. +func (f *fingerprinter) Fingerprint(stringer fmt.Stringer) [32]byte { + if v := f.fingerprint.Load(); v != nil { + return v.([32]byte) + } + + fingerprint := sha256.Sum256([]byte(stringer.String())) + f.fingerprint.Store(fingerprint) + return fingerprint +} + +// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error. +func (f *fingerprinter) FingerprintUsing(typ FingerprintType, stringer fmt.Stringer) ([]byte, error) { + if v, ok := f.cache.Load(typ); ok { + return v.([]byte), nil + } + + data := []byte(stringer.String()) + + var fingerprint []byte + switch typ { + case CRC64Avro: + h := crc64.Sum(data) + fingerprint = h[:] + case CRC64AvroLE: + h := crc64.SumWithByteOrder(data, crc64.LittleEndian) + fingerprint = h[:] + case MD5: + h := md5.Sum(data) + fingerprint = h[:] + case SHA256: + h := sha256.Sum256(data) + fingerprint = h[:] + default: + return nil, fmt.Errorf("avro: unknown fingerprint algorithm %s", typ) + } + + f.cache.Store(typ, fingerprint) + return fingerprint, nil +} + +type cacheFingerprinter struct { + writerFingerprint *[32]byte + + cache atomic.Value // [32]byte +} + +// CacheFingerprint returns the SHA256 identity of the schema. +func (i *cacheFingerprinter) CacheFingerprint(schema Schema, fn func() []byte) [32]byte { + if v := i.cache.Load(); v != nil { + return v.([32]byte) + } + + if i.writerFingerprint == nil { + fp := schema.Fingerprint() + i.cache.Store(fp) + return fp + } + + fp := schema.Fingerprint() + d := append([]byte{}, fp[:]...) + d = append(d, (*i.writerFingerprint)[:]...) + if fn != nil { + d = append(d, fn()...) + } + ident := sha256.Sum256(d) + i.cache.Store(ident) + return ident +} + +type properties struct { + props map[string]any +} + +func newProperties(props map[string]any, res []string) properties { + p := properties{props: map[string]any{}} + for k, v := range props { + if isReserved(res, k) { + continue + } + p.props[k] = v + } + return p +} + +func isReserved(res []string, k string) bool { + for _, r := range res { + if k == r { + return true + } + } + return false +} + +// Prop gets a property from the schema. +func (p properties) Prop(name string) any { + if p.props == nil { + return nil + } + + return p.props[name] +} + +// Props returns a map that contains all schema custom properties. +// Any accidental change to the returned map will directly modify the schema custom properties. +func (p properties) Props() map[string]any { + return p.props +} + +func (p properties) marshalPropertiesToJSON(buf *bytes.Buffer) error { + sortedPropertyKeys := make([]string, 0, len(p.props)) + for k := range p.props { + sortedPropertyKeys = append(sortedPropertyKeys, k) + } + sort.Strings(sortedPropertyKeys) + for _, k := range sortedPropertyKeys { + vv, err := jsoniter.Marshal(p.props[k]) + if err != nil { + return err + } + kk, err := jsoniter.Marshal(k) + if err != nil { + return err + } + buf.WriteString(`,`) + buf.Write(kk) + buf.WriteString(`:`) + buf.Write(vv) + } + return nil +} + +type schemaConfig struct { + aliases []string + doc string + def any + order Order + props map[string]any + wfp *[32]byte +} + +// SchemaOption is a function that sets a schema option. +type SchemaOption func(*schemaConfig) + +// WithAliases sets the aliases on a schema. +func WithAliases(aliases []string) SchemaOption { + return func(opts *schemaConfig) { + opts.aliases = aliases + } +} + +// WithDoc sets the doc on a schema. +func WithDoc(doc string) SchemaOption { + return func(opts *schemaConfig) { + opts.doc = doc + } +} + +// WithDefault sets the default on a schema. +func WithDefault(def any) SchemaOption { + return func(opts *schemaConfig) { + opts.def = def + } +} + +// WithOrder sets the order on a schema. +func WithOrder(order Order) SchemaOption { + return func(opts *schemaConfig) { + opts.order = order + } +} + +// WithProps sets the properties on a schema. +func WithProps(props map[string]any) SchemaOption { + return func(opts *schemaConfig) { + opts.props = props + } +} + +func withWriterFingerprint(fp [32]byte) SchemaOption { + return func(opts *schemaConfig) { + opts.wfp = &fp + } +} + +func withWriterFingerprintIfResolved(fp [32]byte, resolved bool) SchemaOption { + return func(opts *schemaConfig) { + if resolved { + opts.wfp = &fp + } + } +} + +// PrimitiveSchema is an Avro primitive type schema. +type PrimitiveSchema struct { + properties + fingerprinter + cacheFingerprinter + + typ Type + logical LogicalSchema + + // encodedType is the type of the encoded value, if it is different from the typ. + // It's only used in the context of write-read schema resolution. + encodedType Type +} + +// NewPrimitiveSchema creates a new PrimitiveSchema. +func NewPrimitiveSchema(t Type, l LogicalSchema, opts ...SchemaOption) *PrimitiveSchema { + var cfg schemaConfig + for _, opt := range opts { + opt(&cfg) + } + reservedProps := primitiveReserved + if l != nil { + if l.Type() == Decimal { + reservedProps = primitiveWithDecimalTypeReserved + } else { + reservedProps = primitiveWithLogicalTypeReserved + } + } + return &PrimitiveSchema{ + properties: newProperties(cfg.props, reservedProps), + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + typ: t, + logical: l, + } +} + +// Type returns the type of the schema. +func (s *PrimitiveSchema) Type() Type { + return s.typ +} + +// Logical returns the logical schema or nil. +func (s *PrimitiveSchema) Logical() LogicalSchema { + return s.logical +} + +// String returns the canonical form of the schema. +func (s *PrimitiveSchema) String() string { + if s.logical == nil { + return `"` + string(s.typ) + `"` + } + + return `{"type":"` + string(s.typ) + `",` + s.logical.String() + `}` +} + +// MarshalJSON marshals the schema to json. +func (s *PrimitiveSchema) MarshalJSON() ([]byte, error) { + if s.logical == nil && len(s.props) == 0 { + return jsoniter.Marshal(s.typ) + } + + buf := new(bytes.Buffer) + buf.WriteString(`{"type":"` + string(s.typ) + `"`) + if s.logical != nil { + buf.WriteString(`,"logicalType":"` + string(s.logical.Type()) + `"`) + if d, ok := s.logical.(*DecimalLogicalSchema); ok { + buf.WriteString(`,"precision":` + strconv.Itoa(d.prec)) + if d.scale > 0 { + buf.WriteString(`,"scale":` + strconv.Itoa(d.scale)) + } + } + } + if err := s.marshalPropertiesToJSON(buf); err != nil { + return nil, err + } + buf.WriteString("}") + return buf.Bytes(), nil +} + +// Fingerprint returns the SHA256 fingerprint of the schema. +func (s *PrimitiveSchema) Fingerprint() [32]byte { + return s.fingerprinter.Fingerprint(s) +} + +// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error. +func (s *PrimitiveSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { + return s.fingerprinter.FingerprintUsing(typ, s) +} + +// CacheFingerprint returns unique identity of the schema. +func (s *PrimitiveSchema) CacheFingerprint() [32]byte { + return s.cacheFingerprinter.CacheFingerprint(s, nil) +} + +// RecordSchema is an Avro record type schema. +type RecordSchema struct { + name + properties + fingerprinter + cacheFingerprinter + isError bool + fields []*Field + doc string +} + +// NewRecordSchema creates a new record schema instance. +func NewRecordSchema(name, namespace string, fields []*Field, opts ...SchemaOption) (*RecordSchema, error) { + var cfg schemaConfig + for _, opt := range opts { + opt(&cfg) + } + + n, err := newName(name, namespace, cfg.aliases) + if err != nil { + return nil, err + } + + return &RecordSchema{ + name: n, + properties: newProperties(cfg.props, recordReserved), + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + fields: fields, + doc: cfg.doc, + }, nil +} + +// NewErrorRecordSchema creates a new error record schema instance. +func NewErrorRecordSchema(name, namespace string, fields []*Field, opts ...SchemaOption) (*RecordSchema, error) { + rec, err := NewRecordSchema(name, namespace, fields, opts...) + if err != nil { + return nil, err + } + + rec.isError = true + + return rec, nil +} + +// Type returns the type of the schema. +func (s *RecordSchema) Type() Type { + return Record +} + +// Doc returns the documentation of a record. +func (s *RecordSchema) Doc() string { + return s.doc +} + +// IsError determines is this is an error record. +func (s *RecordSchema) IsError() bool { + return s.isError +} + +// Fields returns the fields of a record. +func (s *RecordSchema) Fields() []*Field { + return s.fields +} + +// String returns the canonical form of the schema. +func (s *RecordSchema) String() string { + typ := "record" + if s.isError { + typ = "error" + } + + fields := "" + for _, f := range s.fields { + fields += f.String() + "," + } + if len(fields) > 0 { + fields = fields[:len(fields)-1] + } + + return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}` +} + +// MarshalJSON marshals the schema to json. +func (s *RecordSchema) MarshalJSON() ([]byte, error) { + buf := new(bytes.Buffer) + buf.WriteString(`{"name":"` + s.full + `"`) + if len(s.aliases) > 0 { + aliasesJSON, err := jsoniter.Marshal(s.aliases) + if err != nil { + return nil, err + } + buf.WriteString(`,"aliases":`) + buf.Write(aliasesJSON) + } + if s.doc != "" { + docJSON, err := jsoniter.Marshal(s.doc) + if err != nil { + return nil, err + } + buf.WriteString(`,"doc":`) + buf.Write(docJSON) + } + if s.isError { + buf.WriteString(`,"type":"error"`) + } else { + buf.WriteString(`,"type":"record"`) + } + fieldsJSON, err := jsoniter.Marshal(s.fields) + if err != nil { + return nil, err + } + buf.WriteString(`,"fields":`) + buf.Write(fieldsJSON) + if err := s.marshalPropertiesToJSON(buf); err != nil { + return nil, err + } + buf.WriteString("}") + return buf.Bytes(), nil +} + +// Fingerprint returns the SHA256 fingerprint of the schema. +func (s *RecordSchema) Fingerprint() [32]byte { + return s.fingerprinter.Fingerprint(s) +} + +// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error. +func (s *RecordSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { + return s.fingerprinter.FingerprintUsing(typ, s) +} + +// CacheFingerprint returns unique identity of the schema. +func (s *RecordSchema) CacheFingerprint() [32]byte { + return s.cacheFingerprinter.CacheFingerprint(s, func() []byte { + var defs []any + for _, field := range s.fields { + if !field.HasDefault() { + continue + } + defs = append(defs, field.Default()) + } + b, _ := jsoniter.Marshal(defs) + return b + }) +} + +// Field is an Avro record type field. +type Field struct { + properties + + name string + aliases []string + doc string + typ Schema + hasDef bool + def any + order Order + + // action mainly used when decoding data that lack the field for schema evolution purposes. + action Action + // encodedDef mainly used when decoding data that lack the field for schema evolution purposes. + // Its value remains empty unless the field's encodeDefault function is called. + encodedDef atomic.Value +} + +type noDef struct{} + +// NoDefault is used when no default exists for a field. +var NoDefault = noDef{} + +// NewField creates a new field instance. +func NewField(name string, typ Schema, opts ...SchemaOption) (*Field, error) { + cfg := schemaConfig{def: NoDefault} + for _, opt := range opts { + opt(&cfg) + } + + if err := validateName(name); err != nil { + return nil, err + } + for _, a := range cfg.aliases { + if err := validateName(a); err != nil { + return nil, err + } + } + + switch cfg.order { + case "": + cfg.order = Asc + case Asc, Desc, Ignore: + default: + return nil, fmt.Errorf("avro: field %q order %q is invalid", name, cfg.order) + } + + f := &Field{ + properties: newProperties(cfg.props, fieldReserved), + name: name, + aliases: cfg.aliases, + doc: cfg.doc, + typ: typ, + order: cfg.order, + } + + if cfg.def != NoDefault { + def, err := validateDefault(name, typ, cfg.def) + if err != nil { + return nil, err + } + f.def = def + f.hasDef = true + } + + return f, nil +} + +// Name returns the name of a field. +func (f *Field) Name() string { + return f.name +} + +// Aliases return the field aliases. +func (f *Field) Aliases() []string { + return f.aliases +} + +// Type returns the schema of a field. +func (f *Field) Type() Schema { + return f.typ +} + +// HasDefault determines if the field has a default value. +func (f *Field) HasDefault() bool { + return f.hasDef +} + +// Default returns the default of a field or nil. +// +// The only time a nil default is valid is for a Null Type. +func (f *Field) Default() any { + if f.def == nullDefault { + return nil + } + + return f.def +} + +func (f *Field) encodeDefault(encode func(any) ([]byte, error)) ([]byte, error) { + if v := f.encodedDef.Load(); v != nil { + return v.([]byte), nil + } + if !f.hasDef { + return nil, fmt.Errorf("avro: '%s' field must have a non-empty default value", f.name) + } + if encode == nil { + return nil, fmt.Errorf("avro: failed to encode '%s' default value", f.name) + } + b, err := encode(f.Default()) + if err != nil { + return nil, err + } + f.encodedDef.Store(b) + + return b, nil +} + +// Doc returns the documentation of a field. +func (f *Field) Doc() string { + return f.doc +} + +// Order returns the field order. +func (f *Field) Order() Order { + return f.order +} + +// String returns the canonical form of a field. +func (f *Field) String() string { + return `{"name":"` + f.name + `","type":` + f.typ.String() + `}` +} + +// MarshalJSON marshals the schema to json. +func (f *Field) MarshalJSON() ([]byte, error) { + buf := new(bytes.Buffer) + buf.WriteString(`{"name":"` + f.name + `"`) + if len(f.aliases) > 0 { + aliasesJSON, err := jsoniter.Marshal(f.aliases) + if err != nil { + return nil, err + } + buf.WriteString(`,"aliases":`) + buf.Write(aliasesJSON) + } + if f.doc != "" { + docJSON, err := jsoniter.Marshal(f.doc) + if err != nil { + return nil, err + } + buf.WriteString(`,"doc":`) + buf.Write(docJSON) + } + typeJSON, err := jsoniter.Marshal(f.typ) + if err != nil { + return nil, err + } + buf.WriteString(`,"type":`) + buf.Write(typeJSON) + if f.hasDef { + defaultValueJSON, err := jsoniter.Marshal(f.Default()) + if err != nil { + return nil, err + } + buf.WriteString(`,"default":`) + buf.Write(defaultValueJSON) + } + if f.order != "" && f.order != Asc { + buf.WriteString(`,"order":"` + string(f.order) + `"`) + } + if err := f.marshalPropertiesToJSON(buf); err != nil { + return nil, err + } + buf.WriteString("}") + return buf.Bytes(), nil +} + +// EnumSchema is an Avro enum type schema. +type EnumSchema struct { + name + properties + fingerprinter + cacheFingerprinter + + symbols []string + def string + doc string + + // encodedSymbols is the symbols of the encoded value. + // It's only used in the context of write-read schema resolution. + encodedSymbols []string +} + +// NewEnumSchema creates a new enum schema instance. +func NewEnumSchema(name, namespace string, symbols []string, opts ...SchemaOption) (*EnumSchema, error) { + var cfg schemaConfig + for _, opt := range opts { + opt(&cfg) + } + + n, err := newName(name, namespace, cfg.aliases) + if err != nil { + return nil, err + } + + if len(symbols) == 0 { + return nil, errors.New("avro: enum must have a non-empty array of symbols") + } + for _, sym := range symbols { + if err = validateName(sym); err != nil { + return nil, fmt.Errorf("avro: invalid symbol %q", sym) + } + } + + var def string + if d, ok := cfg.def.(string); ok && d != "" { + if !hasSymbol(symbols, d) { + return nil, fmt.Errorf("avro: symbol default %q must be a symbol", d) + } + def = d + } + + return &EnumSchema{ + name: n, + properties: newProperties(cfg.props, enumReserved), + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + symbols: symbols, + def: def, + doc: cfg.doc, + }, nil +} + +func hasSymbol(symbols []string, sym string) bool { + for _, s := range symbols { + if s == sym { + return true + } + } + return false +} + +// Type returns the type of the schema. +func (s *EnumSchema) Type() Type { + return Enum +} + +// Doc returns the schema doc. +func (s *EnumSchema) Doc() string { + return s.doc +} + +// Symbols returns the symbols of an enum. +func (s *EnumSchema) Symbols() []string { + return s.symbols +} + +// Symbol returns the symbol for the given index. +// It might return the default value in the context of write-read schema resolution. +func (s *EnumSchema) Symbol(i int) (string, bool) { + resolv := len(s.encodedSymbols) > 0 + symbols := s.symbols + if resolv { + // A different set of symbols is encoded. + symbols = s.encodedSymbols + } + + if i < 0 || i >= len(symbols) { + return "", false + } + + symbol := symbols[i] + if resolv && !hasSymbol(s.symbols, symbol) { + if !s.HasDefault() { + return "", false + } + return s.Default(), true + } + return symbol, true +} + +// Default returns the default of an enum or an empty string. +func (s *EnumSchema) Default() string { + return s.def +} + +// HasDefault determines if the schema has a default value. +func (s *EnumSchema) HasDefault() bool { + return s.def != "" +} + +// String returns the canonical form of the schema. +func (s *EnumSchema) String() string { + symbols := "" + for _, sym := range s.symbols { + symbols += `"` + sym + `",` + } + if len(symbols) > 0 { + symbols = symbols[:len(symbols)-1] + } + + return `{"name":"` + s.FullName() + `","type":"enum","symbols":[` + symbols + `]}` +} + +// MarshalJSON marshals the schema to json. +func (s *EnumSchema) MarshalJSON() ([]byte, error) { + buf := new(bytes.Buffer) + buf.WriteString(`{"name":"` + s.full + `"`) + if len(s.aliases) > 0 { + aliasesJSON, err := jsoniter.Marshal(s.aliases) + if err != nil { + return nil, err + } + buf.WriteString(`,"aliases":`) + buf.Write(aliasesJSON) + } + if s.doc != "" { + docJSON, err := jsoniter.Marshal(s.doc) + if err != nil { + return nil, err + } + buf.WriteString(`,"doc":`) + buf.Write(docJSON) + } + buf.WriteString(`,"type":"enum"`) + symbolsJSON, err := jsoniter.Marshal(s.symbols) + if err != nil { + return nil, err + } + buf.WriteString(`,"symbols":`) + buf.Write(symbolsJSON) + if s.def != "" { + buf.WriteString(`,"default":"` + s.def + `"`) + } + if err := s.marshalPropertiesToJSON(buf); err != nil { + return nil, err + } + buf.WriteString("}") + return buf.Bytes(), nil +} + +// Fingerprint returns the SHA256 fingerprint of the schema. +func (s *EnumSchema) Fingerprint() [32]byte { + return s.fingerprinter.Fingerprint(s) +} + +// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error. +func (s *EnumSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { + return s.fingerprinter.FingerprintUsing(typ, s) +} + +// CacheFingerprint returns unique identity of the schema. +func (s *EnumSchema) CacheFingerprint() [32]byte { + return s.cacheFingerprinter.CacheFingerprint(s, func() []byte { + if !s.HasDefault() { + return []byte{} + } + return []byte(s.Default()) + }) +} + +// ArraySchema is an Avro array type schema. +type ArraySchema struct { + properties + fingerprinter + cacheFingerprinter + + items Schema +} + +// NewArraySchema creates an array schema instance. +func NewArraySchema(items Schema, opts ...SchemaOption) *ArraySchema { + var cfg schemaConfig + for _, opt := range opts { + opt(&cfg) + } + + return &ArraySchema{ + properties: newProperties(cfg.props, arrayReserved), + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + items: items, + } +} + +// Type returns the type of the schema. +func (s *ArraySchema) Type() Type { + return Array +} + +// Items returns the items schema of an array. +func (s *ArraySchema) Items() Schema { + return s.items +} + +// String returns the canonical form of the schema. +func (s *ArraySchema) String() string { + return `{"type":"array","items":` + s.items.String() + `}` +} + +// MarshalJSON marshals the schema to json. +func (s *ArraySchema) MarshalJSON() ([]byte, error) { + buf := new(bytes.Buffer) + buf.WriteString(`{"type":"array"`) + itemsJSON, err := jsoniter.Marshal(s.items) + if err != nil { + return nil, err + } + buf.WriteString(`,"items":`) + buf.Write(itemsJSON) + if err = s.marshalPropertiesToJSON(buf); err != nil { + return nil, err + } + buf.WriteString("}") + return buf.Bytes(), nil +} + +// Fingerprint returns the SHA256 fingerprint of the schema. +func (s *ArraySchema) Fingerprint() [32]byte { + return s.fingerprinter.Fingerprint(s) +} + +// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error. +func (s *ArraySchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { + return s.fingerprinter.FingerprintUsing(typ, s) +} + +// CacheFingerprint returns unique identity of the schema. +func (s *ArraySchema) CacheFingerprint() [32]byte { + return s.cacheFingerprinter.CacheFingerprint(s, nil) +} + +// MapSchema is an Avro map type schema. +type MapSchema struct { + properties + fingerprinter + cacheFingerprinter + + values Schema +} + +// NewMapSchema creates a map schema instance. +func NewMapSchema(values Schema, opts ...SchemaOption) *MapSchema { + var cfg schemaConfig + for _, opt := range opts { + opt(&cfg) + } + + return &MapSchema{ + properties: newProperties(cfg.props, mapReserved), + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + values: values, + } +} + +// Type returns the type of the schema. +func (s *MapSchema) Type() Type { + return Map +} + +// Values returns the values schema of a map. +func (s *MapSchema) Values() Schema { + return s.values +} + +// String returns the canonical form of the schema. +func (s *MapSchema) String() string { + return `{"type":"map","values":` + s.values.String() + `}` +} + +// MarshalJSON marshals the schema to json. +func (s *MapSchema) MarshalJSON() ([]byte, error) { + buf := new(bytes.Buffer) + buf.WriteString(`{"type":"map"`) + valuesJSON, err := jsoniter.Marshal(s.values) + if err != nil { + return nil, err + } + buf.WriteString(`,"values":`) + buf.Write(valuesJSON) + if err := s.marshalPropertiesToJSON(buf); err != nil { + return nil, err + } + buf.WriteString("}") + return buf.Bytes(), nil +} + +// Fingerprint returns the SHA256 fingerprint of the schema. +func (s *MapSchema) Fingerprint() [32]byte { + return s.fingerprinter.Fingerprint(s) +} + +// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error. +func (s *MapSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { + return s.fingerprinter.FingerprintUsing(typ, s) +} + +// CacheFingerprint returns unique identity of the schema. +func (s *MapSchema) CacheFingerprint() [32]byte { + return s.cacheFingerprinter.CacheFingerprint(s, nil) +} + +// UnionSchema is an Avro union type schema. +type UnionSchema struct { + fingerprinter + cacheFingerprinter + + types Schemas +} + +// NewUnionSchema creates a union schema instance. +func NewUnionSchema(types []Schema, opts ...SchemaOption) (*UnionSchema, error) { + var cfg schemaConfig + for _, opt := range opts { + opt(&cfg) + } + + seen := map[string]bool{} + for _, schema := range types { + if schema.Type() == Union { + return nil, errors.New("avro: union type cannot be a union") + } + + strType := schemaTypeName(schema) + + if seen[strType] { + return nil, errors.New("avro: union type must be unique") + } + seen[strType] = true + } + + return &UnionSchema{ + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + types: types, + }, nil +} + +// Type returns the type of the schema. +func (s *UnionSchema) Type() Type { + return Union +} + +// Types returns the types of a union. +func (s *UnionSchema) Types() Schemas { + return s.types +} + +// Nullable returns the Schema if the union is nullable, otherwise nil. +func (s *UnionSchema) Nullable() bool { + if len(s.types) != 2 || s.types[0].Type() != Null && s.types[1].Type() != Null { + return false + } + + return true +} + +// Indices returns the index of the null and type schemas for a +// nullable schema. For non-nullable schemas 0 is returned for +// both. +func (s *UnionSchema) Indices() (null, typ int) { + if !s.Nullable() { + return 0, 0 + } + if s.types[0].Type() == Null { + return 0, 1 + } + return 1, 0 +} + +// String returns the canonical form of the schema. +func (s *UnionSchema) String() string { + types := "" + for _, typ := range s.types { + types += typ.String() + "," + } + if len(types) > 0 { + types = types[:len(types)-1] + } + + return `[` + types + `]` +} + +// MarshalJSON marshals the schema to json. +func (s *UnionSchema) MarshalJSON() ([]byte, error) { + return jsoniter.Marshal(s.types) +} + +// Fingerprint returns the SHA256 fingerprint of the schema. +func (s *UnionSchema) Fingerprint() [32]byte { + return s.fingerprinter.Fingerprint(s) +} + +// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error. +func (s *UnionSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { + return s.fingerprinter.FingerprintUsing(typ, s) +} + +// CacheFingerprint returns unique identity of the schema. +func (s *UnionSchema) CacheFingerprint() [32]byte { + return s.cacheFingerprinter.CacheFingerprint(s, nil) +} + +// FixedSchema is an Avro fixed type schema. +type FixedSchema struct { + name + properties + fingerprinter + cacheFingerprinter + + size int + logical LogicalSchema +} + +// NewFixedSchema creates a new fixed schema instance. +func NewFixedSchema( + name, namespace string, + size int, + logical LogicalSchema, + opts ...SchemaOption, +) (*FixedSchema, error) { + var cfg schemaConfig + for _, opt := range opts { + opt(&cfg) + } + + n, err := newName(name, namespace, cfg.aliases) + if err != nil { + return nil, err + } + + reservedProps := fixedReserved + if logical != nil { + if logical.Type() == Decimal { + reservedProps = fixedWithDecimalTypeReserved + } else { + reservedProps = fixedWithLogicalTypeReserved + } + } + return &FixedSchema{ + name: n, + properties: newProperties(cfg.props, reservedProps), + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + size: size, + logical: logical, + }, nil +} + +// Type returns the type of the schema. +func (s *FixedSchema) Type() Type { + return Fixed +} + +// Size returns the number of bytes of the fixed schema. +func (s *FixedSchema) Size() int { + return s.size +} + +// Logical returns the logical schema or nil. +func (s *FixedSchema) Logical() LogicalSchema { + return s.logical +} + +// String returns the canonical form of the schema. +func (s *FixedSchema) String() string { + size := strconv.Itoa(s.size) + + var logical string + if s.logical != nil { + logical = "," + s.logical.String() + } + + return `{"name":"` + s.FullName() + `","type":"fixed","size":` + size + logical + `}` +} + +// MarshalJSON marshals the schema to json. +func (s *FixedSchema) MarshalJSON() ([]byte, error) { + buf := new(bytes.Buffer) + buf.WriteString(`{"name":"` + s.full + `"`) + if len(s.aliases) > 0 { + aliasesJSON, err := jsoniter.Marshal(s.aliases) + if err != nil { + return nil, err + } + buf.WriteString(`,"aliases":`) + buf.Write(aliasesJSON) + } + buf.WriteString(`,"type":"fixed"`) + buf.WriteString(`,"size":` + strconv.Itoa(s.size)) + if s.logical != nil { + buf.WriteString(`,"logicalType":"` + string(s.logical.Type()) + `"`) + if d, ok := s.logical.(*DecimalLogicalSchema); ok { + buf.WriteString(`,"precision":` + strconv.Itoa(d.prec)) + if d.scale > 0 { + buf.WriteString(`,"scale":` + strconv.Itoa(d.scale)) + } + } + } + if err := s.marshalPropertiesToJSON(buf); err != nil { + return nil, err + } + buf.WriteString("}") + return buf.Bytes(), nil +} + +// Fingerprint returns the SHA256 fingerprint of the schema. +func (s *FixedSchema) Fingerprint() [32]byte { + return s.fingerprinter.Fingerprint(s) +} + +// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error. +func (s *FixedSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { + return s.fingerprinter.FingerprintUsing(typ, s) +} + +// CacheFingerprint returns unique identity of the schema. +func (s *FixedSchema) CacheFingerprint() [32]byte { + return s.cacheFingerprinter.CacheFingerprint(s, nil) +} + +// NullSchema is an Avro null type schema. +type NullSchema struct { + properties + fingerprinter +} + +// NewNullSchema creates a new NullSchema. +func NewNullSchema(opts ...SchemaOption) *NullSchema { + var cfg schemaConfig + for _, opt := range opts { + opt(&cfg) + } + + return &NullSchema{ + properties: newProperties(cfg.props, primitiveReserved), + } +} + +// Type returns the type of the schema. +func (s *NullSchema) Type() Type { + return Null +} + +// String returns the canonical form of the schema. +func (s *NullSchema) String() string { + return `"null"` +} + +// MarshalJSON marshals the schema to json. +func (s *NullSchema) MarshalJSON() ([]byte, error) { + if len(s.props) == 0 { + return []byte(`"null"`), nil + } + buf := new(bytes.Buffer) + buf.WriteString(`{"type":"null"`) + if err := s.marshalPropertiesToJSON(buf); err != nil { + return nil, err + } + buf.WriteString("}") + return buf.Bytes(), nil +} + +// Fingerprint returns the SHA256 fingerprint of the schema. +func (s *NullSchema) Fingerprint() [32]byte { + return s.fingerprinter.Fingerprint(s) +} + +// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error. +func (s *NullSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { + return s.fingerprinter.FingerprintUsing(typ, s) +} + +// CacheFingerprint returns unique identity of the schema. +func (s *NullSchema) CacheFingerprint() [32]byte { + return s.Fingerprint() +} + +// RefSchema is a reference to a named Avro schema. +type RefSchema struct { + actual NamedSchema +} + +// NewRefSchema creates a ref schema instance. +func NewRefSchema(schema NamedSchema) *RefSchema { + return &RefSchema{ + actual: schema, + } +} + +// Type returns the type of the schema. +func (s *RefSchema) Type() Type { + return Ref +} + +// Schema returns the schema being referenced. +func (s *RefSchema) Schema() NamedSchema { + return s.actual +} + +// String returns the canonical form of the schema. +func (s *RefSchema) String() string { + return `"` + s.actual.FullName() + `"` +} + +// MarshalJSON marshals the schema to json. +func (s *RefSchema) MarshalJSON() ([]byte, error) { + return []byte(`"` + s.actual.FullName() + `"`), nil +} + +// Fingerprint returns the SHA256 fingerprint of the schema. +func (s *RefSchema) Fingerprint() [32]byte { + return s.actual.Fingerprint() +} + +// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error. +func (s *RefSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { + return s.actual.FingerprintUsing(typ) +} + +// CacheFingerprint returns unique identity of the schema. +func (s *RefSchema) CacheFingerprint() [32]byte { + return s.actual.CacheFingerprint() +} + +// PrimitiveLogicalSchema is a logical type with no properties. +type PrimitiveLogicalSchema struct { + typ LogicalType +} + +// NewPrimitiveLogicalSchema creates a new primitive logical schema instance. +func NewPrimitiveLogicalSchema(typ LogicalType) *PrimitiveLogicalSchema { + return &PrimitiveLogicalSchema{ + typ: typ, + } +} + +// Type returns the type of the logical schema. +func (s *PrimitiveLogicalSchema) Type() LogicalType { + return s.typ +} + +// String returns the canonical form of the logical schema. +func (s *PrimitiveLogicalSchema) String() string { + return `"logicalType":"` + string(s.typ) + `"` +} + +// DecimalLogicalSchema is a decimal logical type. +type DecimalLogicalSchema struct { + prec int + scale int +} + +// NewDecimalLogicalSchema creates a new decimal logical schema instance. +func NewDecimalLogicalSchema(prec, scale int) *DecimalLogicalSchema { + return &DecimalLogicalSchema{ + prec: prec, + scale: scale, + } +} + +// Type returns the type of the logical schema. +func (s *DecimalLogicalSchema) Type() LogicalType { + return Decimal +} + +// Precision returns the precision of the decimal logical schema. +func (s *DecimalLogicalSchema) Precision() int { + return s.prec +} + +// Scale returns the scale of the decimal logical schema. +func (s *DecimalLogicalSchema) Scale() int { + return s.scale +} + +// String returns the canonical form of the logical schema. +func (s *DecimalLogicalSchema) String() string { + var scale string + if s.scale > 0 { + scale = `,"scale":` + strconv.Itoa(s.scale) + } + precision := strconv.Itoa(s.prec) + + return `"logicalType":"` + string(Decimal) + `","precision":` + precision + scale +} + +func invalidNameFirstChar(r rune) bool { + return (r < 'A' || r > 'Z') && (r < 'a' || r > 'z') && r != '_' +} + +func invalidNameOtherChar(r rune) bool { + return invalidNameFirstChar(r) && (r < '0' || r > '9') +} + +func validateName(name string) error { + if name == "" { + return errors.New("name must be a non-empty") + } + + if SkipNameValidation { + return nil + } + + if strings.IndexFunc(name[:1], invalidNameFirstChar) > -1 { + return fmt.Errorf("invalid name %s", name) + } + if strings.IndexFunc(name[1:], invalidNameOtherChar) > -1 { + return fmt.Errorf("invalid name %s", name) + } + + return nil +} + +func validateDefault(name string, schema Schema, def any) (any, error) { + def, ok := isValidDefault(schema, def) + if !ok { + return nil, fmt.Errorf("avro: invalid default for field %s. %+v not a %s", name, def, schema.Type()) + } + return def, nil +} + +func isValidDefault(schema Schema, def any) (any, bool) { + switch schema.Type() { + case Ref: + ref := schema.(*RefSchema) + return isValidDefault(ref.Schema(), def) + case Null: + return nullDefault, def == nil + case Enum: + v, ok := def.(string) + if !ok || len(v) == 0 { + return def, false + } + + var found bool + for _, sym := range schema.(*EnumSchema).symbols { + if def == sym { + found = true + break + } + } + return def, found + case String: + if _, ok := def.(string); ok { + return def, true + } + case Bytes, Fixed: + // Spec: Default values for bytes and fixed fields are JSON strings, + // where Unicode code points 0-255 are mapped to unsigned 8-bit byte values 0-255. + if d, ok := def.(string); ok { + if b, ok := isValidDefaultBytes(d); ok { + if schema.Type() == Fixed { + return byteSliceToArray(b, schema.(*FixedSchema).Size()), true + } + return b, true + } + } + case Boolean: + if _, ok := def.(bool); ok { + return def, true + } + case Int: + if i, ok := def.(int8); ok { + return int(i), true + } + if i, ok := def.(int16); ok { + return int(i), true + } + if i, ok := def.(int32); ok { + return int(i), true + } + if _, ok := def.(int); ok { + return def, true + } + if f, ok := def.(float64); ok { + return int(f), true + } + case Long: + if _, ok := def.(int64); ok { + return def, true + } + if f, ok := def.(float64); ok { + return int64(f), true + } + case Float: + if _, ok := def.(float32); ok { + return def, true + } + if f, ok := def.(float64); ok { + return float32(f), true + } + case Double: + if _, ok := def.(float64); ok { + return def, true + } + case Array: + arr, ok := def.([]any) + if !ok { + return nil, false + } + + as := schema.(*ArraySchema) + for i, v := range arr { + v, ok := isValidDefault(as.Items(), v) + if !ok { + return nil, false + } + arr[i] = v + } + return arr, true + case Map: + m, ok := def.(map[string]any) + if !ok { + return nil, false + } + + ms := schema.(*MapSchema) + for k, v := range m { + v, ok := isValidDefault(ms.Values(), v) + if !ok { + return nil, false + } + + m[k] = v + } + return m, true + case Union: + unionSchema := schema.(*UnionSchema) + return isValidDefault(unionSchema.Types()[0], def) + case Record: + m, ok := def.(map[string]any) + if !ok { + return nil, false + } + + for _, field := range schema.(*RecordSchema).Fields() { + fieldDef := field.Default() + if newDef, ok := m[field.Name()]; ok { + fieldDef = newDef + } + + v, ok := isValidDefault(field.Type(), fieldDef) + if !ok { + return nil, false + } + + m[field.Name()] = v + } + return m, true + } + return nil, false +} + +func schemaTypeName(schema Schema) string { + if schema.Type() == Ref { + schema = schema.(*RefSchema).Schema() + } + + if n, ok := schema.(NamedSchema); ok { + return n.FullName() + } + + sname := string(schema.Type()) + if lt := getLogicalType(schema); lt != "" { + sname += "." + string(lt) + } + return sname +} + +func isValidDefaultBytes(def string) ([]byte, bool) { + runes := []rune(def) + l := len(runes) + b := make([]byte, l) + for i := range l { + if runes[i] < 0 || runes[i] > 255 { + return nil, false + } + b[i] = byte(runes[i]) + } + return b, true +} diff --git a/vendor/github.com/hamba/avro/v2/schema_compatibility.go b/vendor/github.com/hamba/avro/v2/schema_compatibility.go new file mode 100644 index 0000000..0b1d9ac --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/schema_compatibility.go @@ -0,0 +1,487 @@ +package avro + +import ( + "errors" + "fmt" + "sync" +) + +type recursionError struct{} + +func (e recursionError) Error() string { + return "" +} + +type compatKey struct { + reader [32]byte + writer [32]byte +} + +// SchemaCompatibility determines the compatibility of schemas. +type SchemaCompatibility struct { + cache sync.Map // map[compatKey]error +} + +// NewSchemaCompatibility creates a new schema compatibility instance. +func NewSchemaCompatibility() *SchemaCompatibility { + return &SchemaCompatibility{} +} + +// Compatible determines the compatibility if the reader and writer schemas. +func (c *SchemaCompatibility) Compatible(reader, writer Schema) error { + return c.compatible(reader, writer) +} + +func (c *SchemaCompatibility) compatible(reader, writer Schema) error { + key := compatKey{reader: reader.Fingerprint(), writer: writer.Fingerprint()} + if err, ok := c.cache.Load(key); ok { + if _, ok := err.(recursionError); ok { + // Break the recursion here. + return nil + } + + if err == nil { + return nil + } + + return err.(error) + } + + c.cache.Store(key, recursionError{}) + err := c.match(reader, writer) + if err != nil { + // We dont want to pay the cost of fmt.Errorf every time + err = errors.New(err.Error()) + } + c.cache.Store(key, err) + return err +} + +func (c *SchemaCompatibility) match(reader, writer Schema) error { + // If the schema is a reference, get the actual schema + if reader.Type() == Ref { + reader = reader.(*RefSchema).Schema() + } + if writer.Type() == Ref { + writer = writer.(*RefSchema).Schema() + } + + if reader.Type() != writer.Type() { + if writer.Type() == Union { + // Reader must be compatible with all types in writer + for _, schema := range writer.(*UnionSchema).Types() { + if err := c.compatible(reader, schema); err != nil { + return err + } + } + + return nil + } + + if reader.Type() == Union { + // Writer must be compatible with at least one reader schema + var err error + for _, schema := range reader.(*UnionSchema).Types() { + err = c.compatible(schema, writer) + if err == nil { + return nil + } + } + + return fmt.Errorf("reader union lacking writer schema %s", writer.Type()) + } + + switch writer.Type() { + case Int: + if reader.Type() == Long || reader.Type() == Float || reader.Type() == Double { + return nil + } + + case Long: + if reader.Type() == Float || reader.Type() == Double { + return nil + } + + case Float: + if reader.Type() == Double { + return nil + } + + case String: + if reader.Type() == Bytes { + return nil + } + + case Bytes: + if reader.Type() == String { + return nil + } + } + + return fmt.Errorf("reader schema %s not compatible with writer schema %s", reader.Type(), writer.Type()) + } + + switch reader.Type() { + case Array: + return c.compatible(reader.(*ArraySchema).Items(), writer.(*ArraySchema).Items()) + + case Map: + return c.compatible(reader.(*MapSchema).Values(), writer.(*MapSchema).Values()) + + case Fixed: + r := reader.(*FixedSchema) + w := writer.(*FixedSchema) + + if err := c.checkSchemaName(r, w); err != nil { + return err + } + + if err := c.checkFixedSize(r, w); err != nil { + return err + } + + case Enum: + r := reader.(*EnumSchema) + w := writer.(*EnumSchema) + + if err := c.checkSchemaName(r, w); err != nil { + return err + } + + if err := c.checkEnumSymbols(r, w); err != nil { + if r.HasDefault() { + return nil + } + return err + } + + case Record: + r := reader.(*RecordSchema) + w := writer.(*RecordSchema) + + if err := c.checkSchemaName(r, w); err != nil { + return err + } + + if err := c.checkRecordFields(r, w); err != nil { + return err + } + + case Union: + for _, schema := range writer.(*UnionSchema).Types() { + if err := c.compatible(reader, schema); err != nil { + return err + } + } + } + + return nil +} + +func (c *SchemaCompatibility) checkSchemaName(reader, writer NamedSchema) error { + if reader.Name() != writer.Name() { + if c.contains(reader.Aliases(), writer.FullName()) { + return nil + } + return fmt.Errorf("reader schema %s and writer schema %s names do not match", reader.FullName(), writer.FullName()) + } + + return nil +} + +func (c *SchemaCompatibility) checkFixedSize(reader, writer *FixedSchema) error { + if reader.Size() != writer.Size() { + return fmt.Errorf("%s reader and writer fixed sizes do not match", reader.FullName()) + } + + return nil +} + +func (c *SchemaCompatibility) checkEnumSymbols(reader, writer *EnumSchema) error { + for _, symbol := range writer.Symbols() { + if !c.contains(reader.Symbols(), symbol) { + return fmt.Errorf("reader %s is missing symbol %s", reader.FullName(), symbol) + } + } + + return nil +} + +func (c *SchemaCompatibility) checkRecordFields(reader, writer *RecordSchema) error { + for _, field := range reader.Fields() { + f, ok := c.getField(writer.Fields(), field, func(gfo *getFieldOptions) { + gfo.fieldAlias = true + }) + if !ok { + if field.HasDefault() { + continue + } + + return fmt.Errorf("reader field %s is missing in writer schema and has no default", field.Name()) + } + + if err := c.compatible(field.Type(), f.Type()); err != nil { + return err + } + } + + return nil +} + +func (c *SchemaCompatibility) contains(a []string, s string) bool { + for _, str := range a { + if str == s { + return true + } + } + + return false +} + +type getFieldOptions struct { + fieldAlias bool + elemAlias bool +} + +func (c *SchemaCompatibility) getField(a []*Field, f *Field, optFns ...func(*getFieldOptions)) (*Field, bool) { + opt := getFieldOptions{} + for _, fn := range optFns { + fn(&opt) + } + for _, field := range a { + if field.Name() == f.Name() { + return field, true + } + if opt.fieldAlias { + if c.contains(f.Aliases(), field.Name()) { + return field, true + } + } + if opt.elemAlias { + if c.contains(field.Aliases(), f.Name()) { + return field, true + } + } + } + + return nil, false +} + +// Resolve returns a composite schema that allows decoding data written by the writer schema, +// and makes necessary adjustments to support the reader schema. +// +// It fails if the writer and reader schemas are not compatible. +func (c *SchemaCompatibility) Resolve(reader, writer Schema) (Schema, error) { + if err := c.compatible(reader, writer); err != nil { + return nil, err + } + + schema, _, err := c.resolve(reader, writer) + return schema, err +} + +// resolve requires the reader's schema to be already compatible with the writer's. +func (c *SchemaCompatibility) resolve(reader, writer Schema) (schema Schema, resolved bool, err error) { + if reader.Type() == Ref { + reader = reader.(*RefSchema).Schema() + } + if writer.Type() == Ref { + writer = writer.(*RefSchema).Schema() + } + + if writer.Type() != reader.Type() { + if reader.Type() == Union { + for _, schema := range reader.(*UnionSchema).Types() { + // Compatibility is not guaranteed for every Union reader schema. + // Therefore, we need to check compatibility in every iteration. + if err := c.compatible(schema, writer); err != nil { + continue + } + sch, _, err := c.resolve(schema, writer) + if err != nil { + continue + } + return sch, true, nil + } + + return nil, false, fmt.Errorf("reader union lacking writer schema %s", writer.Type()) + } + + if writer.Type() == Union { + schemas := make([]Schema, 0) + for _, schema := range writer.(*UnionSchema).Types() { + sch, _, err := c.resolve(reader, schema) + if err != nil { + return nil, false, err + } + schemas = append(schemas, sch) + } + s, err := NewUnionSchema(schemas, withWriterFingerprint(writer.Fingerprint())) + return s, true, err + } + + if isPromotable(writer.Type(), reader.Type()) { + r := NewPrimitiveSchema(reader.Type(), reader.(*PrimitiveSchema).Logical(), + withWriterFingerprint(writer.Fingerprint()), + ) + r.encodedType = writer.Type() + return r, true, nil + } + + return nil, false, fmt.Errorf("failed to resolve composite schema for %s and %s", reader.Type(), writer.Type()) + } + + if isNative(writer.Type()) { + return reader, false, nil + } + + if writer.Type() == Enum { + r := reader.(*EnumSchema) + w := writer.(*EnumSchema) + if err = c.checkEnumSymbols(r, w); err != nil { + if r.HasDefault() { + enum, _ := NewEnumSchema(r.Name(), r.Namespace(), r.Symbols(), + WithAliases(r.Aliases()), + WithDefault(r.Default()), + withWriterFingerprint(w.Fingerprint()), + ) + enum.encodedSymbols = w.Symbols() + return enum, true, nil + } + + return nil, false, err + } + return reader, false, nil + } + + if writer.Type() == Fixed { + return reader, false, nil + } + + if writer.Type() == Union { + schemas := make([]Schema, 0) + for _, s := range writer.(*UnionSchema).Types() { + sch, resolv, err := c.resolve(reader, s) + if err != nil { + return nil, false, err + } + schemas = append(schemas, sch) + resolved = resolv || resolved + } + s, err := NewUnionSchema(schemas, withWriterFingerprintIfResolved(writer.Fingerprint(), resolved)) + if err != nil { + return nil, false, err + } + return s, resolved, nil + } + + if writer.Type() == Array { + schema, resolved, err = c.resolve(reader.(*ArraySchema).Items(), writer.(*ArraySchema).Items()) + if err != nil { + return nil, false, err + } + return NewArraySchema(schema, withWriterFingerprintIfResolved(writer.Fingerprint(), resolved)), resolved, nil + } + + if writer.Type() == Map { + schema, resolved, err = c.resolve(reader.(*MapSchema).Values(), writer.(*MapSchema).Values()) + if err != nil { + return nil, false, err + } + return NewMapSchema(schema, withWriterFingerprintIfResolved(writer.Fingerprint(), resolved)), resolved, nil + } + + if writer.Type() == Record { + return c.resolveRecord(reader, writer) + } + + return nil, false, fmt.Errorf("failed to resolve composite schema for %s and %s", reader.Type(), writer.Type()) +} + +func (c *SchemaCompatibility) resolveRecord(reader, writer Schema) (Schema, bool, error) { + w := writer.(*RecordSchema) + r := reader.(*RecordSchema) + + fields := make([]*Field, 0) + seen := make(map[string]struct{}) + + var resolved bool + for _, wf := range w.Fields() { + rf, ok := c.getField(r.Fields(), wf, func(gfo *getFieldOptions) { + gfo.elemAlias = true + }) + if !ok { + // The field was not found in the reader schema, it should be ignored. + f, _ := NewField(wf.Name(), wf.Type(), WithAliases(wf.aliases), WithOrder(wf.order)) + f.def = wf.def + f.hasDef = wf.hasDef + f.action = FieldIgnore + fields = append(fields, f) + + resolved = true + continue + } + + ft, resolv, err := c.resolve(rf.Type(), wf.Type()) + if err != nil { + return nil, false, err + } + f, _ := NewField(rf.Name(), ft, WithAliases(rf.aliases), WithOrder(rf.order)) + f.def = rf.def + f.hasDef = rf.hasDef + fields = append(fields, f) + resolved = resolv || resolved + + seen[rf.Name()] = struct{}{} + } + + for _, rf := range r.Fields() { + if _, ok := seen[rf.Name()]; ok { + // This field has already been seen. + continue + } + + // The schemas are already known to be compatible, so there must be a default on + // the field in the writer. Use the default. + + f, _ := NewField(rf.Name(), rf.Type(), WithAliases(rf.aliases), WithOrder(rf.order)) + f.def = rf.def + f.hasDef = rf.hasDef + f.action = FieldSetDefault + fields = append(fields, f) + + resolved = true + } + + schema, err := NewRecordSchema(r.Name(), r.Namespace(), fields, + WithAliases(r.Aliases()), + withWriterFingerprintIfResolved(writer.Fingerprint(), resolved), + ) + return schema, resolved, err +} + +func isNative(typ Type) bool { + switch typ { + case Null, Boolean, Int, Long, Float, Double, Bytes, String: + return true + default: + return false + } +} + +func isPromotable(writerTyp, readerType Type) bool { + switch writerTyp { + case Int: + return readerType == Long || readerType == Float || readerType == Double + case Long: + return readerType == Float || readerType == Double + case Float: + return readerType == Double + case String: + return readerType == Bytes + case Bytes: + return readerType == String + default: + return false + } +} diff --git a/vendor/github.com/hamba/avro/v2/schema_parse.go b/vendor/github.com/hamba/avro/v2/schema_parse.go new file mode 100644 index 0000000..630f202 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/schema_parse.go @@ -0,0 +1,643 @@ +package avro + +import ( + "errors" + "fmt" + "math" + "os" + "path/filepath" + "strings" + + jsoniter "github.com/json-iterator/go" + "github.com/mitchellh/mapstructure" +) + +// DefaultSchemaCache is the default cache for schemas. +var DefaultSchemaCache = &SchemaCache{} + +// SkipNameValidation sets whether to skip name validation. +// Avro spec incurs a strict naming convention for names and aliases, however official Avro tools do not follow that +// More info: +// https://lists.apache.org/thread/39v98os6wdpyr6w31xdkz0yzol51fsrr +// https://github.com/apache/avro/pull/1995 +var SkipNameValidation = false + +// Parse parses a schema string. +func Parse(schema string) (Schema, error) { + return ParseBytes([]byte(schema)) +} + +// ParseWithCache parses a schema string using the given namespace and schema cache. +func ParseWithCache(schema, namespace string, cache *SchemaCache) (Schema, error) { + return ParseBytesWithCache([]byte(schema), namespace, cache) +} + +// MustParse parses a schema string, panicking if there is an error. +func MustParse(schema string) Schema { + parsed, err := Parse(schema) + if err != nil { + panic(err) + } + + return parsed +} + +// ParseFiles parses the schemas in the files, in the order they appear, returning the last schema. +// +// This is useful when your schemas rely on other schemas. +func ParseFiles(paths ...string) (Schema, error) { + var schema Schema + for _, path := range paths { + s, err := os.ReadFile(filepath.Clean(path)) + if err != nil { + return nil, err + } + + schema, err = Parse(string(s)) + if err != nil { + return nil, err + } + } + + return schema, nil +} + +// ParseBytes parses a schema byte slice. +func ParseBytes(schema []byte) (Schema, error) { + return ParseBytesWithCache(schema, "", DefaultSchemaCache) +} + +// ParseBytesWithCache parses a schema byte slice using the given namespace and schema cache. +func ParseBytesWithCache(schema []byte, namespace string, cache *SchemaCache) (Schema, error) { + var json any + if err := jsoniter.Unmarshal(schema, &json); err != nil { + json = string(schema) + } + + internalCache := &SchemaCache{} + internalCache.AddAll(cache) + + seen := seenCache{} + s, err := parseType(namespace, json, seen, internalCache) + if err != nil { + return nil, err + } + + cache.AddAll(internalCache) + + return derefSchema(s), nil +} + +func parseType(namespace string, v any, seen seenCache, cache *SchemaCache) (Schema, error) { + switch val := v.(type) { + case nil: + return &NullSchema{}, nil + + case string: + return parsePrimitiveType(namespace, val, cache) + + case map[string]any: + return parseComplexType(namespace, val, seen, cache) + + case []any: + return parseUnion(namespace, val, seen, cache) + } + + return nil, fmt.Errorf("avro: unknown type: %v", v) +} + +func parsePrimitiveType(namespace, s string, cache *SchemaCache) (Schema, error) { + typ := Type(s) + switch typ { + case Null: + return &NullSchema{}, nil + + case String, Bytes, Int, Long, Float, Double, Boolean: + return parsePrimitive(typ, nil) + + default: + schema := cache.Get(fullName(namespace, s)) + if schema != nil { + return schema, nil + } + + return nil, fmt.Errorf("avro: unknown type: %s", s) + } +} + +func parseComplexType(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { + if val, ok := m["type"].([]any); ok { + // Note: According to the spec, this is not allowed: + // https://avro.apache.org/docs/1.12.0/specification/#schema-declaration + // The "type" property in an object must be a string. A union type will be a slice, + // but NOT an object with a "type" property that is a slice. + // Might be advisable to remove this call (tradeoff between better conformance + // with the spec vs. possible backwards-compatibility issue). + return parseUnion(namespace, val, seen, cache) + } + + str, ok := m["type"].(string) + if !ok { + return nil, fmt.Errorf("avro: unknown type: %+v", m) + } + typ := Type(str) + + switch typ { + case String, Bytes, Int, Long, Float, Double, Boolean, Null: + return parsePrimitive(typ, m) + + case Record, Error: + return parseRecord(typ, namespace, m, seen, cache) + + case Enum: + return parseEnum(namespace, m, seen, cache) + + case Array: + return parseArray(namespace, m, seen, cache) + + case Map: + return parseMap(namespace, m, seen, cache) + + case Fixed: + return parseFixed(namespace, m, seen, cache) + + default: + return parseType(namespace, string(typ), seen, cache) + } +} + +type primitiveSchema struct { + Type string `mapstructure:"type"` + Props map[string]any `mapstructure:",remain"` +} + +func parsePrimitive(typ Type, m map[string]any) (Schema, error) { + if len(m) == 0 { + if typ == Null { + return &NullSchema{}, nil + } + return NewPrimitiveSchema(typ, nil), nil + } + + var ( + p primitiveSchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &p, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding primitive: %w", err) + } + + var logical LogicalSchema + if logicalType := logicalTypeProperty(p.Props); logicalType != "" { + logical = parsePrimitiveLogicalType(typ, logicalType, p.Props) + if logical != nil { + delete(p.Props, "logicalType") + } + } + + if typ == Null { + return NewNullSchema(WithProps(p.Props)), nil + } + return NewPrimitiveSchema(typ, logical, WithProps(p.Props)), nil +} + +func parsePrimitiveLogicalType(typ Type, lt string, props map[string]any) LogicalSchema { + ltyp := LogicalType(lt) + if (typ == String && ltyp == UUID) || + (typ == Int && ltyp == Date) || + (typ == Int && ltyp == TimeMillis) || + (typ == Long && ltyp == TimeMicros) || + (typ == Long && ltyp == TimestampMillis) || + (typ == Long && ltyp == TimestampMicros) || + (typ == Long && ltyp == LocalTimestampMillis) || + (typ == Long && ltyp == LocalTimestampMicros) { + return NewPrimitiveLogicalSchema(ltyp) + } + + if typ == Bytes && ltyp == Decimal { + return parseDecimalLogicalType(-1, props) + } + + return nil // otherwise, not a recognized logical type +} + +type recordSchema struct { + Type string `mapstructure:"type"` + Name string `mapstructure:"name"` + Namespace string `mapstructure:"namespace"` + Aliases []string `mapstructure:"aliases"` + Doc string `mapstructure:"doc"` + Fields []map[string]any `mapstructure:"fields"` + Props map[string]any `mapstructure:",remain"` +} + +func parseRecord(typ Type, namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { + var ( + r recordSchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &r, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding record: %w", err) + } + + if err := checkParsedName(r.Name); err != nil { + return nil, err + } + if r.Namespace == "" { + r.Namespace = namespace + } + + if !hasKey(meta.Keys, "fields") { + return nil, errors.New("avro: record must have an array of fields") + } + fields := make([]*Field, len(r.Fields)) + + var ( + rec *RecordSchema + err error + ) + switch typ { + case Record: + rec, err = NewRecordSchema(r.Name, r.Namespace, fields, + WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props), + ) + case Error: + rec, err = NewErrorRecordSchema(r.Name, r.Namespace, fields, + WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props), + ) + } + if err != nil { + return nil, err + } + + if err = seen.Add(rec.FullName()); err != nil { + return nil, err + } + + ref := NewRefSchema(rec) + cache.Add(rec.FullName(), ref) + for _, alias := range rec.Aliases() { + cache.Add(alias, ref) + } + + for i, f := range r.Fields { + field, err := parseField(rec.namespace, f, seen, cache) + if err != nil { + return nil, err + } + fields[i] = field + } + + return rec, nil +} + +type fieldSchema struct { + Name string `mapstructure:"name"` + Aliases []string `mapstructure:"aliases"` + Type any `mapstructure:"type"` + Doc string `mapstructure:"doc"` + Default any `mapstructure:"default"` + Order Order `mapstructure:"order"` + Props map[string]any `mapstructure:",remain"` +} + +func parseField(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (*Field, error) { + var ( + f fieldSchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &f, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding field: %w", err) + } + + if err := checkParsedName(f.Name); err != nil { + return nil, err + } + + if !hasKey(meta.Keys, "type") { + return nil, errors.New("avro: field requires a type") + } + typ, err := parseType(namespace, f.Type, seen, cache) + if err != nil { + return nil, err + } + + if !hasKey(meta.Keys, "default") { + f.Default = NoDefault + } + + field, err := NewField(f.Name, typ, + WithDefault(f.Default), WithAliases(f.Aliases), WithDoc(f.Doc), WithOrder(f.Order), WithProps(f.Props), + ) + if err != nil { + return nil, err + } + + return field, nil +} + +type enumSchema struct { + Name string `mapstructure:"name"` + Namespace string `mapstructure:"namespace"` + Aliases []string `mapstructure:"aliases"` + Type string `mapstructure:"type"` + Doc string `mapstructure:"doc"` + Symbols []string `mapstructure:"symbols"` + Default string `mapstructure:"default"` + Props map[string]any `mapstructure:",remain"` +} + +func parseEnum(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { + var ( + e enumSchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &e, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding enum: %w", err) + } + + if err := checkParsedName(e.Name); err != nil { + return nil, err + } + if e.Namespace == "" { + e.Namespace = namespace + } + + enum, err := NewEnumSchema(e.Name, e.Namespace, e.Symbols, + WithDefault(e.Default), WithAliases(e.Aliases), WithDoc(e.Doc), WithProps(e.Props), + ) + if err != nil { + return nil, err + } + + if err = seen.Add(enum.FullName()); err != nil { + return nil, err + } + + ref := NewRefSchema(enum) + cache.Add(enum.FullName(), ref) + for _, alias := range enum.Aliases() { + cache.Add(alias, enum) + } + + return enum, nil +} + +type arraySchema struct { + Type string `mapstructure:"type"` + Items any `mapstructure:"items"` + Props map[string]any `mapstructure:",remain"` +} + +func parseArray(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { + var ( + a arraySchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &a, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding array: %w", err) + } + + if !hasKey(meta.Keys, "items") { + return nil, errors.New("avro: array must have an items key") + } + schema, err := parseType(namespace, a.Items, seen, cache) + if err != nil { + return nil, err + } + + return NewArraySchema(schema, WithProps(a.Props)), nil +} + +type mapSchema struct { + Type string `mapstructure:"type"` + Values any `mapstructure:"values"` + Props map[string]any `mapstructure:",remain"` +} + +func parseMap(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { + var ( + ms mapSchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &ms, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding map: %w", err) + } + + if !hasKey(meta.Keys, "values") { + return nil, errors.New("avro: map must have an values key") + } + schema, err := parseType(namespace, ms.Values, seen, cache) + if err != nil { + return nil, err + } + + return NewMapSchema(schema, WithProps(ms.Props)), nil +} + +func parseUnion(namespace string, v []any, seen seenCache, cache *SchemaCache) (Schema, error) { + var err error + types := make([]Schema, len(v)) + for i := range v { + types[i], err = parseType(namespace, v[i], seen, cache) + if err != nil { + return nil, err + } + } + + return NewUnionSchema(types) +} + +type fixedSchema struct { + Name string `mapstructure:"name"` + Namespace string `mapstructure:"namespace"` + Aliases []string `mapstructure:"aliases"` + Type string `mapstructure:"type"` + Size int `mapstructure:"size"` + Props map[string]any `mapstructure:",remain"` +} + +func parseFixed(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { + var ( + f fixedSchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &f, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding fixed: %w", err) + } + + if err := checkParsedName(f.Name); err != nil { + return nil, err + } + if f.Namespace == "" { + f.Namespace = namespace + } + + if !hasKey(meta.Keys, "size") { + return nil, errors.New("avro: fixed must have a size") + } + + var logical LogicalSchema + if logicalType := logicalTypeProperty(f.Props); logicalType != "" { + logical = parseFixedLogicalType(f.Size, logicalType, f.Props) + if logical != nil { + delete(f.Props, "logicalType") + } + } + + fixed, err := NewFixedSchema(f.Name, f.Namespace, f.Size, logical, WithAliases(f.Aliases), WithProps(f.Props)) + if err != nil { + return nil, err + } + + if err = seen.Add(fixed.FullName()); err != nil { + return nil, err + } + + ref := NewRefSchema(fixed) + cache.Add(fixed.FullName(), ref) + for _, alias := range fixed.Aliases() { + cache.Add(alias, fixed) + } + + return fixed, nil +} + +func parseFixedLogicalType(size int, lt string, props map[string]any) LogicalSchema { + ltyp := LogicalType(lt) + switch { + case ltyp == Duration && size == 12: + return NewPrimitiveLogicalSchema(Duration) + case ltyp == Decimal: + return parseDecimalLogicalType(size, props) + } + + return nil +} + +type decimalSchema struct { + Precision int `mapstructure:"precision"` + Scale int `mapstructure:"scale"` +} + +func parseDecimalLogicalType(size int, props map[string]any) LogicalSchema { + var ( + d decimalSchema + meta mapstructure.Metadata + ) + if err := decodeMap(props, &d, &meta); err != nil { + return nil + } + decType := newDecimalLogicalType(size, d.Precision, d.Scale) + if decType != nil { + // Remove the properties that we consumed + delete(props, "precision") + delete(props, "scale") + } + return decType +} + +func newDecimalLogicalType(size, prec, scale int) LogicalSchema { + if prec <= 0 { + return nil + } + + if size > 0 { + maxPrecision := int(math.Round(math.Floor(math.Log10(2) * (8*float64(size) - 1)))) + if prec > maxPrecision { + return nil + } + } + + if scale < 0 { + return nil + } + + // Scale may not be bigger than precision + if scale > prec { + return nil + } + + return NewDecimalLogicalSchema(prec, scale) +} + +func fullName(namespace, name string) string { + if len(namespace) == 0 || strings.ContainsRune(name, '.') { + return name + } + + return namespace + "." + name +} + +func checkParsedName(name string) error { + if name == "" { + return errors.New("avro: non-empty name key required") + } + return nil +} + +func hasKey(keys []string, k string) bool { + for _, key := range keys { + if key == k { + return true + } + } + return false +} + +func decodeMap(in, v any, meta *mapstructure.Metadata) error { + cfg := &mapstructure.DecoderConfig{ + ZeroFields: true, + Metadata: meta, + Result: v, + } + + decoder, _ := mapstructure.NewDecoder(cfg) + return decoder.Decode(in) +} + +func derefSchema(schema Schema) Schema { + seen := map[string]struct{}{} + + return walkSchema(schema, func(schema Schema) Schema { + if ns, ok := schema.(NamedSchema); ok { + if _, hasSeen := seen[ns.FullName()]; hasSeen { + // This NamedSchema has been seen in this run, it needs + // to be turned into a reference. It is possible it was + // dereferenced in a previous run. + return NewRefSchema(ns) + } + + seen[ns.FullName()] = struct{}{} + return schema + } + + ref, isRef := schema.(*RefSchema) + if !isRef { + return schema + } + + if _, haveSeen := seen[ref.Schema().FullName()]; !haveSeen { + seen[ref.Schema().FullName()] = struct{}{} + return ref.Schema() + } + return schema + }) +} + +type seenCache map[string]struct{} + +func (c seenCache) Add(name string) error { + if _, ok := c[name]; ok { + return fmt.Errorf("duplicate name %q", name) + } + c[name] = struct{}{} + return nil +} + +func logicalTypeProperty(props map[string]any) string { + if lt, ok := props["logicalType"].(string); ok { + return lt + } + return "" +} diff --git a/vendor/github.com/hamba/avro/v2/schema_walk.go b/vendor/github.com/hamba/avro/v2/schema_walk.go new file mode 100644 index 0000000..253740c --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/schema_walk.go @@ -0,0 +1,21 @@ +package avro + +func walkSchema(schema Schema, fn func(Schema) Schema) Schema { + schema = fn(schema) + + switch s := schema.(type) { + case *RecordSchema: + for _, f := range s.Fields() { + f.typ = walkSchema(f.typ, fn) + } + case *ArraySchema: + s.items = walkSchema(s.items, fn) + case *MapSchema: + s.values = walkSchema(s.values, fn) + case *UnionSchema: + for i, st := range s.types { + s.types[i] = walkSchema(st, fn) + } + } + return schema +} diff --git a/vendor/github.com/hamba/avro/v2/types.go b/vendor/github.com/hamba/avro/v2/types.go new file mode 100644 index 0000000..931ea99 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/types.go @@ -0,0 +1,9 @@ +package avro + +// LogicalDuration represents the `duration` logical type, as defined in +// https://avro.apache.org/docs/1.11.1/specification/#duration +type LogicalDuration struct { + Months uint32 + Days uint32 + Milliseconds uint32 +} diff --git a/vendor/github.com/hamba/avro/v2/writer.go b/vendor/github.com/hamba/avro/v2/writer.go new file mode 100644 index 0000000..a12d6d0 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/writer.go @@ -0,0 +1,194 @@ +package avro + +import ( + "encoding/binary" + "io" + "math" +) + +// WriterFunc is a function used to customize the Writer. +type WriterFunc func(w *Writer) + +// WithWriterConfig specifies the configuration to use with a writer. +func WithWriterConfig(cfg API) WriterFunc { + return func(w *Writer) { + w.cfg = cfg.(*frozenConfig) + } +} + +// Writer is an Avro specific io.Writer. +type Writer struct { + cfg *frozenConfig + out io.Writer + buf []byte + Error error +} + +// NewWriter creates a new Writer. +func NewWriter(out io.Writer, bufSize int, opts ...WriterFunc) *Writer { + writer := &Writer{ + cfg: DefaultConfig.(*frozenConfig), + out: out, + buf: make([]byte, 0, bufSize), + Error: nil, + } + + for _, opt := range opts { + opt(writer) + } + + return writer +} + +// Reset resets the Writer with a new io.Writer attached. +func (w *Writer) Reset(out io.Writer) { + w.out = out + w.buf = w.buf[:0] +} + +// Buffered returns the number of buffered bytes. +func (w *Writer) Buffered() int { + return len(w.buf) +} + +// Buffer gets the Writer buffer. +func (w *Writer) Buffer() []byte { + return w.buf +} + +// Flush writes any buffered data to the underlying io.Writer. +func (w *Writer) Flush() error { + if w.out == nil { + return nil + } + if w.Error != nil { + return w.Error + } + + n, err := w.out.Write(w.buf) + if n < len(w.buf) && err == nil { + err = io.ErrShortWrite + } + if err != nil { + if w.Error == nil { + w.Error = err + } + return err + } + + w.buf = w.buf[:0] + + return nil +} + +func (w *Writer) writeByte(b byte) { + w.buf = append(w.buf, b) +} + +// Write writes raw bytes to the Writer. +func (w *Writer) Write(b []byte) (int, error) { + w.buf = append(w.buf, b...) + return len(b), nil +} + +// WriteBool writes a Bool to the Writer. +func (w *Writer) WriteBool(b bool) { + if b { + w.writeByte(0x01) + return + } + w.writeByte(0x00) +} + +// WriteInt writes an Int to the Writer. +func (w *Writer) WriteInt(i int32) { + e := uint64((uint32(i) << 1) ^ uint32(i>>31)) + w.encodeInt(e) +} + +// WriteLong writes a Long to the Writer. +func (w *Writer) WriteLong(i int64) { + e := (uint64(i) << 1) ^ uint64(i>>63) + w.encodeInt(e) +} + +func (w *Writer) encodeInt(i uint64) { + if i == 0 { + w.writeByte(0) + return + } + + for i > 0 { + b := byte(i) & 0x7F + i >>= 7 + + if i != 0 { + b |= 0x80 + } + w.writeByte(b) + } +} + +// WriteFloat writes a Float to the Writer. +func (w *Writer) WriteFloat(f float32) { + b := make([]byte, 4) + binary.LittleEndian.PutUint32(b, math.Float32bits(f)) + + w.buf = append(w.buf, b...) +} + +// WriteDouble writes a Double to the Writer. +func (w *Writer) WriteDouble(f float64) { + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, math.Float64bits(f)) + + w.buf = append(w.buf, b...) +} + +// WriteBytes writes Bytes to the Writer. +func (w *Writer) WriteBytes(b []byte) { + w.WriteLong(int64(len(b))) + w.buf = append(w.buf, b...) +} + +// WriteString reads a String to the Writer. +func (w *Writer) WriteString(s string) { + w.WriteLong(int64(len(s))) + w.buf = append(w.buf, s...) +} + +// WriteBlockHeader writes a Block Header to the Writer. +func (w *Writer) WriteBlockHeader(l, s int64) { + if s > 0 && !w.cfg.config.DisableBlockSizeHeader { + w.WriteLong(-l) + w.WriteLong(s) + return + } + w.WriteLong(l) +} + +// WriteBlockCB writes a block using the callback. +func (w *Writer) WriteBlockCB(callback func(w *Writer) int64) int64 { + var dummyHeader [18]byte + headerStart := len(w.buf) + + // Write dummy header + _, _ = w.Write(dummyHeader[:]) + + // Write block data + capturedAt := len(w.buf) + length := callback(w) + size := int64(len(w.buf) - capturedAt) + + // Take a reference to the block data + captured := w.buf[capturedAt:len(w.buf)] + + // Rewrite the header + w.buf = w.buf[:headerStart] + w.WriteBlockHeader(length, size) + + // Copy the block data back to its position + w.buf = append(w.buf, captured...) + + return length +} |
