diff options
| author | mo khan <mo@mokhan.ca> | 2025-07-22 17:35:49 -0600 |
|---|---|---|
| committer | mo khan <mo@mokhan.ca> | 2025-07-22 17:35:49 -0600 |
| commit | 20ef0d92694465ac86b550df139e8366a0a2b4fa (patch) | |
| tree | 3f14589e1ce6eb9306a3af31c3a1f9e1af5ed637 /vendor/github.com/hamba/avro/v2/reader.go | |
| parent | 44e0d272c040cdc53a98b9f1dc58ae7da67752e6 (diff) | |
feat: connect to spicedb
Diffstat (limited to 'vendor/github.com/hamba/avro/v2/reader.go')
| -rw-r--r-- | vendor/github.com/hamba/avro/v2/reader.go | 324 |
1 files changed, 324 insertions, 0 deletions
diff --git a/vendor/github.com/hamba/avro/v2/reader.go b/vendor/github.com/hamba/avro/v2/reader.go new file mode 100644 index 0000000..3c11b18 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/reader.go @@ -0,0 +1,324 @@ +package avro + +import ( + "errors" + "fmt" + "io" + "strings" + "unsafe" +) + +const ( + maxIntBufSize = 5 + maxLongBufSize = 10 +) + +// ReaderFunc is a function used to customize the Reader. +type ReaderFunc func(r *Reader) + +// WithReaderConfig specifies the configuration to use with a reader. +func WithReaderConfig(cfg API) ReaderFunc { + return func(r *Reader) { + r.cfg = cfg.(*frozenConfig) + } +} + +// Reader is an Avro specific io.Reader. +type Reader struct { + cfg *frozenConfig + reader io.Reader + slab []byte + buf []byte + head int + tail int + Error error +} + +// NewReader creates a new Reader. +func NewReader(r io.Reader, bufSize int, opts ...ReaderFunc) *Reader { + reader := &Reader{ + cfg: DefaultConfig.(*frozenConfig), + reader: r, + buf: make([]byte, bufSize), + head: 0, + tail: 0, + } + + for _, opt := range opts { + opt(reader) + } + + return reader +} + +// Reset resets a Reader with a new byte array attached. +func (r *Reader) Reset(b []byte) *Reader { + r.reader = nil + r.buf = b + r.head = 0 + r.tail = len(b) + return r +} + +// ReportError record an error in iterator instance with current position. +func (r *Reader) ReportError(operation, msg string) { + if r.Error != nil && !errors.Is(r.Error, io.EOF) { + return + } + + r.Error = fmt.Errorf("avro: %s: %s", operation, msg) +} + +func (r *Reader) loadMore() bool { + if r.reader == nil { + if r.Error == nil { + r.head = r.tail + r.Error = io.EOF + } + return false + } + + for { + n, err := r.reader.Read(r.buf) + if n == 0 { + if err != nil { + if r.Error == nil { + r.Error = err + } + return false + } + continue + } + + r.head = 0 + r.tail = n + return true + } +} + +func (r *Reader) readByte() byte { + if r.head == r.tail { + if !r.loadMore() { + r.Error = io.ErrUnexpectedEOF + return 0 + } + } + + b := r.buf[r.head] + r.head++ + + return b +} + +// Peek returns the next byte in the buffer. +// The Reader Error will be io.EOF if no next byte exists. +func (r *Reader) Peek() byte { + if r.head == r.tail { + if !r.loadMore() { + return 0 + } + } + return r.buf[r.head] +} + +// Read reads data into the given bytes. +func (r *Reader) Read(b []byte) { + size := len(b) + read := 0 + + for read < size { + if r.head == r.tail { + if !r.loadMore() { + r.Error = io.ErrUnexpectedEOF + return + } + } + + n := copy(b[read:], r.buf[r.head:r.tail]) + r.head += n + read += n + } +} + +// ReadBool reads a Bool from the Reader. +func (r *Reader) ReadBool() bool { + b := r.readByte() + + if b != 0 && b != 1 { + r.ReportError("ReadBool", "invalid bool") + } + return b == 1 +} + +// ReadInt reads an Int from the Reader. +// +//nolint:dupl +func (r *Reader) ReadInt() int32 { + if r.Error != nil { + return 0 + } + + var ( + n int + v uint32 + s uint8 + ) + + for { + tail := r.tail + if r.tail-r.head+n > maxIntBufSize { + tail = r.head + maxIntBufSize - n + } + + // Consume what it is in the buffer. + var i int + for _, b := range r.buf[r.head:tail] { + v |= uint32(b&0x7f) << s + if b&0x80 == 0 { + r.head += i + 1 + return int32((v >> 1) ^ -(v & 1)) + } + s += 7 + i++ + } + if n >= maxIntBufSize { + r.ReportError("ReadInt", "int overflow") + return 0 + } + r.head += i + n += i + + // We ran out of buffer and are not at the end of the int, + // Read more into the buffer. + if !r.loadMore() { + r.Error = fmt.Errorf("reading int: %w", r.Error) + return 0 + } + } +} + +// ReadLong reads a Long from the Reader. +// +//nolint:dupl +func (r *Reader) ReadLong() int64 { + if r.Error != nil { + return 0 + } + + var ( + n int + v uint64 + s uint8 + ) + + for { + tail := r.tail + if r.tail-r.head+n > maxLongBufSize { + tail = r.head + maxLongBufSize - n + } + + // Consume what it is in the buffer. + var i int + for _, b := range r.buf[r.head:tail] { + v |= uint64(b&0x7f) << s + if b&0x80 == 0 { + r.head += i + 1 + return int64((v >> 1) ^ -(v & 1)) + } + s += 7 + i++ + } + if n >= maxLongBufSize { + r.ReportError("ReadLong", "int overflow") + return 0 + } + r.head += i + n += i + + // We ran out of buffer and are not at the end of the long, + // Read more into the buffer. + if !r.loadMore() { + r.Error = fmt.Errorf("reading long: %w", r.Error) + return 0 + } + } +} + +// ReadFloat reads a Float from the Reader. +func (r *Reader) ReadFloat() float32 { + var buf [4]byte + r.Read(buf[:]) + + float := *(*float32)(unsafe.Pointer(&buf[0])) + return float +} + +// ReadDouble reads a Double from the Reader. +func (r *Reader) ReadDouble() float64 { + var buf [8]byte + r.Read(buf[:]) + + float := *(*float64)(unsafe.Pointer(&buf[0])) + return float +} + +// ReadBytes reads Bytes from the Reader. +func (r *Reader) ReadBytes() []byte { + return r.readBytes("bytes") +} + +// ReadString reads a String from the Reader. +func (r *Reader) ReadString() string { + b := r.readBytes("string") + if len(b) == 0 { + return "" + } + + return *(*string)(unsafe.Pointer(&b)) +} + +func (r *Reader) readBytes(op string) []byte { + size := int(r.ReadLong()) + if size < 0 { + fnName := "Read" + strings.ToTitle(op) + r.ReportError(fnName, "invalid "+op+" length") + return nil + } + if size == 0 { + return []byte{} + } + if maxSize := r.cfg.getMaxByteSliceSize(); maxSize > 0 && size > maxSize { + fnName := "Read" + strings.ToTitle(op) + r.ReportError(fnName, "size is greater than `Config.MaxByteSliceSize`") + return nil + } + + // The bytes are entirely in the buffer and of a reasonable size. + // Use the byte slab. + if r.head+size <= r.tail && size <= 1024 { + if cap(r.slab) < size { + r.slab = make([]byte, 1024) + } + dst := r.slab[:size] + r.slab = r.slab[size:] + copy(dst, r.buf[r.head:r.head+size]) + r.head += size + return dst + } + + buf := make([]byte, size) + r.Read(buf) + return buf +} + +// ReadBlockHeader reads a Block Header from the Reader. +func (r *Reader) ReadBlockHeader() (int64, int64) { + length := r.ReadLong() + if length < 0 { + size := r.ReadLong() + + return -length, size + } + + return length, 0 +} |
