summaryrefslogtreecommitdiff
path: root/vendor/github.com/hamba
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hamba')
-rw-r--r--vendor/github.com/hamba/avro/v2/.gitignore0
-rw-r--r--vendor/github.com/hamba/avro/v2/.golangci.yml49
-rw-r--r--vendor/github.com/hamba/avro/v2/CODE_OF_CONDUCT.md76
-rw-r--r--vendor/github.com/hamba/avro/v2/LICENCE21
-rw-r--r--vendor/github.com/hamba/avro/v2/Makefile27
-rw-r--r--vendor/github.com/hamba/avro/v2/README.md257
-rw-r--r--vendor/github.com/hamba/avro/v2/codec.go254
-rw-r--r--vendor/github.com/hamba/avro/v2/codec_array.go119
-rw-r--r--vendor/github.com/hamba/avro/v2/codec_default.go58
-rw-r--r--vendor/github.com/hamba/avro/v2/codec_dynamic.go59
-rw-r--r--vendor/github.com/hamba/avro/v2/codec_enum.go131
-rw-r--r--vendor/github.com/hamba/avro/v2/codec_fixed.go192
-rw-r--r--vendor/github.com/hamba/avro/v2/codec_generic.go133
-rw-r--r--vendor/github.com/hamba/avro/v2/codec_map.go246
-rw-r--r--vendor/github.com/hamba/avro/v2/codec_marshaler.go70
-rw-r--r--vendor/github.com/hamba/avro/v2/codec_native.go666
-rw-r--r--vendor/github.com/hamba/avro/v2/codec_ptr.go66
-rw-r--r--vendor/github.com/hamba/avro/v2/codec_record.go511
-rw-r--r--vendor/github.com/hamba/avro/v2/codec_skip.go225
-rw-r--r--vendor/github.com/hamba/avro/v2/codec_union.go460
-rw-r--r--vendor/github.com/hamba/avro/v2/config.go282
-rw-r--r--vendor/github.com/hamba/avro/v2/config_386.go8
-rw-r--r--vendor/github.com/hamba/avro/v2/config_arm.go8
-rw-r--r--vendor/github.com/hamba/avro/v2/config_x64.go7
-rw-r--r--vendor/github.com/hamba/avro/v2/converter.go34
-rw-r--r--vendor/github.com/hamba/avro/v2/decoder.go49
-rw-r--r--vendor/github.com/hamba/avro/v2/doc.go4
-rw-r--r--vendor/github.com/hamba/avro/v2/encoder.go37
-rw-r--r--vendor/github.com/hamba/avro/v2/internal/bytesx/reset.go38
-rw-r--r--vendor/github.com/hamba/avro/v2/noescape.go21
-rw-r--r--vendor/github.com/hamba/avro/v2/noescape.s0
-rw-r--r--vendor/github.com/hamba/avro/v2/ocf/codec.go164
-rw-r--r--vendor/github.com/hamba/avro/v2/ocf/ocf.go554
-rw-r--r--vendor/github.com/hamba/avro/v2/pkg/crc64/crc64.go154
-rw-r--r--vendor/github.com/hamba/avro/v2/protocol.go377
-rw-r--r--vendor/github.com/hamba/avro/v2/reader.go324
-rw-r--r--vendor/github.com/hamba/avro/v2/reader_generic.go163
-rw-r--r--vendor/github.com/hamba/avro/v2/reader_skip.go79
-rw-r--r--vendor/github.com/hamba/avro/v2/resolver.go90
-rw-r--r--vendor/github.com/hamba/avro/v2/schema.go1807
-rw-r--r--vendor/github.com/hamba/avro/v2/schema_compatibility.go487
-rw-r--r--vendor/github.com/hamba/avro/v2/schema_parse.go643
-rw-r--r--vendor/github.com/hamba/avro/v2/schema_walk.go21
-rw-r--r--vendor/github.com/hamba/avro/v2/types.go9
-rw-r--r--vendor/github.com/hamba/avro/v2/writer.go194
45 files changed, 9174 insertions, 0 deletions
diff --git a/vendor/github.com/hamba/avro/v2/.gitignore b/vendor/github.com/hamba/avro/v2/.gitignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/.gitignore
diff --git a/vendor/github.com/hamba/avro/v2/.golangci.yml b/vendor/github.com/hamba/avro/v2/.golangci.yml
new file mode 100644
index 0000000..fa86e9a
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/.golangci.yml
@@ -0,0 +1,49 @@
+run:
+ tests: false
+ deadline: 5m
+
+linters-settings:
+ gofumpt:
+ extra-rules: true
+ gosec:
+ excludes:
+ - G115
+
+linters:
+ enable-all: true
+ disable:
+ - cyclop # duplicate of gocyclo
+ - exportloopref # deprecated
+ - depguard
+ - err113
+ - exhaustive
+ - exhaustruct
+ - forcetypeassert
+ - funlen
+ - gochecknoglobals
+ - gochecknoinits
+ - gocognit
+ - goconst
+ - gocyclo
+ - gosmopolitan
+ - inamedparam
+ - interfacebloat
+ - ireturn
+ - mnd
+ - nestif
+ - nlreturn
+ - nonamedreturns
+ - tagliatelle
+ - varnamelen
+ - wrapcheck
+ - wsl
+
+issues:
+ exclude-use-default: false
+ exclude:
+ - 'package-comments: should have a package comment'
+ - 'G103: Use of unsafe calls should be audited'
+ exclude-rules:
+ - path: (schema|protocol)\.go
+ linters:
+ - gosec \ No newline at end of file
diff --git a/vendor/github.com/hamba/avro/v2/CODE_OF_CONDUCT.md b/vendor/github.com/hamba/avro/v2/CODE_OF_CONDUCT.md
new file mode 100644
index 0000000..f3e9129
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/CODE_OF_CONDUCT.md
@@ -0,0 +1,76 @@
+# Contributor Covenant Code of Conduct
+
+## Our Pledge
+
+In the interest of fostering an open and welcoming environment, we as
+contributors and maintainers pledge to making participation in our project and
+our community a harassment-free experience for everyone, regardless of age, body
+size, disability, ethnicity, sex characteristics, gender identity and expression,
+level of experience, education, socio-economic status, nationality, personal
+appearance, race, religion, or sexual identity and orientation.
+
+## Our Standards
+
+Examples of behavior that contributes to creating a positive environment
+include:
+
+* Using welcoming and inclusive language
+* Being respectful of differing viewpoints and experiences
+* Gracefully accepting constructive criticism
+* Focusing on what is best for the community
+* Showing empathy towards other community members
+
+Examples of unacceptable behavior by participants include:
+
+* The use of sexualized language or imagery and unwelcome sexual attention or
+ advances
+* Trolling, insulting/derogatory comments, and personal or political attacks
+* Public or private harassment
+* Publishing others' private information, such as a physical or electronic
+ address, without explicit permission
+* Other conduct which could reasonably be considered inappropriate in a
+ professional setting
+
+## Our Responsibilities
+
+Project maintainers are responsible for clarifying the standards of acceptable
+behavior and are expected to take appropriate and fair corrective action in
+response to any instances of unacceptable behavior.
+
+Project maintainers have the right and responsibility to remove, edit, or
+reject comments, commits, code, wiki edits, issues, and other contributions
+that are not aligned to this Code of Conduct, or to ban temporarily or
+permanently any contributor for other behaviors that they deem inappropriate,
+threatening, offensive, or harmful.
+
+## Scope
+
+This Code of Conduct applies both within project spaces and in public spaces
+when an individual is representing the project or its community. Examples of
+representing a project or community include using an official project e-mail
+address, posting via an official social media account, or acting as an appointed
+representative at an online or offline event. Representation of a project may be
+further defined and clarified by project maintainers.
+
+## Enforcement
+
+Instances of abusive, harassing, or otherwise unacceptable behavior may be
+reported by contacting the project team at nick@wiersma.co.za. All
+complaints will be reviewed and investigated and will result in a response that
+is deemed necessary and appropriate to the circumstances. The project team is
+obligated to maintain confidentiality with regard to the reporter of an incident.
+Further details of specific enforcement policies may be posted separately.
+
+Project maintainers who do not follow or enforce the Code of Conduct in good
+faith may face temporary or permanent repercussions as determined by other
+members of the project's leadership.
+
+## Attribution
+
+This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
+available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
+
+[homepage]: https://www.contributor-covenant.org
+
+For answers to common questions about this code of conduct, see
+https://www.contributor-covenant.org/faq
diff --git a/vendor/github.com/hamba/avro/v2/LICENCE b/vendor/github.com/hamba/avro/v2/LICENCE
new file mode 100644
index 0000000..fac73a4
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/LICENCE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2025 Nicholas Wiersma
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE. \ No newline at end of file
diff --git a/vendor/github.com/hamba/avro/v2/Makefile b/vendor/github.com/hamba/avro/v2/Makefile
new file mode 100644
index 0000000..3de65fa
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/Makefile
@@ -0,0 +1,27 @@
+# Format all files
+fmt:
+ @echo "==> Formatting source"
+ @gofmt -s -w $(shell find . -type f -name '*.go' -not -path "./vendor/*")
+ @echo "==> Done"
+.PHONY: fmt
+
+# Tidy the go.mod file
+tidy:
+ @echo "==> Cleaning go.mod"
+ @go mod tidy
+ @echo "==> Done"
+.PHONY: tidy
+
+# Run all tests
+test:
+ @go test -cover -race ./...
+.PHONY: test
+
+# Lint the project
+lint:
+ @golangci-lint run ./...
+.PHONY: lint
+
+# Run CI tasks
+ci: lint test
+.PHONY: ci
diff --git a/vendor/github.com/hamba/avro/v2/README.md b/vendor/github.com/hamba/avro/v2/README.md
new file mode 100644
index 0000000..7ccd9b4
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/README.md
@@ -0,0 +1,257 @@
+<picture>
+ <source media="(prefers-color-scheme: dark)" srcset="http://svg.wiersma.co.za/hamba/project?title=avro&tag=A%20fast%20Go%20avro%20codec&mode=dark">
+ <source media="(prefers-color-scheme: light)" srcset="http://svg.wiersma.co.za/hamba/project?title=avro&tag=A%20fast%20Go%20avro%20codec">
+ <img alt="Logo" src="http://svg.wiersma.co.za/hamba/project?title=avro&tag=A%20fast%20Go%20avro%20codec">
+</picture>
+
+[![Go Report Card](https://goreportcard.com/badge/github.com/hamba/avro/v2)](https://goreportcard.com/report/github.com/hamba/avro/v2)
+[![Build Status](https://github.com/hamba/avro/actions/workflows/test.yml/badge.svg)](https://github.com/hamba/avro/actions)
+[![Coverage Status](https://coveralls.io/repos/github/hamba/avro/badge.svg?branch=main)](https://coveralls.io/github/hamba/avro?branch=main)
+[![Go Reference](https://pkg.go.dev/badge/github.com/hamba/avro/v2.svg)](https://pkg.go.dev/github.com/hamba/avro/v2)
+[![GitHub release](https://img.shields.io/github/release/hamba/avro.svg)](https://github.com/hamba/avro/releases)
+[![GitHub license](https://img.shields.io/badge/license-MIT-blue.svg)](https://raw.githubusercontent.com/hamba/avro/master/LICENSE)
+
+A fast Go avro codec
+
+## Overview
+
+Install with:
+
+```shell
+go get github.com/hamba/avro/v2
+```
+
+**Note:** This project has renamed the default branch from `master` to `main`. You will need to update your local environment.
+
+## Usage
+
+```go
+type SimpleRecord struct {
+ A int64 `avro:"a"`
+ B string `avro:"b"`
+}
+
+schema, err := avro.Parse(`{
+ "type": "record",
+ "name": "simple",
+ "namespace": "org.hamba.avro",
+ "fields" : [
+ {"name": "a", "type": "long"},
+ {"name": "b", "type": "string"}
+ ]
+}`)
+if err != nil {
+ log.Fatal(err)
+}
+
+in := SimpleRecord{A: 27, B: "foo"}
+
+data, err := avro.Marshal(schema, in)
+if err != nil {
+ log.Fatal(err)
+}
+
+fmt.Println(data)
+// Outputs: [54 6 102 111 111]
+
+out := SimpleRecord{}
+err = avro.Unmarshal(schema, data, &out)
+if err != nil {
+ log.Fatal(err)
+}
+
+fmt.Println(out)
+// Outputs: {27 foo}
+```
+
+More examples in the [godoc](https://pkg.go.dev/github.com/hamba/avro/v2).
+
+#### Types Conversions
+
+| Avro | Go Struct | Go Interface |
+|-------------------------------|------------------------------------------------------------|--------------------------|
+| `null` | `nil` | `nil` |
+| `boolean` | `bool` | `bool` |
+| `bytes` | `[]byte` | `[]byte` |
+| `float` | `float32` | `float32` |
+| `double` | `float64` | `float64` |
+| `long` | `int`\*, `int64`, `uint32`\** | `int`, `int64`, `uint32` |
+| `int` | `int`\*, `int32`, `int16`, `int8`, `uint8`\**, `uint16`\** | `int`, `uint8`, `uint16` |
+| `fixed` | `uint64` | `uint64` |
+| `string` | `string` | `string` |
+| `array` | `[]T` | `[]any` |
+| `enum` | `string` | `string` |
+| `fixed` | `[n]byte` | `[n]byte` |
+| `map` | `map[string]T{}` | `map[string]any` |
+| `record` | `struct` | `map[string]any` |
+| `union` | *see below* | *see below* |
+| `int.date` | `time.Time` | `time.Time` |
+| `int.time-millis` | `time.Duration` | `time.Duration` |
+| `long.time-micros` | `time.Duration` | `time.Duration` |
+| `long.timestamp-millis` | `time.Time` | `time.Time` |
+| `long.timestamp-micros` | `time.Time` | `time.Time` |
+| `long.local-timestamp-millis` | `time.Time` | `time.Time` |
+| `long.local-timestamp-micros` | `time.Time` | `time.Time` |
+| `bytes.decimal` | `*big.Rat` | `*big.Rat` |
+| `fixed.decimal` | `*big.Rat` | `*big.Rat` |
+| `string.uuid` | `string` | `string` |
+
+\* Please note that the size of the Go type `int` is platform dependent. Decoding an Avro `long` into a Go `int` is
+only allowed on 64-bit platforms and will result in an error on 32-bit platforms. Similarly, be careful when encoding a
+Go `int` using Avro `int` on a 64-bit platform, as that can result in an integer overflow causing misinterpretation of
+the data.
+
+\** Please note that when the Go type is an unsigned integer care must be taken to ensure that information is not lost
+when converting between the Avro type and Go type. For example, storing a *negative* number in Avro of `int = -100`
+would be interpreted as `uint16 = 65,436` in Go. Another example would be storing numbers in Avro `int = 256` that
+are larger than the Go type `uint8 = 0`.
+
+##### Unions
+
+The following union types are accepted: `map[string]any`, `*T` and `any`.
+
+* **map[string]any:** If the union value is `nil`, a `nil` map will be en/decoded.
+When a non-`nil` union value is encountered, a single key is en/decoded. The key is the avro
+type name, or scheam full name in the case of a named schema (enum, fixed or record).
+* ***T:** This is allowed in a "nullable" union. A nullable union is defined as a two schema union,
+with one of the types being `null` (ie. `["null", "string"]` or `["string", "null"]`), in this case
+a `*T` is allowed, with `T` matching the conversion table above. In the case of a slice, the slice can be used
+directly.
+* **any:** An `interface` can be provided and the type or name resolved. Primitive types
+are pre-registered, but named types, maps and slices will need to be registered with the `Register` function.
+In the case of arrays and maps the enclosed schema type or name is postfix to the type with a `:` separator,
+e.g `"map:string"`. Behavior when a type cannot be resolved will depend on your chosen configuation options:
+ * !Config.UnionResolutionError && !Config.PartialUnionTypeResolution: the map type above is used
+ * Config.UnionResolutionError && !Config.PartialUnionTypeResolution: an error is returned
+ * !Config.UnionResolutionError && Config.PartialUnionTypeResolution: any registered type will get resolved while any unregistered type will fallback to the map type above.
+ * Config.UnionResolutionError && !Config.PartialUnionTypeResolution: any registered type will get resolved while any unregistered type will return an error.
+
+##### TextMarshaler and TextUnmarshaler
+
+The interfaces `TextMarshaler` and `TextUnmarshaler` are supported for a `string` schema type. The object will
+be tested first for implementation of these interfaces, in the case of a `string` schema, before trying regular
+encoding and decoding.
+
+Enums may also implement `TextMarshaler` and `TextUnmarshaler`, and must resolve to valid symbols in the given enum schema.
+
+##### Identical Underlying Types
+
+One type can be [ConvertibleTo](https://go.dev/ref/spec#Conversions) another type if they have identical underlying types.
+A non-native type is allowed be used if it can be convertible to *time.Time*, *big.Rat* or *avro.LogicalDuration* for the particular of *LogicalTypes*.
+
+Ex.: `type Timestamp time.Time`
+
+##### Untrusted Input With Bytes and Strings
+
+For security reasons, the configuration `Config.MaxByteSliceSize` restricts the maximum size of `bytes` and `string` types created
+by the `Reader`. The default maximum size is `1MiB` and is configurable. This is required to stop untrusted input from consuming all memory and
+crashing the application. Should this not be need, setting a negative number will disable the behaviour.
+
+## Benchmark
+
+Benchmark source code can be found at: [https://github.com/nrwiersma/avro-benchmarks](https://github.com/nrwiersma/avro-benchmarks)
+
+```
+BenchmarkGoAvroDecode-8 788455 1505 ns/op 418 B/op 27 allocs/op
+BenchmarkGoAvroEncode-8 624343 1908 ns/op 806 B/op 63 allocs/op
+BenchmarkGoGenAvroDecode-8 1360375 876.4 ns/op 320 B/op 11 allocs/op
+BenchmarkGoGenAvroEncode-8 2801583 425.9 ns/op 240 B/op 3 allocs/op
+BenchmarkHambaDecode-8 5046832 238.7 ns/op 47 B/op 0 allocs/op
+BenchmarkHambaEncode-8 6017635 196.2 ns/op 112 B/op 1 allocs/op
+BenchmarkLinkedinDecode-8 1000000 1003 ns/op 1688 B/op 35 allocs/op
+BenchmarkLinkedinEncode-8 3170553 381.5 ns/op 248 B/op 5 allocs/op
+```
+
+Always benchmark with your own workload. The result depends heavily on the data input.
+
+## Go structs generation
+
+Go structs can be generated for you from the schema. The types generated follow the same logic in [types conversions](#types-conversions)
+
+Install the struct generator with:
+
+```shell
+go install github.com/hamba/avro/v2/cmd/avrogen@<version>
+```
+
+Example usage assuming there's a valid schema in `in.avsc`:
+
+```shell
+avrogen -pkg avro -o bla.go -tags json:snake,yaml:upper-camel in.avsc
+```
+
+**Tip:** Omit `-o FILE` to dump the generated Go structs to stdout instead of a file.
+
+Check the options and usage with `-h`:
+
+```shell
+avrogen -h
+```
+
+Or use it as a lib in internal commands, it's the `gen` package
+
+## Avro schema validation
+
+### avrosv
+
+A small Avro schema validation command-line utility is also available. This simple tool leverages the
+schema parsing functionality of the library, showing validation errors or optionally dumping parsed
+schemas to the console. It can be used in CI/CD pipelines to validate schema changes in a repository.
+
+Install the Avro schema validator with:
+
+```shell
+go install github.com/hamba/avro/v2/cmd/avrosv@<version>
+```
+
+Example usage assuming there's a valid schema in `in.avsc` (exit status code is `0`):
+
+```shell
+avrosv in.avsc
+```
+
+An invalid schema will result in a diagnostic output and a non-zero exit status code:
+
+```shell
+avrosv bad-default-schema.avsc; echo $?
+Error: avro: invalid default for field someString. <nil> not a string
+2
+```
+
+Schemas referencing other schemas can also be validated by providing all of them (schemas are parsed in order):
+
+```shell
+avrosv base-schema.avsc schema-withref.avsc
+```
+
+Check the options and usage with `-h`:
+
+```shell
+avrosv -h
+```
+
+### Name Validation
+
+Avro names are validated according to the
+[Avro specification](https://avro.apache.org/docs/1.11.1/specification/#names).
+
+However, the official Java library does not validate said names accordingly, resulting to some files out in the wild
+to have invalid names. Thus, this library has a configuration option to allow for these invalid names to be parsed.
+
+```go
+avro.SkipNameValidation = true
+```
+
+Note that this variable is global, so ideally you'd need to unset it after you're done with the invalid schema.
+
+## Go Version Support
+
+This library supports the last two versions of Go. While the minimum Go version is
+not guaranteed to increase along side Go, it may jump from time to time to support
+additional features. This will be not be considered a breaking change.
+
+## Who uses hamba/avro?
+
+- [Apache Arrow for Go](https://github.com/apache/arrow-go)
+- [confluent-kafka-go](https://github.com/confluentinc/confluent-kafka-go)
+- [pulsar-client-go](https://github.com/apache/pulsar-client-go)
diff --git a/vendor/github.com/hamba/avro/v2/codec.go b/vendor/github.com/hamba/avro/v2/codec.go
new file mode 100644
index 0000000..c868f51
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec.go
@@ -0,0 +1,254 @@
+package avro
+
+import (
+ "fmt"
+ "math/big"
+ "reflect"
+ "time"
+ "unsafe"
+
+ "github.com/modern-go/reflect2"
+)
+
+var (
+ timeType = reflect.TypeOf(time.Time{})
+ timeDurationType = reflect.TypeOf(time.Duration(0))
+ ratType = reflect.TypeOf(big.Rat{})
+ durType = reflect.TypeOf(LogicalDuration{})
+)
+
+type null struct{}
+
+// ValDecoder represents an internal value decoder.
+//
+// You should never use ValDecoder directly.
+type ValDecoder interface {
+ Decode(ptr unsafe.Pointer, r *Reader)
+}
+
+// ValEncoder represents an internal value encoder.
+//
+// You should never use ValEncoder directly.
+type ValEncoder interface {
+ Encode(ptr unsafe.Pointer, w *Writer)
+}
+
+// ReadVal parses Avro value and stores the result in the value pointed to by obj.
+func (r *Reader) ReadVal(schema Schema, obj any) {
+ decoder := r.cfg.getDecoderFromCache(schema.CacheFingerprint(), reflect2.RTypeOf(obj))
+ if decoder == nil {
+ typ := reflect2.TypeOf(obj)
+ if typ.Kind() != reflect.Ptr {
+ r.ReportError("ReadVal", "can only unmarshal into pointer")
+ return
+ }
+ decoder = r.cfg.DecoderOf(schema, typ)
+ }
+
+ ptr := reflect2.PtrOf(obj)
+ if ptr == nil {
+ r.ReportError("ReadVal", "can not read into nil pointer")
+ return
+ }
+
+ decoder.Decode(ptr, r)
+}
+
+// WriteVal writes the Avro encoding of obj.
+func (w *Writer) WriteVal(schema Schema, val any) {
+ encoder := w.cfg.getEncoderFromCache(schema.Fingerprint(), reflect2.RTypeOf(val))
+ if encoder == nil {
+ typ := reflect2.TypeOf(val)
+ encoder = w.cfg.EncoderOf(schema, typ)
+ }
+ encoder.Encode(reflect2.PtrOf(val), w)
+}
+
+func (c *frozenConfig) DecoderOf(schema Schema, typ reflect2.Type) ValDecoder {
+ rtype := typ.RType()
+ decoder := c.getDecoderFromCache(schema.CacheFingerprint(), rtype)
+ if decoder != nil {
+ return decoder
+ }
+
+ ptrType := typ.(*reflect2.UnsafePtrType)
+ decoder = decoderOfType(newDecoderContext(c), schema, ptrType.Elem())
+ c.addDecoderToCache(schema.CacheFingerprint(), rtype, decoder)
+ return decoder
+}
+
+type deferDecoder struct {
+ decoder ValDecoder
+}
+
+func (d *deferDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ d.decoder.Decode(ptr, r)
+}
+
+type deferEncoder struct {
+ encoder ValEncoder
+}
+
+func (d *deferEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
+ d.encoder.Encode(ptr, w)
+}
+
+type decoderContext struct {
+ cfg *frozenConfig
+ decoders map[cacheKey]ValDecoder
+}
+
+func newDecoderContext(cfg *frozenConfig) *decoderContext {
+ return &decoderContext{
+ cfg: cfg,
+ decoders: make(map[cacheKey]ValDecoder),
+ }
+}
+
+type encoderContext struct {
+ cfg *frozenConfig
+ encoders map[cacheKey]ValEncoder
+}
+
+func newEncoderContext(cfg *frozenConfig) *encoderContext {
+ return &encoderContext{
+ cfg: cfg,
+ encoders: make(map[cacheKey]ValEncoder),
+ }
+}
+
+//nolint:dupl
+func decoderOfType(d *decoderContext, schema Schema, typ reflect2.Type) ValDecoder {
+ if dec := createDecoderOfMarshaler(schema, typ); dec != nil {
+ return dec
+ }
+
+ // Handle eface (empty interface) case when it isn't a union
+ if typ.Kind() == reflect.Interface && schema.Type() != Union {
+ if _, ok := typ.(*reflect2.UnsafeIFaceType); !ok {
+ return newEfaceDecoder(d, schema)
+ }
+ }
+
+ switch schema.Type() {
+ case Null:
+ return &nullCodec{}
+ case String, Bytes, Int, Long, Float, Double, Boolean:
+ return createDecoderOfNative(schema.(*PrimitiveSchema), typ)
+ case Record:
+ key := cacheKey{fingerprint: schema.CacheFingerprint(), rtype: typ.RType()}
+ defDec := &deferDecoder{}
+ d.decoders[key] = defDec
+ defDec.decoder = createDecoderOfRecord(d, schema.(*RecordSchema), typ)
+ return defDec.decoder
+ case Ref:
+ key := cacheKey{fingerprint: schema.(*RefSchema).Schema().CacheFingerprint(), rtype: typ.RType()}
+ if dec, f := d.decoders[key]; f {
+ return dec
+ }
+ return decoderOfType(d, schema.(*RefSchema).Schema(), typ)
+ case Enum:
+ return createDecoderOfEnum(schema.(*EnumSchema), typ)
+ case Array:
+ return createDecoderOfArray(d, schema.(*ArraySchema), typ)
+ case Map:
+ return createDecoderOfMap(d, schema.(*MapSchema), typ)
+ case Union:
+ return createDecoderOfUnion(d, schema.(*UnionSchema), typ)
+ case Fixed:
+ return createDecoderOfFixed(schema.(*FixedSchema), typ)
+ default:
+ // It is impossible to get here with a valid schema
+ return &errorDecoder{err: fmt.Errorf("avro: schema type %s is unsupported", schema.Type())}
+ }
+}
+
+func (c *frozenConfig) EncoderOf(schema Schema, typ reflect2.Type) ValEncoder {
+ if typ == nil {
+ typ = reflect2.TypeOf((*null)(nil))
+ }
+
+ rtype := typ.RType()
+ encoder := c.getEncoderFromCache(schema.Fingerprint(), rtype)
+ if encoder != nil {
+ return encoder
+ }
+
+ encoder = encoderOfType(newEncoderContext(c), schema, typ)
+ if typ.LikePtr() {
+ encoder = &onePtrEncoder{encoder}
+ }
+ c.addEncoderToCache(schema.Fingerprint(), rtype, encoder)
+ return encoder
+}
+
+type onePtrEncoder struct {
+ enc ValEncoder
+}
+
+func (e *onePtrEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
+ e.enc.Encode(noescape(unsafe.Pointer(&ptr)), w)
+}
+
+//nolint:dupl
+func encoderOfType(e *encoderContext, schema Schema, typ reflect2.Type) ValEncoder {
+ if enc := createEncoderOfMarshaler(schema, typ); enc != nil {
+ return enc
+ }
+
+ if typ.Kind() == reflect.Interface {
+ return &interfaceEncoder{schema: schema, typ: typ}
+ }
+
+ switch schema.Type() {
+ case Null:
+ return &nullCodec{}
+ case String, Bytes, Int, Long, Float, Double, Boolean:
+ return createEncoderOfNative(schema.(*PrimitiveSchema), typ)
+ case Record:
+ key := cacheKey{fingerprint: schema.Fingerprint(), rtype: typ.RType()}
+ defEnc := &deferEncoder{}
+ e.encoders[key] = defEnc
+ defEnc.encoder = createEncoderOfRecord(e, schema.(*RecordSchema), typ)
+ return defEnc.encoder
+ case Ref:
+ key := cacheKey{fingerprint: schema.(*RefSchema).Schema().Fingerprint(), rtype: typ.RType()}
+ if enc, f := e.encoders[key]; f {
+ return enc
+ }
+ return encoderOfType(e, schema.(*RefSchema).Schema(), typ)
+ case Enum:
+ return createEncoderOfEnum(schema.(*EnumSchema), typ)
+ case Array:
+ return createEncoderOfArray(e, schema.(*ArraySchema), typ)
+ case Map:
+ return createEncoderOfMap(e, schema.(*MapSchema), typ)
+ case Union:
+ return createEncoderOfUnion(e, schema.(*UnionSchema), typ)
+ case Fixed:
+ return createEncoderOfFixed(schema.(*FixedSchema), typ)
+ default:
+ // It is impossible to get here with a valid schema
+ return &errorEncoder{err: fmt.Errorf("avro: schema type %s is unsupported", schema.Type())}
+ }
+}
+
+type errorDecoder struct {
+ err error
+}
+
+func (d *errorDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ if r.Error == nil {
+ r.Error = d.err
+ }
+}
+
+type errorEncoder struct {
+ err error
+}
+
+func (e *errorEncoder) Encode(_ unsafe.Pointer, w *Writer) {
+ if w.Error == nil {
+ w.Error = e.err
+ }
+}
diff --git a/vendor/github.com/hamba/avro/v2/codec_array.go b/vendor/github.com/hamba/avro/v2/codec_array.go
new file mode 100644
index 0000000..0b412d9
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec_array.go
@@ -0,0 +1,119 @@
+package avro
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "reflect"
+ "unsafe"
+
+ "github.com/modern-go/reflect2"
+)
+
+func createDecoderOfArray(d *decoderContext, schema *ArraySchema, typ reflect2.Type) ValDecoder {
+ if typ.Kind() == reflect.Slice {
+ return decoderOfArray(d, schema, typ)
+ }
+
+ return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())}
+}
+
+func createEncoderOfArray(e *encoderContext, schema *ArraySchema, typ reflect2.Type) ValEncoder {
+ if typ.Kind() == reflect.Slice {
+ return encoderOfArray(e, schema, typ)
+ }
+
+ return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())}
+}
+
+func decoderOfArray(d *decoderContext, arr *ArraySchema, typ reflect2.Type) ValDecoder {
+ sliceType := typ.(*reflect2.UnsafeSliceType)
+ decoder := decoderOfType(d, arr.Items(), sliceType.Elem())
+
+ return &arrayDecoder{typ: sliceType, decoder: decoder}
+}
+
+type arrayDecoder struct {
+ typ *reflect2.UnsafeSliceType
+ decoder ValDecoder
+}
+
+func (d *arrayDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ var size int
+ sliceType := d.typ
+
+ for {
+ l, _ := r.ReadBlockHeader()
+ if l == 0 {
+ break
+ }
+
+ start := size
+ size += int(l)
+
+ if size > r.cfg.getMaxSliceAllocSize() {
+ r.ReportError("decode array", "size is greater than `Config.MaxSliceAllocSize`")
+ return
+ }
+
+ sliceType.UnsafeGrow(ptr, size)
+
+ for i := start; i < size; i++ {
+ elemPtr := sliceType.UnsafeGetIndex(ptr, i)
+ d.decoder.Decode(elemPtr, r)
+ if r.Error != nil {
+ r.Error = fmt.Errorf("reading %s: %w", d.typ.String(), r.Error)
+ return
+ }
+ }
+ }
+
+ if r.Error != nil && !errors.Is(r.Error, io.EOF) {
+ r.Error = fmt.Errorf("%v: %w", d.typ, r.Error)
+ }
+}
+
+func encoderOfArray(e *encoderContext, arr *ArraySchema, typ reflect2.Type) ValEncoder {
+ sliceType := typ.(*reflect2.UnsafeSliceType)
+ encoder := encoderOfType(e, arr.Items(), sliceType.Elem())
+
+ return &arrayEncoder{
+ blockLength: e.cfg.getBlockLength(),
+ typ: sliceType,
+ encoder: encoder,
+ }
+}
+
+type arrayEncoder struct {
+ blockLength int
+ typ *reflect2.UnsafeSliceType
+ encoder ValEncoder
+}
+
+func (e *arrayEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
+ blockLength := e.blockLength
+ length := e.typ.UnsafeLengthOf(ptr)
+
+ for i := 0; i < length; i += blockLength {
+ w.WriteBlockCB(func(w *Writer) int64 {
+ count := int64(0)
+ for j := i; j < i+blockLength && j < length; j++ {
+ elemPtr := e.typ.UnsafeGetIndex(ptr, j)
+ e.encoder.Encode(elemPtr, w)
+ if w.Error != nil && !errors.Is(w.Error, io.EOF) {
+ w.Error = fmt.Errorf("%s: %w", e.typ.String(), w.Error)
+ return count
+ }
+ count++
+ }
+
+ return count
+ })
+ }
+
+ w.WriteBlockHeader(0, 0)
+
+ if w.Error != nil && !errors.Is(w.Error, io.EOF) {
+ w.Error = fmt.Errorf("%v: %w", e.typ, w.Error)
+ }
+}
diff --git a/vendor/github.com/hamba/avro/v2/codec_default.go b/vendor/github.com/hamba/avro/v2/codec_default.go
new file mode 100644
index 0000000..c42bdc3
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec_default.go
@@ -0,0 +1,58 @@
+package avro
+
+import (
+ "fmt"
+ "unsafe"
+
+ "github.com/modern-go/reflect2"
+)
+
+func createDefaultDecoder(d *decoderContext, field *Field, typ reflect2.Type) ValDecoder {
+ cfg := d.cfg
+ fn := func(def any) ([]byte, error) {
+ defaultType := reflect2.TypeOf(def)
+ if defaultType == nil {
+ defaultType = reflect2.TypeOf((*null)(nil))
+ }
+ defaultEncoder := encoderOfType(newEncoderContext(cfg), field.Type(), defaultType)
+ if defaultType.LikePtr() {
+ defaultEncoder = &onePtrEncoder{defaultEncoder}
+ }
+ w := cfg.borrowWriter()
+ defer cfg.returnWriter(w)
+
+ defaultEncoder.Encode(reflect2.PtrOf(def), w)
+ if w.Error != nil {
+ return nil, w.Error
+ }
+ b := w.Buffer()
+ data := make([]byte, len(b))
+ copy(data, b)
+
+ return data, nil
+ }
+
+ b, err := field.encodeDefault(fn)
+ if err != nil {
+ return &errorDecoder{err: fmt.Errorf("decode default: %w", err)}
+ }
+ return &defaultDecoder{
+ data: b,
+ decoder: decoderOfType(d, field.Type(), typ),
+ }
+}
+
+type defaultDecoder struct {
+ data []byte
+ decoder ValDecoder
+}
+
+// Decode implements ValDecoder.
+func (d *defaultDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ rr := r.cfg.borrowReader(d.data)
+ defer r.cfg.returnReader(rr)
+
+ d.decoder.Decode(ptr, rr)
+}
+
+var _ ValDecoder = &defaultDecoder{}
diff --git a/vendor/github.com/hamba/avro/v2/codec_dynamic.go b/vendor/github.com/hamba/avro/v2/codec_dynamic.go
new file mode 100644
index 0000000..f14a04e
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec_dynamic.go
@@ -0,0 +1,59 @@
+package avro
+
+import (
+ "reflect"
+ "unsafe"
+
+ "github.com/modern-go/reflect2"
+)
+
+type efaceDecoder struct {
+ schema Schema
+ typ reflect2.Type
+ dec ValDecoder
+}
+
+func newEfaceDecoder(d *decoderContext, schema Schema) *efaceDecoder {
+ typ, _ := genericReceiver(schema)
+ dec := decoderOfType(d, schema, typ)
+
+ return &efaceDecoder{
+ schema: schema,
+ typ: typ,
+ dec: dec,
+ }
+}
+
+func (d *efaceDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ pObj := (*any)(ptr)
+ if *pObj == nil {
+ *pObj = genericDecode(d.typ, d.dec, r)
+ return
+ }
+
+ typ := reflect2.TypeOf(*pObj)
+ if typ.Kind() != reflect.Ptr {
+ *pObj = genericDecode(d.typ, d.dec, r)
+ return
+ }
+
+ ptrType := typ.(*reflect2.UnsafePtrType)
+ ptrElemType := ptrType.Elem()
+ if reflect2.IsNil(*pObj) {
+ obj := ptrElemType.New()
+ r.ReadVal(d.schema, obj)
+ *pObj = obj
+ return
+ }
+ r.ReadVal(d.schema, *pObj)
+}
+
+type interfaceEncoder struct {
+ schema Schema
+ typ reflect2.Type
+}
+
+func (e *interfaceEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
+ obj := e.typ.UnsafeIndirect(ptr)
+ w.WriteVal(e.schema, obj)
+}
diff --git a/vendor/github.com/hamba/avro/v2/codec_enum.go b/vendor/github.com/hamba/avro/v2/codec_enum.go
new file mode 100644
index 0000000..65ab453
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec_enum.go
@@ -0,0 +1,131 @@
+package avro
+
+import (
+ "encoding"
+ "errors"
+ "fmt"
+ "reflect"
+ "unsafe"
+
+ "github.com/modern-go/reflect2"
+)
+
+func createDecoderOfEnum(schema *EnumSchema, typ reflect2.Type) ValDecoder {
+ switch {
+ case typ.Kind() == reflect.String:
+ return &enumCodec{enum: schema}
+ case typ.Implements(textUnmarshalerType):
+ return &enumTextMarshalerCodec{typ: typ, enum: schema}
+ case reflect2.PtrTo(typ).Implements(textUnmarshalerType):
+ return &enumTextMarshalerCodec{typ: typ, enum: schema, ptr: true}
+ }
+
+ return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())}
+}
+
+func createEncoderOfEnum(schema *EnumSchema, typ reflect2.Type) ValEncoder {
+ switch {
+ case typ.Kind() == reflect.String:
+ return &enumCodec{enum: schema}
+ case typ.Implements(textMarshalerType):
+ return &enumTextMarshalerCodec{typ: typ, enum: schema}
+ case reflect2.PtrTo(typ).Implements(textMarshalerType):
+ return &enumTextMarshalerCodec{typ: typ, enum: schema, ptr: true}
+ }
+
+ return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())}
+}
+
+type enumCodec struct {
+ enum *EnumSchema
+}
+
+func (c *enumCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ i := int(r.ReadInt())
+
+ symbol, ok := c.enum.Symbol(i)
+ if !ok {
+ r.ReportError("decode enum symbol", "unknown enum symbol")
+ return
+ }
+
+ *((*string)(ptr)) = symbol
+}
+
+func (c *enumCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ str := *((*string)(ptr))
+ for i, sym := range c.enum.symbols {
+ if str != sym {
+ continue
+ }
+
+ w.WriteInt(int32(i))
+ return
+ }
+
+ w.Error = fmt.Errorf("avro: unknown enum symbol: %s", str)
+}
+
+type enumTextMarshalerCodec struct {
+ typ reflect2.Type
+ enum *EnumSchema
+ ptr bool
+}
+
+func (c *enumTextMarshalerCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ i := int(r.ReadInt())
+
+ symbol, ok := c.enum.Symbol(i)
+ if !ok {
+ r.ReportError("decode enum symbol", "unknown enum symbol")
+ return
+ }
+
+ var obj any
+ if c.ptr {
+ obj = c.typ.PackEFace(ptr)
+ } else {
+ obj = c.typ.UnsafeIndirect(ptr)
+ }
+ if reflect2.IsNil(obj) {
+ ptrType := c.typ.(*reflect2.UnsafePtrType)
+ newPtr := ptrType.Elem().UnsafeNew()
+ *((*unsafe.Pointer)(ptr)) = newPtr
+ obj = c.typ.UnsafeIndirect(ptr)
+ }
+ unmarshaler := (obj).(encoding.TextUnmarshaler)
+ if err := unmarshaler.UnmarshalText([]byte(symbol)); err != nil {
+ r.ReportError("decode enum text unmarshaler", err.Error())
+ }
+}
+
+func (c *enumTextMarshalerCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ var obj any
+ if c.ptr {
+ obj = c.typ.PackEFace(ptr)
+ } else {
+ obj = c.typ.UnsafeIndirect(ptr)
+ }
+ if c.typ.IsNullable() && reflect2.IsNil(obj) {
+ w.Error = errors.New("encoding nil enum text marshaler")
+ return
+ }
+ marshaler := (obj).(encoding.TextMarshaler)
+ b, err := marshaler.MarshalText()
+ if err != nil {
+ w.Error = err
+ return
+ }
+
+ str := string(b)
+ for i, sym := range c.enum.symbols {
+ if str != sym {
+ continue
+ }
+
+ w.WriteInt(int32(i))
+ return
+ }
+
+ w.Error = fmt.Errorf("avro: unknown enum symbol: %s", str)
+}
diff --git a/vendor/github.com/hamba/avro/v2/codec_fixed.go b/vendor/github.com/hamba/avro/v2/codec_fixed.go
new file mode 100644
index 0000000..d6a642b
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec_fixed.go
@@ -0,0 +1,192 @@
+package avro
+
+import (
+ "encoding/binary"
+ "fmt"
+ "math/big"
+ "reflect"
+ "unsafe"
+
+ "github.com/modern-go/reflect2"
+)
+
+func createDecoderOfFixed(fixed *FixedSchema, typ reflect2.Type) ValDecoder {
+ switch typ.Kind() {
+ case reflect.Array:
+ arrayType := typ.(reflect2.ArrayType)
+ if arrayType.Elem().Kind() != reflect.Uint8 || arrayType.Len() != fixed.Size() {
+ break
+ }
+ return &fixedCodec{arrayType: typ.(*reflect2.UnsafeArrayType)}
+ case reflect.Uint64:
+ if fixed.Size() != 8 {
+ break
+ }
+
+ return &fixedUint64Codec{}
+ case reflect.Ptr:
+ ptrType := typ.(*reflect2.UnsafePtrType)
+ elemType := ptrType.Elem()
+
+ ls := fixed.Logical()
+ tpy1 := elemType.Type1()
+ if elemType.Kind() != reflect.Struct || !tpy1.ConvertibleTo(ratType) || ls == nil ||
+ ls.Type() != Decimal {
+ break
+ }
+ dec := ls.(*DecimalLogicalSchema)
+ return &fixedDecimalCodec{prec: dec.Precision(), scale: dec.Scale(), size: fixed.Size()}
+ case reflect.Struct:
+ ls := fixed.Logical()
+ if ls == nil {
+ break
+ }
+ typ1 := typ.Type1()
+ if !typ1.ConvertibleTo(durType) || ls.Type() != Duration {
+ break
+ }
+ return &fixedDurationCodec{}
+ }
+
+ return &errorDecoder{
+ err: fmt.Errorf("avro: %s is unsupported for Avro %s, size=%d", typ.String(), fixed.Type(), fixed.Size()),
+ }
+}
+
+func createEncoderOfFixed(fixed *FixedSchema, typ reflect2.Type) ValEncoder {
+ switch typ.Kind() {
+ case reflect.Array:
+ arrayType := typ.(reflect2.ArrayType)
+ if arrayType.Elem().Kind() != reflect.Uint8 || arrayType.Len() != fixed.Size() {
+ break
+ }
+ return &fixedCodec{arrayType: typ.(*reflect2.UnsafeArrayType)}
+ case reflect.Uint64:
+ if fixed.Size() != 8 {
+ break
+ }
+
+ return &fixedUint64Codec{}
+ case reflect.Ptr:
+ ptrType := typ.(*reflect2.UnsafePtrType)
+ elemType := ptrType.Elem()
+
+ ls := fixed.Logical()
+ tpy1 := elemType.Type1()
+ if elemType.Kind() != reflect.Struct || !tpy1.ConvertibleTo(ratType) || ls == nil ||
+ ls.Type() != Decimal {
+ break
+ }
+ dec := ls.(*DecimalLogicalSchema)
+ return &fixedDecimalCodec{prec: dec.Precision(), scale: dec.Scale(), size: fixed.Size()}
+
+ case reflect.Struct:
+ ls := fixed.Logical()
+ if ls == nil {
+ break
+ }
+ typ1 := typ.Type1()
+ if typ1.ConvertibleTo(durType) && ls.Type() == Duration {
+ return &fixedDurationCodec{}
+ }
+ }
+
+ return &errorEncoder{
+ err: fmt.Errorf("avro: %s is unsupported for Avro %s, size=%d", typ.String(), fixed.Type(), fixed.Size()),
+ }
+}
+
+type fixedUint64Codec [8]byte
+
+func (c *fixedUint64Codec) Decode(ptr unsafe.Pointer, r *Reader) {
+ buffer := c[:]
+ r.Read(buffer)
+ *(*uint64)(ptr) = binary.BigEndian.Uint64(buffer)
+}
+
+func (c *fixedUint64Codec) Encode(ptr unsafe.Pointer, w *Writer) {
+ buffer := c[:]
+ binary.BigEndian.PutUint64(buffer, *(*uint64)(ptr))
+ _, _ = w.Write(buffer)
+}
+
+type fixedCodec struct {
+ arrayType *reflect2.UnsafeArrayType
+}
+
+func (c *fixedCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ for i := range c.arrayType.Len() {
+ c.arrayType.UnsafeSetIndex(ptr, i, reflect2.PtrOf(r.readByte()))
+ }
+}
+
+func (c *fixedCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ for i := range c.arrayType.Len() {
+ bytePtr := c.arrayType.UnsafeGetIndex(ptr, i)
+ w.writeByte(*((*byte)(bytePtr)))
+ }
+}
+
+type fixedDecimalCodec struct {
+ prec int
+ scale int
+ size int
+}
+
+func (c *fixedDecimalCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ b := make([]byte, c.size)
+ r.Read(b)
+ *((**big.Rat)(ptr)) = ratFromBytes(b, c.scale)
+}
+
+func (c *fixedDecimalCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ r := *((**big.Rat)(ptr))
+ scale := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(c.scale)), nil)
+ i := (&big.Int{}).Mul(r.Num(), scale)
+ i = i.Div(i, r.Denom())
+
+ var b []byte
+ switch i.Sign() {
+ case 0:
+ b = make([]byte, c.size)
+
+ case 1:
+ b = i.Bytes()
+ if b[0]&0x80 > 0 {
+ b = append([]byte{0}, b...)
+ }
+ if len(b) < c.size {
+ padded := make([]byte, c.size)
+ copy(padded[c.size-len(b):], b)
+ b = padded
+ }
+
+ case -1:
+ b = i.Add(i, (&big.Int{}).Lsh(one, uint(c.size*8))).Bytes()
+ }
+
+ _, _ = w.Write(b)
+}
+
+type fixedDurationCodec struct{}
+
+func (*fixedDurationCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ b := make([]byte, 12)
+ r.Read(b)
+ var duration LogicalDuration
+ duration.Months = binary.LittleEndian.Uint32(b[0:4])
+ duration.Days = binary.LittleEndian.Uint32(b[4:8])
+ duration.Milliseconds = binary.LittleEndian.Uint32(b[8:12])
+ *((*LogicalDuration)(ptr)) = duration
+}
+
+func (*fixedDurationCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ duration := (*LogicalDuration)(ptr)
+ b := make([]byte, 4)
+ binary.LittleEndian.PutUint32(b, duration.Months)
+ _, _ = w.Write(b)
+ binary.LittleEndian.PutUint32(b, duration.Days)
+ _, _ = w.Write(b)
+ binary.LittleEndian.PutUint32(b, duration.Milliseconds)
+ _, _ = w.Write(b)
+}
diff --git a/vendor/github.com/hamba/avro/v2/codec_generic.go b/vendor/github.com/hamba/avro/v2/codec_generic.go
new file mode 100644
index 0000000..00ef1e2
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec_generic.go
@@ -0,0 +1,133 @@
+package avro
+
+import (
+ "errors"
+ "math/big"
+ "time"
+
+ "github.com/modern-go/reflect2"
+)
+
+func genericDecode(typ reflect2.Type, dec ValDecoder, r *Reader) any {
+ ptr := typ.UnsafeNew()
+ dec.Decode(ptr, r)
+ if r.Error != nil {
+ return nil
+ }
+
+ obj := typ.UnsafeIndirect(ptr)
+ if reflect2.IsNil(obj) {
+ return nil
+ }
+ return obj
+}
+
+func genericReceiver(schema Schema) (reflect2.Type, error) {
+ if schema.Type() == Ref {
+ schema = schema.(*RefSchema).Schema()
+ }
+
+ var ls LogicalSchema
+ lts, ok := schema.(LogicalTypeSchema)
+ if ok {
+ ls = lts.Logical()
+ }
+
+ schemaName := string(schema.Type())
+ if ls != nil {
+ schemaName += "." + string(ls.Type())
+ }
+
+ switch schema.Type() {
+ case Null:
+ return reflect2.TypeOf((*null)(nil)), nil
+ case Boolean:
+ var v bool
+ return reflect2.TypeOf(v), nil
+ case Int:
+ if ls != nil {
+ switch ls.Type() {
+ case Date:
+ var v time.Time
+ return reflect2.TypeOf(v), nil
+
+ case TimeMillis:
+ var v time.Duration
+ return reflect2.TypeOf(v), nil
+ }
+ }
+ var v int
+ return reflect2.TypeOf(v), nil
+ case Long:
+ if ls != nil {
+ switch ls.Type() {
+ case TimeMicros:
+ var v time.Duration
+ return reflect2.TypeOf(v), nil
+ case TimestampMillis:
+ var v time.Time
+ return reflect2.TypeOf(v), nil
+ case TimestampMicros:
+ var v time.Time
+ return reflect2.TypeOf(v), nil
+ case LocalTimestampMillis:
+ var v time.Time
+ return reflect2.TypeOf(v), nil
+ case LocalTimestampMicros:
+ var v time.Time
+ return reflect2.TypeOf(v), nil
+ }
+ }
+ var v int64
+ return reflect2.TypeOf(v), nil
+ case Float:
+ var v float32
+ return reflect2.TypeOf(v), nil
+ case Double:
+ var v float64
+ return reflect2.TypeOf(v), nil
+ case String:
+ var v string
+ return reflect2.TypeOf(v), nil
+ case Bytes:
+ if ls != nil && ls.Type() == Decimal {
+ var v *big.Rat
+ return reflect2.TypeOf(v), nil
+ }
+ var v []byte
+ return reflect2.TypeOf(v), nil
+ case Record:
+ var v map[string]any
+ return reflect2.TypeOf(v), nil
+ case Enum:
+ var v string
+ return reflect2.TypeOf(v), nil
+ case Array:
+ v := make([]any, 0)
+ return reflect2.TypeOf(v), nil
+ case Map:
+ var v map[string]any
+ return reflect2.TypeOf(v), nil
+ case Union:
+ var v map[string]any
+ return reflect2.TypeOf(v), nil
+ case Fixed:
+ fixed := schema.(*FixedSchema)
+ ls := fixed.Logical()
+ if ls != nil {
+ switch ls.Type() {
+ case Duration:
+ var v LogicalDuration
+ return reflect2.TypeOf(v), nil
+ case Decimal:
+ var v *big.Rat
+ return reflect2.TypeOf(v), nil
+ }
+ }
+ v := byteSliceToArray(make([]byte, fixed.Size()), fixed.Size())
+ return reflect2.TypeOf(v), nil
+ default:
+ // This should not be possible.
+ return nil, errors.New("dynamic receiver not found for schema " + schemaName)
+ }
+}
diff --git a/vendor/github.com/hamba/avro/v2/codec_map.go b/vendor/github.com/hamba/avro/v2/codec_map.go
new file mode 100644
index 0000000..18c7de1
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec_map.go
@@ -0,0 +1,246 @@
+package avro
+
+import (
+ "encoding"
+ "errors"
+ "fmt"
+ "io"
+ "reflect"
+ "unsafe"
+
+ "github.com/modern-go/reflect2"
+)
+
+func createDecoderOfMap(d *decoderContext, schema *MapSchema, typ reflect2.Type) ValDecoder {
+ if typ.Kind() == reflect.Map {
+ keyType := typ.(reflect2.MapType).Key()
+ switch {
+ case keyType.Kind() == reflect.String:
+ return decoderOfMap(d, schema, typ)
+ case keyType.Implements(textUnmarshalerType):
+ return decoderOfMapUnmarshaler(d, schema, typ)
+ }
+ }
+
+ return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())}
+}
+
+func createEncoderOfMap(e *encoderContext, schema *MapSchema, typ reflect2.Type) ValEncoder {
+ if typ.Kind() == reflect.Map {
+ keyType := typ.(reflect2.MapType).Key()
+ switch {
+ case keyType.Kind() == reflect.String:
+ return encoderOfMap(e, schema, typ)
+ case keyType.Implements(textMarshalerType):
+ return encoderOfMapMarshaler(e, schema, typ)
+ }
+ }
+
+ return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())}
+}
+
+func decoderOfMap(d *decoderContext, m *MapSchema, typ reflect2.Type) ValDecoder {
+ mapType := typ.(*reflect2.UnsafeMapType)
+ decoder := decoderOfType(d, m.Values(), mapType.Elem())
+
+ return &mapDecoder{
+ mapType: mapType,
+ elemType: mapType.Elem(),
+ decoder: decoder,
+ }
+}
+
+type mapDecoder struct {
+ mapType *reflect2.UnsafeMapType
+ elemType reflect2.Type
+ decoder ValDecoder
+}
+
+func (d *mapDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ if d.mapType.UnsafeIsNil(ptr) {
+ d.mapType.UnsafeSet(ptr, d.mapType.UnsafeMakeMap(0))
+ }
+
+ for {
+ l, _ := r.ReadBlockHeader()
+ if l == 0 {
+ break
+ }
+
+ for range l {
+ keyPtr := reflect2.PtrOf(r.ReadString())
+ elemPtr := d.elemType.UnsafeNew()
+ d.decoder.Decode(elemPtr, r)
+ if r.Error != nil {
+ r.Error = fmt.Errorf("reading map[string]%s: %w", d.elemType.String(), r.Error)
+ return
+ }
+
+ d.mapType.UnsafeSetIndex(ptr, keyPtr, elemPtr)
+ }
+ }
+
+ if r.Error != nil && !errors.Is(r.Error, io.EOF) {
+ r.Error = fmt.Errorf("%v: %w", d.mapType, r.Error)
+ }
+}
+
+func decoderOfMapUnmarshaler(d *decoderContext, m *MapSchema, typ reflect2.Type) ValDecoder {
+ mapType := typ.(*reflect2.UnsafeMapType)
+ decoder := decoderOfType(d, m.Values(), mapType.Elem())
+
+ return &mapDecoderUnmarshaler{
+ mapType: mapType,
+ keyType: mapType.Key(),
+ elemType: mapType.Elem(),
+ decoder: decoder,
+ }
+}
+
+type mapDecoderUnmarshaler struct {
+ mapType *reflect2.UnsafeMapType
+ keyType reflect2.Type
+ elemType reflect2.Type
+ decoder ValDecoder
+}
+
+func (d *mapDecoderUnmarshaler) Decode(ptr unsafe.Pointer, r *Reader) {
+ if d.mapType.UnsafeIsNil(ptr) {
+ d.mapType.UnsafeSet(ptr, d.mapType.UnsafeMakeMap(0))
+ }
+
+ for {
+ l, _ := r.ReadBlockHeader()
+ if l == 0 {
+ break
+ }
+
+ for range l {
+ keyPtr := d.keyType.UnsafeNew()
+ keyObj := d.keyType.UnsafeIndirect(keyPtr)
+ if reflect2.IsNil(keyObj) {
+ ptrType := d.keyType.(*reflect2.UnsafePtrType)
+ newPtr := ptrType.Elem().UnsafeNew()
+ *((*unsafe.Pointer)(keyPtr)) = newPtr
+ keyObj = d.keyType.UnsafeIndirect(keyPtr)
+ }
+ unmarshaler := keyObj.(encoding.TextUnmarshaler)
+ err := unmarshaler.UnmarshalText([]byte(r.ReadString()))
+ if err != nil {
+ r.ReportError("mapDecoderUnmarshaler", err.Error())
+ return
+ }
+
+ elemPtr := d.elemType.UnsafeNew()
+ d.decoder.Decode(elemPtr, r)
+
+ d.mapType.UnsafeSetIndex(ptr, keyPtr, elemPtr)
+ }
+ }
+
+ if r.Error != nil && !errors.Is(r.Error, io.EOF) {
+ r.Error = fmt.Errorf("%v: %w", d.mapType, r.Error)
+ }
+}
+
+func encoderOfMap(e *encoderContext, m *MapSchema, typ reflect2.Type) ValEncoder {
+ mapType := typ.(*reflect2.UnsafeMapType)
+ encoder := encoderOfType(e, m.Values(), mapType.Elem())
+
+ return &mapEncoder{
+ blockLength: e.cfg.getBlockLength(),
+ mapType: mapType,
+ encoder: encoder,
+ }
+}
+
+type mapEncoder struct {
+ blockLength int
+ mapType *reflect2.UnsafeMapType
+ encoder ValEncoder
+}
+
+func (e *mapEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
+ blockLength := e.blockLength
+
+ iter := e.mapType.UnsafeIterate(ptr)
+
+ for {
+ wrote := w.WriteBlockCB(func(w *Writer) int64 {
+ var i int
+ for i = 0; iter.HasNext() && i < blockLength; i++ {
+ keyPtr, elemPtr := iter.UnsafeNext()
+ w.WriteString(*((*string)(keyPtr)))
+ e.encoder.Encode(elemPtr, w)
+ }
+
+ return int64(i)
+ })
+
+ if wrote == 0 {
+ break
+ }
+ }
+
+ if w.Error != nil && !errors.Is(w.Error, io.EOF) {
+ w.Error = fmt.Errorf("%v: %w", e.mapType, w.Error)
+ }
+}
+
+func encoderOfMapMarshaler(e *encoderContext, m *MapSchema, typ reflect2.Type) ValEncoder {
+ mapType := typ.(*reflect2.UnsafeMapType)
+ encoder := encoderOfType(e, m.Values(), mapType.Elem())
+
+ return &mapEncoderMarshaller{
+ blockLength: e.cfg.getBlockLength(),
+ mapType: mapType,
+ keyType: mapType.Key(),
+ encoder: encoder,
+ }
+}
+
+type mapEncoderMarshaller struct {
+ blockLength int
+ mapType *reflect2.UnsafeMapType
+ keyType reflect2.Type
+ encoder ValEncoder
+}
+
+func (e *mapEncoderMarshaller) Encode(ptr unsafe.Pointer, w *Writer) {
+ blockLength := e.blockLength
+
+ iter := e.mapType.UnsafeIterate(ptr)
+
+ for {
+ wrote := w.WriteBlockCB(func(w *Writer) int64 {
+ var i int
+ for i = 0; iter.HasNext() && i < blockLength; i++ {
+ keyPtr, elemPtr := iter.UnsafeNext()
+
+ obj := e.keyType.UnsafeIndirect(keyPtr)
+ if e.keyType.IsNullable() && reflect2.IsNil(obj) {
+ w.Error = errors.New("avro: mapEncoderMarshaller: encoding nil TextMarshaller")
+ return int64(0)
+ }
+ marshaler := (obj).(encoding.TextMarshaler)
+ b, err := marshaler.MarshalText()
+ if err != nil {
+ w.Error = err
+ return int64(0)
+ }
+ w.WriteString(string(b))
+
+ e.encoder.Encode(elemPtr, w)
+ }
+ return int64(i)
+ })
+
+ if wrote == 0 {
+ break
+ }
+ }
+
+ if w.Error != nil && !errors.Is(w.Error, io.EOF) {
+ w.Error = fmt.Errorf("%v: %w", e.mapType, w.Error)
+ }
+}
diff --git a/vendor/github.com/hamba/avro/v2/codec_marshaler.go b/vendor/github.com/hamba/avro/v2/codec_marshaler.go
new file mode 100644
index 0000000..d783d17
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec_marshaler.go
@@ -0,0 +1,70 @@
+package avro
+
+import (
+ "encoding"
+ "unsafe"
+
+ "github.com/modern-go/reflect2"
+)
+
+var (
+ textMarshalerType = reflect2.TypeOfPtr((*encoding.TextMarshaler)(nil)).Elem()
+ textUnmarshalerType = reflect2.TypeOfPtr((*encoding.TextUnmarshaler)(nil)).Elem()
+)
+
+func createDecoderOfMarshaler(schema Schema, typ reflect2.Type) ValDecoder {
+ if typ.Implements(textUnmarshalerType) && schema.Type() == String {
+ return &textMarshalerCodec{typ}
+ }
+ ptrType := reflect2.PtrTo(typ)
+ if ptrType.Implements(textUnmarshalerType) && schema.Type() == String {
+ return &referenceDecoder{
+ &textMarshalerCodec{ptrType},
+ }
+ }
+ return nil
+}
+
+func createEncoderOfMarshaler(schema Schema, typ reflect2.Type) ValEncoder {
+ if typ.Implements(textMarshalerType) && schema.Type() == String {
+ return &textMarshalerCodec{
+ typ: typ,
+ }
+ }
+ return nil
+}
+
+type textMarshalerCodec struct {
+ typ reflect2.Type
+}
+
+func (c textMarshalerCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ obj := c.typ.UnsafeIndirect(ptr)
+ if reflect2.IsNil(obj) {
+ ptrType := c.typ.(*reflect2.UnsafePtrType)
+ newPtr := ptrType.Elem().UnsafeNew()
+ *((*unsafe.Pointer)(ptr)) = newPtr
+ obj = c.typ.UnsafeIndirect(ptr)
+ }
+ unmarshaler := (obj).(encoding.TextUnmarshaler)
+ b := r.ReadBytes()
+ err := unmarshaler.UnmarshalText(b)
+ if err != nil {
+ r.ReportError("textMarshalerCodec", err.Error())
+ }
+}
+
+func (c textMarshalerCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ obj := c.typ.UnsafeIndirect(ptr)
+ if c.typ.IsNullable() && reflect2.IsNil(obj) {
+ w.WriteBytes(nil)
+ return
+ }
+ marshaler := (obj).(encoding.TextMarshaler)
+ b, err := marshaler.MarshalText()
+ if err != nil {
+ w.Error = err
+ return
+ }
+ w.WriteBytes(b)
+}
diff --git a/vendor/github.com/hamba/avro/v2/codec_native.go b/vendor/github.com/hamba/avro/v2/codec_native.go
new file mode 100644
index 0000000..85612a6
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec_native.go
@@ -0,0 +1,666 @@
+package avro
+
+import (
+ "fmt"
+ "math/big"
+ "reflect"
+ "strconv"
+ "time"
+ "unsafe"
+
+ "github.com/modern-go/reflect2"
+)
+
+//nolint:maintidx // Splitting this would not make it simpler.
+func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecoder {
+ resolved := schema.encodedType != ""
+ switch typ.Kind() {
+ case reflect.Bool:
+ if schema.Type() != Boolean {
+ break
+ }
+ return &boolCodec{}
+
+ case reflect.Int:
+ switch schema.Type() {
+ case Int:
+ return &intCodec[int]{}
+ case Long:
+ if strconv.IntSize == 64 {
+ // allow decoding into int when it's 64-bit
+ return &longCodec[int]{}
+ }
+ }
+
+ case reflect.Int8:
+ if schema.Type() != Int {
+ break
+ }
+ return &intCodec[int8]{}
+
+ case reflect.Uint8:
+ if schema.Type() != Int {
+ break
+ }
+ return &intCodec[uint8]{}
+
+ case reflect.Int16:
+ if schema.Type() != Int {
+ break
+ }
+ return &intCodec[int16]{}
+
+ case reflect.Uint16:
+ if schema.Type() != Int {
+ break
+ }
+ return &intCodec[uint16]{}
+
+ case reflect.Int32:
+ if schema.Type() != Int {
+ break
+ }
+ return &intCodec[int32]{}
+
+ case reflect.Uint32:
+ if schema.Type() != Long {
+ break
+ }
+ if resolved {
+ return &longConvCodec[uint32]{convert: createLongConverter(schema.encodedType)}
+ }
+ return &longCodec[uint32]{}
+
+ case reflect.Int64:
+ st := schema.Type()
+ lt := getLogicalType(schema)
+ switch {
+ case st == Int && lt == TimeMillis: // time.Duration
+ return &timeMillisCodec{}
+
+ case st == Long && lt == TimeMicros: // time.Duration
+ return &timeMicrosCodec{
+ convert: createLongConverter(schema.encodedType),
+ }
+
+ case st == Long:
+ isTimestamp := (lt == TimestampMillis || lt == TimestampMicros)
+ if isTimestamp && typ.Type1() == timeDurationType {
+ return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s",
+ typ.Type1().String(), schema.Type(), lt)}
+ }
+ if resolved {
+ return &longConvCodec[int64]{convert: createLongConverter(schema.encodedType)}
+ }
+ return &longCodec[int64]{}
+
+ default:
+ break
+ }
+
+ case reflect.Float32:
+ if schema.Type() != Float {
+ break
+ }
+ if resolved {
+ return &float32ConvCodec{convert: createFloatConverter(schema.encodedType)}
+ }
+ return &float32Codec{}
+
+ case reflect.Float64:
+ if schema.Type() != Double {
+ break
+ }
+ if resolved {
+ return &float64ConvCodec{convert: createDoubleConverter(schema.encodedType)}
+ }
+ return &float64Codec{}
+
+ case reflect.String:
+ if schema.Type() != String {
+ break
+ }
+ return &stringCodec{}
+
+ case reflect.Slice:
+ if typ.(reflect2.SliceType).Elem().Kind() != reflect.Uint8 || schema.Type() != Bytes {
+ break
+ }
+ return &bytesCodec{sliceType: typ.(*reflect2.UnsafeSliceType)}
+
+ case reflect.Struct:
+ st := schema.Type()
+ ls := getLogicalSchema(schema)
+ lt := getLogicalType(schema)
+ isTime := typ.Type1().ConvertibleTo(timeType)
+ switch {
+ case isTime && st == Int && lt == Date:
+ return &dateCodec{}
+ case isTime && st == Long && lt == TimestampMillis:
+ return &timestampMillisCodec{
+ convert: createLongConverter(schema.encodedType),
+ }
+ case isTime && st == Long && lt == TimestampMicros:
+ return &timestampMicrosCodec{
+ convert: createLongConverter(schema.encodedType),
+ }
+ case isTime && st == Long && lt == LocalTimestampMillis:
+ return &timestampMillisCodec{
+ local: true,
+ convert: createLongConverter(schema.encodedType),
+ }
+ case isTime && st == Long && lt == LocalTimestampMicros:
+ return &timestampMicrosCodec{
+ local: true,
+ convert: createLongConverter(schema.encodedType),
+ }
+ case typ.Type1().ConvertibleTo(ratType) && st == Bytes && lt == Decimal:
+ dec := ls.(*DecimalLogicalSchema)
+ return &bytesDecimalCodec{prec: dec.Precision(), scale: dec.Scale()}
+
+ default:
+ break
+ }
+ case reflect.Ptr:
+ ptrType := typ.(*reflect2.UnsafePtrType)
+ elemType := ptrType.Elem()
+ tpy1 := elemType.Type1()
+ ls := getLogicalSchema(schema)
+ if ls == nil {
+ break
+ }
+ if !tpy1.ConvertibleTo(ratType) || schema.Type() != Bytes || ls.Type() != Decimal {
+ break
+ }
+ dec := ls.(*DecimalLogicalSchema)
+
+ return &bytesDecimalPtrCodec{prec: dec.Precision(), scale: dec.Scale()}
+ }
+
+ return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())}
+}
+
+//nolint:maintidx // Splitting this would not make it simpler.
+func createEncoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValEncoder {
+ switch typ.Kind() {
+ case reflect.Bool:
+ if schema.Type() != Boolean {
+ break
+ }
+ return &boolCodec{}
+
+ case reflect.Int:
+ switch schema.Type() {
+ case Int:
+ return &intCodec[int]{}
+ case Long:
+ return &longCodec[int]{}
+ }
+
+ case reflect.Int8:
+ if schema.Type() != Int {
+ break
+ }
+ return &intCodec[int8]{}
+
+ case reflect.Uint8:
+ if schema.Type() != Int {
+ break
+ }
+ return &intCodec[uint8]{}
+
+ case reflect.Int16:
+ if schema.Type() != Int {
+ break
+ }
+ return &intCodec[int16]{}
+
+ case reflect.Uint16:
+ if schema.Type() != Int {
+ break
+ }
+ return &intCodec[uint16]{}
+
+ case reflect.Int32:
+ switch schema.Type() {
+ case Long:
+ return &longCodec[int32]{}
+
+ case Int:
+ return &intCodec[int32]{}
+ }
+
+ case reflect.Uint32:
+ if schema.Type() != Long {
+ break
+ }
+ return &longCodec[uint32]{}
+
+ case reflect.Int64:
+ st := schema.Type()
+ lt := getLogicalType(schema)
+ switch {
+ case st == Int && lt == TimeMillis: // time.Duration
+ return &timeMillisCodec{}
+
+ case st == Long && lt == TimeMicros: // time.Duration
+ return &timeMicrosCodec{}
+
+ case st == Long:
+ isTimestamp := (lt == TimestampMillis || lt == TimestampMicros)
+ if isTimestamp && typ.Type1() == timeDurationType {
+ return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s",
+ typ.Type1().String(), schema.Type(), lt)}
+ }
+ return &longCodec[int64]{}
+
+ default:
+ break
+ }
+
+ case reflect.Float32:
+ switch schema.Type() {
+ case Double:
+ return &float32DoubleCodec{}
+ case Float:
+ return &float32Codec{}
+ }
+
+ case reflect.Float64:
+ if schema.Type() != Double {
+ break
+ }
+ return &float64Codec{}
+
+ case reflect.String:
+ if schema.Type() != String {
+ break
+ }
+ return &stringCodec{}
+
+ case reflect.Slice:
+ if typ.(reflect2.SliceType).Elem().Kind() != reflect.Uint8 || schema.Type() != Bytes {
+ break
+ }
+ return &bytesCodec{sliceType: typ.(*reflect2.UnsafeSliceType)}
+
+ case reflect.Struct:
+ st := schema.Type()
+ lt := getLogicalType(schema)
+ isTime := typ.Type1().ConvertibleTo(timeType)
+ switch {
+ case isTime && st == Int && lt == Date:
+ return &dateCodec{}
+ case isTime && st == Long && lt == TimestampMillis:
+ return &timestampMillisCodec{}
+ case isTime && st == Long && lt == TimestampMicros:
+ return &timestampMicrosCodec{}
+ case isTime && st == Long && lt == LocalTimestampMillis:
+ return &timestampMillisCodec{local: true}
+ case isTime && st == Long && lt == LocalTimestampMicros:
+ return &timestampMicrosCodec{local: true}
+ case typ.Type1().ConvertibleTo(ratType) && st != Bytes || lt == Decimal:
+ ls := getLogicalSchema(schema)
+ dec := ls.(*DecimalLogicalSchema)
+ return &bytesDecimalCodec{prec: dec.Precision(), scale: dec.Scale()}
+ default:
+ break
+ }
+
+ case reflect.Ptr:
+ ptrType := typ.(*reflect2.UnsafePtrType)
+ elemType := ptrType.Elem()
+ tpy1 := elemType.Type1()
+ ls := getLogicalSchema(schema)
+ if ls == nil {
+ break
+ }
+ if !tpy1.ConvertibleTo(ratType) || schema.Type() != Bytes || ls.Type() != Decimal {
+ break
+ }
+ dec := ls.(*DecimalLogicalSchema)
+
+ return &bytesDecimalPtrCodec{prec: dec.Precision(), scale: dec.Scale()}
+ }
+
+ return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())}
+}
+
+func getLogicalSchema(schema Schema) LogicalSchema {
+ lts, ok := schema.(LogicalTypeSchema)
+ if !ok {
+ return nil
+ }
+
+ return lts.Logical()
+}
+
+func getLogicalType(schema Schema) LogicalType {
+ ls := getLogicalSchema(schema)
+ if ls == nil {
+ return ""
+ }
+
+ return ls.Type()
+}
+
+type nullCodec struct{}
+
+func (*nullCodec) Decode(unsafe.Pointer, *Reader) {}
+
+func (*nullCodec) Encode(unsafe.Pointer, *Writer) {}
+
+type boolCodec struct{}
+
+func (*boolCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ *((*bool)(ptr)) = r.ReadBool()
+}
+
+func (*boolCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ w.WriteBool(*((*bool)(ptr)))
+}
+
+type smallInt interface {
+ ~int | ~int8 | ~int16 | ~int32 | ~uint | ~uint8 | ~uint16
+}
+
+type intCodec[T smallInt] struct{}
+
+func (*intCodec[T]) Decode(ptr unsafe.Pointer, r *Reader) {
+ *((*T)(ptr)) = T(r.ReadInt())
+}
+
+func (*intCodec[T]) Encode(ptr unsafe.Pointer, w *Writer) {
+ w.WriteInt(int32(*((*T)(ptr))))
+}
+
+type largeInt interface {
+ ~int | ~int32 | ~uint32 | int64
+}
+
+type longCodec[T largeInt] struct{}
+
+func (c *longCodec[T]) Decode(ptr unsafe.Pointer, r *Reader) {
+ *((*T)(ptr)) = T(r.ReadLong())
+}
+
+func (*longCodec[T]) Encode(ptr unsafe.Pointer, w *Writer) {
+ w.WriteLong(int64(*((*T)(ptr))))
+}
+
+type longConvCodec[T largeInt] struct {
+ convert func(*Reader) int64
+}
+
+func (c *longConvCodec[T]) Decode(ptr unsafe.Pointer, r *Reader) {
+ *((*T)(ptr)) = T(c.convert(r))
+}
+
+type float32Codec struct{}
+
+func (c *float32Codec) Decode(ptr unsafe.Pointer, r *Reader) {
+ *((*float32)(ptr)) = r.ReadFloat()
+}
+
+func (*float32Codec) Encode(ptr unsafe.Pointer, w *Writer) {
+ w.WriteFloat(*((*float32)(ptr)))
+}
+
+type float32ConvCodec struct {
+ convert func(*Reader) float32
+}
+
+func (c *float32ConvCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ *((*float32)(ptr)) = c.convert(r)
+}
+
+type float32DoubleCodec struct{}
+
+func (*float32DoubleCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ w.WriteDouble(float64(*((*float32)(ptr))))
+}
+
+type float64Codec struct{}
+
+func (c *float64Codec) Decode(ptr unsafe.Pointer, r *Reader) {
+ *((*float64)(ptr)) = r.ReadDouble()
+}
+
+func (*float64Codec) Encode(ptr unsafe.Pointer, w *Writer) {
+ w.WriteDouble(*((*float64)(ptr)))
+}
+
+type float64ConvCodec struct {
+ convert func(*Reader) float64
+}
+
+func (c *float64ConvCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ *((*float64)(ptr)) = c.convert(r)
+}
+
+type stringCodec struct{}
+
+func (c *stringCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ *((*string)(ptr)) = r.ReadString()
+}
+
+func (*stringCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ w.WriteString(*((*string)(ptr)))
+}
+
+type bytesCodec struct {
+ sliceType *reflect2.UnsafeSliceType
+}
+
+func (c *bytesCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ b := r.ReadBytes()
+ c.sliceType.UnsafeSet(ptr, reflect2.PtrOf(b))
+}
+
+func (c *bytesCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ w.WriteBytes(*((*[]byte)(ptr)))
+}
+
+type dateCodec struct{}
+
+func (c *dateCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ i := r.ReadInt()
+ sec := int64(i) * int64(24*time.Hour/time.Second)
+ *((*time.Time)(ptr)) = time.Unix(sec, 0).UTC()
+}
+
+func (c *dateCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ t := *((*time.Time)(ptr))
+ days := t.Unix() / int64(24*time.Hour/time.Second)
+ w.WriteInt(int32(days))
+}
+
+type timestampMillisCodec struct {
+ local bool
+ convert func(*Reader) int64
+}
+
+func (c *timestampMillisCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ var i int64
+ if c.convert != nil {
+ i = c.convert(r)
+ } else {
+ i = r.ReadLong()
+ }
+ sec := i / 1e3
+ nsec := (i - sec*1e3) * 1e6
+ t := time.Unix(sec, nsec)
+
+ if c.local {
+ // When doing unix time, Go will convert the time from UTC to Local,
+ // changing the time by the number of seconds in the zone offset.
+ // Remove those added seconds.
+ _, offset := t.Zone()
+ t = t.Add(time.Duration(-1*offset) * time.Second)
+ *((*time.Time)(ptr)) = t
+ return
+ }
+ *((*time.Time)(ptr)) = t.UTC()
+}
+
+func (c *timestampMillisCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ t := *((*time.Time)(ptr))
+ if c.local {
+ t = t.Local()
+ t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
+ }
+ w.WriteLong(t.Unix()*1e3 + int64(t.Nanosecond()/1e6))
+}
+
+type timestampMicrosCodec struct {
+ local bool
+ convert func(*Reader) int64
+}
+
+func (c *timestampMicrosCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ var i int64
+ if c.convert != nil {
+ i = c.convert(r)
+ } else {
+ i = r.ReadLong()
+ }
+ sec := i / 1e6
+ nsec := (i - sec*1e6) * 1e3
+ t := time.Unix(sec, nsec)
+
+ if c.local {
+ // When doing unix time, Go will convert the time from UTC to Local,
+ // changing the time by the number of seconds in the zone offset.
+ // Remove those added seconds.
+ _, offset := t.Zone()
+ t = t.Add(time.Duration(-1*offset) * time.Second)
+ *((*time.Time)(ptr)) = t
+ return
+ }
+ *((*time.Time)(ptr)) = t.UTC()
+}
+
+func (c *timestampMicrosCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ t := *((*time.Time)(ptr))
+ if c.local {
+ t = t.Local()
+ t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
+ }
+ w.WriteLong(t.Unix()*1e6 + int64(t.Nanosecond()/1e3))
+}
+
+type timeMillisCodec struct{}
+
+func (c *timeMillisCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ i := r.ReadInt()
+ *((*time.Duration)(ptr)) = time.Duration(i) * time.Millisecond
+}
+
+func (c *timeMillisCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ d := *((*time.Duration)(ptr))
+ w.WriteInt(int32(d.Nanoseconds() / int64(time.Millisecond)))
+}
+
+type timeMicrosCodec struct {
+ convert func(*Reader) int64
+}
+
+func (c *timeMicrosCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ var i int64
+ if c.convert != nil {
+ i = c.convert(r)
+ } else {
+ i = r.ReadLong()
+ }
+ *((*time.Duration)(ptr)) = time.Duration(i) * time.Microsecond
+}
+
+func (c *timeMicrosCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ d := *((*time.Duration)(ptr))
+ w.WriteLong(d.Nanoseconds() / int64(time.Microsecond))
+}
+
+var one = big.NewInt(1)
+
+type bytesDecimalCodec struct {
+ prec int
+ scale int
+}
+
+func (c *bytesDecimalCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ b := r.ReadBytes()
+ if i := (&big.Int{}).SetBytes(b); len(b) > 0 && b[0]&0x80 > 0 {
+ i.Sub(i, new(big.Int).Lsh(one, uint(len(b))*8))
+ }
+ *((**big.Rat)(ptr)) = ratFromBytes(b, c.scale)
+}
+
+func ratFromBytes(b []byte, scale int) *big.Rat {
+ num := (&big.Int{}).SetBytes(b)
+ if len(b) > 0 && b[0]&0x80 > 0 {
+ num.Sub(num, new(big.Int).Lsh(one, uint(len(b))*8))
+ }
+ denom := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil)
+ return new(big.Rat).SetFrac(num, denom)
+}
+
+func (c *bytesDecimalCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ r := (*big.Rat)(ptr)
+ scale := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(c.scale)), nil)
+ i := (&big.Int{}).Mul(r.Num(), scale)
+ i = i.Div(i, r.Denom())
+
+ var b []byte
+ switch i.Sign() {
+ case 0:
+ b = []byte{0}
+
+ case 1:
+ b = i.Bytes()
+ if b[0]&0x80 > 0 {
+ b = append([]byte{0}, b...)
+ }
+
+ case -1:
+ length := uint(i.BitLen()/8+1) * 8
+ b = i.Add(i, (&big.Int{}).Lsh(one, length)).Bytes()
+ }
+ w.WriteBytes(b)
+}
+
+type bytesDecimalPtrCodec struct {
+ prec int
+ scale int
+}
+
+func (c *bytesDecimalPtrCodec) Decode(ptr unsafe.Pointer, r *Reader) {
+ b := r.ReadBytes()
+ if i := (&big.Int{}).SetBytes(b); len(b) > 0 && b[0]&0x80 > 0 {
+ i.Sub(i, new(big.Int).Lsh(one, uint(len(b))*8))
+ }
+ *((**big.Rat)(ptr)) = ratFromBytes(b, c.scale)
+}
+
+func (c *bytesDecimalPtrCodec) Encode(ptr unsafe.Pointer, w *Writer) {
+ r := *((**big.Rat)(ptr))
+ scale := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(c.scale)), nil)
+ i := (&big.Int{}).Mul(r.Num(), scale)
+ i = i.Div(i, r.Denom())
+
+ var b []byte
+ switch i.Sign() {
+ case 0:
+ b = []byte{0}
+
+ case 1:
+ b = i.Bytes()
+ if b[0]&0x80 > 0 {
+ b = append([]byte{0}, b...)
+ }
+
+ case -1:
+ length := uint(i.BitLen()/8+1) * 8
+ b = i.Add(i, (&big.Int{}).Lsh(one, length)).Bytes()
+ }
+ w.WriteBytes(b)
+}
diff --git a/vendor/github.com/hamba/avro/v2/codec_ptr.go b/vendor/github.com/hamba/avro/v2/codec_ptr.go
new file mode 100644
index 0000000..07b099e
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec_ptr.go
@@ -0,0 +1,66 @@
+package avro
+
+import (
+ "errors"
+ "unsafe"
+
+ "github.com/modern-go/reflect2"
+)
+
+func decoderOfPtr(d *decoderContext, schema Schema, typ reflect2.Type) ValDecoder {
+ ptrType := typ.(*reflect2.UnsafePtrType)
+ elemType := ptrType.Elem()
+
+ decoder := decoderOfType(d, schema, elemType)
+
+ return &dereferenceDecoder{typ: elemType, decoder: decoder}
+}
+
+type dereferenceDecoder struct {
+ typ reflect2.Type
+ decoder ValDecoder
+}
+
+func (d *dereferenceDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ if *((*unsafe.Pointer)(ptr)) == nil {
+ // Create new instance
+ newPtr := d.typ.UnsafeNew()
+ d.decoder.Decode(newPtr, r)
+ *((*unsafe.Pointer)(ptr)) = newPtr
+ return
+ }
+
+ // Reuse existing instance
+ d.decoder.Decode(*((*unsafe.Pointer)(ptr)), r)
+}
+
+func encoderOfPtr(e *encoderContext, schema Schema, typ reflect2.Type) ValEncoder {
+ ptrType := typ.(*reflect2.UnsafePtrType)
+ elemType := ptrType.Elem()
+
+ enc := encoderOfType(e, schema, elemType)
+
+ return &dereferenceEncoder{typ: elemType, encoder: enc}
+}
+
+type dereferenceEncoder struct {
+ typ reflect2.Type
+ encoder ValEncoder
+}
+
+func (d *dereferenceEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
+ if *((*unsafe.Pointer)(ptr)) == nil {
+ w.Error = errors.New("avro: cannot encode nil pointer")
+ return
+ }
+
+ d.encoder.Encode(*((*unsafe.Pointer)(ptr)), w)
+}
+
+type referenceDecoder struct {
+ decoder ValDecoder
+}
+
+func (decoder *referenceDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ decoder.decoder.Decode(unsafe.Pointer(&ptr), r)
+}
diff --git a/vendor/github.com/hamba/avro/v2/codec_record.go b/vendor/github.com/hamba/avro/v2/codec_record.go
new file mode 100644
index 0000000..67562b7
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec_record.go
@@ -0,0 +1,511 @@
+package avro
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "reflect"
+ "unsafe"
+
+ "github.com/modern-go/reflect2"
+)
+
+func createDecoderOfRecord(d *decoderContext, schema Schema, typ reflect2.Type) ValDecoder {
+ switch typ.Kind() {
+ case reflect.Struct:
+ return decoderOfStruct(d, schema, typ)
+
+ case reflect.Map:
+ if typ.(reflect2.MapType).Key().Kind() != reflect.String ||
+ typ.(reflect2.MapType).Elem().Kind() != reflect.Interface {
+ break
+ }
+ return decoderOfRecord(d, schema, typ)
+
+ case reflect.Ptr:
+ return decoderOfPtr(d, schema, typ)
+
+ case reflect.Interface:
+ if ifaceType, ok := typ.(*reflect2.UnsafeIFaceType); ok {
+ return &recordIfaceDecoder{schema: schema, valType: ifaceType}
+ }
+ }
+
+ return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for avro %s", typ.String(), schema.Type())}
+}
+
+func createEncoderOfRecord(e *encoderContext, schema *RecordSchema, typ reflect2.Type) ValEncoder {
+ switch typ.Kind() {
+ case reflect.Struct:
+ return encoderOfStruct(e, schema, typ)
+
+ case reflect.Map:
+ if typ.(reflect2.MapType).Key().Kind() != reflect.String ||
+ typ.(reflect2.MapType).Elem().Kind() != reflect.Interface {
+ break
+ }
+ return encoderOfRecord(e, schema, typ)
+
+ case reflect.Ptr:
+ return encoderOfPtr(e, schema, typ)
+ }
+
+ return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for avro %s", typ.String(), schema.Type())}
+}
+
+func decoderOfStruct(d *decoderContext, schema Schema, typ reflect2.Type) ValDecoder {
+ rec := schema.(*RecordSchema)
+ structDesc := describeStruct(d.cfg.getTagKey(), typ)
+
+ fields := make([]*structFieldDecoder, 0, len(rec.Fields()))
+
+ for _, field := range rec.Fields() {
+ if field.action == FieldIgnore {
+ fields = append(fields, &structFieldDecoder{
+ decoder: createSkipDecoder(field.Type()),
+ })
+ continue
+ }
+
+ sf := structDesc.Fields.Get(field.Name())
+ if sf == nil {
+ for _, alias := range field.Aliases() {
+ sf = structDesc.Fields.Get(alias)
+ if sf != nil {
+ break
+ }
+ }
+ }
+ // Skip field if it doesn't exist
+ if sf == nil {
+ // If the field value doesn't exist in the binary, ignore it instead of
+ // appending a 'SkipDecoder'.
+ //
+ // Note: 'SkipDecoder' performs a read and moves the cursor, which,
+ // in this case, will lead to a dirty read.
+ if field.action == FieldSetDefault {
+ continue
+ }
+
+ fields = append(fields, &structFieldDecoder{
+ decoder: createSkipDecoder(field.Type()),
+ })
+ continue
+ }
+
+ if field.action == FieldSetDefault {
+ if field.hasDef {
+ fields = append(fields, &structFieldDecoder{
+ field: sf.Field,
+ decoder: createDefaultDecoder(d, field, sf.Field[len(sf.Field)-1].Type()),
+ })
+
+ continue
+ }
+ }
+
+ dec := decoderOfType(d, field.Type(), sf.Field[len(sf.Field)-1].Type())
+ fields = append(fields, &structFieldDecoder{
+ field: sf.Field,
+ decoder: dec,
+ })
+ }
+
+ return &structDecoder{typ: typ, fields: fields}
+}
+
+type structFieldDecoder struct {
+ field []*reflect2.UnsafeStructField
+ decoder ValDecoder
+}
+
+type structDecoder struct {
+ typ reflect2.Type
+ fields []*structFieldDecoder
+}
+
+func (d *structDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ for _, field := range d.fields {
+ // Skip case
+ if field.field == nil {
+ field.decoder.Decode(nil, r)
+ continue
+ }
+
+ fieldPtr := ptr
+ for i, f := range field.field {
+ fieldPtr = f.UnsafeGet(fieldPtr)
+
+ if i == len(field.field)-1 {
+ break
+ }
+
+ if f.Type().Kind() == reflect.Ptr {
+ if *((*unsafe.Pointer)(fieldPtr)) == nil {
+ newPtr := f.Type().(*reflect2.UnsafePtrType).Elem().UnsafeNew()
+ *((*unsafe.Pointer)(fieldPtr)) = newPtr
+ }
+
+ fieldPtr = *((*unsafe.Pointer)(fieldPtr))
+ }
+ }
+ field.decoder.Decode(fieldPtr, r)
+
+ if r.Error != nil && !errors.Is(r.Error, io.EOF) {
+ for _, f := range field.field {
+ r.Error = fmt.Errorf("%s: %w", f.Name(), r.Error)
+ return
+ }
+ }
+ }
+}
+
+func encoderOfStruct(e *encoderContext, rec *RecordSchema, typ reflect2.Type) ValEncoder {
+ structDesc := describeStruct(e.cfg.getTagKey(), typ)
+
+ fields := make([]*structFieldEncoder, 0, len(rec.Fields()))
+ for _, field := range rec.Fields() {
+ sf := structDesc.Fields.Get(field.Name())
+ if sf != nil {
+ fields = append(fields, &structFieldEncoder{
+ field: sf.Field,
+ encoder: encoderOfType(e, field.Type(), sf.Field[len(sf.Field)-1].Type()),
+ })
+ continue
+ }
+
+ if !field.HasDefault() {
+ // In all other cases, this is a required field
+ err := fmt.Errorf("avro: record %s is missing required field %q", rec.FullName(), field.Name())
+ return &errorEncoder{err: err}
+ }
+
+ def := field.Default()
+ if field.Default() == nil {
+ if field.Type().Type() == Null {
+ // We write nothing in a Null case, just skip it
+ continue
+ }
+
+ if field.Type().Type() == Union && field.Type().(*UnionSchema).Nullable() {
+ defaultType := reflect2.TypeOf(&def)
+ fields = append(fields, &structFieldEncoder{
+ defaultPtr: reflect2.PtrOf(&def),
+ encoder: encoderOfNullableUnion(e, field.Type(), defaultType),
+ })
+ continue
+ }
+ }
+
+ defaultType := reflect2.TypeOf(def)
+ defaultEncoder := encoderOfType(e, field.Type(), defaultType)
+ if defaultType.LikePtr() {
+ defaultEncoder = &onePtrEncoder{defaultEncoder}
+ }
+ fields = append(fields, &structFieldEncoder{
+ defaultPtr: reflect2.PtrOf(def),
+ encoder: defaultEncoder,
+ })
+ }
+ return &structEncoder{typ: typ, fields: fields}
+}
+
+type structFieldEncoder struct {
+ field []*reflect2.UnsafeStructField
+ defaultPtr unsafe.Pointer
+ encoder ValEncoder
+}
+
+type structEncoder struct {
+ typ reflect2.Type
+ fields []*structFieldEncoder
+}
+
+func (e *structEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
+ for _, field := range e.fields {
+ // Default case
+ if field.field == nil {
+ field.encoder.Encode(field.defaultPtr, w)
+ continue
+ }
+
+ fieldPtr := ptr
+ for i, f := range field.field {
+ fieldPtr = f.UnsafeGet(fieldPtr)
+
+ if i == len(field.field)-1 {
+ break
+ }
+
+ if f.Type().Kind() == reflect.Ptr {
+ if *((*unsafe.Pointer)(fieldPtr)) == nil {
+ w.Error = fmt.Errorf("embedded field %q is nil", f.Name())
+ return
+ }
+
+ fieldPtr = *((*unsafe.Pointer)(fieldPtr))
+ }
+ }
+ field.encoder.Encode(fieldPtr, w)
+
+ if w.Error != nil && !errors.Is(w.Error, io.EOF) {
+ for _, f := range field.field {
+ w.Error = fmt.Errorf("%s: %w", f.Name(), w.Error)
+ return
+ }
+ }
+ }
+}
+
+func decoderOfRecord(d *decoderContext, schema Schema, typ reflect2.Type) ValDecoder {
+ rec := schema.(*RecordSchema)
+ mapType := typ.(*reflect2.UnsafeMapType)
+
+ fields := make([]recordMapDecoderField, len(rec.Fields()))
+ for i, field := range rec.Fields() {
+ switch field.action {
+ case FieldIgnore:
+ fields[i] = recordMapDecoderField{
+ name: field.Name(),
+ decoder: createSkipDecoder(field.Type()),
+ skip: true,
+ }
+ continue
+ case FieldSetDefault:
+ if field.hasDef {
+ fields[i] = recordMapDecoderField{
+ name: field.Name(),
+ decoder: createDefaultDecoder(d, field, mapType.Elem()),
+ }
+ continue
+ }
+ }
+
+ fields[i] = recordMapDecoderField{
+ name: field.Name(),
+ decoder: decoderOfType(d, field.Type(), mapType.Elem()),
+ }
+ }
+
+ return &recordMapDecoder{
+ mapType: mapType,
+ elemType: mapType.Elem(),
+ fields: fields,
+ }
+}
+
+type recordMapDecoderField struct {
+ name string
+ decoder ValDecoder
+ skip bool
+}
+
+type recordMapDecoder struct {
+ mapType *reflect2.UnsafeMapType
+ elemType reflect2.Type
+ fields []recordMapDecoderField
+}
+
+func (d *recordMapDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ if d.mapType.UnsafeIsNil(ptr) {
+ d.mapType.UnsafeSet(ptr, d.mapType.UnsafeMakeMap(len(d.fields)))
+ }
+
+ for _, field := range d.fields {
+ elemPtr := d.elemType.UnsafeNew()
+ field.decoder.Decode(elemPtr, r)
+ if field.skip {
+ continue
+ }
+
+ d.mapType.UnsafeSetIndex(ptr, reflect2.PtrOf(field), elemPtr)
+ }
+
+ if r.Error != nil && !errors.Is(r.Error, io.EOF) {
+ r.Error = fmt.Errorf("%v: %w", d.mapType, r.Error)
+ }
+}
+
+func encoderOfRecord(e *encoderContext, rec *RecordSchema, typ reflect2.Type) ValEncoder {
+ mapType := typ.(*reflect2.UnsafeMapType)
+
+ fields := make([]mapEncoderField, len(rec.Fields()))
+ for i, field := range rec.Fields() {
+ fields[i] = mapEncoderField{
+ name: field.Name(),
+ hasDef: field.HasDefault(),
+ def: field.Default(),
+ encoder: encoderOfType(e, field.Type(), mapType.Elem()),
+ }
+
+ if field.HasDefault() {
+ switch {
+ case field.Type().Type() == Union:
+ union := field.Type().(*UnionSchema)
+ fields[i].def = map[string]any{
+ string(union.Types()[0].Type()): field.Default(),
+ }
+ case field.Default() == nil:
+ continue
+ }
+
+ defaultType := reflect2.TypeOf(fields[i].def)
+ fields[i].defEncoder = encoderOfType(e, field.Type(), defaultType)
+ if defaultType.LikePtr() {
+ fields[i].defEncoder = &onePtrEncoder{fields[i].defEncoder}
+ }
+ }
+ }
+
+ return &recordMapEncoder{
+ mapType: mapType,
+ fields: fields,
+ }
+}
+
+type mapEncoderField struct {
+ name string
+ hasDef bool
+ def any
+ defEncoder ValEncoder
+ encoder ValEncoder
+}
+
+type recordMapEncoder struct {
+ mapType *reflect2.UnsafeMapType
+ fields []mapEncoderField
+}
+
+func (e *recordMapEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
+ for _, field := range e.fields {
+ // The first property of mapEncoderField is the name, so a pointer
+ // to field is a pointer to the name.
+ valPtr := e.mapType.UnsafeGetIndex(ptr, reflect2.PtrOf(field))
+ if valPtr == nil {
+ // Missing required field
+ if !field.hasDef {
+ w.Error = fmt.Errorf("avro: missing required field %s", field.name)
+ return
+ }
+
+ // Null default
+ if field.def == nil {
+ continue
+ }
+
+ defPtr := reflect2.PtrOf(field.def)
+ field.defEncoder.Encode(defPtr, w)
+ continue
+ }
+
+ field.encoder.Encode(valPtr, w)
+
+ if w.Error != nil && !errors.Is(w.Error, io.EOF) {
+ w.Error = fmt.Errorf("%s: %w", field.name, w.Error)
+ return
+ }
+ }
+}
+
+type recordIfaceDecoder struct {
+ schema Schema
+ valType *reflect2.UnsafeIFaceType
+}
+
+func (d *recordIfaceDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ obj := d.valType.UnsafeIndirect(ptr)
+ if reflect2.IsNil(obj) {
+ r.ReportError("decode non empty interface", "can not unmarshal into nil")
+ return
+ }
+
+ r.ReadVal(d.schema, obj)
+}
+
+type structDescriptor struct {
+ Type reflect2.Type
+ Fields structFields
+}
+
+type structFields []*structField
+
+func (sf structFields) Get(name string) *structField {
+ for _, f := range sf {
+ if f.Name == name {
+ return f
+ }
+ }
+
+ return nil
+}
+
+type structField struct {
+ Name string
+ Field []*reflect2.UnsafeStructField
+
+ anon *reflect2.UnsafeStructType
+}
+
+func describeStruct(tagKey string, typ reflect2.Type) *structDescriptor {
+ structType := typ.(*reflect2.UnsafeStructType)
+ fields := structFields{}
+
+ var curr []structField
+ next := []structField{{anon: structType}}
+
+ visited := map[uintptr]bool{}
+
+ for len(next) > 0 {
+ curr, next = next, curr[:0]
+
+ for _, f := range curr {
+ rtype := f.anon.RType()
+ if visited[f.anon.RType()] {
+ continue
+ }
+ visited[rtype] = true
+
+ for i := range f.anon.NumField() {
+ field := f.anon.Field(i).(*reflect2.UnsafeStructField)
+ isUnexported := field.PkgPath() != ""
+
+ chain := make([]*reflect2.UnsafeStructField, len(f.Field)+1)
+ copy(chain, f.Field)
+ chain[len(f.Field)] = field
+
+ if field.Anonymous() {
+ t := field.Type()
+ if t.Kind() == reflect.Ptr {
+ t = t.(*reflect2.UnsafePtrType).Elem()
+ }
+ if t.Kind() != reflect.Struct {
+ continue
+ }
+
+ next = append(next, structField{Field: chain, anon: t.(*reflect2.UnsafeStructType)})
+ continue
+ }
+
+ // Ignore unexported fields.
+ if isUnexported {
+ continue
+ }
+
+ fieldName := field.Name()
+ if tag, ok := field.Tag().Lookup(tagKey); ok {
+ fieldName = tag
+ }
+
+ fields = append(fields, &structField{
+ Name: fieldName,
+ Field: chain,
+ })
+ }
+ }
+ }
+
+ return &structDescriptor{
+ Type: structType,
+ Fields: fields,
+ }
+}
diff --git a/vendor/github.com/hamba/avro/v2/codec_skip.go b/vendor/github.com/hamba/avro/v2/codec_skip.go
new file mode 100644
index 0000000..3be0dd6
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec_skip.go
@@ -0,0 +1,225 @@
+package avro
+
+import (
+ "fmt"
+ "unsafe"
+)
+
+func createSkipDecoder(schema Schema) ValDecoder {
+ switch schema.Type() {
+ case Boolean:
+ return &boolSkipDecoder{}
+
+ case Int:
+ return &intSkipDecoder{}
+
+ case Long:
+ return &longSkipDecoder{}
+
+ case Float:
+ return &floatSkipDecoder{}
+
+ case Double:
+ return &doubleSkipDecoder{}
+
+ case String:
+ return &stringSkipDecoder{}
+
+ case Bytes:
+ return &bytesSkipDecoder{}
+
+ case Record:
+ return skipDecoderOfRecord(schema)
+
+ case Ref:
+ return createSkipDecoder(schema.(*RefSchema).Schema())
+
+ case Enum:
+ return &enumSkipDecoder{symbols: schema.(*EnumSchema).Symbols()}
+
+ case Array:
+ return skipDecoderOfArray(schema)
+
+ case Map:
+ return skipDecoderOfMap(schema)
+
+ case Union:
+ return skipDecoderOfUnion(schema)
+
+ case Fixed:
+ return &fixedSkipDecoder{size: schema.(*FixedSchema).Size()}
+
+ default:
+ return &errorDecoder{err: fmt.Errorf("avro: schema type %s is unsupported", schema.Type())}
+ }
+}
+
+type boolSkipDecoder struct{}
+
+func (*boolSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ r.SkipBool()
+}
+
+type intSkipDecoder struct{}
+
+func (*intSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ r.SkipInt()
+}
+
+type longSkipDecoder struct{}
+
+func (*longSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ r.SkipLong()
+}
+
+type floatSkipDecoder struct{}
+
+func (*floatSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ r.SkipFloat()
+}
+
+type doubleSkipDecoder struct{}
+
+func (*doubleSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ r.SkipDouble()
+}
+
+type stringSkipDecoder struct{}
+
+func (*stringSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ r.SkipString()
+}
+
+type bytesSkipDecoder struct{}
+
+func (c *bytesSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ r.SkipBytes()
+}
+
+func skipDecoderOfRecord(schema Schema) ValDecoder {
+ rec := schema.(*RecordSchema)
+
+ decoders := make([]ValDecoder, len(rec.Fields()))
+ for i, field := range rec.Fields() {
+ decoders[i] = createSkipDecoder(field.Type())
+ }
+
+ return &recordSkipDecoder{
+ decoders: decoders,
+ }
+}
+
+type recordSkipDecoder struct {
+ decoders []ValDecoder
+}
+
+func (d *recordSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ for _, decoder := range d.decoders {
+ decoder.Decode(nil, r)
+ }
+}
+
+type enumSkipDecoder struct {
+ symbols []string
+}
+
+func (c *enumSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ r.SkipInt()
+}
+
+func skipDecoderOfArray(schema Schema) ValDecoder {
+ arr := schema.(*ArraySchema)
+ decoder := createSkipDecoder(arr.Items())
+
+ return &sliceSkipDecoder{
+ decoder: decoder,
+ }
+}
+
+type sliceSkipDecoder struct {
+ decoder ValDecoder
+}
+
+func (d *sliceSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ for {
+ l, size := r.ReadBlockHeader()
+ if l == 0 {
+ break
+ }
+
+ if size > 0 {
+ r.SkipNBytes(int(size))
+ continue
+ }
+
+ for range l {
+ d.decoder.Decode(nil, r)
+ }
+ }
+}
+
+func skipDecoderOfMap(schema Schema) ValDecoder {
+ m := schema.(*MapSchema)
+ decoder := createSkipDecoder(m.Values())
+
+ return &mapSkipDecoder{
+ decoder: decoder,
+ }
+}
+
+type mapSkipDecoder struct {
+ decoder ValDecoder
+}
+
+func (d *mapSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ for {
+ l, size := r.ReadBlockHeader()
+ if l == 0 {
+ break
+ }
+
+ if size > 0 {
+ r.SkipNBytes(int(size))
+ continue
+ }
+
+ for range l {
+ r.SkipString()
+ d.decoder.Decode(nil, r)
+ }
+ }
+}
+
+func skipDecoderOfUnion(schema Schema) ValDecoder {
+ union := schema.(*UnionSchema)
+
+ return &unionSkipDecoder{
+ schema: union,
+ }
+}
+
+type unionSkipDecoder struct {
+ schema *UnionSchema
+}
+
+func (d *unionSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ _, resSchema := getUnionSchema(d.schema, r)
+ if resSchema == nil {
+ return
+ }
+
+ // In a null case, just return
+ if resSchema.Type() == Null {
+ return
+ }
+
+ createSkipDecoder(resSchema).Decode(nil, r)
+}
+
+type fixedSkipDecoder struct {
+ size int
+}
+
+func (d *fixedSkipDecoder) Decode(_ unsafe.Pointer, r *Reader) {
+ r.SkipNBytes(d.size)
+}
diff --git a/vendor/github.com/hamba/avro/v2/codec_union.go b/vendor/github.com/hamba/avro/v2/codec_union.go
new file mode 100644
index 0000000..7d80b53
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/codec_union.go
@@ -0,0 +1,460 @@
+package avro
+
+import (
+ "errors"
+ "fmt"
+ "reflect"
+ "strings"
+ "unsafe"
+
+ "github.com/modern-go/reflect2"
+)
+
+func createDecoderOfUnion(d *decoderContext, schema *UnionSchema, typ reflect2.Type) ValDecoder {
+ switch typ.Kind() {
+ case reflect.Map:
+ if typ.(reflect2.MapType).Key().Kind() != reflect.String ||
+ typ.(reflect2.MapType).Elem().Kind() != reflect.Interface {
+ break
+ }
+ return decoderOfMapUnion(d, schema, typ)
+ case reflect.Slice:
+ if !schema.Nullable() {
+ break
+ }
+ return decoderOfNullableUnion(d, schema, typ)
+ case reflect.Ptr:
+ if !schema.Nullable() {
+ break
+ }
+ return decoderOfNullableUnion(d, schema, typ)
+ case reflect.Interface:
+ if _, ok := typ.(*reflect2.UnsafeIFaceType); !ok {
+ dec, err := decoderOfResolvedUnion(d, schema)
+ if err != nil {
+ return &errorDecoder{err: fmt.Errorf("avro: problem resolving decoder for Avro %s: %w", schema.Type(), err)}
+ }
+ return dec
+ }
+ }
+
+ return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())}
+}
+
+func createEncoderOfUnion(e *encoderContext, schema *UnionSchema, typ reflect2.Type) ValEncoder {
+ switch typ.Kind() {
+ case reflect.Map:
+ if typ.(reflect2.MapType).Key().Kind() != reflect.String ||
+ typ.(reflect2.MapType).Elem().Kind() != reflect.Interface {
+ break
+ }
+ return encoderOfMapUnion(e, schema, typ)
+ case reflect.Slice:
+ if !schema.Nullable() {
+ break
+ }
+ return encoderOfNullableUnion(e, schema, typ)
+ case reflect.Ptr:
+ if !schema.Nullable() {
+ break
+ }
+ return encoderOfNullableUnion(e, schema, typ)
+ }
+ return encoderOfResolverUnion(e, schema, typ)
+}
+
+func decoderOfMapUnion(d *decoderContext, union *UnionSchema, typ reflect2.Type) ValDecoder {
+ mapType := typ.(*reflect2.UnsafeMapType)
+
+ typeDecs := make([]ValDecoder, len(union.Types()))
+ for i, s := range union.Types() {
+ if s.Type() == Null {
+ continue
+ }
+ typeDecs[i] = newEfaceDecoder(d, s)
+ }
+
+ return &mapUnionDecoder{
+ cfg: d.cfg,
+ schema: union,
+ mapType: mapType,
+ elemType: mapType.Elem(),
+ typeDecs: typeDecs,
+ }
+}
+
+type mapUnionDecoder struct {
+ cfg *frozenConfig
+ schema *UnionSchema
+ mapType *reflect2.UnsafeMapType
+ elemType reflect2.Type
+ typeDecs []ValDecoder
+}
+
+func (d *mapUnionDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ idx, resSchema := getUnionSchema(d.schema, r)
+ if resSchema == nil {
+ return
+ }
+
+ // In a null case, just return
+ if resSchema.Type() == Null {
+ return
+ }
+
+ if d.mapType.UnsafeIsNil(ptr) {
+ d.mapType.UnsafeSet(ptr, d.mapType.UnsafeMakeMap(1))
+ }
+
+ key := schemaTypeName(resSchema)
+ keyPtr := reflect2.PtrOf(key)
+
+ elemPtr := d.elemType.UnsafeNew()
+ d.typeDecs[idx].Decode(elemPtr, r)
+
+ d.mapType.UnsafeSetIndex(ptr, keyPtr, elemPtr)
+}
+
+func encoderOfMapUnion(e *encoderContext, union *UnionSchema, _ reflect2.Type) ValEncoder {
+ return &mapUnionEncoder{
+ cfg: e.cfg,
+ schema: union,
+ }
+}
+
+type mapUnionEncoder struct {
+ cfg *frozenConfig
+ schema *UnionSchema
+}
+
+func (e *mapUnionEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
+ m := *((*map[string]any)(ptr))
+
+ if len(m) > 1 {
+ w.Error = errors.New("avro: cannot encode union map with multiple entries")
+ return
+ }
+
+ name := "null"
+ val := any(nil)
+ for k, v := range m {
+ name = k
+ val = v
+ break
+ }
+
+ schema, pos := e.schema.Types().Get(name)
+ if schema == nil {
+ w.Error = fmt.Errorf("avro: unknown union type %s", name)
+ return
+ }
+
+ w.WriteInt(int32(pos))
+
+ if schema.Type() == Null && val == nil {
+ return
+ }
+
+ elemType := reflect2.TypeOf(val)
+ elemPtr := reflect2.PtrOf(val)
+
+ encoder := encoderOfType(newEncoderContext(e.cfg), schema, elemType)
+ if elemType.LikePtr() {
+ encoder = &onePtrEncoder{encoder}
+ }
+ encoder.Encode(elemPtr, w)
+}
+
+func decoderOfNullableUnion(d *decoderContext, schema Schema, typ reflect2.Type) ValDecoder {
+ union := schema.(*UnionSchema)
+ _, typeIdx := union.Indices()
+
+ var (
+ baseTyp reflect2.Type
+ isPtr bool
+ )
+ switch v := typ.(type) {
+ case *reflect2.UnsafePtrType:
+ baseTyp = v.Elem()
+ isPtr = true
+ case *reflect2.UnsafeSliceType:
+ baseTyp = v
+ }
+ decoder := decoderOfType(d, union.Types()[typeIdx], baseTyp)
+
+ return &unionNullableDecoder{
+ schema: union,
+ typ: baseTyp,
+ isPtr: isPtr,
+ decoder: decoder,
+ }
+}
+
+type unionNullableDecoder struct {
+ schema *UnionSchema
+ typ reflect2.Type
+ isPtr bool
+ decoder ValDecoder
+}
+
+func (d *unionNullableDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ _, schema := getUnionSchema(d.schema, r)
+ if schema == nil {
+ return
+ }
+
+ if schema.Type() == Null {
+ *((*unsafe.Pointer)(ptr)) = nil
+ return
+ }
+
+ // Handle the non-ptr case separately.
+ if !d.isPtr {
+ if d.typ.UnsafeIsNil(ptr) {
+ // Create a new instance.
+ newPtr := d.typ.UnsafeNew()
+ d.decoder.Decode(newPtr, r)
+ d.typ.UnsafeSet(ptr, newPtr)
+ return
+ }
+
+ // Reuse the existing instance.
+ d.decoder.Decode(ptr, r)
+ return
+ }
+
+ if *((*unsafe.Pointer)(ptr)) == nil {
+ // Create new instance.
+ newPtr := d.typ.UnsafeNew()
+ d.decoder.Decode(newPtr, r)
+ *((*unsafe.Pointer)(ptr)) = newPtr
+ return
+ }
+
+ // Reuse existing instance.
+ d.decoder.Decode(*((*unsafe.Pointer)(ptr)), r)
+}
+
+func encoderOfNullableUnion(e *encoderContext, schema Schema, typ reflect2.Type) ValEncoder {
+ union := schema.(*UnionSchema)
+ nullIdx, typeIdx := union.Indices()
+
+ var (
+ baseTyp reflect2.Type
+ isPtr bool
+ )
+ switch v := typ.(type) {
+ case *reflect2.UnsafePtrType:
+ baseTyp = v.Elem()
+ isPtr = true
+ case *reflect2.UnsafeSliceType:
+ baseTyp = v
+ }
+ encoder := encoderOfType(e, union.Types()[typeIdx], baseTyp)
+
+ return &unionNullableEncoder{
+ schema: union,
+ encoder: encoder,
+ isPtr: isPtr,
+ nullIdx: int32(nullIdx),
+ typeIdx: int32(typeIdx),
+ }
+}
+
+type unionNullableEncoder struct {
+ schema *UnionSchema
+ encoder ValEncoder
+ isPtr bool
+ nullIdx int32
+ typeIdx int32
+}
+
+func (e *unionNullableEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
+ if *((*unsafe.Pointer)(ptr)) == nil {
+ w.WriteInt(e.nullIdx)
+ return
+ }
+
+ w.WriteInt(e.typeIdx)
+ newPtr := ptr
+ if e.isPtr {
+ newPtr = *((*unsafe.Pointer)(ptr))
+ }
+ e.encoder.Encode(newPtr, w)
+}
+
+func decoderOfResolvedUnion(d *decoderContext, schema Schema) (ValDecoder, error) {
+ union := schema.(*UnionSchema)
+
+ types := make([]reflect2.Type, len(union.Types()))
+ decoders := make([]ValDecoder, len(union.Types()))
+ for i, schema := range union.Types() {
+ name := unionResolutionName(schema)
+
+ typ, err := d.cfg.resolver.Type(name)
+ if err != nil {
+ if d.cfg.config.UnionResolutionError {
+ return nil, err
+ }
+
+ if d.cfg.config.PartialUnionTypeResolution {
+ decoders[i] = nil
+ types[i] = nil
+ continue
+ }
+
+ decoders = []ValDecoder{}
+ types = []reflect2.Type{}
+ break
+ }
+
+ decoder := decoderOfType(d, schema, typ)
+ decoders[i] = decoder
+ types[i] = typ
+ }
+
+ return &unionResolvedDecoder{
+ cfg: d.cfg,
+ schema: union,
+ types: types,
+ decoders: decoders,
+ }, nil
+}
+
+type unionResolvedDecoder struct {
+ cfg *frozenConfig
+ schema *UnionSchema
+ types []reflect2.Type
+ decoders []ValDecoder
+}
+
+func (d *unionResolvedDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
+ i, schema := getUnionSchema(d.schema, r)
+ if schema == nil {
+ return
+ }
+
+ pObj := (*any)(ptr)
+
+ if schema.Type() == Null {
+ *pObj = nil
+ return
+ }
+
+ if i >= len(d.decoders) || d.decoders[i] == nil {
+ if d.cfg.config.UnionResolutionError {
+ r.ReportError("decode union type", "unknown union type")
+ return
+ }
+
+ // We cannot resolve this, set it to the map type
+ name := schemaTypeName(schema)
+ obj := map[string]any{}
+ vTyp, err := genericReceiver(schema)
+ if err != nil {
+ r.ReportError("Union", err.Error())
+ return
+ }
+ obj[name] = genericDecode(vTyp, decoderOfType(newDecoderContext(d.cfg), schema, vTyp), r)
+
+ *pObj = obj
+ return
+ }
+
+ typ := d.types[i]
+ var newPtr unsafe.Pointer
+ switch typ.Kind() {
+ case reflect.Map:
+ mapType := typ.(*reflect2.UnsafeMapType)
+ newPtr = mapType.UnsafeMakeMap(1)
+
+ case reflect.Slice:
+ mapType := typ.(*reflect2.UnsafeSliceType)
+ newPtr = mapType.UnsafeMakeSlice(1, 1)
+
+ case reflect.Ptr:
+ elemType := typ.(*reflect2.UnsafePtrType).Elem()
+ newPtr = elemType.UnsafeNew()
+
+ default:
+ newPtr = typ.UnsafeNew()
+ }
+
+ d.decoders[i].Decode(newPtr, r)
+ *pObj = typ.UnsafeIndirect(newPtr)
+}
+
+func unionResolutionName(schema Schema) string {
+ name := schemaTypeName(schema)
+ switch schema.Type() {
+ case Map:
+ name += ":"
+ valSchema := schema.(*MapSchema).Values()
+ valName := schemaTypeName(valSchema)
+
+ name += valName
+
+ case Array:
+ name += ":"
+ itemSchema := schema.(*ArraySchema).Items()
+ itemName := schemaTypeName(itemSchema)
+
+ name += itemName
+ }
+
+ return name
+}
+
+func encoderOfResolverUnion(e *encoderContext, schema Schema, typ reflect2.Type) ValEncoder {
+ union := schema.(*UnionSchema)
+
+ names, err := e.cfg.resolver.Name(typ)
+ if err != nil {
+ return &errorEncoder{err: err}
+ }
+
+ var pos int
+ for _, name := range names {
+ if idx := strings.Index(name, ":"); idx > 0 {
+ name = name[:idx]
+ }
+
+ schema, pos = union.Types().Get(name)
+ if schema != nil {
+ break
+ }
+ }
+ if schema == nil {
+ return &errorEncoder{err: fmt.Errorf("avro: unknown union type %s", names[0])}
+ }
+
+ encoder := encoderOfType(e, schema, typ)
+
+ return &unionResolverEncoder{
+ pos: pos,
+ encoder: encoder,
+ }
+}
+
+type unionResolverEncoder struct {
+ pos int
+ encoder ValEncoder
+}
+
+func (e *unionResolverEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
+ w.WriteInt(int32(e.pos))
+
+ e.encoder.Encode(ptr, w)
+}
+
+func getUnionSchema(schema *UnionSchema, r *Reader) (int, Schema) {
+ types := schema.Types()
+
+ idx := int(r.ReadInt())
+ if idx < 0 || idx > len(types)-1 {
+ r.ReportError("decode union type", "unknown union type")
+ return 0, nil
+ }
+
+ return idx, types[idx]
+}
diff --git a/vendor/github.com/hamba/avro/v2/config.go b/vendor/github.com/hamba/avro/v2/config.go
new file mode 100644
index 0000000..57b2769
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/config.go
@@ -0,0 +1,282 @@
+package avro
+
+import (
+ "errors"
+ "io"
+ "sync"
+
+ "github.com/modern-go/reflect2"
+)
+
+const (
+ defaultMaxByteSliceSize = 1_048_576 // 1 MiB
+)
+
+// DefaultConfig is the default API.
+var DefaultConfig = Config{}.Freeze()
+
+// Config customises how the codec should behave.
+type Config struct {
+ // TagKey is the struct tag key used when en/decoding structs.
+ // This defaults to "avro".
+ TagKey string
+
+ // BlockLength is the length of blocks for maps and arrays.
+ // This defaults to 100.
+ BlockLength int
+
+ // DisableBlockSizeHeader disables encoding of an array/map size in bytes.
+ // Encoded array/map will be prefixed with only the number of elements in
+ // contrast with default behavior which prefixes them with the number of elements
+ // and the total number of bytes in the array/map. Both approaches are valid according to the
+ // Avro specification, however not all decoders support the latter.
+ DisableBlockSizeHeader bool
+
+ // UnionResolutionError determines if an error will be returned
+ // when a type cannot be resolved while decoding a union.
+ UnionResolutionError bool
+
+ // PartialUnionTypeResolution dictates if the union type resolution
+ // should be attempted even when not all union types are registered.
+ // When enabled, the underlying type will get resolved if it is registered
+ // even if other types of the union are not. If resolution fails, logic
+ // falls back to default union resolution behavior based on the value of
+ // UnionResolutionError.
+ PartialUnionTypeResolution bool
+
+ // Disable caching layer for encoders and decoders, forcing them to get rebuilt on every
+ // call to Marshal() and Unmarshal()
+ DisableCaching bool
+
+ // MaxByteSliceSize is the maximum size of `bytes` or `string` types the Reader will create, defaulting to 1MiB.
+ // If this size is exceeded, the Reader returns an error. This can be disabled by setting a negative number.
+ MaxByteSliceSize int
+
+ // MaxSliceAllocSize is the maximum size that the decoder will allocate, set to the max heap
+ // allocation size by default.
+ // If this size is exceeded, the decoder returns an error.
+ MaxSliceAllocSize int
+}
+
+// Freeze makes the configuration immutable.
+func (c Config) Freeze() API {
+ api := &frozenConfig{
+ config: c,
+ resolver: NewTypeResolver(),
+ }
+
+ api.readerPool = &sync.Pool{
+ New: func() any {
+ return &Reader{
+ cfg: api,
+ reader: nil,
+ buf: nil,
+ head: 0,
+ tail: 0,
+ }
+ },
+ }
+ api.writerPool = &sync.Pool{
+ New: func() any {
+ return &Writer{
+ cfg: api,
+ out: nil,
+ buf: make([]byte, 0, 512),
+ Error: nil,
+ }
+ },
+ }
+
+ return api
+}
+
+// API represents a frozen Config.
+type API interface {
+ // Marshal returns the Avro encoding of v.
+ Marshal(schema Schema, v any) ([]byte, error)
+
+ // Unmarshal parses the Avro encoded data and stores the result in the value pointed to by v.
+ // If v is nil or not a pointer, Unmarshal returns an error.
+ Unmarshal(schema Schema, data []byte, v any) error
+
+ // NewEncoder returns a new encoder that writes to w using schema.
+ NewEncoder(schema Schema, w io.Writer) *Encoder
+
+ // NewDecoder returns a new decoder that reads from reader r using schema.
+ NewDecoder(schema Schema, r io.Reader) *Decoder
+
+ // DecoderOf returns the value decoder for a given schema and type.
+ DecoderOf(schema Schema, typ reflect2.Type) ValDecoder
+
+ // EncoderOf returns the value encoder for a given schema and type.
+ EncoderOf(schema Schema, tpy reflect2.Type) ValEncoder
+
+ // Register registers names to their types for resolution. All primitive types are pre-registered.
+ Register(name string, obj any)
+}
+
+type frozenConfig struct {
+ config Config
+
+ decoderCache sync.Map // map[cacheKey]ValDecoder
+ encoderCache sync.Map // map[cacheKey]ValEncoder
+
+ readerPool *sync.Pool
+ writerPool *sync.Pool
+
+ resolver *TypeResolver
+}
+
+func (c *frozenConfig) Marshal(schema Schema, v any) ([]byte, error) {
+ writer := c.borrowWriter()
+ defer c.returnWriter(writer)
+
+ writer.WriteVal(schema, v)
+ if err := writer.Error; err != nil {
+ return nil, err
+ }
+
+ result := writer.Buffer()
+ copied := make([]byte, len(result))
+ copy(copied, result)
+
+ return copied, nil
+}
+
+func (c *frozenConfig) borrowWriter() *Writer {
+ writer := c.writerPool.Get().(*Writer)
+ writer.Reset(nil)
+ return writer
+}
+
+func (c *frozenConfig) returnWriter(writer *Writer) {
+ writer.out = nil
+ writer.Error = nil
+
+ c.writerPool.Put(writer)
+}
+
+func (c *frozenConfig) Unmarshal(schema Schema, data []byte, v any) error {
+ reader := c.borrowReader(data)
+ defer c.returnReader(reader)
+
+ reader.ReadVal(schema, v)
+ err := reader.Error
+
+ if errors.Is(err, io.EOF) {
+ return nil
+ }
+
+ return err
+}
+
+func (c *frozenConfig) borrowReader(data []byte) *Reader {
+ reader := c.readerPool.Get().(*Reader)
+ reader.Reset(data)
+ return reader
+}
+
+func (c *frozenConfig) returnReader(reader *Reader) {
+ reader.Error = nil
+ c.readerPool.Put(reader)
+}
+
+func (c *frozenConfig) NewEncoder(schema Schema, w io.Writer) *Encoder {
+ writer, ok := w.(*Writer)
+ if !ok {
+ writer = NewWriter(w, 512, WithWriterConfig(c))
+ }
+ return &Encoder{
+ s: schema,
+ w: writer,
+ }
+}
+
+func (c *frozenConfig) NewDecoder(schema Schema, r io.Reader) *Decoder {
+ reader := NewReader(r, 512, WithReaderConfig(c))
+ return &Decoder{
+ s: schema,
+ r: reader,
+ }
+}
+
+func (c *frozenConfig) Register(name string, obj any) {
+ c.resolver.Register(name, obj)
+}
+
+type cacheKey struct {
+ fingerprint [32]byte
+ rtype uintptr
+}
+
+func (c *frozenConfig) addDecoderToCache(fingerprint [32]byte, rtype uintptr, dec ValDecoder) {
+ if c.config.DisableCaching {
+ return
+ }
+ key := cacheKey{fingerprint: fingerprint, rtype: rtype}
+ c.decoderCache.Store(key, dec)
+}
+
+func (c *frozenConfig) getDecoderFromCache(fingerprint [32]byte, rtype uintptr) ValDecoder {
+ if c.config.DisableCaching {
+ return nil
+ }
+ key := cacheKey{fingerprint: fingerprint, rtype: rtype}
+ if dec, ok := c.decoderCache.Load(key); ok {
+ return dec.(ValDecoder)
+ }
+
+ return nil
+}
+
+func (c *frozenConfig) addEncoderToCache(fingerprint [32]byte, rtype uintptr, enc ValEncoder) {
+ if c.config.DisableCaching {
+ return
+ }
+ key := cacheKey{fingerprint: fingerprint, rtype: rtype}
+ c.encoderCache.Store(key, enc)
+}
+
+func (c *frozenConfig) getEncoderFromCache(fingerprint [32]byte, rtype uintptr) ValEncoder {
+ if c.config.DisableCaching {
+ return nil
+ }
+ key := cacheKey{fingerprint: fingerprint, rtype: rtype}
+ if enc, ok := c.encoderCache.Load(key); ok {
+ return enc.(ValEncoder)
+ }
+
+ return nil
+}
+
+func (c *frozenConfig) getTagKey() string {
+ tagKey := c.config.TagKey
+ if tagKey == "" {
+ return "avro"
+ }
+ return tagKey
+}
+
+func (c *frozenConfig) getBlockLength() int {
+ blockSize := c.config.BlockLength
+ if blockSize <= 0 {
+ return 100
+ }
+ return blockSize
+}
+
+func (c *frozenConfig) getMaxByteSliceSize() int {
+ size := c.config.MaxByteSliceSize
+ if size == 0 {
+ return defaultMaxByteSliceSize
+ }
+ return size
+}
+
+func (c *frozenConfig) getMaxSliceAllocSize() int {
+ size := c.config.MaxSliceAllocSize
+ if size > maxAllocSize || size <= 0 {
+ return maxAllocSize
+ }
+ return size
+}
diff --git a/vendor/github.com/hamba/avro/v2/config_386.go b/vendor/github.com/hamba/avro/v2/config_386.go
new file mode 100644
index 0000000..a168fd7
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/config_386.go
@@ -0,0 +1,8 @@
+package avro
+
+import "math"
+
+// Max allocation size for an array due to the limit in number of bits in a heap address:
+// https://github.com/golang/go/blob/7f76c00fc5678fa782708ba8fece63750cb89d03/src/runtime/malloc.go#L190
+// 32-bit systems accept the full 32bit address space
+const maxAllocSize = math.MaxInt
diff --git a/vendor/github.com/hamba/avro/v2/config_arm.go b/vendor/github.com/hamba/avro/v2/config_arm.go
new file mode 100644
index 0000000..a168fd7
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/config_arm.go
@@ -0,0 +1,8 @@
+package avro
+
+import "math"
+
+// Max allocation size for an array due to the limit in number of bits in a heap address:
+// https://github.com/golang/go/blob/7f76c00fc5678fa782708ba8fece63750cb89d03/src/runtime/malloc.go#L190
+// 32-bit systems accept the full 32bit address space
+const maxAllocSize = math.MaxInt
diff --git a/vendor/github.com/hamba/avro/v2/config_x64.go b/vendor/github.com/hamba/avro/v2/config_x64.go
new file mode 100644
index 0000000..cecafd6
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/config_x64.go
@@ -0,0 +1,7 @@
+//go:build !386 && !arm
+
+package avro
+
+// Max allocation size for an array due to the limit in number of bits in a heap address:
+// https://github.com/golang/go/blob/7f76c00fc5678fa782708ba8fece63750cb89d03/src/runtime/malloc.go#L183
+const maxAllocSize = int(1 << 48)
diff --git a/vendor/github.com/hamba/avro/v2/converter.go b/vendor/github.com/hamba/avro/v2/converter.go
new file mode 100644
index 0000000..cc1f17c
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/converter.go
@@ -0,0 +1,34 @@
+package avro
+
+func createLongConverter(typ Type) func(*Reader) int64 {
+ switch typ {
+ case Int:
+ return func(r *Reader) int64 { return int64(r.ReadInt()) }
+ default:
+ return nil
+ }
+}
+
+func createFloatConverter(typ Type) func(*Reader) float32 {
+ switch typ {
+ case Int:
+ return func(r *Reader) float32 { return float32(r.ReadInt()) }
+ case Long:
+ return func(r *Reader) float32 { return float32(r.ReadLong()) }
+ default:
+ return nil
+ }
+}
+
+func createDoubleConverter(typ Type) func(*Reader) float64 {
+ switch typ {
+ case Int:
+ return func(r *Reader) float64 { return float64(r.ReadInt()) }
+ case Long:
+ return func(r *Reader) float64 { return float64(r.ReadLong()) }
+ case Float:
+ return func(r *Reader) float64 { return float64(r.ReadFloat()) }
+ default:
+ return nil
+ }
+}
diff --git a/vendor/github.com/hamba/avro/v2/decoder.go b/vendor/github.com/hamba/avro/v2/decoder.go
new file mode 100644
index 0000000..0e0ab9d
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/decoder.go
@@ -0,0 +1,49 @@
+package avro
+
+import (
+ "io"
+)
+
+// Decoder reads and decodes Avro values from an input stream.
+type Decoder struct {
+ s Schema
+ r *Reader
+}
+
+// NewDecoder returns a new decoder that reads from reader r using schema s.
+func NewDecoder(s string, r io.Reader) (*Decoder, error) {
+ sch, err := Parse(s)
+ if err != nil {
+ return nil, err
+ }
+
+ return NewDecoderForSchema(sch, r), nil
+}
+
+// NewDecoderForSchema returns a new decoder that reads from r using schema.
+func NewDecoderForSchema(schema Schema, reader io.Reader) *Decoder {
+ return DefaultConfig.NewDecoder(schema, reader)
+}
+
+// Decode reads the next Avro encoded value from its input and stores it in the value pointed to by v.
+func (d *Decoder) Decode(v any) error {
+ if d.r.head == d.r.tail && d.r.reader != nil {
+ if !d.r.loadMore() {
+ return io.EOF
+ }
+ }
+
+ d.r.ReadVal(d.s, v)
+
+ //nolint:errorlint // Only direct EOF errors should be discarded.
+ if d.r.Error == io.EOF {
+ return nil
+ }
+ return d.r.Error
+}
+
+// Unmarshal parses the Avro encoded data and stores the result in the value pointed to by v.
+// If v is nil or not a pointer, Unmarshal returns an error.
+func Unmarshal(schema Schema, data []byte, v any) error {
+ return DefaultConfig.Unmarshal(schema, data, v)
+}
diff --git a/vendor/github.com/hamba/avro/v2/doc.go b/vendor/github.com/hamba/avro/v2/doc.go
new file mode 100644
index 0000000..5445573
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/doc.go
@@ -0,0 +1,4 @@
+// Package avro implements encoding and decoding of Avro as defined by the Avro specification.
+//
+// See the Avro specification for an understanding of Avro: http://avro.apache.org/docs/current/
+package avro
diff --git a/vendor/github.com/hamba/avro/v2/encoder.go b/vendor/github.com/hamba/avro/v2/encoder.go
new file mode 100644
index 0000000..faa285e
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/encoder.go
@@ -0,0 +1,37 @@
+package avro
+
+import (
+ "io"
+)
+
+// Encoder writes Avro values to an output stream.
+type Encoder struct {
+ s Schema
+ w *Writer
+}
+
+// NewEncoder returns a new encoder that writes to w using schema s.
+func NewEncoder(s string, w io.Writer) (*Encoder, error) {
+ sch, err := Parse(s)
+ if err != nil {
+ return nil, err
+ }
+ return NewEncoderForSchema(sch, w), nil
+}
+
+// NewEncoderForSchema returns a new encoder that writes to w using schema.
+func NewEncoderForSchema(schema Schema, w io.Writer) *Encoder {
+ return DefaultConfig.NewEncoder(schema, w)
+}
+
+// Encode writes the Avro encoding of v to the stream.
+func (e *Encoder) Encode(v any) error {
+ e.w.WriteVal(e.s, v)
+ _ = e.w.Flush()
+ return e.w.Error
+}
+
+// Marshal returns the Avro encoding of v.
+func Marshal(schema Schema, v any) ([]byte, error) {
+ return DefaultConfig.Marshal(schema, v)
+}
diff --git a/vendor/github.com/hamba/avro/v2/internal/bytesx/reset.go b/vendor/github.com/hamba/avro/v2/internal/bytesx/reset.go
new file mode 100644
index 0000000..0e2485c
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/internal/bytesx/reset.go
@@ -0,0 +1,38 @@
+// Package bytesx implements bytes extensions.
+package bytesx
+
+import "io"
+
+// ResetReader implements the io.Reader reading from a resettable byte slice.
+type ResetReader struct {
+ buf []byte
+ head int
+ tail int
+}
+
+// NewResetReader returns a new ResetReader reading from b.
+func NewResetReader(b []byte) *ResetReader {
+ r := &ResetReader{}
+ r.Reset(b)
+
+ return r
+}
+
+// Read reads bytes into p.
+func (r *ResetReader) Read(p []byte) (int, error) {
+ if r.head == r.tail {
+ return 0, io.EOF
+ }
+
+ n := copy(p, r.buf[r.head:])
+ r.head += n
+
+ return n, nil
+}
+
+// Reset resets the byte slice being read from.
+func (r *ResetReader) Reset(b []byte) {
+ r.buf = b
+ r.head = 0
+ r.tail = len(b)
+}
diff --git a/vendor/github.com/hamba/avro/v2/noescape.go b/vendor/github.com/hamba/avro/v2/noescape.go
new file mode 100644
index 0000000..8907846
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/noescape.go
@@ -0,0 +1,21 @@
+package avro
+
+import (
+ "unsafe"
+)
+
+// noescape hides a pointer from escape analysis. noescape is
+// the identity function but escape analysis doesn't think the
+// output depends on the input. noescape is inlined and currently
+// compiles down to zero instructions.
+// USE CAREFULLY!
+//
+// This function is taken from Go std lib:
+// https://github.com/golang/go/blob/master/src/runtime/stubs.go#L178
+//
+//nolint:govet,staticcheck
+//go:nosplit
+func noescape(p unsafe.Pointer) unsafe.Pointer {
+ x := uintptr(p)
+ return unsafe.Pointer(x ^ 0)
+}
diff --git a/vendor/github.com/hamba/avro/v2/noescape.s b/vendor/github.com/hamba/avro/v2/noescape.s
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/noescape.s
diff --git a/vendor/github.com/hamba/avro/v2/ocf/codec.go b/vendor/github.com/hamba/avro/v2/ocf/codec.go
new file mode 100644
index 0000000..8d0cab2
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/ocf/codec.go
@@ -0,0 +1,164 @@
+package ocf
+
+import (
+ "bytes"
+ "compress/flate"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "hash/crc32"
+ "io"
+
+ "github.com/golang/snappy"
+ "github.com/klauspost/compress/zstd"
+)
+
+// CodecName represents a compression codec name.
+type CodecName string
+
+// Supported compression codecs.
+const (
+ Null CodecName = "null"
+ Deflate CodecName = "deflate"
+ Snappy CodecName = "snappy"
+ ZStandard CodecName = "zstandard"
+)
+
+type codecOptions struct {
+ DeflateCompressionLevel int
+ ZStandardOptions zstdOptions
+}
+
+type zstdOptions struct {
+ EOptions []zstd.EOption
+ DOptions []zstd.DOption
+}
+
+func resolveCodec(name CodecName, codecOpts codecOptions) (Codec, error) {
+ switch name {
+ case Null, "":
+ return &NullCodec{}, nil
+
+ case Deflate:
+ return &DeflateCodec{compLvl: codecOpts.DeflateCompressionLevel}, nil
+
+ case Snappy:
+ return &SnappyCodec{}, nil
+
+ case ZStandard:
+ return newZStandardCodec(codecOpts.ZStandardOptions), nil
+
+ default:
+ return nil, fmt.Errorf("unknown codec %s", name)
+ }
+}
+
+// Codec represents a compression codec.
+type Codec interface {
+ // Decode decodes the given bytes.
+ Decode([]byte) ([]byte, error)
+ // Encode encodes the given bytes.
+ Encode([]byte) []byte
+}
+
+// NullCodec is a no op codec.
+type NullCodec struct{}
+
+// Decode decodes the given bytes.
+func (*NullCodec) Decode(b []byte) ([]byte, error) {
+ return b, nil
+}
+
+// Encode encodes the given bytes.
+func (*NullCodec) Encode(b []byte) []byte {
+ return b
+}
+
+// DeflateCodec is a flate compression codec.
+type DeflateCodec struct {
+ compLvl int
+}
+
+// Decode decodes the given bytes.
+func (c *DeflateCodec) Decode(b []byte) ([]byte, error) {
+ r := flate.NewReader(bytes.NewBuffer(b))
+ data, err := io.ReadAll(r)
+ if err != nil {
+ _ = r.Close()
+ return nil, err
+ }
+ _ = r.Close()
+
+ return data, nil
+}
+
+// Encode encodes the given bytes.
+func (c *DeflateCodec) Encode(b []byte) []byte {
+ data := bytes.NewBuffer(make([]byte, 0, len(b)))
+
+ w, _ := flate.NewWriter(data, c.compLvl)
+ _, _ = w.Write(b)
+ _ = w.Close()
+
+ return data.Bytes()
+}
+
+// SnappyCodec is a snappy compression codec.
+type SnappyCodec struct{}
+
+// Decode decodes the given bytes.
+func (*SnappyCodec) Decode(b []byte) ([]byte, error) {
+ l := len(b)
+ if l < 5 {
+ return nil, errors.New("block does not contain snappy checksum")
+ }
+
+ dst, err := snappy.Decode(nil, b[:l-4])
+ if err != nil {
+ return nil, err
+ }
+
+ crc := binary.BigEndian.Uint32(b[l-4:])
+ if crc32.ChecksumIEEE(dst) != crc {
+ return nil, errors.New("snappy checksum mismatch")
+ }
+
+ return dst, nil
+}
+
+// Encode encodes the given bytes.
+func (*SnappyCodec) Encode(b []byte) []byte {
+ dst := snappy.Encode(nil, b)
+
+ dst = append(dst, 0, 0, 0, 0)
+ binary.BigEndian.PutUint32(dst[len(dst)-4:], crc32.ChecksumIEEE(b))
+
+ return dst
+}
+
+// ZStandardCodec is a zstandard compression codec.
+type ZStandardCodec struct {
+ decoder *zstd.Decoder
+ encoder *zstd.Encoder
+}
+
+func newZStandardCodec(opts zstdOptions) *ZStandardCodec {
+ decoder, _ := zstd.NewReader(nil, opts.DOptions...)
+ encoder, _ := zstd.NewWriter(nil, opts.EOptions...)
+ return &ZStandardCodec{
+ decoder: decoder,
+ encoder: encoder,
+ }
+}
+
+// Decode decodes the given bytes.
+func (zstdCodec *ZStandardCodec) Decode(b []byte) ([]byte, error) {
+ defer func() { _ = zstdCodec.decoder.Reset(nil) }()
+ return zstdCodec.decoder.DecodeAll(b, nil)
+}
+
+// Encode encodes the given bytes.
+func (zstdCodec *ZStandardCodec) Encode(b []byte) []byte {
+ defer zstdCodec.encoder.Reset(nil)
+ return zstdCodec.encoder.EncodeAll(b, nil)
+}
diff --git a/vendor/github.com/hamba/avro/v2/ocf/ocf.go b/vendor/github.com/hamba/avro/v2/ocf/ocf.go
new file mode 100644
index 0000000..b3d7408
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/ocf/ocf.go
@@ -0,0 +1,554 @@
+// Package ocf implements encoding and decoding of Avro Object Container Files as defined by the Avro specification.
+//
+// See the Avro specification for an understanding of Avro: http://avro.apache.org/docs/current/
+package ocf
+
+import (
+ "bytes"
+ "compress/flate"
+ "crypto/rand"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+
+ "github.com/hamba/avro/v2"
+ "github.com/hamba/avro/v2/internal/bytesx"
+ "github.com/klauspost/compress/zstd"
+)
+
+const (
+ schemaKey = "avro.schema"
+ codecKey = "avro.codec"
+)
+
+var (
+ magicBytes = [4]byte{'O', 'b', 'j', 1}
+
+ // HeaderSchema is the Avro schema of a container file header.
+ HeaderSchema = avro.MustParse(`{
+ "type": "record",
+ "name": "org.apache.avro.file.Header",
+ "fields": [
+ {"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}},
+ {"name": "meta", "type": {"type": "map", "values": "bytes"}},
+ {"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}}
+ ]
+}`)
+
+ // DefaultSchemaMarshaler calls the schema's String() method, to produce
+ // a "canonical" schema.
+ DefaultSchemaMarshaler = defaultMarshalSchema
+ // FullSchemaMarshaler calls the schema's MarshalJSON() method, to produce
+ // a schema with all details preserved. The "canonical" schema returned by
+ // the default marshaler does not preserve a type's extra properties.
+ FullSchemaMarshaler = fullMarshalSchema
+)
+
+// Header represents an Avro container file header.
+type Header struct {
+ Magic [4]byte `avro:"magic"`
+ Meta map[string][]byte `avro:"meta"`
+ Sync [16]byte `avro:"sync"`
+}
+
+type decoderConfig struct {
+ DecoderConfig avro.API
+ SchemaCache *avro.SchemaCache
+ CodecOptions codecOptions
+}
+
+// DecoderFunc represents a configuration function for Decoder.
+type DecoderFunc func(cfg *decoderConfig)
+
+// WithDecoderConfig sets the value decoder config on the OCF decoder.
+func WithDecoderConfig(wCfg avro.API) DecoderFunc {
+ return func(cfg *decoderConfig) {
+ cfg.DecoderConfig = wCfg
+ }
+}
+
+// WithDecoderSchemaCache sets the schema cache for the decoder.
+// If not specified, defaults to avro.DefaultSchemaCache.
+func WithDecoderSchemaCache(cache *avro.SchemaCache) DecoderFunc {
+ return func(cfg *decoderConfig) {
+ cfg.SchemaCache = cache
+ }
+}
+
+// WithZStandardDecoderOptions sets the options for the ZStandard decoder.
+func WithZStandardDecoderOptions(opts ...zstd.DOption) DecoderFunc {
+ return func(cfg *decoderConfig) {
+ cfg.CodecOptions.ZStandardOptions.DOptions = append(cfg.CodecOptions.ZStandardOptions.DOptions, opts...)
+ }
+}
+
+// Decoder reads and decodes Avro values from a container file.
+type Decoder struct {
+ reader *avro.Reader
+ resetReader *bytesx.ResetReader
+ decoder *avro.Decoder
+ meta map[string][]byte
+ sync [16]byte
+ schema avro.Schema
+
+ codec Codec
+
+ count int64
+}
+
+// NewDecoder returns a new decoder that reads from reader r.
+func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) {
+ cfg := decoderConfig{
+ DecoderConfig: avro.DefaultConfig,
+ SchemaCache: avro.DefaultSchemaCache,
+ CodecOptions: codecOptions{
+ DeflateCompressionLevel: flate.DefaultCompression,
+ },
+ }
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ reader := avro.NewReader(r, 1024)
+
+ h, err := readHeader(reader, cfg.SchemaCache, cfg.CodecOptions)
+ if err != nil {
+ return nil, fmt.Errorf("decoder: %w", err)
+ }
+
+ decReader := bytesx.NewResetReader([]byte{})
+
+ return &Decoder{
+ reader: reader,
+ resetReader: decReader,
+ decoder: cfg.DecoderConfig.NewDecoder(h.Schema, decReader),
+ meta: h.Meta,
+ sync: h.Sync,
+ codec: h.Codec,
+ schema: h.Schema,
+ }, nil
+}
+
+// Metadata returns the header metadata.
+func (d *Decoder) Metadata() map[string][]byte {
+ return d.meta
+}
+
+// Schema returns the schema that was parsed from the file's metadata
+// and that is used to interpret the file's contents.
+func (d *Decoder) Schema() avro.Schema {
+ return d.schema
+}
+
+// HasNext determines if there is another value to read.
+func (d *Decoder) HasNext() bool {
+ if d.count <= 0 {
+ count := d.readBlock()
+ d.count = count
+ }
+
+ if d.reader.Error != nil {
+ return false
+ }
+
+ return d.count > 0
+}
+
+// Decode reads the next Avro encoded value from its input and stores it in the value pointed to by v.
+func (d *Decoder) Decode(v any) error {
+ if d.count <= 0 {
+ return errors.New("decoder: no data found, call HasNext first")
+ }
+
+ d.count--
+
+ return d.decoder.Decode(v)
+}
+
+// Error returns the last reader error.
+func (d *Decoder) Error() error {
+ if errors.Is(d.reader.Error, io.EOF) {
+ return nil
+ }
+
+ return d.reader.Error
+}
+
+func (d *Decoder) readBlock() int64 {
+ _ = d.reader.Peek()
+ if errors.Is(d.reader.Error, io.EOF) {
+ // There is no next block
+ return 0
+ }
+
+ count := d.reader.ReadLong()
+ size := d.reader.ReadLong()
+
+ // Read the blocks data
+ switch {
+ case count > 0:
+ data := make([]byte, size)
+ d.reader.Read(data)
+
+ data, err := d.codec.Decode(data)
+ if err != nil {
+ d.reader.Error = err
+ }
+
+ d.resetReader.Reset(data)
+
+ case size > 0:
+ // Skip the block data when count is 0
+ data := make([]byte, size)
+ d.reader.Read(data)
+ }
+
+ // Read the sync.
+ var sync [16]byte
+ d.reader.Read(sync[:])
+ if d.sync != sync && !errors.Is(d.reader.Error, io.EOF) {
+ d.reader.Error = errors.New("decoder: invalid block")
+ }
+
+ return count
+}
+
+type encoderConfig struct {
+ BlockLength int
+ CodecName CodecName
+ CodecOptions codecOptions
+ Metadata map[string][]byte
+ Sync [16]byte
+ EncodingConfig avro.API
+ SchemaCache *avro.SchemaCache
+ SchemaMarshaler func(avro.Schema) ([]byte, error)
+}
+
+// EncoderFunc represents a configuration function for Encoder.
+type EncoderFunc func(cfg *encoderConfig)
+
+// WithBlockLength sets the block length on the encoder.
+func WithBlockLength(length int) EncoderFunc {
+ return func(cfg *encoderConfig) {
+ cfg.BlockLength = length
+ }
+}
+
+// WithCodec sets the compression codec on the encoder.
+func WithCodec(codec CodecName) EncoderFunc {
+ return func(cfg *encoderConfig) {
+ cfg.CodecName = codec
+ }
+}
+
+// WithCompressionLevel sets the compression codec to deflate and
+// the compression level on the encoder.
+func WithCompressionLevel(compLvl int) EncoderFunc {
+ return func(cfg *encoderConfig) {
+ cfg.CodecName = Deflate
+ cfg.CodecOptions.DeflateCompressionLevel = compLvl
+ }
+}
+
+// WithZStandardEncoderOptions sets the options for the ZStandard encoder.
+func WithZStandardEncoderOptions(opts ...zstd.EOption) EncoderFunc {
+ return func(cfg *encoderConfig) {
+ cfg.CodecOptions.ZStandardOptions.EOptions = append(cfg.CodecOptions.ZStandardOptions.EOptions, opts...)
+ }
+}
+
+// WithMetadata sets the metadata on the encoder header.
+func WithMetadata(meta map[string][]byte) EncoderFunc {
+ return func(cfg *encoderConfig) {
+ cfg.Metadata = meta
+ }
+}
+
+// WithEncoderSchemaCache sets the schema cache for the encoder.
+// If not specified, defaults to avro.DefaultSchemaCache.
+func WithEncoderSchemaCache(cache *avro.SchemaCache) EncoderFunc {
+ return func(cfg *encoderConfig) {
+ cfg.SchemaCache = cache
+ }
+}
+
+// WithSchemaMarshaler sets the schema marshaler for the encoder.
+// If not specified, defaults to DefaultSchemaMarshaler.
+func WithSchemaMarshaler(m func(avro.Schema) ([]byte, error)) EncoderFunc {
+ return func(cfg *encoderConfig) {
+ cfg.SchemaMarshaler = m
+ }
+}
+
+// WithSyncBlock sets the sync block.
+func WithSyncBlock(sync [16]byte) EncoderFunc {
+ return func(cfg *encoderConfig) {
+ cfg.Sync = sync
+ }
+}
+
+// WithEncodingConfig sets the value encoder config on the OCF encoder.
+func WithEncodingConfig(wCfg avro.API) EncoderFunc {
+ return func(cfg *encoderConfig) {
+ cfg.EncodingConfig = wCfg
+ }
+}
+
+// Encoder writes Avro container file to an output stream.
+type Encoder struct {
+ writer *avro.Writer
+ buf *bytes.Buffer
+ encoder *avro.Encoder
+ sync [16]byte
+
+ codec Codec
+
+ blockLength int
+ count int
+}
+
+// NewEncoder returns a new encoder that writes to w using schema s.
+//
+// If the writer is an existing ocf file, it will append data using the
+// existing schema.
+func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
+ cfg := computeEncoderConfig(opts)
+ schema, err := avro.ParseWithCache(s, "", cfg.SchemaCache)
+ if err != nil {
+ return nil, err
+ }
+ return newEncoder(schema, w, cfg)
+}
+
+// NewEncoderWithSchema returns a new encoder that writes to w using schema s.
+//
+// If the writer is an existing ocf file, it will append data using the
+// existing schema.
+func NewEncoderWithSchema(schema avro.Schema, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
+ return newEncoder(schema, w, computeEncoderConfig(opts))
+}
+
+func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, error) {
+ switch file := w.(type) {
+ case nil:
+ return nil, errors.New("writer cannot be nil")
+ case *os.File:
+ info, err := file.Stat()
+ if err != nil {
+ return nil, err
+ }
+
+ if info.Size() > 0 {
+ reader := avro.NewReader(file, 1024)
+ h, err := readHeader(reader, cfg.SchemaCache, cfg.CodecOptions)
+ if err != nil {
+ return nil, err
+ }
+ if err = skipToEnd(reader, h.Sync); err != nil {
+ return nil, err
+ }
+
+ writer := avro.NewWriter(w, 512, avro.WithWriterConfig(cfg.EncodingConfig))
+ buf := &bytes.Buffer{}
+ e := &Encoder{
+ writer: writer,
+ buf: buf,
+ encoder: cfg.EncodingConfig.NewEncoder(h.Schema, buf),
+ sync: h.Sync,
+ codec: h.Codec,
+ blockLength: cfg.BlockLength,
+ }
+ return e, nil
+ }
+ }
+
+ schemaJSON, err := cfg.SchemaMarshaler(schema)
+ if err != nil {
+ return nil, err
+ }
+
+ cfg.Metadata[schemaKey] = schemaJSON
+ cfg.Metadata[codecKey] = []byte(cfg.CodecName)
+ header := Header{
+ Magic: magicBytes,
+ Meta: cfg.Metadata,
+ }
+ header.Sync = cfg.Sync
+ if header.Sync == [16]byte{} {
+ _, _ = rand.Read(header.Sync[:])
+ }
+
+ codec, err := resolveCodec(cfg.CodecName, cfg.CodecOptions)
+ if err != nil {
+ return nil, err
+ }
+
+ writer := avro.NewWriter(w, 512, avro.WithWriterConfig(cfg.EncodingConfig))
+ writer.WriteVal(HeaderSchema, header)
+ if err = writer.Flush(); err != nil {
+ return nil, err
+ }
+
+ buf := &bytes.Buffer{}
+ e := &Encoder{
+ writer: writer,
+ buf: buf,
+ encoder: cfg.EncodingConfig.NewEncoder(schema, buf),
+ sync: header.Sync,
+ codec: codec,
+ blockLength: cfg.BlockLength,
+ }
+ return e, nil
+}
+
+func computeEncoderConfig(opts []EncoderFunc) encoderConfig {
+ cfg := encoderConfig{
+ BlockLength: 100,
+ CodecName: Null,
+ CodecOptions: codecOptions{
+ DeflateCompressionLevel: flate.DefaultCompression,
+ },
+ Metadata: map[string][]byte{},
+ EncodingConfig: avro.DefaultConfig,
+ SchemaCache: avro.DefaultSchemaCache,
+ SchemaMarshaler: DefaultSchemaMarshaler,
+ }
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+ return cfg
+}
+
+// Write v to the internal buffer. This method skips the internal encoder and
+// therefore the caller is responsible for encoding the bytes. No error will be
+// thrown if the bytes does not conform to the schema given to NewEncoder, but
+// the final ocf data will be corrupted.
+func (e *Encoder) Write(p []byte) (n int, err error) {
+ n, err = e.buf.Write(p)
+ if err != nil {
+ return n, err
+ }
+
+ e.count++
+ if e.count >= e.blockLength {
+ if err = e.writerBlock(); err != nil {
+ return n, err
+ }
+ }
+
+ return n, e.writer.Error
+}
+
+// Encode writes the Avro encoding of v to the stream.
+func (e *Encoder) Encode(v any) error {
+ if err := e.encoder.Encode(v); err != nil {
+ return err
+ }
+
+ e.count++
+ if e.count >= e.blockLength {
+ if err := e.writerBlock(); err != nil {
+ return err
+ }
+ }
+
+ return e.writer.Error
+}
+
+// Flush flushes the underlying writer.
+func (e *Encoder) Flush() error {
+ if e.count == 0 {
+ return nil
+ }
+
+ if err := e.writerBlock(); err != nil {
+ return err
+ }
+
+ return e.writer.Error
+}
+
+// Close closes the encoder, flushing the writer.
+func (e *Encoder) Close() error {
+ return e.Flush()
+}
+
+func (e *Encoder) writerBlock() error {
+ e.writer.WriteLong(int64(e.count))
+
+ b := e.codec.Encode(e.buf.Bytes())
+
+ e.writer.WriteLong(int64(len(b)))
+ _, _ = e.writer.Write(b)
+
+ _, _ = e.writer.Write(e.sync[:])
+
+ e.count = 0
+ e.buf.Reset()
+ return e.writer.Flush()
+}
+
+type ocfHeader struct {
+ Schema avro.Schema
+ Codec Codec
+ Meta map[string][]byte
+ Sync [16]byte
+}
+
+func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache, codecOpts codecOptions) (*ocfHeader, error) {
+ var h Header
+ reader.ReadVal(HeaderSchema, &h)
+ if reader.Error != nil {
+ return nil, fmt.Errorf("unexpected error: %w", reader.Error)
+ }
+
+ if h.Magic != magicBytes {
+ return nil, errors.New("invalid avro file")
+ }
+ schema, err := avro.ParseBytesWithCache(h.Meta[schemaKey], "", schemaCache)
+ if err != nil {
+ return nil, err
+ }
+
+ codec, err := resolveCodec(CodecName(h.Meta[codecKey]), codecOpts)
+ if err != nil {
+ return nil, err
+ }
+
+ return &ocfHeader{
+ Schema: schema,
+ Codec: codec,
+ Meta: h.Meta,
+ Sync: h.Sync,
+ }, nil
+}
+
+func skipToEnd(reader *avro.Reader, sync [16]byte) error {
+ for {
+ _ = reader.ReadLong()
+ if errors.Is(reader.Error, io.EOF) {
+ return nil
+ }
+ size := reader.ReadLong()
+ reader.SkipNBytes(int(size))
+ if reader.Error != nil {
+ return reader.Error
+ }
+
+ var synMark [16]byte
+ reader.Read(synMark[:])
+ if sync != synMark && !errors.Is(reader.Error, io.EOF) {
+ reader.Error = errors.New("invalid block")
+ }
+ }
+}
+
+func defaultMarshalSchema(schema avro.Schema) ([]byte, error) {
+ return []byte(schema.String()), nil
+}
+
+func fullMarshalSchema(schema avro.Schema) ([]byte, error) {
+ return json.Marshal(schema)
+}
diff --git a/vendor/github.com/hamba/avro/v2/pkg/crc64/crc64.go b/vendor/github.com/hamba/avro/v2/pkg/crc64/crc64.go
new file mode 100644
index 0000000..6ca8205
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/pkg/crc64/crc64.go
@@ -0,0 +1,154 @@
+// Package crc64 implements the Avro CRC-64 checksum.
+// See https://avro.apache.org/docs/current/spec.html#schema_fingerprints for information.
+package crc64
+
+import (
+ "hash"
+)
+
+func init() {
+ buildTable()
+}
+
+// Size is the of a CRC-64 checksum in bytes.
+const Size = 8
+
+// ByteOrder denotes how integers are encoded into bytes. The ByteOrder
+// interface in encoding/binary cancels some optimizations, so use a more
+// direct implementation.
+type ByteOrder int
+
+// ByteOrder constants.
+const (
+ LittleEndian ByteOrder = iota
+ BigEndian
+)
+
+// Empty is the empty checksum.
+const Empty = 0xc15d213aa4d7a795
+
+// Table is a 256-word table representing the polynomial for efficient processing.
+type Table [256]uint64
+
+func makeTable() *Table {
+ t := new(Table)
+ for i := range 256 {
+ fp := uint64(i)
+ for range 8 {
+ fp = (fp >> 1) ^ (Empty & -(fp & 1))
+ }
+ t[i] = fp
+ }
+ return t
+}
+
+var crc64Table *Table
+
+func buildTable() {
+ crc64Table = makeTable()
+}
+
+type digest struct {
+ crc uint64
+ tab *Table
+ byteOrder ByteOrder
+}
+
+// New creates a new hash.Hash64 computing the Avro CRC-64 checksum.
+// Its Sum method will lay the value out in big-endian byte order.
+func New() hash.Hash64 {
+ return newDigest(BigEndian)
+}
+
+// NewWithByteOrder creates a new hash.Hash64 computing the Avro CRC-64
+// checksum. Its Sum method will lay the value out in specified byte order.
+func NewWithByteOrder(byteOrder ByteOrder) hash.Hash64 {
+ return newDigest(byteOrder)
+}
+
+func newDigest(byteOrder ByteOrder) *digest {
+ return &digest{
+ crc: Empty,
+ tab: crc64Table,
+ byteOrder: byteOrder,
+ }
+}
+
+// Size returns the bytes size of the checksum.
+func (d *digest) Size() int {
+ return Size
+}
+
+// BlockSize returns the block size of the checksum.
+func (d *digest) BlockSize() int {
+ return 1
+}
+
+// Reset resets the hash instance.
+func (d *digest) Reset() {
+ d.crc = Empty
+}
+
+// Write accumulatively adds the given data to the checksum.
+func (d *digest) Write(p []byte) (n int, err error) {
+ for i := range p {
+ d.crc = (d.crc >> 8) ^ d.tab[(int)(byte(d.crc)^p[i])&0xff]
+ }
+
+ return len(p), nil
+}
+
+// Sum64 returns the checksum as a uint64.
+func (d *digest) Sum64() uint64 {
+ return d.crc
+}
+
+// Sum returns the checksum as a byte slice, using the given byte slice.
+func (d *digest) Sum(in []byte) []byte {
+ b := d.sumBytes()
+ return append(in, b[:]...)
+}
+
+// sumBytes returns the checksum as a byte array in digest byte order.
+func (d *digest) sumBytes() [Size]byte {
+ s := d.Sum64()
+
+ switch d.byteOrder {
+ case LittleEndian:
+ return [Size]byte{
+ byte(s),
+ byte(s >> 8),
+ byte(s >> 16),
+ byte(s >> 24),
+ byte(s >> 32),
+ byte(s >> 40),
+ byte(s >> 48),
+ byte(s >> 56),
+ }
+ case BigEndian:
+ return [Size]byte{
+ byte(s >> 56),
+ byte(s >> 48),
+ byte(s >> 40),
+ byte(s >> 32),
+ byte(s >> 24),
+ byte(s >> 16),
+ byte(s >> 8),
+ byte(s),
+ }
+ }
+ panic("unknown byte order")
+}
+
+// Sum returns the CRC64 checksum of the data, in big-endian byte order.
+func Sum(data []byte) [Size]byte {
+ return SumWithByteOrder(data, BigEndian)
+}
+
+// SumWithByteOrder returns the CRC64 checksum of the data, in specified byte
+// order.
+func SumWithByteOrder(data []byte, byteOrder ByteOrder) [Size]byte {
+ d := newDigest(byteOrder)
+ _, _ = d.Write(data)
+ return d.sumBytes()
+}
diff --git a/vendor/github.com/hamba/avro/v2/protocol.go b/vendor/github.com/hamba/avro/v2/protocol.go
new file mode 100644
index 0000000..3a62baa
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/protocol.go
@@ -0,0 +1,377 @@
+package avro
+
+import (
+ "crypto/md5"
+ "encoding/hex"
+ "errors"
+ "fmt"
+ "os"
+
+ jsoniter "github.com/json-iterator/go"
+ "github.com/mitchellh/mapstructure"
+)
+
+var (
+ protocolReserved = []string{"doc", "types", "messages", "protocol", "namespace"}
+ messageReserved = []string{"doc", "response", "request", "errors", "one-way"}
+)
+
+type protocolConfig struct {
+ doc string
+ props map[string]any
+}
+
+// ProtocolOption is a function that sets a protocol option.
+type ProtocolOption func(*protocolConfig)
+
+// WithProtoDoc sets the doc on a protocol.
+func WithProtoDoc(doc string) ProtocolOption {
+ return func(opts *protocolConfig) {
+ opts.doc = doc
+ }
+}
+
+// WithProtoProps sets the properties on a protocol.
+func WithProtoProps(props map[string]any) ProtocolOption {
+ return func(opts *protocolConfig) {
+ opts.props = props
+ }
+}
+
+// Protocol is an Avro protocol.
+type Protocol struct {
+ name
+ properties
+
+ types []NamedSchema
+ messages map[string]*Message
+
+ doc string
+
+ hash string
+}
+
+// NewProtocol creates a protocol instance.
+func NewProtocol(
+ name, namepsace string,
+ types []NamedSchema,
+ messages map[string]*Message,
+ opts ...ProtocolOption,
+) (*Protocol, error) {
+ var cfg protocolConfig
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ n, err := newName(name, namepsace, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ p := &Protocol{
+ name: n,
+ properties: newProperties(cfg.props, protocolReserved),
+ types: types,
+ messages: messages,
+ doc: cfg.doc,
+ }
+
+ b := md5.Sum([]byte(p.String()))
+ p.hash = hex.EncodeToString(b[:])
+
+ return p, nil
+}
+
+// Message returns a message with the given name or nil.
+func (p *Protocol) Message(name string) *Message {
+ return p.messages[name]
+}
+
+// Doc returns the protocol doc.
+func (p *Protocol) Doc() string {
+ return p.doc
+}
+
+// Hash returns the MD5 hash of the protocol.
+func (p *Protocol) Hash() string {
+ return p.hash
+}
+
+// Types returns the types of the protocol.
+func (p *Protocol) Types() []NamedSchema {
+ return p.types
+}
+
+// String returns the canonical form of the protocol.
+func (p *Protocol) String() string {
+ types := ""
+ for _, f := range p.types {
+ types += f.String() + ","
+ }
+ if len(types) > 0 {
+ types = types[:len(types)-1]
+ }
+
+ messages := ""
+ for k, m := range p.messages {
+ messages += `"` + k + `":` + m.String() + ","
+ }
+ if len(messages) > 0 {
+ messages = messages[:len(messages)-1]
+ }
+
+ return `{"protocol":"` + p.Name() +
+ `","namespace":"` + p.Namespace() +
+ `","types":[` + types + `],"messages":{` + messages + `}}`
+}
+
+// Message is an Avro protocol message.
+type Message struct {
+ properties
+
+ req *RecordSchema
+ resp Schema
+ errs *UnionSchema
+ oneWay bool
+
+ doc string
+}
+
+// NewMessage creates a protocol message instance.
+func NewMessage(req *RecordSchema, resp Schema, errors *UnionSchema, oneWay bool, opts ...ProtocolOption) *Message {
+ var cfg protocolConfig
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ return &Message{
+ properties: newProperties(cfg.props, messageReserved),
+ req: req,
+ resp: resp,
+ errs: errors,
+ oneWay: oneWay,
+ doc: cfg.doc,
+ }
+}
+
+// Request returns the message request schema.
+func (m *Message) Request() *RecordSchema {
+ return m.req
+}
+
+// Response returns the message response schema.
+func (m *Message) Response() Schema {
+ return m.resp
+}
+
+// Errors returns the message errors union schema.
+func (m *Message) Errors() *UnionSchema {
+ return m.errs
+}
+
+// OneWay determines of the message is a one way message.
+func (m *Message) OneWay() bool {
+ return m.oneWay
+}
+
+// Doc returns the message doc.
+func (m *Message) Doc() string {
+ return m.doc
+}
+
+// String returns the canonical form of the message.
+func (m *Message) String() string {
+ fields := ""
+ for _, f := range m.req.fields {
+ fields += f.String() + ","
+ }
+ if len(fields) > 0 {
+ fields = fields[:len(fields)-1]
+ }
+
+ str := `{"request":[` + fields + `]`
+ if m.resp != nil {
+ str += `,"response":` + m.resp.String()
+ }
+ if m.errs != nil && len(m.errs.Types()) > 1 {
+ errs, _ := NewUnionSchema(m.errs.Types()[1:])
+ str += `,"errors":` + errs.String()
+ }
+ str += "}"
+ return str
+}
+
+// ParseProtocolFile parses an Avro protocol from a file.
+func ParseProtocolFile(path string) (*Protocol, error) {
+ s, err := os.ReadFile(path)
+ if err != nil {
+ return nil, err
+ }
+
+ return ParseProtocol(string(s))
+}
+
+// MustParseProtocol parses an Avro protocol, panicing if there is an error.
+func MustParseProtocol(protocol string) *Protocol {
+ parsed, err := ParseProtocol(protocol)
+ if err != nil {
+ panic(err)
+ }
+
+ return parsed
+}
+
+// ParseProtocol parses an Avro protocol.
+func ParseProtocol(protocol string) (*Protocol, error) {
+ cache := &SchemaCache{}
+
+ var m map[string]any
+ if err := jsoniter.Unmarshal([]byte(protocol), &m); err != nil {
+ return nil, err
+ }
+
+ seen := seenCache{}
+ return parseProtocol(m, seen, cache)
+}
+
+type protocol struct {
+ Protocol string `mapstructure:"protocol"`
+ Namespace string `mapstructure:"namespace"`
+ Doc string `mapstructure:"doc"`
+ Types []any `mapstructure:"types"`
+ Messages map[string]map[string]any `mapstructure:"messages"`
+ Props map[string]any `mapstructure:",remain"`
+}
+
+func parseProtocol(m map[string]any, seen seenCache, cache *SchemaCache) (*Protocol, error) {
+ var (
+ p protocol
+ meta mapstructure.Metadata
+ )
+ if err := decodeMap(m, &p, &meta); err != nil {
+ return nil, fmt.Errorf("avro: error decoding protocol: %w", err)
+ }
+
+ if err := checkParsedName(p.Protocol); err != nil {
+ return nil, err
+ }
+
+ var (
+ types []NamedSchema
+ err error
+ )
+ if len(p.Types) > 0 {
+ types, err = parseProtocolTypes(p.Namespace, p.Types, seen, cache)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ messages := map[string]*Message{}
+ if len(p.Messages) > 0 {
+ for k, msg := range p.Messages {
+ message, err := parseMessage(p.Namespace, msg, seen, cache)
+ if err != nil {
+ return nil, err
+ }
+
+ messages[k] = message
+ }
+ }
+
+ return NewProtocol(p.Protocol, p.Namespace, types, messages, WithProtoDoc(p.Doc), WithProtoProps(p.Props))
+}
+
+func parseProtocolTypes(namespace string, types []any, seen seenCache, cache *SchemaCache) ([]NamedSchema, error) {
+ ts := make([]NamedSchema, len(types))
+ for i, typ := range types {
+ schema, err := parseType(namespace, typ, seen, cache)
+ if err != nil {
+ return nil, err
+ }
+
+ namedSchema, ok := schema.(NamedSchema)
+ if !ok {
+ return nil, errors.New("avro: protocol types must be named schemas")
+ }
+
+ ts[i] = namedSchema
+ }
+
+ return ts, nil
+}
+
+type message struct {
+ Doc string `mapstructure:"doc"`
+ Request []map[string]any `mapstructure:"request"`
+ Response any `mapstructure:"response"`
+ Errors []any `mapstructure:"errors"`
+ OneWay bool `mapstructure:"one-way"`
+ Props map[string]any `mapstructure:",remain"`
+}
+
+func parseMessage(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (*Message, error) {
+ var (
+ msg message
+ meta mapstructure.Metadata
+ )
+ if err := decodeMap(m, &msg, &meta); err != nil {
+ return nil, fmt.Errorf("avro: error decoding message: %w", err)
+ }
+
+ fields := make([]*Field, len(msg.Request))
+ for i, f := range msg.Request {
+ field, err := parseField(namespace, f, seen, cache)
+ if err != nil {
+ return nil, err
+ }
+ fields[i] = field
+ }
+ request := &RecordSchema{
+ name: name{},
+ properties: properties{},
+ fields: fields,
+ }
+
+ var response Schema
+ if msg.Response != nil {
+ schema, err := parseType(namespace, msg.Response, seen, cache)
+ if err != nil {
+ return nil, err
+ }
+
+ if schema.Type() != Null {
+ response = schema
+ }
+ }
+
+ types := []Schema{NewPrimitiveSchema(String, nil)}
+ if len(msg.Errors) > 0 {
+ for _, e := range msg.Errors {
+ schema, err := parseType(namespace, e, seen, cache)
+ if err != nil {
+ return nil, err
+ }
+
+ if rec, ok := schema.(*RecordSchema); ok && !rec.IsError() {
+ return nil, errors.New("avro: errors record schema must be of type error")
+ }
+
+ types = append(types, schema)
+ }
+ }
+ errs, err := NewUnionSchema(types)
+ if err != nil {
+ return nil, err
+ }
+
+ oneWay := msg.OneWay
+ if hasKey(meta.Keys, "one-way") && oneWay && (len(errs.Types()) > 1 || response != nil) {
+ return nil, errors.New("avro: one-way messages cannot not have a response or errors")
+ }
+ if !oneWay && len(errs.Types()) <= 1 && response == nil {
+ oneWay = true
+ }
+
+ return NewMessage(request, response, errs, oneWay, WithProtoDoc(msg.Doc), WithProtoProps(msg.Props)), nil
+}
diff --git a/vendor/github.com/hamba/avro/v2/reader.go b/vendor/github.com/hamba/avro/v2/reader.go
new file mode 100644
index 0000000..3c11b18
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/reader.go
@@ -0,0 +1,324 @@
+package avro
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "strings"
+ "unsafe"
+)
+
+const (
+ maxIntBufSize = 5
+ maxLongBufSize = 10
+)
+
+// ReaderFunc is a function used to customize the Reader.
+type ReaderFunc func(r *Reader)
+
+// WithReaderConfig specifies the configuration to use with a reader.
+func WithReaderConfig(cfg API) ReaderFunc {
+ return func(r *Reader) {
+ r.cfg = cfg.(*frozenConfig)
+ }
+}
+
+// Reader is an Avro specific io.Reader.
+type Reader struct {
+ cfg *frozenConfig
+ reader io.Reader
+ slab []byte
+ buf []byte
+ head int
+ tail int
+ Error error
+}
+
+// NewReader creates a new Reader.
+func NewReader(r io.Reader, bufSize int, opts ...ReaderFunc) *Reader {
+ reader := &Reader{
+ cfg: DefaultConfig.(*frozenConfig),
+ reader: r,
+ buf: make([]byte, bufSize),
+ head: 0,
+ tail: 0,
+ }
+
+ for _, opt := range opts {
+ opt(reader)
+ }
+
+ return reader
+}
+
+// Reset resets a Reader with a new byte array attached.
+func (r *Reader) Reset(b []byte) *Reader {
+ r.reader = nil
+ r.buf = b
+ r.head = 0
+ r.tail = len(b)
+ return r
+}
+
+// ReportError record an error in iterator instance with current position.
+func (r *Reader) ReportError(operation, msg string) {
+ if r.Error != nil && !errors.Is(r.Error, io.EOF) {
+ return
+ }
+
+ r.Error = fmt.Errorf("avro: %s: %s", operation, msg)
+}
+
+func (r *Reader) loadMore() bool {
+ if r.reader == nil {
+ if r.Error == nil {
+ r.head = r.tail
+ r.Error = io.EOF
+ }
+ return false
+ }
+
+ for {
+ n, err := r.reader.Read(r.buf)
+ if n == 0 {
+ if err != nil {
+ if r.Error == nil {
+ r.Error = err
+ }
+ return false
+ }
+ continue
+ }
+
+ r.head = 0
+ r.tail = n
+ return true
+ }
+}
+
+func (r *Reader) readByte() byte {
+ if r.head == r.tail {
+ if !r.loadMore() {
+ r.Error = io.ErrUnexpectedEOF
+ return 0
+ }
+ }
+
+ b := r.buf[r.head]
+ r.head++
+
+ return b
+}
+
+// Peek returns the next byte in the buffer.
+// The Reader Error will be io.EOF if no next byte exists.
+func (r *Reader) Peek() byte {
+ if r.head == r.tail {
+ if !r.loadMore() {
+ return 0
+ }
+ }
+ return r.buf[r.head]
+}
+
+// Read reads data into the given bytes.
+func (r *Reader) Read(b []byte) {
+ size := len(b)
+ read := 0
+
+ for read < size {
+ if r.head == r.tail {
+ if !r.loadMore() {
+ r.Error = io.ErrUnexpectedEOF
+ return
+ }
+ }
+
+ n := copy(b[read:], r.buf[r.head:r.tail])
+ r.head += n
+ read += n
+ }
+}
+
+// ReadBool reads a Bool from the Reader.
+func (r *Reader) ReadBool() bool {
+ b := r.readByte()
+
+ if b != 0 && b != 1 {
+ r.ReportError("ReadBool", "invalid bool")
+ }
+ return b == 1
+}
+
+// ReadInt reads an Int from the Reader.
+//
+//nolint:dupl
+func (r *Reader) ReadInt() int32 {
+ if r.Error != nil {
+ return 0
+ }
+
+ var (
+ n int
+ v uint32
+ s uint8
+ )
+
+ for {
+ tail := r.tail
+ if r.tail-r.head+n > maxIntBufSize {
+ tail = r.head + maxIntBufSize - n
+ }
+
+ // Consume what it is in the buffer.
+ var i int
+ for _, b := range r.buf[r.head:tail] {
+ v |= uint32(b&0x7f) << s
+ if b&0x80 == 0 {
+ r.head += i + 1
+ return int32((v >> 1) ^ -(v & 1))
+ }
+ s += 7
+ i++
+ }
+ if n >= maxIntBufSize {
+ r.ReportError("ReadInt", "int overflow")
+ return 0
+ }
+ r.head += i
+ n += i
+
+ // We ran out of buffer and are not at the end of the int,
+ // Read more into the buffer.
+ if !r.loadMore() {
+ r.Error = fmt.Errorf("reading int: %w", r.Error)
+ return 0
+ }
+ }
+}
+
+// ReadLong reads a Long from the Reader.
+//
+//nolint:dupl
+func (r *Reader) ReadLong() int64 {
+ if r.Error != nil {
+ return 0
+ }
+
+ var (
+ n int
+ v uint64
+ s uint8
+ )
+
+ for {
+ tail := r.tail
+ if r.tail-r.head+n > maxLongBufSize {
+ tail = r.head + maxLongBufSize - n
+ }
+
+ // Consume what it is in the buffer.
+ var i int
+ for _, b := range r.buf[r.head:tail] {
+ v |= uint64(b&0x7f) << s
+ if b&0x80 == 0 {
+ r.head += i + 1
+ return int64((v >> 1) ^ -(v & 1))
+ }
+ s += 7
+ i++
+ }
+ if n >= maxLongBufSize {
+ r.ReportError("ReadLong", "int overflow")
+ return 0
+ }
+ r.head += i
+ n += i
+
+ // We ran out of buffer and are not at the end of the long,
+ // Read more into the buffer.
+ if !r.loadMore() {
+ r.Error = fmt.Errorf("reading long: %w", r.Error)
+ return 0
+ }
+ }
+}
+
+// ReadFloat reads a Float from the Reader.
+func (r *Reader) ReadFloat() float32 {
+ var buf [4]byte
+ r.Read(buf[:])
+
+ float := *(*float32)(unsafe.Pointer(&buf[0]))
+ return float
+}
+
+// ReadDouble reads a Double from the Reader.
+func (r *Reader) ReadDouble() float64 {
+ var buf [8]byte
+ r.Read(buf[:])
+
+ float := *(*float64)(unsafe.Pointer(&buf[0]))
+ return float
+}
+
+// ReadBytes reads Bytes from the Reader.
+func (r *Reader) ReadBytes() []byte {
+ return r.readBytes("bytes")
+}
+
+// ReadString reads a String from the Reader.
+func (r *Reader) ReadString() string {
+ b := r.readBytes("string")
+ if len(b) == 0 {
+ return ""
+ }
+
+ return *(*string)(unsafe.Pointer(&b))
+}
+
+func (r *Reader) readBytes(op string) []byte {
+ size := int(r.ReadLong())
+ if size < 0 {
+ fnName := "Read" + strings.ToTitle(op)
+ r.ReportError(fnName, "invalid "+op+" length")
+ return nil
+ }
+ if size == 0 {
+ return []byte{}
+ }
+ if maxSize := r.cfg.getMaxByteSliceSize(); maxSize > 0 && size > maxSize {
+ fnName := "Read" + strings.ToTitle(op)
+ r.ReportError(fnName, "size is greater than `Config.MaxByteSliceSize`")
+ return nil
+ }
+
+ // The bytes are entirely in the buffer and of a reasonable size.
+ // Use the byte slab.
+ if r.head+size <= r.tail && size <= 1024 {
+ if cap(r.slab) < size {
+ r.slab = make([]byte, 1024)
+ }
+ dst := r.slab[:size]
+ r.slab = r.slab[size:]
+ copy(dst, r.buf[r.head:r.head+size])
+ r.head += size
+ return dst
+ }
+
+ buf := make([]byte, size)
+ r.Read(buf)
+ return buf
+}
+
+// ReadBlockHeader reads a Block Header from the Reader.
+func (r *Reader) ReadBlockHeader() (int64, int64) {
+ length := r.ReadLong()
+ if length < 0 {
+ size := r.ReadLong()
+
+ return -length, size
+ }
+
+ return length, 0
+}
diff --git a/vendor/github.com/hamba/avro/v2/reader_generic.go b/vendor/github.com/hamba/avro/v2/reader_generic.go
new file mode 100644
index 0000000..32d341c
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/reader_generic.go
@@ -0,0 +1,163 @@
+package avro
+
+import (
+ "fmt"
+ "reflect"
+ "time"
+)
+
+// ReadNext reads the next Avro element as a generic interface.
+func (r *Reader) ReadNext(schema Schema) any {
+ var ls LogicalSchema
+ lts, ok := schema.(LogicalTypeSchema)
+ if ok {
+ ls = lts.Logical()
+ }
+
+ switch schema.Type() {
+ case Boolean:
+ return r.ReadBool()
+ case Int:
+ if ls != nil {
+ switch ls.Type() {
+ case Date:
+ i := r.ReadInt()
+ sec := int64(i) * int64(24*time.Hour/time.Second)
+ return time.Unix(sec, 0).UTC()
+
+ case TimeMillis:
+ return time.Duration(r.ReadInt()) * time.Millisecond
+ }
+ }
+ return int(r.ReadInt())
+ case Long:
+ if ls != nil {
+ switch ls.Type() {
+ case TimeMicros:
+ return time.Duration(r.ReadLong()) * time.Microsecond
+
+ case TimestampMillis:
+ i := r.ReadLong()
+ sec := i / 1e3
+ nsec := (i - sec*1e3) * 1e6
+ return time.Unix(sec, nsec).UTC()
+
+ case TimestampMicros:
+ i := r.ReadLong()
+ sec := i / 1e6
+ nsec := (i - sec*1e6) * 1e3
+ return time.Unix(sec, nsec).UTC()
+ }
+ }
+ return r.ReadLong()
+ case Float:
+ return r.ReadFloat()
+ case Double:
+ return r.ReadDouble()
+ case String:
+ return r.ReadString()
+ case Bytes:
+ if ls != nil && ls.Type() == Decimal {
+ dec := ls.(*DecimalLogicalSchema)
+ return ratFromBytes(r.ReadBytes(), dec.Scale())
+ }
+ return r.ReadBytes()
+ case Record:
+ fields := schema.(*RecordSchema).Fields()
+ obj := make(map[string]any, len(fields))
+ for _, field := range fields {
+ obj[field.Name()] = r.ReadNext(field.Type())
+ }
+ return obj
+ case Ref:
+ return r.ReadNext(schema.(*RefSchema).Schema())
+ case Enum:
+ symbols := schema.(*EnumSchema).Symbols()
+ idx := int(r.ReadInt())
+ if idx < 0 || idx >= len(symbols) {
+ r.ReportError("Read", "unknown enum symbol")
+ return nil
+ }
+ return symbols[idx]
+ case Array:
+ arr := []any{}
+ r.ReadArrayCB(func(r *Reader) bool {
+ elem := r.ReadNext(schema.(*ArraySchema).Items())
+ arr = append(arr, elem)
+ return true
+ })
+ return arr
+ case Map:
+ obj := map[string]any{}
+ r.ReadMapCB(func(r *Reader, field string) bool {
+ elem := r.ReadNext(schema.(*MapSchema).Values())
+ obj[field] = elem
+ return true
+ })
+ return obj
+ case Union:
+ types := schema.(*UnionSchema).Types()
+ idx := int(r.ReadLong())
+ if idx < 0 || idx > len(types)-1 {
+ r.ReportError("Read", "unknown union type")
+ return nil
+ }
+ schema = types[idx]
+ if schema.Type() == Null {
+ return nil
+ }
+
+ key := schemaTypeName(schema)
+ obj := map[string]any{}
+ obj[key] = r.ReadNext(types[idx])
+ return obj
+ case Fixed:
+ size := schema.(*FixedSchema).Size()
+ obj := make([]byte, size)
+ r.Read(obj)
+ if ls != nil && ls.Type() == Decimal {
+ dec := ls.(*DecimalLogicalSchema)
+ return ratFromBytes(obj, dec.Scale())
+ }
+ return byteSliceToArray(obj, size)
+ default:
+ r.ReportError("Read", fmt.Sprintf("unexpected schema type: %v", schema.Type()))
+ return nil
+ }
+}
+
+// ReadArrayCB reads an array with a callback per item.
+func (r *Reader) ReadArrayCB(fn func(*Reader) bool) {
+ for {
+ l, _ := r.ReadBlockHeader()
+ if l == 0 {
+ break
+ }
+ for range l {
+ fn(r)
+ }
+ }
+}
+
+// ReadMapCB reads an array with a callback per item.
+func (r *Reader) ReadMapCB(fn func(*Reader, string) bool) {
+ for {
+ l, _ := r.ReadBlockHeader()
+ if l == 0 {
+ break
+ }
+
+ for range l {
+ field := r.ReadString()
+ fn(r, field)
+ }
+ }
+}
+
+var byteType = reflect.TypeOf((*byte)(nil)).Elem()
+
+func byteSliceToArray(b []byte, size int) any {
+ vArr := reflect.New(reflect.ArrayOf(size, byteType)).Elem()
+ reflect.Copy(vArr, reflect.ValueOf(b))
+ return vArr.Interface()
+}
diff --git a/vendor/github.com/hamba/avro/v2/reader_skip.go b/vendor/github.com/hamba/avro/v2/reader_skip.go
new file mode 100644
index 0000000..94288c8
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/reader_skip.go
@@ -0,0 +1,79 @@
+package avro
+
+// SkipNBytes skips the given number of bytes in the reader.
+func (r *Reader) SkipNBytes(n int) {
+ read := 0
+ for read < n {
+ if r.head == r.tail {
+ if !r.loadMore() {
+ return
+ }
+ }
+
+ if read+r.tail-r.head < n {
+ read += r.tail - r.head
+ r.head = r.tail
+ continue
+ }
+
+ r.head += n - read
+ read += n - read
+ }
+}
+
+// SkipBool skips a Bool in the reader.
+func (r *Reader) SkipBool() {
+ _ = r.readByte()
+}
+
+// SkipInt skips an Int in the reader.
+func (r *Reader) SkipInt() {
+ var n int
+ for r.Error == nil && n < maxIntBufSize {
+ b := r.readByte()
+ if b&0x80 == 0 {
+ break
+ }
+ n++
+ }
+}
+
+// SkipLong skips a Long in the reader.
+func (r *Reader) SkipLong() {
+ var n int
+ for r.Error == nil && n < maxLongBufSize {
+ b := r.readByte()
+ if b&0x80 == 0 {
+ break
+ }
+ n++
+ }
+}
+
+// SkipFloat skips a Float in the reader.
+func (r *Reader) SkipFloat() {
+ r.SkipNBytes(4)
+}
+
+// SkipDouble skips a Double in the reader.
+func (r *Reader) SkipDouble() {
+ r.SkipNBytes(8)
+}
+
+// SkipString skips a String in the reader.
+func (r *Reader) SkipString() {
+ size := r.ReadLong()
+ if size <= 0 {
+ return
+ }
+ r.SkipNBytes(int(size))
+}
+
+// SkipBytes skips Bytes in the reader.
+func (r *Reader) SkipBytes() {
+ size := r.ReadLong()
+ if size <= 0 {
+ return
+ }
+ r.SkipNBytes(int(size))
+}
diff --git a/vendor/github.com/hamba/avro/v2/resolver.go b/vendor/github.com/hamba/avro/v2/resolver.go
new file mode 100644
index 0000000..c1b6ab6
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/resolver.go
@@ -0,0 +1,90 @@
+package avro
+
+import (
+ "fmt"
+ "math/big"
+ "sync"
+ "time"
+
+ "github.com/modern-go/reflect2"
+)
+
+// TypeResolver resolves types by name.
+type TypeResolver struct {
+ names sync.Map // map[string]reflect2.Type
+ types sync.Map // map[int][]string
+}
+
+// NewTypeResolver creates a new type resolver with all primitive types
+// registered.
+func NewTypeResolver() *TypeResolver {
+ r := &TypeResolver{}
+
+ // Register basic types
+ r.Register(string(Null), &null{})
+ r.Register(string(Int), int8(0))
+ r.Register(string(Int), int16(0))
+ r.Register(string(Int), int32(0))
+ r.Register(string(Int), int(0))
+ r.Register(string(Long), int(0))
+ r.Register(string(Long), int64(0))
+ r.Register(string(Float), float32(0))
+ r.Register(string(Double), float64(0))
+ r.Register(string(String), "")
+ r.Register(string(Bytes), []byte{})
+ r.Register(string(Boolean), true)
+
+ // Register logical types
+ r.Register(string(Int)+"."+string(Date), time.Time{})
+ r.Register(string(Int)+"."+string(TimeMillis), time.Duration(0))
+ r.Register(string(Long)+"."+string(TimestampMillis), time.Time{})
+ r.Register(string(Long)+"."+string(TimestampMicros), time.Time{})
+ r.Register(string(Long)+"."+string(TimeMicros), time.Duration(0))
+ r.Register(string(Bytes)+"."+string(Decimal), big.NewRat(1, 1))
+ r.Register(string(String)+"."+string(UUID), "")
+
+ return r
+}
+
+// Register registers names to their types for resolution.
+func (r *TypeResolver) Register(name string, obj any) {
+ typ := reflect2.TypeOf(obj)
+ rtype := typ.RType()
+
+ r.names.Store(name, typ)
+
+ raw, ok := r.types.LoadOrStore(rtype, []string{name})
+ if !ok {
+ return
+ }
+ names := raw.([]string)
+ names = append(names, name)
+ r.types.Store(rtype, names)
+}
+
+// Name gets the name for a type, or an error.
+func (r *TypeResolver) Name(typ reflect2.Type) ([]string, error) {
+ rtype := typ.RType()
+
+ names, ok := r.types.Load(rtype)
+ if !ok {
+ return nil, fmt.Errorf("avro: unable to resolve type %s", typ.String())
+ }
+
+ return names.([]string), nil
+}
+
+// Type gets the type for a name, or an error.
+func (r *TypeResolver) Type(name string) (reflect2.Type, error) {
+ typ, ok := r.names.Load(name)
+ if !ok {
+ return nil, fmt.Errorf("avro: unable to resolve type with name %s", name)
+ }
+
+ return typ.(reflect2.Type), nil
+}
+
+// Register registers names to their types for resolution. All primitive types are pre-registered.
+func Register(name string, obj any) {
+ DefaultConfig.Register(name, obj)
+}
diff --git a/vendor/github.com/hamba/avro/v2/schema.go b/vendor/github.com/hamba/avro/v2/schema.go
new file mode 100644
index 0000000..84a9fd2
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/schema.go
@@ -0,0 +1,1807 @@
+package avro
+
+import (
+ "bytes"
+ "crypto/md5"
+ "crypto/sha256"
+ "errors"
+ "fmt"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+
+ "github.com/hamba/avro/v2/pkg/crc64"
+ jsoniter "github.com/json-iterator/go"
+)
+
+type nullDefaultType struct{}
+
+func (nullDefaultType) MarshalJSON() ([]byte, error) {
+ return []byte("null"), nil
+}
+
+var nullDefault nullDefaultType = struct{}{}
+
+var (
+ // Note: order matches the order of properties as they are named in the spec.
+ // https://avro.apache.org/docs/1.12.0/specification
+ recordReserved = []string{"type", "name", "namespace", "doc", "aliases", "fields"}
+ fieldReserved = []string{"name", "doc", "type", "order", "aliases", "default"}
+ enumReserved = []string{"type", "name", "namespace", "aliases", "doc", "symbols", "default"}
+ arrayReserved = []string{"type", "items"}
+ mapReserved = []string{"type", "values"}
+ fixedReserved = []string{"type", "name", "namespace", "aliases", "size"}
+ fixedWithLogicalTypeReserved = []string{"type", "name", "namespace", "aliases", "size", "logicalType"}
+ fixedWithDecimalTypeReserved = []string{
+ "type", "name", "namespace", "aliases", "size", "logicalType", "precision", "scale",
+ }
+ primitiveReserved = []string{"type"}
+ primitiveWithLogicalTypeReserved = []string{"type", "logicalType"}
+ primitiveWithDecimalTypeReserved = []string{"type", "logicalType", "precision", "scale"}
+)
+
+// Type is a schema type.
+type Type string
+
+// Schema type constants.
+const (
+ Record Type = "record"
+ Error Type = "error"
+ Ref Type = "<ref>"
+ Enum Type = "enum"
+ Array Type = "array"
+ Map Type = "map"
+ Union Type = "union"
+ Fixed Type = "fixed"
+ String Type = "string"
+ Bytes Type = "bytes"
+ Int Type = "int"
+ Long Type = "long"
+ Float Type = "float"
+ Double Type = "double"
+ Boolean Type = "boolean"
+ Null Type = "null"
+)
+
+// Order is a field order.
+type Order string
+
+// Field orders.
+const (
+ Asc Order = "ascending"
+ Desc Order = "descending"
+ Ignore Order = "ignore"
+)
+
+// LogicalType is a schema logical type.
+type LogicalType string
+
+// Schema logical type constants.
+const (
+ Decimal LogicalType = "decimal"
+ UUID LogicalType = "uuid"
+ Date LogicalType = "date"
+ TimeMillis LogicalType = "time-millis"
+ TimeMicros LogicalType = "time-micros"
+ TimestampMillis LogicalType = "timestamp-millis"
+ TimestampMicros LogicalType = "timestamp-micros"
+ LocalTimestampMillis LogicalType = "local-timestamp-millis"
+ LocalTimestampMicros LogicalType = "local-timestamp-micros"
+ Duration LogicalType = "duration"
+)
+
+// Action is a field action used during decoding process.
+type Action string
+
+// Action type constants.
+const (
+ FieldIgnore Action = "ignore"
+ FieldSetDefault Action = "set_default"
+)
+
+// FingerprintType is a fingerprinting algorithm.
+type FingerprintType string
+
+// Fingerprint type constants.
+const (
+ CRC64Avro FingerprintType = "CRC64-AVRO"
+ CRC64AvroLE FingerprintType = "CRC64-AVRO-LE"
+ MD5 FingerprintType = "MD5"
+ SHA256 FingerprintType = "SHA256"
+)
+
+// SchemaCache is a cache of schemas.
+type SchemaCache struct {
+ cache sync.Map // map[string]Schema
+}
+
+// Add adds a schema to the cache with the given name.
+func (c *SchemaCache) Add(name string, schema Schema) {
+ c.cache.Store(name, schema)
+}
+
+// Get returns the Schema if it exists.
+func (c *SchemaCache) Get(name string) Schema {
+ if v, ok := c.cache.Load(name); ok {
+ return v.(Schema)
+ }
+
+ return nil
+}
+
+// AddAll adds all schemas from the given cache to the current cache.
+func (c *SchemaCache) AddAll(cache *SchemaCache) {
+ if cache == nil {
+ return
+ }
+ cache.cache.Range(func(key, value interface{}) bool {
+ c.cache.Store(key, value)
+ return true
+ })
+}
+
+// Schemas is a slice of Schemas.
+type Schemas []Schema
+
+// Get gets a schema and position by type or name if it is a named schema.
+func (s Schemas) Get(name string) (Schema, int) {
+ for i, schema := range s {
+ if schemaTypeName(schema) == name {
+ return schema, i
+ }
+ }
+
+ return nil, -1
+}
+
+// Schema represents an Avro schema.
+type Schema interface {
+ // Type returns the type of the schema.
+ Type() Type
+
+ // String returns the canonical form of the schema.
+ String() string
+
+ // Fingerprint returns the SHA256 fingerprint of the schema.
+ Fingerprint() [32]byte
+
+ // FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error.
+ FingerprintUsing(FingerprintType) ([]byte, error)
+
+ // CacheFingerprint returns the unique identity of the schema.
+ // This returns a unique identity for schemas resolved from a writer schema, otherwise it returns
+ // the schemas Fingerprint.
+ CacheFingerprint() [32]byte
+}
+
+// LogicalSchema represents an Avro schema with a logical type.
+type LogicalSchema interface {
+ // Type returns the type of the logical schema.
+ Type() LogicalType
+
+ // String returns the canonical form of the logical schema.
+ String() string
+}
+
+// PropertySchema represents a schema with properties.
+type PropertySchema interface {
+ // Prop gets a property from the schema.
+ Prop(string) any
+}
+
+// NamedSchema represents a schema with a name.
+type NamedSchema interface {
+ Schema
+ PropertySchema
+
+ // Name returns the name of the schema.
+ Name() string
+
+ // Namespace returns the namespace of a schema.
+ Namespace() string
+
+ // FullName returns the full qualified name of a schema.
+ FullName() string
+
+ // Aliases returns the full qualified aliases of a schema.
+ Aliases() []string
+}
+
+// LogicalTypeSchema represents a schema that can contain a logical type.
+type LogicalTypeSchema interface {
+ // Logical returns the logical schema or nil.
+ Logical() LogicalSchema
+}
+
+type name struct {
+ name string
+ namespace string
+ full string
+ aliases []string
+}
+
+func newName(n, ns string, aliases []string) (name, error) {
+ if idx := strings.LastIndexByte(n, '.'); idx > -1 {
+ ns = n[:idx]
+ n = n[idx+1:]
+ }
+
+ full := n
+ if ns != "" {
+ full = ns + "." + n
+ }
+
+ for _, part := range strings.Split(full, ".") {
+ if err := validateName(part); err != nil {
+ return name{}, fmt.Errorf("avro: invalid name part %q in name %q: %w", full, part, err)
+ }
+ }
+
+ a := make([]string, 0, len(aliases))
+ for _, alias := range aliases {
+ if !strings.Contains(alias, ".") {
+ if err := validateName(alias); err != nil {
+ return name{}, fmt.Errorf("avro: invalid name %q: %w", alias, err)
+ }
+ if ns == "" {
+ a = append(a, alias)
+ continue
+ }
+ a = append(a, ns+"."+alias)
+ continue
+ }
+
+ for _, part := range strings.Split(alias, ".") {
+ if err := validateName(part); err != nil {
+ return name{}, fmt.Errorf("avro: invalid name part %q in name %q: %w", full, part, err)
+ }
+ }
+ a = append(a, alias)
+ }
+
+ return name{
+ name: n,
+ namespace: ns,
+ full: full,
+ aliases: a,
+ }, nil
+}
+
+// Name returns the name of a schema.
+func (n name) Name() string {
+ return n.name
+}
+
+// Namespace returns the namespace of a schema.
+func (n name) Namespace() string {
+ return n.namespace
+}
+
+// FullName returns the fully qualified name of a schema.
+func (n name) FullName() string {
+ return n.full
+}
+
+// Aliases returns the fully qualified aliases of a schema.
+func (n name) Aliases() []string {
+ return n.aliases
+}
+
+type fingerprinter struct {
+ fingerprint atomic.Value // [32]byte
+ cache sync.Map // map[FingerprintType][]byte
+}
+
+// Fingerprint returns the SHA256 fingerprint of the schema.
+func (f *fingerprinter) Fingerprint(stringer fmt.Stringer) [32]byte {
+ if v := f.fingerprint.Load(); v != nil {
+ return v.([32]byte)
+ }
+
+ fingerprint := sha256.Sum256([]byte(stringer.String()))
+ f.fingerprint.Store(fingerprint)
+ return fingerprint
+}
+
+// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error.
+func (f *fingerprinter) FingerprintUsing(typ FingerprintType, stringer fmt.Stringer) ([]byte, error) {
+ if v, ok := f.cache.Load(typ); ok {
+ return v.([]byte), nil
+ }
+
+ data := []byte(stringer.String())
+
+ var fingerprint []byte
+ switch typ {
+ case CRC64Avro:
+ h := crc64.Sum(data)
+ fingerprint = h[:]
+ case CRC64AvroLE:
+ h := crc64.SumWithByteOrder(data, crc64.LittleEndian)
+ fingerprint = h[:]
+ case MD5:
+ h := md5.Sum(data)
+ fingerprint = h[:]
+ case SHA256:
+ h := sha256.Sum256(data)
+ fingerprint = h[:]
+ default:
+ return nil, fmt.Errorf("avro: unknown fingerprint algorithm %s", typ)
+ }
+
+ f.cache.Store(typ, fingerprint)
+ return fingerprint, nil
+}
+
+type cacheFingerprinter struct {
+ writerFingerprint *[32]byte
+
+ cache atomic.Value // [32]byte
+}
+
+// CacheFingerprint returns the SHA256 identity of the schema.
+func (i *cacheFingerprinter) CacheFingerprint(schema Schema, fn func() []byte) [32]byte {
+ if v := i.cache.Load(); v != nil {
+ return v.([32]byte)
+ }
+
+ if i.writerFingerprint == nil {
+ fp := schema.Fingerprint()
+ i.cache.Store(fp)
+ return fp
+ }
+
+ fp := schema.Fingerprint()
+ d := append([]byte{}, fp[:]...)
+ d = append(d, (*i.writerFingerprint)[:]...)
+ if fn != nil {
+ d = append(d, fn()...)
+ }
+ ident := sha256.Sum256(d)
+ i.cache.Store(ident)
+ return ident
+}
+
+type properties struct {
+ props map[string]any
+}
+
+func newProperties(props map[string]any, res []string) properties {
+ p := properties{props: map[string]any{}}
+ for k, v := range props {
+ if isReserved(res, k) {
+ continue
+ }
+ p.props[k] = v
+ }
+ return p
+}
+
+func isReserved(res []string, k string) bool {
+ for _, r := range res {
+ if k == r {
+ return true
+ }
+ }
+ return false
+}
+
+// Prop gets a property from the schema.
+func (p properties) Prop(name string) any {
+ if p.props == nil {
+ return nil
+ }
+
+ return p.props[name]
+}
+
+// Props returns a map that contains all schema custom properties.
+// Any accidental change to the returned map will directly modify the schema custom properties.
+func (p properties) Props() map[string]any {
+ return p.props
+}
+
+func (p properties) marshalPropertiesToJSON(buf *bytes.Buffer) error {
+ sortedPropertyKeys := make([]string, 0, len(p.props))
+ for k := range p.props {
+ sortedPropertyKeys = append(sortedPropertyKeys, k)
+ }
+ sort.Strings(sortedPropertyKeys)
+ for _, k := range sortedPropertyKeys {
+ vv, err := jsoniter.Marshal(p.props[k])
+ if err != nil {
+ return err
+ }
+ kk, err := jsoniter.Marshal(k)
+ if err != nil {
+ return err
+ }
+ buf.WriteString(`,`)
+ buf.Write(kk)
+ buf.WriteString(`:`)
+ buf.Write(vv)
+ }
+ return nil
+}
+
+type schemaConfig struct {
+ aliases []string
+ doc string
+ def any
+ order Order
+ props map[string]any
+ wfp *[32]byte
+}
+
+// SchemaOption is a function that sets a schema option.
+type SchemaOption func(*schemaConfig)
+
+// WithAliases sets the aliases on a schema.
+func WithAliases(aliases []string) SchemaOption {
+ return func(opts *schemaConfig) {
+ opts.aliases = aliases
+ }
+}
+
+// WithDoc sets the doc on a schema.
+func WithDoc(doc string) SchemaOption {
+ return func(opts *schemaConfig) {
+ opts.doc = doc
+ }
+}
+
+// WithDefault sets the default on a schema.
+func WithDefault(def any) SchemaOption {
+ return func(opts *schemaConfig) {
+ opts.def = def
+ }
+}
+
+// WithOrder sets the order on a schema.
+func WithOrder(order Order) SchemaOption {
+ return func(opts *schemaConfig) {
+ opts.order = order
+ }
+}
+
+// WithProps sets the properties on a schema.
+func WithProps(props map[string]any) SchemaOption {
+ return func(opts *schemaConfig) {
+ opts.props = props
+ }
+}
+
+func withWriterFingerprint(fp [32]byte) SchemaOption {
+ return func(opts *schemaConfig) {
+ opts.wfp = &fp
+ }
+}
+
+func withWriterFingerprintIfResolved(fp [32]byte, resolved bool) SchemaOption {
+ return func(opts *schemaConfig) {
+ if resolved {
+ opts.wfp = &fp
+ }
+ }
+}
+
+// PrimitiveSchema is an Avro primitive type schema.
+type PrimitiveSchema struct {
+ properties
+ fingerprinter
+ cacheFingerprinter
+
+ typ Type
+ logical LogicalSchema
+
+ // encodedType is the type of the encoded value, if it is different from the typ.
+ // It's only used in the context of write-read schema resolution.
+ encodedType Type
+}
+
+// NewPrimitiveSchema creates a new PrimitiveSchema.
+func NewPrimitiveSchema(t Type, l LogicalSchema, opts ...SchemaOption) *PrimitiveSchema {
+ var cfg schemaConfig
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+ reservedProps := primitiveReserved
+ if l != nil {
+ if l.Type() == Decimal {
+ reservedProps = primitiveWithDecimalTypeReserved
+ } else {
+ reservedProps = primitiveWithLogicalTypeReserved
+ }
+ }
+ return &PrimitiveSchema{
+ properties: newProperties(cfg.props, reservedProps),
+ cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
+ typ: t,
+ logical: l,
+ }
+}
+
+// Type returns the type of the schema.
+func (s *PrimitiveSchema) Type() Type {
+ return s.typ
+}
+
+// Logical returns the logical schema or nil.
+func (s *PrimitiveSchema) Logical() LogicalSchema {
+ return s.logical
+}
+
+// String returns the canonical form of the schema.
+func (s *PrimitiveSchema) String() string {
+ if s.logical == nil {
+ return `"` + string(s.typ) + `"`
+ }
+
+ return `{"type":"` + string(s.typ) + `",` + s.logical.String() + `}`
+}
+
+// MarshalJSON marshals the schema to json.
+func (s *PrimitiveSchema) MarshalJSON() ([]byte, error) {
+ if s.logical == nil && len(s.props) == 0 {
+ return jsoniter.Marshal(s.typ)
+ }
+
+ buf := new(bytes.Buffer)
+ buf.WriteString(`{"type":"` + string(s.typ) + `"`)
+ if s.logical != nil {
+ buf.WriteString(`,"logicalType":"` + string(s.logical.Type()) + `"`)
+ if d, ok := s.logical.(*DecimalLogicalSchema); ok {
+ buf.WriteString(`,"precision":` + strconv.Itoa(d.prec))
+ if d.scale > 0 {
+ buf.WriteString(`,"scale":` + strconv.Itoa(d.scale))
+ }
+ }
+ }
+ if err := s.marshalPropertiesToJSON(buf); err != nil {
+ return nil, err
+ }
+ buf.WriteString("}")
+ return buf.Bytes(), nil
+}
+
+// Fingerprint returns the SHA256 fingerprint of the schema.
+func (s *PrimitiveSchema) Fingerprint() [32]byte {
+ return s.fingerprinter.Fingerprint(s)
+}
+
+// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error.
+func (s *PrimitiveSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) {
+ return s.fingerprinter.FingerprintUsing(typ, s)
+}
+
+// CacheFingerprint returns unique identity of the schema.
+func (s *PrimitiveSchema) CacheFingerprint() [32]byte {
+ return s.cacheFingerprinter.CacheFingerprint(s, nil)
+}
+
+// RecordSchema is an Avro record type schema.
+type RecordSchema struct {
+ name
+ properties
+ fingerprinter
+ cacheFingerprinter
+ isError bool
+ fields []*Field
+ doc string
+}
+
+// NewRecordSchema creates a new record schema instance.
+func NewRecordSchema(name, namespace string, fields []*Field, opts ...SchemaOption) (*RecordSchema, error) {
+ var cfg schemaConfig
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ n, err := newName(name, namespace, cfg.aliases)
+ if err != nil {
+ return nil, err
+ }
+
+ return &RecordSchema{
+ name: n,
+ properties: newProperties(cfg.props, recordReserved),
+ cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
+ fields: fields,
+ doc: cfg.doc,
+ }, nil
+}
+
+// NewErrorRecordSchema creates a new error record schema instance.
+func NewErrorRecordSchema(name, namespace string, fields []*Field, opts ...SchemaOption) (*RecordSchema, error) {
+ rec, err := NewRecordSchema(name, namespace, fields, opts...)
+ if err != nil {
+ return nil, err
+ }
+
+ rec.isError = true
+
+ return rec, nil
+}
+
+// Type returns the type of the schema.
+func (s *RecordSchema) Type() Type {
+ return Record
+}
+
+// Doc returns the documentation of a record.
+func (s *RecordSchema) Doc() string {
+ return s.doc
+}
+
+// IsError determines is this is an error record.
+func (s *RecordSchema) IsError() bool {
+ return s.isError
+}
+
+// Fields returns the fields of a record.
+func (s *RecordSchema) Fields() []*Field {
+ return s.fields
+}
+
+// String returns the canonical form of the schema.
+func (s *RecordSchema) String() string {
+ typ := "record"
+ if s.isError {
+ typ = "error"
+ }
+
+ fields := ""
+ for _, f := range s.fields {
+ fields += f.String() + ","
+ }
+ if len(fields) > 0 {
+ fields = fields[:len(fields)-1]
+ }
+
+ return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}`
+}
+
+// MarshalJSON marshals the schema to json.
+func (s *RecordSchema) MarshalJSON() ([]byte, error) {
+ buf := new(bytes.Buffer)
+ buf.WriteString(`{"name":"` + s.full + `"`)
+ if len(s.aliases) > 0 {
+ aliasesJSON, err := jsoniter.Marshal(s.aliases)
+ if err != nil {
+ return nil, err
+ }
+ buf.WriteString(`,"aliases":`)
+ buf.Write(aliasesJSON)
+ }
+ if s.doc != "" {
+ docJSON, err := jsoniter.Marshal(s.doc)
+ if err != nil {
+ return nil, err
+ }
+ buf.WriteString(`,"doc":`)
+ buf.Write(docJSON)
+ }
+ if s.isError {
+ buf.WriteString(`,"type":"error"`)
+ } else {
+ buf.WriteString(`,"type":"record"`)
+ }
+ fieldsJSON, err := jsoniter.Marshal(s.fields)
+ if err != nil {
+ return nil, err
+ }
+ buf.WriteString(`,"fields":`)
+ buf.Write(fieldsJSON)
+ if err := s.marshalPropertiesToJSON(buf); err != nil {
+ return nil, err
+ }
+ buf.WriteString("}")
+ return buf.Bytes(), nil
+}
+
+// Fingerprint returns the SHA256 fingerprint of the schema.
+func (s *RecordSchema) Fingerprint() [32]byte {
+ return s.fingerprinter.Fingerprint(s)
+}
+
+// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error.
+func (s *RecordSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) {
+ return s.fingerprinter.FingerprintUsing(typ, s)
+}
+
+// CacheFingerprint returns unique identity of the schema.
+func (s *RecordSchema) CacheFingerprint() [32]byte {
+ return s.cacheFingerprinter.CacheFingerprint(s, func() []byte {
+ var defs []any
+ for _, field := range s.fields {
+ if !field.HasDefault() {
+ continue
+ }
+ defs = append(defs, field.Default())
+ }
+ b, _ := jsoniter.Marshal(defs)
+ return b
+ })
+}
+
+// Field is an Avro record type field.
+type Field struct {
+ properties
+
+ name string
+ aliases []string
+ doc string
+ typ Schema
+ hasDef bool
+ def any
+ order Order
+
+ // action mainly used when decoding data that lack the field for schema evolution purposes.
+ action Action
+ // encodedDef mainly used when decoding data that lack the field for schema evolution purposes.
+ // Its value remains empty unless the field's encodeDefault function is called.
+ encodedDef atomic.Value
+}
+
+type noDef struct{}
+
+// NoDefault is used when no default exists for a field.
+var NoDefault = noDef{}
+
+// NewField creates a new field instance.
+func NewField(name string, typ Schema, opts ...SchemaOption) (*Field, error) {
+ cfg := schemaConfig{def: NoDefault}
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ if err := validateName(name); err != nil {
+ return nil, err
+ }
+ for _, a := range cfg.aliases {
+ if err := validateName(a); err != nil {
+ return nil, err
+ }
+ }
+
+ switch cfg.order {
+ case "":
+ cfg.order = Asc
+ case Asc, Desc, Ignore:
+ default:
+ return nil, fmt.Errorf("avro: field %q order %q is invalid", name, cfg.order)
+ }
+
+ f := &Field{
+ properties: newProperties(cfg.props, fieldReserved),
+ name: name,
+ aliases: cfg.aliases,
+ doc: cfg.doc,
+ typ: typ,
+ order: cfg.order,
+ }
+
+ if cfg.def != NoDefault {
+ def, err := validateDefault(name, typ, cfg.def)
+ if err != nil {
+ return nil, err
+ }
+ f.def = def
+ f.hasDef = true
+ }
+
+ return f, nil
+}
+
+// Name returns the name of a field.
+func (f *Field) Name() string {
+ return f.name
+}
+
+// Aliases return the field aliases.
+func (f *Field) Aliases() []string {
+ return f.aliases
+}
+
+// Type returns the schema of a field.
+func (f *Field) Type() Schema {
+ return f.typ
+}
+
+// HasDefault determines if the field has a default value.
+func (f *Field) HasDefault() bool {
+ return f.hasDef
+}
+
+// Default returns the default of a field or nil.
+//
+// The only time a nil default is valid is for a Null Type.
+func (f *Field) Default() any {
+ if f.def == nullDefault {
+ return nil
+ }
+
+ return f.def
+}
+
+func (f *Field) encodeDefault(encode func(any) ([]byte, error)) ([]byte, error) {
+ if v := f.encodedDef.Load(); v != nil {
+ return v.([]byte), nil
+ }
+ if !f.hasDef {
+ return nil, fmt.Errorf("avro: '%s' field must have a non-empty default value", f.name)
+ }
+ if encode == nil {
+ return nil, fmt.Errorf("avro: failed to encode '%s' default value", f.name)
+ }
+ b, err := encode(f.Default())
+ if err != nil {
+ return nil, err
+ }
+ f.encodedDef.Store(b)
+
+ return b, nil
+}
+
+// Doc returns the documentation of a field.
+func (f *Field) Doc() string {
+ return f.doc
+}
+
+// Order returns the field order.
+func (f *Field) Order() Order {
+ return f.order
+}
+
+// String returns the canonical form of a field.
+func (f *Field) String() string {
+ return `{"name":"` + f.name + `","type":` + f.typ.String() + `}`
+}
+
+// MarshalJSON marshals the schema to json.
+func (f *Field) MarshalJSON() ([]byte, error) {
+ buf := new(bytes.Buffer)
+ buf.WriteString(`{"name":"` + f.name + `"`)
+ if len(f.aliases) > 0 {
+ aliasesJSON, err := jsoniter.Marshal(f.aliases)
+ if err != nil {
+ return nil, err
+ }
+ buf.WriteString(`,"aliases":`)
+ buf.Write(aliasesJSON)
+ }
+ if f.doc != "" {
+ docJSON, err := jsoniter.Marshal(f.doc)
+ if err != nil {
+ return nil, err
+ }
+ buf.WriteString(`,"doc":`)
+ buf.Write(docJSON)
+ }
+ typeJSON, err := jsoniter.Marshal(f.typ)
+ if err != nil {
+ return nil, err
+ }
+ buf.WriteString(`,"type":`)
+ buf.Write(typeJSON)
+ if f.hasDef {
+ defaultValueJSON, err := jsoniter.Marshal(f.Default())
+ if err != nil {
+ return nil, err
+ }
+ buf.WriteString(`,"default":`)
+ buf.Write(defaultValueJSON)
+ }
+ if f.order != "" && f.order != Asc {
+ buf.WriteString(`,"order":"` + string(f.order) + `"`)
+ }
+ if err := f.marshalPropertiesToJSON(buf); err != nil {
+ return nil, err
+ }
+ buf.WriteString("}")
+ return buf.Bytes(), nil
+}
+
+// EnumSchema is an Avro enum type schema.
+type EnumSchema struct {
+ name
+ properties
+ fingerprinter
+ cacheFingerprinter
+
+ symbols []string
+ def string
+ doc string
+
+ // encodedSymbols is the symbols of the encoded value.
+ // It's only used in the context of write-read schema resolution.
+ encodedSymbols []string
+}
+
+// NewEnumSchema creates a new enum schema instance.
+func NewEnumSchema(name, namespace string, symbols []string, opts ...SchemaOption) (*EnumSchema, error) {
+ var cfg schemaConfig
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ n, err := newName(name, namespace, cfg.aliases)
+ if err != nil {
+ return nil, err
+ }
+
+ if len(symbols) == 0 {
+ return nil, errors.New("avro: enum must have a non-empty array of symbols")
+ }
+ for _, sym := range symbols {
+ if err = validateName(sym); err != nil {
+ return nil, fmt.Errorf("avro: invalid symbol %q", sym)
+ }
+ }
+
+ var def string
+ if d, ok := cfg.def.(string); ok && d != "" {
+ if !hasSymbol(symbols, d) {
+ return nil, fmt.Errorf("avro: symbol default %q must be a symbol", d)
+ }
+ def = d
+ }
+
+ return &EnumSchema{
+ name: n,
+ properties: newProperties(cfg.props, enumReserved),
+ cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
+ symbols: symbols,
+ def: def,
+ doc: cfg.doc,
+ }, nil
+}
+
+func hasSymbol(symbols []string, sym string) bool {
+ for _, s := range symbols {
+ if s == sym {
+ return true
+ }
+ }
+ return false
+}
+
+// Type returns the type of the schema.
+func (s *EnumSchema) Type() Type {
+ return Enum
+}
+
+// Doc returns the schema doc.
+func (s *EnumSchema) Doc() string {
+ return s.doc
+}
+
+// Symbols returns the symbols of an enum.
+func (s *EnumSchema) Symbols() []string {
+ return s.symbols
+}
+
+// Symbol returns the symbol for the given index.
+// It might return the default value in the context of write-read schema resolution.
+func (s *EnumSchema) Symbol(i int) (string, bool) {
+ resolv := len(s.encodedSymbols) > 0
+ symbols := s.symbols
+ if resolv {
+ // A different set of symbols is encoded.
+ symbols = s.encodedSymbols
+ }
+
+ if i < 0 || i >= len(symbols) {
+ return "", false
+ }
+
+ symbol := symbols[i]
+ if resolv && !hasSymbol(s.symbols, symbol) {
+ if !s.HasDefault() {
+ return "", false
+ }
+ return s.Default(), true
+ }
+ return symbol, true
+}
+
+// Default returns the default of an enum or an empty string.
+func (s *EnumSchema) Default() string {
+ return s.def
+}
+
+// HasDefault determines if the schema has a default value.
+func (s *EnumSchema) HasDefault() bool {
+ return s.def != ""
+}
+
+// String returns the canonical form of the schema.
+func (s *EnumSchema) String() string {
+ symbols := ""
+ for _, sym := range s.symbols {
+ symbols += `"` + sym + `",`
+ }
+ if len(symbols) > 0 {
+ symbols = symbols[:len(symbols)-1]
+ }
+
+ return `{"name":"` + s.FullName() + `","type":"enum","symbols":[` + symbols + `]}`
+}
+
+// MarshalJSON marshals the schema to json.
+func (s *EnumSchema) MarshalJSON() ([]byte, error) {
+ buf := new(bytes.Buffer)
+ buf.WriteString(`{"name":"` + s.full + `"`)
+ if len(s.aliases) > 0 {
+ aliasesJSON, err := jsoniter.Marshal(s.aliases)
+ if err != nil {
+ return nil, err
+ }
+ buf.WriteString(`,"aliases":`)
+ buf.Write(aliasesJSON)
+ }
+ if s.doc != "" {
+ docJSON, err := jsoniter.Marshal(s.doc)
+ if err != nil {
+ return nil, err
+ }
+ buf.WriteString(`,"doc":`)
+ buf.Write(docJSON)
+ }
+ buf.WriteString(`,"type":"enum"`)
+ symbolsJSON, err := jsoniter.Marshal(s.symbols)
+ if err != nil {
+ return nil, err
+ }
+ buf.WriteString(`,"symbols":`)
+ buf.Write(symbolsJSON)
+ if s.def != "" {
+ buf.WriteString(`,"default":"` + s.def + `"`)
+ }
+ if err := s.marshalPropertiesToJSON(buf); err != nil {
+ return nil, err
+ }
+ buf.WriteString("}")
+ return buf.Bytes(), nil
+}
+
+// Fingerprint returns the SHA256 fingerprint of the schema.
+func (s *EnumSchema) Fingerprint() [32]byte {
+ return s.fingerprinter.Fingerprint(s)
+}
+
+// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error.
+func (s *EnumSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) {
+ return s.fingerprinter.FingerprintUsing(typ, s)
+}
+
+// CacheFingerprint returns unique identity of the schema.
+func (s *EnumSchema) CacheFingerprint() [32]byte {
+ return s.cacheFingerprinter.CacheFingerprint(s, func() []byte {
+ if !s.HasDefault() {
+ return []byte{}
+ }
+ return []byte(s.Default())
+ })
+}
+
+// ArraySchema is an Avro array type schema.
+type ArraySchema struct {
+ properties
+ fingerprinter
+ cacheFingerprinter
+
+ items Schema
+}
+
+// NewArraySchema creates an array schema instance.
+func NewArraySchema(items Schema, opts ...SchemaOption) *ArraySchema {
+ var cfg schemaConfig
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ return &ArraySchema{
+ properties: newProperties(cfg.props, arrayReserved),
+ cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
+ items: items,
+ }
+}
+
+// Type returns the type of the schema.
+func (s *ArraySchema) Type() Type {
+ return Array
+}
+
+// Items returns the items schema of an array.
+func (s *ArraySchema) Items() Schema {
+ return s.items
+}
+
+// String returns the canonical form of the schema.
+func (s *ArraySchema) String() string {
+ return `{"type":"array","items":` + s.items.String() + `}`
+}
+
+// MarshalJSON marshals the schema to json.
+func (s *ArraySchema) MarshalJSON() ([]byte, error) {
+ buf := new(bytes.Buffer)
+ buf.WriteString(`{"type":"array"`)
+ itemsJSON, err := jsoniter.Marshal(s.items)
+ if err != nil {
+ return nil, err
+ }
+ buf.WriteString(`,"items":`)
+ buf.Write(itemsJSON)
+ if err = s.marshalPropertiesToJSON(buf); err != nil {
+ return nil, err
+ }
+ buf.WriteString("}")
+ return buf.Bytes(), nil
+}
+
+// Fingerprint returns the SHA256 fingerprint of the schema.
+func (s *ArraySchema) Fingerprint() [32]byte {
+ return s.fingerprinter.Fingerprint(s)
+}
+
+// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error.
+func (s *ArraySchema) FingerprintUsing(typ FingerprintType) ([]byte, error) {
+ return s.fingerprinter.FingerprintUsing(typ, s)
+}
+
+// CacheFingerprint returns unique identity of the schema.
+func (s *ArraySchema) CacheFingerprint() [32]byte {
+ return s.cacheFingerprinter.CacheFingerprint(s, nil)
+}
+
+// MapSchema is an Avro map type schema.
+type MapSchema struct {
+ properties
+ fingerprinter
+ cacheFingerprinter
+
+ values Schema
+}
+
+// NewMapSchema creates a map schema instance.
+func NewMapSchema(values Schema, opts ...SchemaOption) *MapSchema {
+ var cfg schemaConfig
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ return &MapSchema{
+ properties: newProperties(cfg.props, mapReserved),
+ cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
+ values: values,
+ }
+}
+
+// Type returns the type of the schema.
+func (s *MapSchema) Type() Type {
+ return Map
+}
+
+// Values returns the values schema of a map.
+func (s *MapSchema) Values() Schema {
+ return s.values
+}
+
+// String returns the canonical form of the schema.
+func (s *MapSchema) String() string {
+ return `{"type":"map","values":` + s.values.String() + `}`
+}
+
+// MarshalJSON marshals the schema to json.
+func (s *MapSchema) MarshalJSON() ([]byte, error) {
+ buf := new(bytes.Buffer)
+ buf.WriteString(`{"type":"map"`)
+ valuesJSON, err := jsoniter.Marshal(s.values)
+ if err != nil {
+ return nil, err
+ }
+ buf.WriteString(`,"values":`)
+ buf.Write(valuesJSON)
+ if err := s.marshalPropertiesToJSON(buf); err != nil {
+ return nil, err
+ }
+ buf.WriteString("}")
+ return buf.Bytes(), nil
+}
+
+// Fingerprint returns the SHA256 fingerprint of the schema.
+func (s *MapSchema) Fingerprint() [32]byte {
+ return s.fingerprinter.Fingerprint(s)
+}
+
+// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error.
+func (s *MapSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) {
+ return s.fingerprinter.FingerprintUsing(typ, s)
+}
+
+// CacheFingerprint returns unique identity of the schema.
+func (s *MapSchema) CacheFingerprint() [32]byte {
+ return s.cacheFingerprinter.CacheFingerprint(s, nil)
+}
+
+// UnionSchema is an Avro union type schema.
+type UnionSchema struct {
+ fingerprinter
+ cacheFingerprinter
+
+ types Schemas
+}
+
+// NewUnionSchema creates a union schema instance.
+func NewUnionSchema(types []Schema, opts ...SchemaOption) (*UnionSchema, error) {
+ var cfg schemaConfig
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ seen := map[string]bool{}
+ for _, schema := range types {
+ if schema.Type() == Union {
+ return nil, errors.New("avro: union type cannot be a union")
+ }
+
+ strType := schemaTypeName(schema)
+
+ if seen[strType] {
+ return nil, errors.New("avro: union type must be unique")
+ }
+ seen[strType] = true
+ }
+
+ return &UnionSchema{
+ cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
+ types: types,
+ }, nil
+}
+
+// Type returns the type of the schema.
+func (s *UnionSchema) Type() Type {
+ return Union
+}
+
+// Types returns the types of a union.
+func (s *UnionSchema) Types() Schemas {
+ return s.types
+}
+
+// Nullable returns the Schema if the union is nullable, otherwise nil.
+func (s *UnionSchema) Nullable() bool {
+ if len(s.types) != 2 || s.types[0].Type() != Null && s.types[1].Type() != Null {
+ return false
+ }
+
+ return true
+}
+
+// Indices returns the index of the null and type schemas for a
+// nullable schema. For non-nullable schemas 0 is returned for
+// both.
+func (s *UnionSchema) Indices() (null, typ int) {
+ if !s.Nullable() {
+ return 0, 0
+ }
+ if s.types[0].Type() == Null {
+ return 0, 1
+ }
+ return 1, 0
+}
+
+// String returns the canonical form of the schema.
+func (s *UnionSchema) String() string {
+ types := ""
+ for _, typ := range s.types {
+ types += typ.String() + ","
+ }
+ if len(types) > 0 {
+ types = types[:len(types)-1]
+ }
+
+ return `[` + types + `]`
+}
+
+// MarshalJSON marshals the schema to json.
+func (s *UnionSchema) MarshalJSON() ([]byte, error) {
+ return jsoniter.Marshal(s.types)
+}
+
+// Fingerprint returns the SHA256 fingerprint of the schema.
+func (s *UnionSchema) Fingerprint() [32]byte {
+ return s.fingerprinter.Fingerprint(s)
+}
+
+// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error.
+func (s *UnionSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) {
+ return s.fingerprinter.FingerprintUsing(typ, s)
+}
+
+// CacheFingerprint returns unique identity of the schema.
+func (s *UnionSchema) CacheFingerprint() [32]byte {
+ return s.cacheFingerprinter.CacheFingerprint(s, nil)
+}
+
+// FixedSchema is an Avro fixed type schema.
+type FixedSchema struct {
+ name
+ properties
+ fingerprinter
+ cacheFingerprinter
+
+ size int
+ logical LogicalSchema
+}
+
+// NewFixedSchema creates a new fixed schema instance.
+func NewFixedSchema(
+ name, namespace string,
+ size int,
+ logical LogicalSchema,
+ opts ...SchemaOption,
+) (*FixedSchema, error) {
+ var cfg schemaConfig
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ n, err := newName(name, namespace, cfg.aliases)
+ if err != nil {
+ return nil, err
+ }
+
+ reservedProps := fixedReserved
+ if logical != nil {
+ if logical.Type() == Decimal {
+ reservedProps = fixedWithDecimalTypeReserved
+ } else {
+ reservedProps = fixedWithLogicalTypeReserved
+ }
+ }
+ return &FixedSchema{
+ name: n,
+ properties: newProperties(cfg.props, reservedProps),
+ cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
+ size: size,
+ logical: logical,
+ }, nil
+}
+
+// Type returns the type of the schema.
+func (s *FixedSchema) Type() Type {
+ return Fixed
+}
+
+// Size returns the number of bytes of the fixed schema.
+func (s *FixedSchema) Size() int {
+ return s.size
+}
+
+// Logical returns the logical schema or nil.
+func (s *FixedSchema) Logical() LogicalSchema {
+ return s.logical
+}
+
+// String returns the canonical form of the schema.
+func (s *FixedSchema) String() string {
+ size := strconv.Itoa(s.size)
+
+ var logical string
+ if s.logical != nil {
+ logical = "," + s.logical.String()
+ }
+
+ return `{"name":"` + s.FullName() + `","type":"fixed","size":` + size + logical + `}`
+}
+
+// MarshalJSON marshals the schema to json.
+func (s *FixedSchema) MarshalJSON() ([]byte, error) {
+ buf := new(bytes.Buffer)
+ buf.WriteString(`{"name":"` + s.full + `"`)
+ if len(s.aliases) > 0 {
+ aliasesJSON, err := jsoniter.Marshal(s.aliases)
+ if err != nil {
+ return nil, err
+ }
+ buf.WriteString(`,"aliases":`)
+ buf.Write(aliasesJSON)
+ }
+ buf.WriteString(`,"type":"fixed"`)
+ buf.WriteString(`,"size":` + strconv.Itoa(s.size))
+ if s.logical != nil {
+ buf.WriteString(`,"logicalType":"` + string(s.logical.Type()) + `"`)
+ if d, ok := s.logical.(*DecimalLogicalSchema); ok {
+ buf.WriteString(`,"precision":` + strconv.Itoa(d.prec))
+ if d.scale > 0 {
+ buf.WriteString(`,"scale":` + strconv.Itoa(d.scale))
+ }
+ }
+ }
+ if err := s.marshalPropertiesToJSON(buf); err != nil {
+ return nil, err
+ }
+ buf.WriteString("}")
+ return buf.Bytes(), nil
+}
+
+// Fingerprint returns the SHA256 fingerprint of the schema.
+func (s *FixedSchema) Fingerprint() [32]byte {
+ return s.fingerprinter.Fingerprint(s)
+}
+
+// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error.
+func (s *FixedSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) {
+ return s.fingerprinter.FingerprintUsing(typ, s)
+}
+
+// CacheFingerprint returns unique identity of the schema.
+func (s *FixedSchema) CacheFingerprint() [32]byte {
+ return s.cacheFingerprinter.CacheFingerprint(s, nil)
+}
+
+// NullSchema is an Avro null type schema.
+type NullSchema struct {
+ properties
+ fingerprinter
+}
+
+// NewNullSchema creates a new NullSchema.
+func NewNullSchema(opts ...SchemaOption) *NullSchema {
+ var cfg schemaConfig
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ return &NullSchema{
+ properties: newProperties(cfg.props, primitiveReserved),
+ }
+}
+
+// Type returns the type of the schema.
+func (s *NullSchema) Type() Type {
+ return Null
+}
+
+// String returns the canonical form of the schema.
+func (s *NullSchema) String() string {
+ return `"null"`
+}
+
+// MarshalJSON marshals the schema to json.
+func (s *NullSchema) MarshalJSON() ([]byte, error) {
+ if len(s.props) == 0 {
+ return []byte(`"null"`), nil
+ }
+ buf := new(bytes.Buffer)
+ buf.WriteString(`{"type":"null"`)
+ if err := s.marshalPropertiesToJSON(buf); err != nil {
+ return nil, err
+ }
+ buf.WriteString("}")
+ return buf.Bytes(), nil
+}
+
+// Fingerprint returns the SHA256 fingerprint of the schema.
+func (s *NullSchema) Fingerprint() [32]byte {
+ return s.fingerprinter.Fingerprint(s)
+}
+
+// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error.
+func (s *NullSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) {
+ return s.fingerprinter.FingerprintUsing(typ, s)
+}
+
+// CacheFingerprint returns unique identity of the schema.
+func (s *NullSchema) CacheFingerprint() [32]byte {
+ return s.Fingerprint()
+}
+
+// RefSchema is a reference to a named Avro schema.
+type RefSchema struct {
+ actual NamedSchema
+}
+
+// NewRefSchema creates a ref schema instance.
+func NewRefSchema(schema NamedSchema) *RefSchema {
+ return &RefSchema{
+ actual: schema,
+ }
+}
+
+// Type returns the type of the schema.
+func (s *RefSchema) Type() Type {
+ return Ref
+}
+
+// Schema returns the schema being referenced.
+func (s *RefSchema) Schema() NamedSchema {
+ return s.actual
+}
+
+// String returns the canonical form of the schema.
+func (s *RefSchema) String() string {
+ return `"` + s.actual.FullName() + `"`
+}
+
+// MarshalJSON marshals the schema to json.
+func (s *RefSchema) MarshalJSON() ([]byte, error) {
+ return []byte(`"` + s.actual.FullName() + `"`), nil
+}
+
+// Fingerprint returns the SHA256 fingerprint of the schema.
+func (s *RefSchema) Fingerprint() [32]byte {
+ return s.actual.Fingerprint()
+}
+
+// FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error.
+func (s *RefSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) {
+ return s.actual.FingerprintUsing(typ)
+}
+
+// CacheFingerprint returns unique identity of the schema.
+func (s *RefSchema) CacheFingerprint() [32]byte {
+ return s.actual.CacheFingerprint()
+}
+
+// PrimitiveLogicalSchema is a logical type with no properties.
+type PrimitiveLogicalSchema struct {
+ typ LogicalType
+}
+
+// NewPrimitiveLogicalSchema creates a new primitive logical schema instance.
+func NewPrimitiveLogicalSchema(typ LogicalType) *PrimitiveLogicalSchema {
+ return &PrimitiveLogicalSchema{
+ typ: typ,
+ }
+}
+
+// Type returns the type of the logical schema.
+func (s *PrimitiveLogicalSchema) Type() LogicalType {
+ return s.typ
+}
+
+// String returns the canonical form of the logical schema.
+func (s *PrimitiveLogicalSchema) String() string {
+ return `"logicalType":"` + string(s.typ) + `"`
+}
+
+// DecimalLogicalSchema is a decimal logical type.
+type DecimalLogicalSchema struct {
+ prec int
+ scale int
+}
+
+// NewDecimalLogicalSchema creates a new decimal logical schema instance.
+func NewDecimalLogicalSchema(prec, scale int) *DecimalLogicalSchema {
+ return &DecimalLogicalSchema{
+ prec: prec,
+ scale: scale,
+ }
+}
+
+// Type returns the type of the logical schema.
+func (s *DecimalLogicalSchema) Type() LogicalType {
+ return Decimal
+}
+
+// Precision returns the precision of the decimal logical schema.
+func (s *DecimalLogicalSchema) Precision() int {
+ return s.prec
+}
+
+// Scale returns the scale of the decimal logical schema.
+func (s *DecimalLogicalSchema) Scale() int {
+ return s.scale
+}
+
+// String returns the canonical form of the logical schema.
+func (s *DecimalLogicalSchema) String() string {
+ var scale string
+ if s.scale > 0 {
+ scale = `,"scale":` + strconv.Itoa(s.scale)
+ }
+ precision := strconv.Itoa(s.prec)
+
+ return `"logicalType":"` + string(Decimal) + `","precision":` + precision + scale
+}
+
+func invalidNameFirstChar(r rune) bool {
+ return (r < 'A' || r > 'Z') && (r < 'a' || r > 'z') && r != '_'
+}
+
+func invalidNameOtherChar(r rune) bool {
+ return invalidNameFirstChar(r) && (r < '0' || r > '9')
+}
+
+func validateName(name string) error {
+ if name == "" {
+ return errors.New("name must be a non-empty")
+ }
+
+ if SkipNameValidation {
+ return nil
+ }
+
+ if strings.IndexFunc(name[:1], invalidNameFirstChar) > -1 {
+ return fmt.Errorf("invalid name %s", name)
+ }
+ if strings.IndexFunc(name[1:], invalidNameOtherChar) > -1 {
+ return fmt.Errorf("invalid name %s", name)
+ }
+
+ return nil
+}
+
+func validateDefault(name string, schema Schema, def any) (any, error) {
+ def, ok := isValidDefault(schema, def)
+ if !ok {
+ return nil, fmt.Errorf("avro: invalid default for field %s. %+v not a %s", name, def, schema.Type())
+ }
+ return def, nil
+}
+
+func isValidDefault(schema Schema, def any) (any, bool) {
+ switch schema.Type() {
+ case Ref:
+ ref := schema.(*RefSchema)
+ return isValidDefault(ref.Schema(), def)
+ case Null:
+ return nullDefault, def == nil
+ case Enum:
+ v, ok := def.(string)
+ if !ok || len(v) == 0 {
+ return def, false
+ }
+
+ var found bool
+ for _, sym := range schema.(*EnumSchema).symbols {
+ if def == sym {
+ found = true
+ break
+ }
+ }
+ return def, found
+ case String:
+ if _, ok := def.(string); ok {
+ return def, true
+ }
+ case Bytes, Fixed:
+ // Spec: Default values for bytes and fixed fields are JSON strings,
+ // where Unicode code points 0-255 are mapped to unsigned 8-bit byte values 0-255.
+ if d, ok := def.(string); ok {
+ if b, ok := isValidDefaultBytes(d); ok {
+ if schema.Type() == Fixed {
+ return byteSliceToArray(b, schema.(*FixedSchema).Size()), true
+ }
+ return b, true
+ }
+ }
+ case Boolean:
+ if _, ok := def.(bool); ok {
+ return def, true
+ }
+ case Int:
+ if i, ok := def.(int8); ok {
+ return int(i), true
+ }
+ if i, ok := def.(int16); ok {
+ return int(i), true
+ }
+ if i, ok := def.(int32); ok {
+ return int(i), true
+ }
+ if _, ok := def.(int); ok {
+ return def, true
+ }
+ if f, ok := def.(float64); ok {
+ return int(f), true
+ }
+ case Long:
+ if _, ok := def.(int64); ok {
+ return def, true
+ }
+ if f, ok := def.(float64); ok {
+ return int64(f), true
+ }
+ case Float:
+ if _, ok := def.(float32); ok {
+ return def, true
+ }
+ if f, ok := def.(float64); ok {
+ return float32(f), true
+ }
+ case Double:
+ if _, ok := def.(float64); ok {
+ return def, true
+ }
+ case Array:
+ arr, ok := def.([]any)
+ if !ok {
+ return nil, false
+ }
+
+ as := schema.(*ArraySchema)
+ for i, v := range arr {
+ v, ok := isValidDefault(as.Items(), v)
+ if !ok {
+ return nil, false
+ }
+ arr[i] = v
+ }
+ return arr, true
+ case Map:
+ m, ok := def.(map[string]any)
+ if !ok {
+ return nil, false
+ }
+
+ ms := schema.(*MapSchema)
+ for k, v := range m {
+ v, ok := isValidDefault(ms.Values(), v)
+ if !ok {
+ return nil, false
+ }
+
+ m[k] = v
+ }
+ return m, true
+ case Union:
+ unionSchema := schema.(*UnionSchema)
+ return isValidDefault(unionSchema.Types()[0], def)
+ case Record:
+ m, ok := def.(map[string]any)
+ if !ok {
+ return nil, false
+ }
+
+ for _, field := range schema.(*RecordSchema).Fields() {
+ fieldDef := field.Default()
+ if newDef, ok := m[field.Name()]; ok {
+ fieldDef = newDef
+ }
+
+ v, ok := isValidDefault(field.Type(), fieldDef)
+ if !ok {
+ return nil, false
+ }
+
+ m[field.Name()] = v
+ }
+ return m, true
+ }
+ return nil, false
+}
+
+func schemaTypeName(schema Schema) string {
+ if schema.Type() == Ref {
+ schema = schema.(*RefSchema).Schema()
+ }
+
+ if n, ok := schema.(NamedSchema); ok {
+ return n.FullName()
+ }
+
+ sname := string(schema.Type())
+ if lt := getLogicalType(schema); lt != "" {
+ sname += "." + string(lt)
+ }
+ return sname
+}
+
+func isValidDefaultBytes(def string) ([]byte, bool) {
+ runes := []rune(def)
+ l := len(runes)
+ b := make([]byte, l)
+ for i := range l {
+ if runes[i] < 0 || runes[i] > 255 {
+ return nil, false
+ }
+ b[i] = byte(runes[i])
+ }
+ return b, true
+}
diff --git a/vendor/github.com/hamba/avro/v2/schema_compatibility.go b/vendor/github.com/hamba/avro/v2/schema_compatibility.go
new file mode 100644
index 0000000..0b1d9ac
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/schema_compatibility.go
@@ -0,0 +1,487 @@
+package avro
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+)
+
+type recursionError struct{}
+
+func (e recursionError) Error() string {
+ return ""
+}
+
+type compatKey struct {
+ reader [32]byte
+ writer [32]byte
+}
+
+// SchemaCompatibility determines the compatibility of schemas.
+type SchemaCompatibility struct {
+ cache sync.Map // map[compatKey]error
+}
+
+// NewSchemaCompatibility creates a new schema compatibility instance.
+func NewSchemaCompatibility() *SchemaCompatibility {
+ return &SchemaCompatibility{}
+}
+
+// Compatible determines the compatibility if the reader and writer schemas.
+func (c *SchemaCompatibility) Compatible(reader, writer Schema) error {
+ return c.compatible(reader, writer)
+}
+
+func (c *SchemaCompatibility) compatible(reader, writer Schema) error {
+ key := compatKey{reader: reader.Fingerprint(), writer: writer.Fingerprint()}
+ if err, ok := c.cache.Load(key); ok {
+ if _, ok := err.(recursionError); ok {
+ // Break the recursion here.
+ return nil
+ }
+
+ if err == nil {
+ return nil
+ }
+
+ return err.(error)
+ }
+
+ c.cache.Store(key, recursionError{})
+ err := c.match(reader, writer)
+ if err != nil {
+ // We dont want to pay the cost of fmt.Errorf every time
+ err = errors.New(err.Error())
+ }
+ c.cache.Store(key, err)
+ return err
+}
+
+func (c *SchemaCompatibility) match(reader, writer Schema) error {
+ // If the schema is a reference, get the actual schema
+ if reader.Type() == Ref {
+ reader = reader.(*RefSchema).Schema()
+ }
+ if writer.Type() == Ref {
+ writer = writer.(*RefSchema).Schema()
+ }
+
+ if reader.Type() != writer.Type() {
+ if writer.Type() == Union {
+ // Reader must be compatible with all types in writer
+ for _, schema := range writer.(*UnionSchema).Types() {
+ if err := c.compatible(reader, schema); err != nil {
+ return err
+ }
+ }
+
+ return nil
+ }
+
+ if reader.Type() == Union {
+ // Writer must be compatible with at least one reader schema
+ var err error
+ for _, schema := range reader.(*UnionSchema).Types() {
+ err = c.compatible(schema, writer)
+ if err == nil {
+ return nil
+ }
+ }
+
+ return fmt.Errorf("reader union lacking writer schema %s", writer.Type())
+ }
+
+ switch writer.Type() {
+ case Int:
+ if reader.Type() == Long || reader.Type() == Float || reader.Type() == Double {
+ return nil
+ }
+
+ case Long:
+ if reader.Type() == Float || reader.Type() == Double {
+ return nil
+ }
+
+ case Float:
+ if reader.Type() == Double {
+ return nil
+ }
+
+ case String:
+ if reader.Type() == Bytes {
+ return nil
+ }
+
+ case Bytes:
+ if reader.Type() == String {
+ return nil
+ }
+ }
+
+ return fmt.Errorf("reader schema %s not compatible with writer schema %s", reader.Type(), writer.Type())
+ }
+
+ switch reader.Type() {
+ case Array:
+ return c.compatible(reader.(*ArraySchema).Items(), writer.(*ArraySchema).Items())
+
+ case Map:
+ return c.compatible(reader.(*MapSchema).Values(), writer.(*MapSchema).Values())
+
+ case Fixed:
+ r := reader.(*FixedSchema)
+ w := writer.(*FixedSchema)
+
+ if err := c.checkSchemaName(r, w); err != nil {
+ return err
+ }
+
+ if err := c.checkFixedSize(r, w); err != nil {
+ return err
+ }
+
+ case Enum:
+ r := reader.(*EnumSchema)
+ w := writer.(*EnumSchema)
+
+ if err := c.checkSchemaName(r, w); err != nil {
+ return err
+ }
+
+ if err := c.checkEnumSymbols(r, w); err != nil {
+ if r.HasDefault() {
+ return nil
+ }
+ return err
+ }
+
+ case Record:
+ r := reader.(*RecordSchema)
+ w := writer.(*RecordSchema)
+
+ if err := c.checkSchemaName(r, w); err != nil {
+ return err
+ }
+
+ if err := c.checkRecordFields(r, w); err != nil {
+ return err
+ }
+
+ case Union:
+ for _, schema := range writer.(*UnionSchema).Types() {
+ if err := c.compatible(reader, schema); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+func (c *SchemaCompatibility) checkSchemaName(reader, writer NamedSchema) error {
+ if reader.Name() != writer.Name() {
+ if c.contains(reader.Aliases(), writer.FullName()) {
+ return nil
+ }
+ return fmt.Errorf("reader schema %s and writer schema %s names do not match", reader.FullName(), writer.FullName())
+ }
+
+ return nil
+}
+
+func (c *SchemaCompatibility) checkFixedSize(reader, writer *FixedSchema) error {
+ if reader.Size() != writer.Size() {
+ return fmt.Errorf("%s reader and writer fixed sizes do not match", reader.FullName())
+ }
+
+ return nil
+}
+
+func (c *SchemaCompatibility) checkEnumSymbols(reader, writer *EnumSchema) error {
+ for _, symbol := range writer.Symbols() {
+ if !c.contains(reader.Symbols(), symbol) {
+ return fmt.Errorf("reader %s is missing symbol %s", reader.FullName(), symbol)
+ }
+ }
+
+ return nil
+}
+
+func (c *SchemaCompatibility) checkRecordFields(reader, writer *RecordSchema) error {
+ for _, field := range reader.Fields() {
+ f, ok := c.getField(writer.Fields(), field, func(gfo *getFieldOptions) {
+ gfo.fieldAlias = true
+ })
+ if !ok {
+ if field.HasDefault() {
+ continue
+ }
+
+ return fmt.Errorf("reader field %s is missing in writer schema and has no default", field.Name())
+ }
+
+ if err := c.compatible(field.Type(), f.Type()); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (c *SchemaCompatibility) contains(a []string, s string) bool {
+ for _, str := range a {
+ if str == s {
+ return true
+ }
+ }
+
+ return false
+}
+
+type getFieldOptions struct {
+ fieldAlias bool
+ elemAlias bool
+}
+
+func (c *SchemaCompatibility) getField(a []*Field, f *Field, optFns ...func(*getFieldOptions)) (*Field, bool) {
+ opt := getFieldOptions{}
+ for _, fn := range optFns {
+ fn(&opt)
+ }
+ for _, field := range a {
+ if field.Name() == f.Name() {
+ return field, true
+ }
+ if opt.fieldAlias {
+ if c.contains(f.Aliases(), field.Name()) {
+ return field, true
+ }
+ }
+ if opt.elemAlias {
+ if c.contains(field.Aliases(), f.Name()) {
+ return field, true
+ }
+ }
+ }
+
+ return nil, false
+}
+
+// Resolve returns a composite schema that allows decoding data written by the writer schema,
+// and makes necessary adjustments to support the reader schema.
+//
+// It fails if the writer and reader schemas are not compatible.
+func (c *SchemaCompatibility) Resolve(reader, writer Schema) (Schema, error) {
+ if err := c.compatible(reader, writer); err != nil {
+ return nil, err
+ }
+
+ schema, _, err := c.resolve(reader, writer)
+ return schema, err
+}
+
+// resolve requires the reader's schema to be already compatible with the writer's.
+func (c *SchemaCompatibility) resolve(reader, writer Schema) (schema Schema, resolved bool, err error) {
+ if reader.Type() == Ref {
+ reader = reader.(*RefSchema).Schema()
+ }
+ if writer.Type() == Ref {
+ writer = writer.(*RefSchema).Schema()
+ }
+
+ if writer.Type() != reader.Type() {
+ if reader.Type() == Union {
+ for _, schema := range reader.(*UnionSchema).Types() {
+ // Compatibility is not guaranteed for every Union reader schema.
+ // Therefore, we need to check compatibility in every iteration.
+ if err := c.compatible(schema, writer); err != nil {
+ continue
+ }
+ sch, _, err := c.resolve(schema, writer)
+ if err != nil {
+ continue
+ }
+ return sch, true, nil
+ }
+
+ return nil, false, fmt.Errorf("reader union lacking writer schema %s", writer.Type())
+ }
+
+ if writer.Type() == Union {
+ schemas := make([]Schema, 0)
+ for _, schema := range writer.(*UnionSchema).Types() {
+ sch, _, err := c.resolve(reader, schema)
+ if err != nil {
+ return nil, false, err
+ }
+ schemas = append(schemas, sch)
+ }
+ s, err := NewUnionSchema(schemas, withWriterFingerprint(writer.Fingerprint()))
+ return s, true, err
+ }
+
+ if isPromotable(writer.Type(), reader.Type()) {
+ r := NewPrimitiveSchema(reader.Type(), reader.(*PrimitiveSchema).Logical(),
+ withWriterFingerprint(writer.Fingerprint()),
+ )
+ r.encodedType = writer.Type()
+ return r, true, nil
+ }
+
+ return nil, false, fmt.Errorf("failed to resolve composite schema for %s and %s", reader.Type(), writer.Type())
+ }
+
+ if isNative(writer.Type()) {
+ return reader, false, nil
+ }
+
+ if writer.Type() == Enum {
+ r := reader.(*EnumSchema)
+ w := writer.(*EnumSchema)
+ if err = c.checkEnumSymbols(r, w); err != nil {
+ if r.HasDefault() {
+ enum, _ := NewEnumSchema(r.Name(), r.Namespace(), r.Symbols(),
+ WithAliases(r.Aliases()),
+ WithDefault(r.Default()),
+ withWriterFingerprint(w.Fingerprint()),
+ )
+ enum.encodedSymbols = w.Symbols()
+ return enum, true, nil
+ }
+
+ return nil, false, err
+ }
+ return reader, false, nil
+ }
+
+ if writer.Type() == Fixed {
+ return reader, false, nil
+ }
+
+ if writer.Type() == Union {
+ schemas := make([]Schema, 0)
+ for _, s := range writer.(*UnionSchema).Types() {
+ sch, resolv, err := c.resolve(reader, s)
+ if err != nil {
+ return nil, false, err
+ }
+ schemas = append(schemas, sch)
+ resolved = resolv || resolved
+ }
+ s, err := NewUnionSchema(schemas, withWriterFingerprintIfResolved(writer.Fingerprint(), resolved))
+ if err != nil {
+ return nil, false, err
+ }
+ return s, resolved, nil
+ }
+
+ if writer.Type() == Array {
+ schema, resolved, err = c.resolve(reader.(*ArraySchema).Items(), writer.(*ArraySchema).Items())
+ if err != nil {
+ return nil, false, err
+ }
+ return NewArraySchema(schema, withWriterFingerprintIfResolved(writer.Fingerprint(), resolved)), resolved, nil
+ }
+
+ if writer.Type() == Map {
+ schema, resolved, err = c.resolve(reader.(*MapSchema).Values(), writer.(*MapSchema).Values())
+ if err != nil {
+ return nil, false, err
+ }
+ return NewMapSchema(schema, withWriterFingerprintIfResolved(writer.Fingerprint(), resolved)), resolved, nil
+ }
+
+ if writer.Type() == Record {
+ return c.resolveRecord(reader, writer)
+ }
+
+ return nil, false, fmt.Errorf("failed to resolve composite schema for %s and %s", reader.Type(), writer.Type())
+}
+
+func (c *SchemaCompatibility) resolveRecord(reader, writer Schema) (Schema, bool, error) {
+ w := writer.(*RecordSchema)
+ r := reader.(*RecordSchema)
+
+ fields := make([]*Field, 0)
+ seen := make(map[string]struct{})
+
+ var resolved bool
+ for _, wf := range w.Fields() {
+ rf, ok := c.getField(r.Fields(), wf, func(gfo *getFieldOptions) {
+ gfo.elemAlias = true
+ })
+ if !ok {
+ // The field was not found in the reader schema, it should be ignored.
+ f, _ := NewField(wf.Name(), wf.Type(), WithAliases(wf.aliases), WithOrder(wf.order))
+ f.def = wf.def
+ f.hasDef = wf.hasDef
+ f.action = FieldIgnore
+ fields = append(fields, f)
+
+ resolved = true
+ continue
+ }
+
+ ft, resolv, err := c.resolve(rf.Type(), wf.Type())
+ if err != nil {
+ return nil, false, err
+ }
+ f, _ := NewField(rf.Name(), ft, WithAliases(rf.aliases), WithOrder(rf.order))
+ f.def = rf.def
+ f.hasDef = rf.hasDef
+ fields = append(fields, f)
+ resolved = resolv || resolved
+
+ seen[rf.Name()] = struct{}{}
+ }
+
+ for _, rf := range r.Fields() {
+ if _, ok := seen[rf.Name()]; ok {
+ // This field has already been seen.
+ continue
+ }
+
+ // The schemas are already known to be compatible, so there must be a default on
+ // the field in the writer. Use the default.
+
+ f, _ := NewField(rf.Name(), rf.Type(), WithAliases(rf.aliases), WithOrder(rf.order))
+ f.def = rf.def
+ f.hasDef = rf.hasDef
+ f.action = FieldSetDefault
+ fields = append(fields, f)
+
+ resolved = true
+ }
+
+ schema, err := NewRecordSchema(r.Name(), r.Namespace(), fields,
+ WithAliases(r.Aliases()),
+ withWriterFingerprintIfResolved(writer.Fingerprint(), resolved),
+ )
+ return schema, resolved, err
+}
+
+func isNative(typ Type) bool {
+ switch typ {
+ case Null, Boolean, Int, Long, Float, Double, Bytes, String:
+ return true
+ default:
+ return false
+ }
+}
+
+func isPromotable(writerTyp, readerType Type) bool {
+ switch writerTyp {
+ case Int:
+ return readerType == Long || readerType == Float || readerType == Double
+ case Long:
+ return readerType == Float || readerType == Double
+ case Float:
+ return readerType == Double
+ case String:
+ return readerType == Bytes
+ case Bytes:
+ return readerType == String
+ default:
+ return false
+ }
+}
diff --git a/vendor/github.com/hamba/avro/v2/schema_parse.go b/vendor/github.com/hamba/avro/v2/schema_parse.go
new file mode 100644
index 0000000..630f202
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/schema_parse.go
@@ -0,0 +1,643 @@
+package avro
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "os"
+ "path/filepath"
+ "strings"
+
+ jsoniter "github.com/json-iterator/go"
+ "github.com/mitchellh/mapstructure"
+)
+
+// DefaultSchemaCache is the default cache for schemas.
+var DefaultSchemaCache = &SchemaCache{}
+
+// SkipNameValidation sets whether to skip name validation.
+// Avro spec incurs a strict naming convention for names and aliases, however official Avro tools do not follow that
+// More info:
+// https://lists.apache.org/thread/39v98os6wdpyr6w31xdkz0yzol51fsrr
+// https://github.com/apache/avro/pull/1995
+var SkipNameValidation = false
+
+// Parse parses a schema string.
+func Parse(schema string) (Schema, error) {
+ return ParseBytes([]byte(schema))
+}
+
+// ParseWithCache parses a schema string using the given namespace and schema cache.
+func ParseWithCache(schema, namespace string, cache *SchemaCache) (Schema, error) {
+ return ParseBytesWithCache([]byte(schema), namespace, cache)
+}
+
+// MustParse parses a schema string, panicking if there is an error.
+func MustParse(schema string) Schema {
+ parsed, err := Parse(schema)
+ if err != nil {
+ panic(err)
+ }
+
+ return parsed
+}
+
+// ParseFiles parses the schemas in the files, in the order they appear, returning the last schema.
+//
+// This is useful when your schemas rely on other schemas.
+func ParseFiles(paths ...string) (Schema, error) {
+ var schema Schema
+ for _, path := range paths {
+ s, err := os.ReadFile(filepath.Clean(path))
+ if err != nil {
+ return nil, err
+ }
+
+ schema, err = Parse(string(s))
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return schema, nil
+}
+
+// ParseBytes parses a schema byte slice.
+func ParseBytes(schema []byte) (Schema, error) {
+ return ParseBytesWithCache(schema, "", DefaultSchemaCache)
+}
+
+// ParseBytesWithCache parses a schema byte slice using the given namespace and schema cache.
+func ParseBytesWithCache(schema []byte, namespace string, cache *SchemaCache) (Schema, error) {
+ var json any
+ if err := jsoniter.Unmarshal(schema, &json); err != nil {
+ json = string(schema)
+ }
+
+ internalCache := &SchemaCache{}
+ internalCache.AddAll(cache)
+
+ seen := seenCache{}
+ s, err := parseType(namespace, json, seen, internalCache)
+ if err != nil {
+ return nil, err
+ }
+
+ cache.AddAll(internalCache)
+
+ return derefSchema(s), nil
+}
+
+func parseType(namespace string, v any, seen seenCache, cache *SchemaCache) (Schema, error) {
+ switch val := v.(type) {
+ case nil:
+ return &NullSchema{}, nil
+
+ case string:
+ return parsePrimitiveType(namespace, val, cache)
+
+ case map[string]any:
+ return parseComplexType(namespace, val, seen, cache)
+
+ case []any:
+ return parseUnion(namespace, val, seen, cache)
+ }
+
+ return nil, fmt.Errorf("avro: unknown type: %v", v)
+}
+
+func parsePrimitiveType(namespace, s string, cache *SchemaCache) (Schema, error) {
+ typ := Type(s)
+ switch typ {
+ case Null:
+ return &NullSchema{}, nil
+
+ case String, Bytes, Int, Long, Float, Double, Boolean:
+ return parsePrimitive(typ, nil)
+
+ default:
+ schema := cache.Get(fullName(namespace, s))
+ if schema != nil {
+ return schema, nil
+ }
+
+ return nil, fmt.Errorf("avro: unknown type: %s", s)
+ }
+}
+
+func parseComplexType(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) {
+ if val, ok := m["type"].([]any); ok {
+ // Note: According to the spec, this is not allowed:
+ // https://avro.apache.org/docs/1.12.0/specification/#schema-declaration
+ // The "type" property in an object must be a string. A union type will be a slice,
+ // but NOT an object with a "type" property that is a slice.
+ // Might be advisable to remove this call (tradeoff between better conformance
+ // with the spec vs. possible backwards-compatibility issue).
+ return parseUnion(namespace, val, seen, cache)
+ }
+
+ str, ok := m["type"].(string)
+ if !ok {
+ return nil, fmt.Errorf("avro: unknown type: %+v", m)
+ }
+ typ := Type(str)
+
+ switch typ {
+ case String, Bytes, Int, Long, Float, Double, Boolean, Null:
+ return parsePrimitive(typ, m)
+
+ case Record, Error:
+ return parseRecord(typ, namespace, m, seen, cache)
+
+ case Enum:
+ return parseEnum(namespace, m, seen, cache)
+
+ case Array:
+ return parseArray(namespace, m, seen, cache)
+
+ case Map:
+ return parseMap(namespace, m, seen, cache)
+
+ case Fixed:
+ return parseFixed(namespace, m, seen, cache)
+
+ default:
+ return parseType(namespace, string(typ), seen, cache)
+ }
+}
+
+type primitiveSchema struct {
+ Type string `mapstructure:"type"`
+ Props map[string]any `mapstructure:",remain"`
+}
+
+func parsePrimitive(typ Type, m map[string]any) (Schema, error) {
+ if len(m) == 0 {
+ if typ == Null {
+ return &NullSchema{}, nil
+ }
+ return NewPrimitiveSchema(typ, nil), nil
+ }
+
+ var (
+ p primitiveSchema
+ meta mapstructure.Metadata
+ )
+ if err := decodeMap(m, &p, &meta); err != nil {
+ return nil, fmt.Errorf("avro: error decoding primitive: %w", err)
+ }
+
+ var logical LogicalSchema
+ if logicalType := logicalTypeProperty(p.Props); logicalType != "" {
+ logical = parsePrimitiveLogicalType(typ, logicalType, p.Props)
+ if logical != nil {
+ delete(p.Props, "logicalType")
+ }
+ }
+
+ if typ == Null {
+ return NewNullSchema(WithProps(p.Props)), nil
+ }
+ return NewPrimitiveSchema(typ, logical, WithProps(p.Props)), nil
+}
+
+func parsePrimitiveLogicalType(typ Type, lt string, props map[string]any) LogicalSchema {
+ ltyp := LogicalType(lt)
+ if (typ == String && ltyp == UUID) ||
+ (typ == Int && ltyp == Date) ||
+ (typ == Int && ltyp == TimeMillis) ||
+ (typ == Long && ltyp == TimeMicros) ||
+ (typ == Long && ltyp == TimestampMillis) ||
+ (typ == Long && ltyp == TimestampMicros) ||
+ (typ == Long && ltyp == LocalTimestampMillis) ||
+ (typ == Long && ltyp == LocalTimestampMicros) {
+ return NewPrimitiveLogicalSchema(ltyp)
+ }
+
+ if typ == Bytes && ltyp == Decimal {
+ return parseDecimalLogicalType(-1, props)
+ }
+
+ return nil // otherwise, not a recognized logical type
+}
+
+type recordSchema struct {
+ Type string `mapstructure:"type"`
+ Name string `mapstructure:"name"`
+ Namespace string `mapstructure:"namespace"`
+ Aliases []string `mapstructure:"aliases"`
+ Doc string `mapstructure:"doc"`
+ Fields []map[string]any `mapstructure:"fields"`
+ Props map[string]any `mapstructure:",remain"`
+}
+
+func parseRecord(typ Type, namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) {
+ var (
+ r recordSchema
+ meta mapstructure.Metadata
+ )
+ if err := decodeMap(m, &r, &meta); err != nil {
+ return nil, fmt.Errorf("avro: error decoding record: %w", err)
+ }
+
+ if err := checkParsedName(r.Name); err != nil {
+ return nil, err
+ }
+ if r.Namespace == "" {
+ r.Namespace = namespace
+ }
+
+ if !hasKey(meta.Keys, "fields") {
+ return nil, errors.New("avro: record must have an array of fields")
+ }
+ fields := make([]*Field, len(r.Fields))
+
+ var (
+ rec *RecordSchema
+ err error
+ )
+ switch typ {
+ case Record:
+ rec, err = NewRecordSchema(r.Name, r.Namespace, fields,
+ WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props),
+ )
+ case Error:
+ rec, err = NewErrorRecordSchema(r.Name, r.Namespace, fields,
+ WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props),
+ )
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ if err = seen.Add(rec.FullName()); err != nil {
+ return nil, err
+ }
+
+ ref := NewRefSchema(rec)
+ cache.Add(rec.FullName(), ref)
+ for _, alias := range rec.Aliases() {
+ cache.Add(alias, ref)
+ }
+
+ for i, f := range r.Fields {
+ field, err := parseField(rec.namespace, f, seen, cache)
+ if err != nil {
+ return nil, err
+ }
+ fields[i] = field
+ }
+
+ return rec, nil
+}
+
+type fieldSchema struct {
+ Name string `mapstructure:"name"`
+ Aliases []string `mapstructure:"aliases"`
+ Type any `mapstructure:"type"`
+ Doc string `mapstructure:"doc"`
+ Default any `mapstructure:"default"`
+ Order Order `mapstructure:"order"`
+ Props map[string]any `mapstructure:",remain"`
+}
+
+func parseField(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (*Field, error) {
+ var (
+ f fieldSchema
+ meta mapstructure.Metadata
+ )
+ if err := decodeMap(m, &f, &meta); err != nil {
+ return nil, fmt.Errorf("avro: error decoding field: %w", err)
+ }
+
+ if err := checkParsedName(f.Name); err != nil {
+ return nil, err
+ }
+
+ if !hasKey(meta.Keys, "type") {
+ return nil, errors.New("avro: field requires a type")
+ }
+ typ, err := parseType(namespace, f.Type, seen, cache)
+ if err != nil {
+ return nil, err
+ }
+
+ if !hasKey(meta.Keys, "default") {
+ f.Default = NoDefault
+ }
+
+ field, err := NewField(f.Name, typ,
+ WithDefault(f.Default), WithAliases(f.Aliases), WithDoc(f.Doc), WithOrder(f.Order), WithProps(f.Props),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return field, nil
+}
+
+type enumSchema struct {
+ Name string `mapstructure:"name"`
+ Namespace string `mapstructure:"namespace"`
+ Aliases []string `mapstructure:"aliases"`
+ Type string `mapstructure:"type"`
+ Doc string `mapstructure:"doc"`
+ Symbols []string `mapstructure:"symbols"`
+ Default string `mapstructure:"default"`
+ Props map[string]any `mapstructure:",remain"`
+}
+
+func parseEnum(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) {
+ var (
+ e enumSchema
+ meta mapstructure.Metadata
+ )
+ if err := decodeMap(m, &e, &meta); err != nil {
+ return nil, fmt.Errorf("avro: error decoding enum: %w", err)
+ }
+
+ if err := checkParsedName(e.Name); err != nil {
+ return nil, err
+ }
+ if e.Namespace == "" {
+ e.Namespace = namespace
+ }
+
+ enum, err := NewEnumSchema(e.Name, e.Namespace, e.Symbols,
+ WithDefault(e.Default), WithAliases(e.Aliases), WithDoc(e.Doc), WithProps(e.Props),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ if err = seen.Add(enum.FullName()); err != nil {
+ return nil, err
+ }
+
+ ref := NewRefSchema(enum)
+ cache.Add(enum.FullName(), ref)
+ for _, alias := range enum.Aliases() {
+ cache.Add(alias, enum)
+ }
+
+ return enum, nil
+}
+
+type arraySchema struct {
+ Type string `mapstructure:"type"`
+ Items any `mapstructure:"items"`
+ Props map[string]any `mapstructure:",remain"`
+}
+
+func parseArray(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) {
+ var (
+ a arraySchema
+ meta mapstructure.Metadata
+ )
+ if err := decodeMap(m, &a, &meta); err != nil {
+ return nil, fmt.Errorf("avro: error decoding array: %w", err)
+ }
+
+ if !hasKey(meta.Keys, "items") {
+ return nil, errors.New("avro: array must have an items key")
+ }
+ schema, err := parseType(namespace, a.Items, seen, cache)
+ if err != nil {
+ return nil, err
+ }
+
+ return NewArraySchema(schema, WithProps(a.Props)), nil
+}
+
+type mapSchema struct {
+ Type string `mapstructure:"type"`
+ Values any `mapstructure:"values"`
+ Props map[string]any `mapstructure:",remain"`
+}
+
+func parseMap(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) {
+ var (
+ ms mapSchema
+ meta mapstructure.Metadata
+ )
+ if err := decodeMap(m, &ms, &meta); err != nil {
+ return nil, fmt.Errorf("avro: error decoding map: %w", err)
+ }
+
+ if !hasKey(meta.Keys, "values") {
+ return nil, errors.New("avro: map must have an values key")
+ }
+ schema, err := parseType(namespace, ms.Values, seen, cache)
+ if err != nil {
+ return nil, err
+ }
+
+ return NewMapSchema(schema, WithProps(ms.Props)), nil
+}
+
+func parseUnion(namespace string, v []any, seen seenCache, cache *SchemaCache) (Schema, error) {
+ var err error
+ types := make([]Schema, len(v))
+ for i := range v {
+ types[i], err = parseType(namespace, v[i], seen, cache)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return NewUnionSchema(types)
+}
+
+type fixedSchema struct {
+ Name string `mapstructure:"name"`
+ Namespace string `mapstructure:"namespace"`
+ Aliases []string `mapstructure:"aliases"`
+ Type string `mapstructure:"type"`
+ Size int `mapstructure:"size"`
+ Props map[string]any `mapstructure:",remain"`
+}
+
+func parseFixed(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) {
+ var (
+ f fixedSchema
+ meta mapstructure.Metadata
+ )
+ if err := decodeMap(m, &f, &meta); err != nil {
+ return nil, fmt.Errorf("avro: error decoding fixed: %w", err)
+ }
+
+ if err := checkParsedName(f.Name); err != nil {
+ return nil, err
+ }
+ if f.Namespace == "" {
+ f.Namespace = namespace
+ }
+
+ if !hasKey(meta.Keys, "size") {
+ return nil, errors.New("avro: fixed must have a size")
+ }
+
+ var logical LogicalSchema
+ if logicalType := logicalTypeProperty(f.Props); logicalType != "" {
+ logical = parseFixedLogicalType(f.Size, logicalType, f.Props)
+ if logical != nil {
+ delete(f.Props, "logicalType")
+ }
+ }
+
+ fixed, err := NewFixedSchema(f.Name, f.Namespace, f.Size, logical, WithAliases(f.Aliases), WithProps(f.Props))
+ if err != nil {
+ return nil, err
+ }
+
+ if err = seen.Add(fixed.FullName()); err != nil {
+ return nil, err
+ }
+
+ ref := NewRefSchema(fixed)
+ cache.Add(fixed.FullName(), ref)
+ for _, alias := range fixed.Aliases() {
+ cache.Add(alias, fixed)
+ }
+
+ return fixed, nil
+}
+
+func parseFixedLogicalType(size int, lt string, props map[string]any) LogicalSchema {
+ ltyp := LogicalType(lt)
+ switch {
+ case ltyp == Duration && size == 12:
+ return NewPrimitiveLogicalSchema(Duration)
+ case ltyp == Decimal:
+ return parseDecimalLogicalType(size, props)
+ }
+
+ return nil
+}
+
+type decimalSchema struct {
+ Precision int `mapstructure:"precision"`
+ Scale int `mapstructure:"scale"`
+}
+
+func parseDecimalLogicalType(size int, props map[string]any) LogicalSchema {
+ var (
+ d decimalSchema
+ meta mapstructure.Metadata
+ )
+ if err := decodeMap(props, &d, &meta); err != nil {
+ return nil
+ }
+ decType := newDecimalLogicalType(size, d.Precision, d.Scale)
+ if decType != nil {
+ // Remove the properties that we consumed
+ delete(props, "precision")
+ delete(props, "scale")
+ }
+ return decType
+}
+
+func newDecimalLogicalType(size, prec, scale int) LogicalSchema {
+ if prec <= 0 {
+ return nil
+ }
+
+ if size > 0 {
+ maxPrecision := int(math.Round(math.Floor(math.Log10(2) * (8*float64(size) - 1))))
+ if prec > maxPrecision {
+ return nil
+ }
+ }
+
+ if scale < 0 {
+ return nil
+ }
+
+ // Scale may not be bigger than precision
+ if scale > prec {
+ return nil
+ }
+
+ return NewDecimalLogicalSchema(prec, scale)
+}
+
+func fullName(namespace, name string) string {
+ if len(namespace) == 0 || strings.ContainsRune(name, '.') {
+ return name
+ }
+
+ return namespace + "." + name
+}
+
+func checkParsedName(name string) error {
+ if name == "" {
+ return errors.New("avro: non-empty name key required")
+ }
+ return nil
+}
+
+func hasKey(keys []string, k string) bool {
+ for _, key := range keys {
+ if key == k {
+ return true
+ }
+ }
+ return false
+}
+
+func decodeMap(in, v any, meta *mapstructure.Metadata) error {
+ cfg := &mapstructure.DecoderConfig{
+ ZeroFields: true,
+ Metadata: meta,
+ Result: v,
+ }
+
+ decoder, _ := mapstructure.NewDecoder(cfg)
+ return decoder.Decode(in)
+}
+
+func derefSchema(schema Schema) Schema {
+ seen := map[string]struct{}{}
+
+ return walkSchema(schema, func(schema Schema) Schema {
+ if ns, ok := schema.(NamedSchema); ok {
+ if _, hasSeen := seen[ns.FullName()]; hasSeen {
+ // This NamedSchema has been seen in this run, it needs
+ // to be turned into a reference. It is possible it was
+ // dereferenced in a previous run.
+ return NewRefSchema(ns)
+ }
+
+ seen[ns.FullName()] = struct{}{}
+ return schema
+ }
+
+ ref, isRef := schema.(*RefSchema)
+ if !isRef {
+ return schema
+ }
+
+ if _, haveSeen := seen[ref.Schema().FullName()]; !haveSeen {
+ seen[ref.Schema().FullName()] = struct{}{}
+ return ref.Schema()
+ }
+ return schema
+ })
+}
+
+type seenCache map[string]struct{}
+
+func (c seenCache) Add(name string) error {
+ if _, ok := c[name]; ok {
+ return fmt.Errorf("duplicate name %q", name)
+ }
+ c[name] = struct{}{}
+ return nil
+}
+
+func logicalTypeProperty(props map[string]any) string {
+ if lt, ok := props["logicalType"].(string); ok {
+ return lt
+ }
+ return ""
+}
diff --git a/vendor/github.com/hamba/avro/v2/schema_walk.go b/vendor/github.com/hamba/avro/v2/schema_walk.go
new file mode 100644
index 0000000..253740c
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/schema_walk.go
@@ -0,0 +1,21 @@
+package avro
+
+func walkSchema(schema Schema, fn func(Schema) Schema) Schema {
+ schema = fn(schema)
+
+ switch s := schema.(type) {
+ case *RecordSchema:
+ for _, f := range s.Fields() {
+ f.typ = walkSchema(f.typ, fn)
+ }
+ case *ArraySchema:
+ s.items = walkSchema(s.items, fn)
+ case *MapSchema:
+ s.values = walkSchema(s.values, fn)
+ case *UnionSchema:
+ for i, st := range s.types {
+ s.types[i] = walkSchema(st, fn)
+ }
+ }
+ return schema
+}
diff --git a/vendor/github.com/hamba/avro/v2/types.go b/vendor/github.com/hamba/avro/v2/types.go
new file mode 100644
index 0000000..931ea99
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/types.go
@@ -0,0 +1,9 @@
+package avro
+
+// LogicalDuration represents the `duration` logical type, as defined in
+// https://avro.apache.org/docs/1.11.1/specification/#duration
+type LogicalDuration struct {
+ Months uint32
+ Days uint32
+ Milliseconds uint32
+}
diff --git a/vendor/github.com/hamba/avro/v2/writer.go b/vendor/github.com/hamba/avro/v2/writer.go
new file mode 100644
index 0000000..a12d6d0
--- /dev/null
+++ b/vendor/github.com/hamba/avro/v2/writer.go
@@ -0,0 +1,194 @@
+package avro
+
+import (
+ "encoding/binary"
+ "io"
+ "math"
+)
+
+// WriterFunc is a function used to customize the Writer.
+type WriterFunc func(w *Writer)
+
+// WithWriterConfig specifies the configuration to use with a writer.
+func WithWriterConfig(cfg API) WriterFunc {
+ return func(w *Writer) {
+ w.cfg = cfg.(*frozenConfig)
+ }
+}
+
+// Writer is an Avro specific io.Writer.
+type Writer struct {
+ cfg *frozenConfig
+ out io.Writer
+ buf []byte
+ Error error
+}
+
+// NewWriter creates a new Writer.
+func NewWriter(out io.Writer, bufSize int, opts ...WriterFunc) *Writer {
+ writer := &Writer{
+ cfg: DefaultConfig.(*frozenConfig),
+ out: out,
+ buf: make([]byte, 0, bufSize),
+ Error: nil,
+ }
+
+ for _, opt := range opts {
+ opt(writer)
+ }
+
+ return writer
+}
+
+// Reset resets the Writer with a new io.Writer attached.
+func (w *Writer) Reset(out io.Writer) {
+ w.out = out
+ w.buf = w.buf[:0]
+}
+
+// Buffered returns the number of buffered bytes.
+func (w *Writer) Buffered() int {
+ return len(w.buf)
+}
+
+// Buffer gets the Writer buffer.
+func (w *Writer) Buffer() []byte {
+ return w.buf
+}
+
+// Flush writes any buffered data to the underlying io.Writer.
+func (w *Writer) Flush() error {
+ if w.out == nil {
+ return nil
+ }
+ if w.Error != nil {
+ return w.Error
+ }
+
+ n, err := w.out.Write(w.buf)
+ if n < len(w.buf) && err == nil {
+ err = io.ErrShortWrite
+ }
+ if err != nil {
+ if w.Error == nil {
+ w.Error = err
+ }
+ return err
+ }
+
+ w.buf = w.buf[:0]
+
+ return nil
+}
+
+func (w *Writer) writeByte(b byte) {
+ w.buf = append(w.buf, b)
+}
+
+// Write writes raw bytes to the Writer.
+func (w *Writer) Write(b []byte) (int, error) {
+ w.buf = append(w.buf, b...)
+ return len(b), nil
+}
+
+// WriteBool writes a Bool to the Writer.
+func (w *Writer) WriteBool(b bool) {
+ if b {
+ w.writeByte(0x01)
+ return
+ }
+ w.writeByte(0x00)
+}
+
+// WriteInt writes an Int to the Writer.
+func (w *Writer) WriteInt(i int32) {
+ e := uint64((uint32(i) << 1) ^ uint32(i>>31))
+ w.encodeInt(e)
+}
+
+// WriteLong writes a Long to the Writer.
+func (w *Writer) WriteLong(i int64) {
+ e := (uint64(i) << 1) ^ uint64(i>>63)
+ w.encodeInt(e)
+}
+
+func (w *Writer) encodeInt(i uint64) {
+ if i == 0 {
+ w.writeByte(0)
+ return
+ }
+
+ for i > 0 {
+ b := byte(i) & 0x7F
+ i >>= 7
+
+ if i != 0 {
+ b |= 0x80
+ }
+ w.writeByte(b)
+ }
+}
+
+// WriteFloat writes a Float to the Writer.
+func (w *Writer) WriteFloat(f float32) {
+ b := make([]byte, 4)
+ binary.LittleEndian.PutUint32(b, math.Float32bits(f))
+
+ w.buf = append(w.buf, b...)
+}
+
+// WriteDouble writes a Double to the Writer.
+func (w *Writer) WriteDouble(f float64) {
+ b := make([]byte, 8)
+ binary.LittleEndian.PutUint64(b, math.Float64bits(f))
+
+ w.buf = append(w.buf, b...)
+}
+
+// WriteBytes writes Bytes to the Writer.
+func (w *Writer) WriteBytes(b []byte) {
+ w.WriteLong(int64(len(b)))
+ w.buf = append(w.buf, b...)
+}
+
+// WriteString reads a String to the Writer.
+func (w *Writer) WriteString(s string) {
+ w.WriteLong(int64(len(s)))
+ w.buf = append(w.buf, s...)
+}
+
+// WriteBlockHeader writes a Block Header to the Writer.
+func (w *Writer) WriteBlockHeader(l, s int64) {
+ if s > 0 && !w.cfg.config.DisableBlockSizeHeader {
+ w.WriteLong(-l)
+ w.WriteLong(s)
+ return
+ }
+ w.WriteLong(l)
+}
+
+// WriteBlockCB writes a block using the callback.
+func (w *Writer) WriteBlockCB(callback func(w *Writer) int64) int64 {
+ var dummyHeader [18]byte
+ headerStart := len(w.buf)
+
+ // Write dummy header
+ _, _ = w.Write(dummyHeader[:])
+
+ // Write block data
+ capturedAt := len(w.buf)
+ length := callback(w)
+ size := int64(len(w.buf) - capturedAt)
+
+ // Take a reference to the block data
+ captured := w.buf[capturedAt:len(w.buf)]
+
+ // Rewrite the header
+ w.buf = w.buf[:headerStart]
+ w.WriteBlockHeader(length, size)
+
+ // Copy the block data back to its position
+ w.buf = append(w.buf, captured...)
+
+ return length
+}