diff options
Diffstat (limited to 'vendor/github.com/hamba/avro/v2/schema_parse.go')
| -rw-r--r-- | vendor/github.com/hamba/avro/v2/schema_parse.go | 643 |
1 files changed, 643 insertions, 0 deletions
diff --git a/vendor/github.com/hamba/avro/v2/schema_parse.go b/vendor/github.com/hamba/avro/v2/schema_parse.go new file mode 100644 index 0000000..630f202 --- /dev/null +++ b/vendor/github.com/hamba/avro/v2/schema_parse.go @@ -0,0 +1,643 @@ +package avro + +import ( + "errors" + "fmt" + "math" + "os" + "path/filepath" + "strings" + + jsoniter "github.com/json-iterator/go" + "github.com/mitchellh/mapstructure" +) + +// DefaultSchemaCache is the default cache for schemas. +var DefaultSchemaCache = &SchemaCache{} + +// SkipNameValidation sets whether to skip name validation. +// Avro spec incurs a strict naming convention for names and aliases, however official Avro tools do not follow that +// More info: +// https://lists.apache.org/thread/39v98os6wdpyr6w31xdkz0yzol51fsrr +// https://github.com/apache/avro/pull/1995 +var SkipNameValidation = false + +// Parse parses a schema string. +func Parse(schema string) (Schema, error) { + return ParseBytes([]byte(schema)) +} + +// ParseWithCache parses a schema string using the given namespace and schema cache. +func ParseWithCache(schema, namespace string, cache *SchemaCache) (Schema, error) { + return ParseBytesWithCache([]byte(schema), namespace, cache) +} + +// MustParse parses a schema string, panicking if there is an error. +func MustParse(schema string) Schema { + parsed, err := Parse(schema) + if err != nil { + panic(err) + } + + return parsed +} + +// ParseFiles parses the schemas in the files, in the order they appear, returning the last schema. +// +// This is useful when your schemas rely on other schemas. +func ParseFiles(paths ...string) (Schema, error) { + var schema Schema + for _, path := range paths { + s, err := os.ReadFile(filepath.Clean(path)) + if err != nil { + return nil, err + } + + schema, err = Parse(string(s)) + if err != nil { + return nil, err + } + } + + return schema, nil +} + +// ParseBytes parses a schema byte slice. +func ParseBytes(schema []byte) (Schema, error) { + return ParseBytesWithCache(schema, "", DefaultSchemaCache) +} + +// ParseBytesWithCache parses a schema byte slice using the given namespace and schema cache. +func ParseBytesWithCache(schema []byte, namespace string, cache *SchemaCache) (Schema, error) { + var json any + if err := jsoniter.Unmarshal(schema, &json); err != nil { + json = string(schema) + } + + internalCache := &SchemaCache{} + internalCache.AddAll(cache) + + seen := seenCache{} + s, err := parseType(namespace, json, seen, internalCache) + if err != nil { + return nil, err + } + + cache.AddAll(internalCache) + + return derefSchema(s), nil +} + +func parseType(namespace string, v any, seen seenCache, cache *SchemaCache) (Schema, error) { + switch val := v.(type) { + case nil: + return &NullSchema{}, nil + + case string: + return parsePrimitiveType(namespace, val, cache) + + case map[string]any: + return parseComplexType(namespace, val, seen, cache) + + case []any: + return parseUnion(namespace, val, seen, cache) + } + + return nil, fmt.Errorf("avro: unknown type: %v", v) +} + +func parsePrimitiveType(namespace, s string, cache *SchemaCache) (Schema, error) { + typ := Type(s) + switch typ { + case Null: + return &NullSchema{}, nil + + case String, Bytes, Int, Long, Float, Double, Boolean: + return parsePrimitive(typ, nil) + + default: + schema := cache.Get(fullName(namespace, s)) + if schema != nil { + return schema, nil + } + + return nil, fmt.Errorf("avro: unknown type: %s", s) + } +} + +func parseComplexType(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { + if val, ok := m["type"].([]any); ok { + // Note: According to the spec, this is not allowed: + // https://avro.apache.org/docs/1.12.0/specification/#schema-declaration + // The "type" property in an object must be a string. A union type will be a slice, + // but NOT an object with a "type" property that is a slice. + // Might be advisable to remove this call (tradeoff between better conformance + // with the spec vs. possible backwards-compatibility issue). + return parseUnion(namespace, val, seen, cache) + } + + str, ok := m["type"].(string) + if !ok { + return nil, fmt.Errorf("avro: unknown type: %+v", m) + } + typ := Type(str) + + switch typ { + case String, Bytes, Int, Long, Float, Double, Boolean, Null: + return parsePrimitive(typ, m) + + case Record, Error: + return parseRecord(typ, namespace, m, seen, cache) + + case Enum: + return parseEnum(namespace, m, seen, cache) + + case Array: + return parseArray(namespace, m, seen, cache) + + case Map: + return parseMap(namespace, m, seen, cache) + + case Fixed: + return parseFixed(namespace, m, seen, cache) + + default: + return parseType(namespace, string(typ), seen, cache) + } +} + +type primitiveSchema struct { + Type string `mapstructure:"type"` + Props map[string]any `mapstructure:",remain"` +} + +func parsePrimitive(typ Type, m map[string]any) (Schema, error) { + if len(m) == 0 { + if typ == Null { + return &NullSchema{}, nil + } + return NewPrimitiveSchema(typ, nil), nil + } + + var ( + p primitiveSchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &p, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding primitive: %w", err) + } + + var logical LogicalSchema + if logicalType := logicalTypeProperty(p.Props); logicalType != "" { + logical = parsePrimitiveLogicalType(typ, logicalType, p.Props) + if logical != nil { + delete(p.Props, "logicalType") + } + } + + if typ == Null { + return NewNullSchema(WithProps(p.Props)), nil + } + return NewPrimitiveSchema(typ, logical, WithProps(p.Props)), nil +} + +func parsePrimitiveLogicalType(typ Type, lt string, props map[string]any) LogicalSchema { + ltyp := LogicalType(lt) + if (typ == String && ltyp == UUID) || + (typ == Int && ltyp == Date) || + (typ == Int && ltyp == TimeMillis) || + (typ == Long && ltyp == TimeMicros) || + (typ == Long && ltyp == TimestampMillis) || + (typ == Long && ltyp == TimestampMicros) || + (typ == Long && ltyp == LocalTimestampMillis) || + (typ == Long && ltyp == LocalTimestampMicros) { + return NewPrimitiveLogicalSchema(ltyp) + } + + if typ == Bytes && ltyp == Decimal { + return parseDecimalLogicalType(-1, props) + } + + return nil // otherwise, not a recognized logical type +} + +type recordSchema struct { + Type string `mapstructure:"type"` + Name string `mapstructure:"name"` + Namespace string `mapstructure:"namespace"` + Aliases []string `mapstructure:"aliases"` + Doc string `mapstructure:"doc"` + Fields []map[string]any `mapstructure:"fields"` + Props map[string]any `mapstructure:",remain"` +} + +func parseRecord(typ Type, namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { + var ( + r recordSchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &r, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding record: %w", err) + } + + if err := checkParsedName(r.Name); err != nil { + return nil, err + } + if r.Namespace == "" { + r.Namespace = namespace + } + + if !hasKey(meta.Keys, "fields") { + return nil, errors.New("avro: record must have an array of fields") + } + fields := make([]*Field, len(r.Fields)) + + var ( + rec *RecordSchema + err error + ) + switch typ { + case Record: + rec, err = NewRecordSchema(r.Name, r.Namespace, fields, + WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props), + ) + case Error: + rec, err = NewErrorRecordSchema(r.Name, r.Namespace, fields, + WithAliases(r.Aliases), WithDoc(r.Doc), WithProps(r.Props), + ) + } + if err != nil { + return nil, err + } + + if err = seen.Add(rec.FullName()); err != nil { + return nil, err + } + + ref := NewRefSchema(rec) + cache.Add(rec.FullName(), ref) + for _, alias := range rec.Aliases() { + cache.Add(alias, ref) + } + + for i, f := range r.Fields { + field, err := parseField(rec.namespace, f, seen, cache) + if err != nil { + return nil, err + } + fields[i] = field + } + + return rec, nil +} + +type fieldSchema struct { + Name string `mapstructure:"name"` + Aliases []string `mapstructure:"aliases"` + Type any `mapstructure:"type"` + Doc string `mapstructure:"doc"` + Default any `mapstructure:"default"` + Order Order `mapstructure:"order"` + Props map[string]any `mapstructure:",remain"` +} + +func parseField(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (*Field, error) { + var ( + f fieldSchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &f, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding field: %w", err) + } + + if err := checkParsedName(f.Name); err != nil { + return nil, err + } + + if !hasKey(meta.Keys, "type") { + return nil, errors.New("avro: field requires a type") + } + typ, err := parseType(namespace, f.Type, seen, cache) + if err != nil { + return nil, err + } + + if !hasKey(meta.Keys, "default") { + f.Default = NoDefault + } + + field, err := NewField(f.Name, typ, + WithDefault(f.Default), WithAliases(f.Aliases), WithDoc(f.Doc), WithOrder(f.Order), WithProps(f.Props), + ) + if err != nil { + return nil, err + } + + return field, nil +} + +type enumSchema struct { + Name string `mapstructure:"name"` + Namespace string `mapstructure:"namespace"` + Aliases []string `mapstructure:"aliases"` + Type string `mapstructure:"type"` + Doc string `mapstructure:"doc"` + Symbols []string `mapstructure:"symbols"` + Default string `mapstructure:"default"` + Props map[string]any `mapstructure:",remain"` +} + +func parseEnum(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { + var ( + e enumSchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &e, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding enum: %w", err) + } + + if err := checkParsedName(e.Name); err != nil { + return nil, err + } + if e.Namespace == "" { + e.Namespace = namespace + } + + enum, err := NewEnumSchema(e.Name, e.Namespace, e.Symbols, + WithDefault(e.Default), WithAliases(e.Aliases), WithDoc(e.Doc), WithProps(e.Props), + ) + if err != nil { + return nil, err + } + + if err = seen.Add(enum.FullName()); err != nil { + return nil, err + } + + ref := NewRefSchema(enum) + cache.Add(enum.FullName(), ref) + for _, alias := range enum.Aliases() { + cache.Add(alias, enum) + } + + return enum, nil +} + +type arraySchema struct { + Type string `mapstructure:"type"` + Items any `mapstructure:"items"` + Props map[string]any `mapstructure:",remain"` +} + +func parseArray(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { + var ( + a arraySchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &a, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding array: %w", err) + } + + if !hasKey(meta.Keys, "items") { + return nil, errors.New("avro: array must have an items key") + } + schema, err := parseType(namespace, a.Items, seen, cache) + if err != nil { + return nil, err + } + + return NewArraySchema(schema, WithProps(a.Props)), nil +} + +type mapSchema struct { + Type string `mapstructure:"type"` + Values any `mapstructure:"values"` + Props map[string]any `mapstructure:",remain"` +} + +func parseMap(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { + var ( + ms mapSchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &ms, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding map: %w", err) + } + + if !hasKey(meta.Keys, "values") { + return nil, errors.New("avro: map must have an values key") + } + schema, err := parseType(namespace, ms.Values, seen, cache) + if err != nil { + return nil, err + } + + return NewMapSchema(schema, WithProps(ms.Props)), nil +} + +func parseUnion(namespace string, v []any, seen seenCache, cache *SchemaCache) (Schema, error) { + var err error + types := make([]Schema, len(v)) + for i := range v { + types[i], err = parseType(namespace, v[i], seen, cache) + if err != nil { + return nil, err + } + } + + return NewUnionSchema(types) +} + +type fixedSchema struct { + Name string `mapstructure:"name"` + Namespace string `mapstructure:"namespace"` + Aliases []string `mapstructure:"aliases"` + Type string `mapstructure:"type"` + Size int `mapstructure:"size"` + Props map[string]any `mapstructure:",remain"` +} + +func parseFixed(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) { + var ( + f fixedSchema + meta mapstructure.Metadata + ) + if err := decodeMap(m, &f, &meta); err != nil { + return nil, fmt.Errorf("avro: error decoding fixed: %w", err) + } + + if err := checkParsedName(f.Name); err != nil { + return nil, err + } + if f.Namespace == "" { + f.Namespace = namespace + } + + if !hasKey(meta.Keys, "size") { + return nil, errors.New("avro: fixed must have a size") + } + + var logical LogicalSchema + if logicalType := logicalTypeProperty(f.Props); logicalType != "" { + logical = parseFixedLogicalType(f.Size, logicalType, f.Props) + if logical != nil { + delete(f.Props, "logicalType") + } + } + + fixed, err := NewFixedSchema(f.Name, f.Namespace, f.Size, logical, WithAliases(f.Aliases), WithProps(f.Props)) + if err != nil { + return nil, err + } + + if err = seen.Add(fixed.FullName()); err != nil { + return nil, err + } + + ref := NewRefSchema(fixed) + cache.Add(fixed.FullName(), ref) + for _, alias := range fixed.Aliases() { + cache.Add(alias, fixed) + } + + return fixed, nil +} + +func parseFixedLogicalType(size int, lt string, props map[string]any) LogicalSchema { + ltyp := LogicalType(lt) + switch { + case ltyp == Duration && size == 12: + return NewPrimitiveLogicalSchema(Duration) + case ltyp == Decimal: + return parseDecimalLogicalType(size, props) + } + + return nil +} + +type decimalSchema struct { + Precision int `mapstructure:"precision"` + Scale int `mapstructure:"scale"` +} + +func parseDecimalLogicalType(size int, props map[string]any) LogicalSchema { + var ( + d decimalSchema + meta mapstructure.Metadata + ) + if err := decodeMap(props, &d, &meta); err != nil { + return nil + } + decType := newDecimalLogicalType(size, d.Precision, d.Scale) + if decType != nil { + // Remove the properties that we consumed + delete(props, "precision") + delete(props, "scale") + } + return decType +} + +func newDecimalLogicalType(size, prec, scale int) LogicalSchema { + if prec <= 0 { + return nil + } + + if size > 0 { + maxPrecision := int(math.Round(math.Floor(math.Log10(2) * (8*float64(size) - 1)))) + if prec > maxPrecision { + return nil + } + } + + if scale < 0 { + return nil + } + + // Scale may not be bigger than precision + if scale > prec { + return nil + } + + return NewDecimalLogicalSchema(prec, scale) +} + +func fullName(namespace, name string) string { + if len(namespace) == 0 || strings.ContainsRune(name, '.') { + return name + } + + return namespace + "." + name +} + +func checkParsedName(name string) error { + if name == "" { + return errors.New("avro: non-empty name key required") + } + return nil +} + +func hasKey(keys []string, k string) bool { + for _, key := range keys { + if key == k { + return true + } + } + return false +} + +func decodeMap(in, v any, meta *mapstructure.Metadata) error { + cfg := &mapstructure.DecoderConfig{ + ZeroFields: true, + Metadata: meta, + Result: v, + } + + decoder, _ := mapstructure.NewDecoder(cfg) + return decoder.Decode(in) +} + +func derefSchema(schema Schema) Schema { + seen := map[string]struct{}{} + + return walkSchema(schema, func(schema Schema) Schema { + if ns, ok := schema.(NamedSchema); ok { + if _, hasSeen := seen[ns.FullName()]; hasSeen { + // This NamedSchema has been seen in this run, it needs + // to be turned into a reference. It is possible it was + // dereferenced in a previous run. + return NewRefSchema(ns) + } + + seen[ns.FullName()] = struct{}{} + return schema + } + + ref, isRef := schema.(*RefSchema) + if !isRef { + return schema + } + + if _, haveSeen := seen[ref.Schema().FullName()]; !haveSeen { + seen[ref.Schema().FullName()] = struct{}{} + return ref.Schema() + } + return schema + }) +} + +type seenCache map[string]struct{} + +func (c seenCache) Add(name string) error { + if _, ok := c[name]; ok { + return fmt.Errorf("duplicate name %q", name) + } + c[name] = struct{}{} + return nil +} + +func logicalTypeProperty(props map[string]any) string { + if lt, ok := props["logicalType"].(string); ok { + return lt + } + return "" +} |
