From 07580655c8e487e762d6aa67da92934c61bee03d Mon Sep 17 00:00:00 2001 From: Brian McQueen Date: Fri, 24 Sep 2021 14:58:17 -0700 Subject: [PATCH] added nativeFromTextual support for standard json and added avro roundtrip tool --- array.go | 4 +- codec.go | 54 ++++-- enum_test.go | 75 ++++++++ examples/roundtrip/main.go | 176 +++++++++++++++++ map.go | 4 +- record.go | 4 +- text_test.go | 17 +- union.go | 382 +++++++++++++++++++++++++++---------- union_test.go | 115 +++++++++++ 9 files changed, 711 insertions(+), 120 deletions(-) create mode 100644 examples/roundtrip/main.go diff --git a/array.go b/array.go index fee440d..de58946 100644 --- a/array.go +++ b/array.go @@ -16,13 +16,13 @@ import ( "reflect" ) -func makeArrayCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}) (*Codec, error) { +func makeArrayCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}, cb *codecBuilder) (*Codec, error) { // array type must have items itemSchema, ok := schemaMap["items"] if !ok { return nil, fmt.Errorf("Array ought to have items key") } - itemCodec, err := buildCodec(st, enclosingNamespace, itemSchema) + itemCodec, err := buildCodec(st, enclosingNamespace, itemSchema, cb) if err != nil { return nil, fmt.Errorf("Array items ought to be valid Avro type: %s", err) } diff --git a/codec.go b/codec.go index 51f4fd7..9e3aac4 100644 --- a/codec.go +++ b/codec.go @@ -60,6 +60,15 @@ type Codec struct { Rabin uint64 } +// codecBuilder holds the 3 kinds of codec builders so they can be +// replaced if needed +// and so they can be passed down the call stack during codec building +type codecBuilder struct { + mapBuilder func(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}, cb *codecBuilder) (*Codec, error) + stringBuilder func(st map[string]*Codec, enclosingNamespace string, typeName string, schemaMap map[string]interface{}, cb *codecBuilder) (*Codec, error) + sliceBuilder func(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) +} + // NewCodec returns a Codec used to translate between a byte slice of either // binary or textual Avro data and native Go data. // @@ -87,6 +96,22 @@ type Codec struct { // fmt.Println(err) // } func NewCodec(schemaSpecification string) (*Codec, error) { + return NewCodecFrom(schemaSpecification, &codecBuilder{ + buildCodecForTypeDescribedByMap, + buildCodecForTypeDescribedByString, + buildCodecForTypeDescribedBySlice, + }) +} + +func NewCodecForStandardJSON(schemaSpecification string) (*Codec, error) { + return NewCodecFrom(schemaSpecification, &codecBuilder{ + buildCodecForTypeDescribedByMap, + buildCodecForTypeDescribedByString, + buildCodecForTypeDescribedBySliceJSON, + }) +} + +func NewCodecFrom(schemaSpecification string, cb *codecBuilder) (*Codec, error) { var schema interface{} if err := json.Unmarshal([]byte(schemaSpecification), &schema); err != nil { @@ -96,7 +121,7 @@ func NewCodec(schemaSpecification string) (*Codec, error) { // bootstrap a symbol table with primitive type codecs for the new codec st := newSymbolTable() - c, err := buildCodec(st, nullNamespace, schema) + c, err := buildCodec(st, nullNamespace, schema, cb) if err != nil { return nil, err } @@ -494,25 +519,26 @@ func (c *Codec) SchemaCRC64Avro() int64 { // convert a schema data structure to a codec, prefixing with specified // namespace -func buildCodec(st map[string]*Codec, enclosingNamespace string, schema interface{}) (*Codec, error) { +func buildCodec(st map[string]*Codec, enclosingNamespace string, schema interface{}, cb *codecBuilder) (*Codec, error) { switch schemaType := schema.(type) { case map[string]interface{}: - return buildCodecForTypeDescribedByMap(st, enclosingNamespace, schemaType) + return cb.mapBuilder(st, enclosingNamespace, schemaType, cb) case string: - return buildCodecForTypeDescribedByString(st, enclosingNamespace, schemaType, nil) + return cb.stringBuilder(st, enclosingNamespace, schemaType, nil, cb) case []interface{}: - return buildCodecForTypeDescribedBySlice(st, enclosingNamespace, schemaType) + return cb.sliceBuilder(st, enclosingNamespace, schemaType, cb) default: return nil, fmt.Errorf("unknown schema type: %T", schema) } } // Reach into the map, grabbing its "type". Use that to create the codec. -func buildCodecForTypeDescribedByMap(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}) (*Codec, error) { +func buildCodecForTypeDescribedByMap(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}, cb *codecBuilder) (*Codec, error) { t, ok := schemaMap["type"] if !ok { return nil, fmt.Errorf("missing type: %v", schemaMap) } + switch v := t.(type) { case string: // Already defined types may be abbreviated with its string name. @@ -522,17 +548,17 @@ func buildCodecForTypeDescribedByMap(st map[string]*Codec, enclosingNamespace st // EXAMPLE: "type":"int" // EXAMPLE: "type":"record" // EXAMPLE: "type":"somePreviouslyDefinedCustomTypeString" - return buildCodecForTypeDescribedByString(st, enclosingNamespace, v, schemaMap) + return cb.stringBuilder(st, enclosingNamespace, v, schemaMap, cb) case map[string]interface{}: - return buildCodecForTypeDescribedByMap(st, enclosingNamespace, v) + return cb.mapBuilder(st, enclosingNamespace, v, cb) case []interface{}: - return buildCodecForTypeDescribedBySlice(st, enclosingNamespace, v) + return cb.sliceBuilder(st, enclosingNamespace, v, cb) default: return nil, fmt.Errorf("type ought to be either string, map[string]interface{}, or []interface{}; received: %T", t) } } -func buildCodecForTypeDescribedByString(st map[string]*Codec, enclosingNamespace string, typeName string, schemaMap map[string]interface{}) (*Codec, error) { +func buildCodecForTypeDescribedByString(st map[string]*Codec, enclosingNamespace string, typeName string, schemaMap map[string]interface{}, cb *codecBuilder) (*Codec, error) { isLogicalType := false searchType := typeName // logicalType will be non-nil for those fields without a logicalType property set @@ -557,15 +583,15 @@ func buildCodecForTypeDescribedByString(st map[string]*Codec, enclosingNamespace // There are only a small handful of complex Avro data types. switch searchType { case "array": - return makeArrayCodec(st, enclosingNamespace, schemaMap) + return makeArrayCodec(st, enclosingNamespace, schemaMap, cb) case "enum": return makeEnumCodec(st, enclosingNamespace, schemaMap) case "fixed": return makeFixedCodec(st, enclosingNamespace, schemaMap) case "map": - return makeMapCodec(st, enclosingNamespace, schemaMap) + return makeMapCodec(st, enclosingNamespace, schemaMap, cb) case "record": - return makeRecordCodec(st, enclosingNamespace, schemaMap) + return makeRecordCodec(st, enclosingNamespace, schemaMap, cb) case "bytes.decimal": return makeDecimalBytesCodec(st, enclosingNamespace, schemaMap) case "fixed.decimal": @@ -573,7 +599,7 @@ func buildCodecForTypeDescribedByString(st map[string]*Codec, enclosingNamespace default: if isLogicalType { delete(schemaMap, "logicalType") - return buildCodecForTypeDescribedByString(st, enclosingNamespace, typeName, schemaMap) + return buildCodecForTypeDescribedByString(st, enclosingNamespace, typeName, schemaMap, cb) } return nil, fmt.Errorf("unknown type name: %q", searchType) } diff --git a/enum_test.go b/enum_test.go index 84dbdd8..1b0ede9 100644 --- a/enum_test.go +++ b/enum_test.go @@ -10,6 +10,8 @@ package goavro import ( + "encoding/json" + "fmt" "testing" ) @@ -59,3 +61,76 @@ func TestEnumTextCodec(t *testing.T) { testTextEncodeFail(t, `{"type":"enum","name":"e1","symbols":["alpha","bravo"]}`, "charlie", `cannot encode textual enum "e1": value ought to be member of symbols`) testTextDecodeFail(t, `{"type":"enum","name":"e1","symbols":["alpha","bravo"]}`, []byte(`"charlie"`), `cannot decode textual enum "e1": value ought to be member of symbols`) } + +func TestGH233(t *testing.T) { + // here's the fail case + // testTextCodecPass(t, `{"type":"record","name":"FooBar","namespace":"com.foo.bar","fields":[{"name":"event","type":["null",{"type":"enum","name":"FooBarEvent","symbols":["CREATED","UPDATED"]}]}]}`, map[string]interface{}{"event": Union("FooBarEvent", "CREATED")}, []byte(`{"event":{"FooBarEvent":"CREATED"}}`)) + // remove the namespace and it passes + testTextCodecPass(t, `{"type":"record","name":"FooBar","fields":[{"name":"event","type":["null",{"type":"enum","name":"FooBarEvent","symbols":["CREATED","UPDATED"]}]}]}`, map[string]interface{}{"event": Union("FooBarEvent", "CREATED")}, []byte(`{"event":{"FooBarEvent":"CREATED"}}`)) + // experiments + // the basic enum + testTextCodecPass(t, `{"type":"enum","name":"FooBarEvent","symbols":["CREATED","UPDATED"]}`, "CREATED", []byte(`"CREATED"`)) + // the basic enum with namespace + testTextCodecPass(t, `{"type":"enum","name":"FooBarEvent","namespace":"com.foo.bar","symbols":["CREATED","UPDATED"]}`, "CREATED", []byte(`"CREATED"`)) + // union with enum + testTextCodecPass(t, `["null",{"type":"enum","name":"FooBarEvent","symbols":["CREATED","UPDATED"]}]`, Union("FooBarEvent", "CREATED"), []byte(`{"FooBarEvent":"CREATED"}`)) + // FAIL: union with enum with namespace: cannot determine codec: "FooBarEvent" + // testTextCodecPass(t, `["null",{"type":"enum","name":"FooBarEvent","namespace":"com.foo.bar","symbols":["CREATED","UPDATED"]}]`, Union("FooBarEvent", "CREATED"), []byte(`{"FooBarEvent":"CREATED"}`)) + // conclusion, union is not handling namespaces correctly + // try union with record instead of enum (records and enums both have namespaces) + // get a basic record going + testTextCodecPass(t, `{"type":"record","name":"LongList","fields":[{"name":"next","type":["null","LongList"],"default":null}]}`, map[string]interface{}{"next": Union("LongList", map[string]interface{}{"next": nil})}, []byte(`{"next":{"LongList":{"next":null}}}`)) + // add a namespace to the record + // fails in the same way cannot determine codec: "LongList" for key: "next" + // testTextCodecPass(t, `{"type":"record","name":"LongList","namespace":"com.foo.bar","fields":[{"name":"next","type":["null","LongList"],"default":null}]}`, map[string]interface{}{"next": Union("LongList", map[string]interface{}{"next": nil})}, []byte(`{"next":{"LongList":{"next":null}}}`)) + // + // experiments on syntax solutions + // testTextCodecPass(t, `["null",{"type":"enum","name":"com.foo.bar.FooBarEvent","symbols":["CREATED","UPDATED"]}]`, Union("com.foo.bar.FooBarEvent", "CREATED"), []byte(`{"FooBarEvent":"CREATED"}`)) + // thie TestUnionMapRecordFitsInRecord tests binary from Native, but not native from textual + // that's where the error is happening + // if the namespace is specified in the incoming name it works + testTextCodecPass(t, `{"type":"record","name":"ns1.LongList","fields":[{"name":"next","type":["null","LongList"],"default":null}]}`, map[string]interface{}{"next": Union("ns1.LongList", map[string]interface{}{"next": nil})}, []byte(`{"next":{"ns1.LongList":{"next":null}}}`)) + + // try the failcase with the namespace specified on the input + testTextCodecPass(t, `{"type":"record","name":"FooBar","namespace":"com.foo.bar","fields":[{"name":"event","type":["null",{"type":"enum","name":"FooBarEvent","symbols":["CREATED","UPDATED"]}]}]}`, map[string]interface{}{"event": Union("com.foo.bar.FooBarEvent", "CREATED")}, []byte(`{"event":{"com.foo.bar.FooBarEvent":"CREATED"}}`)) + +} + +func ExampleCheckSolutionGH233() { + const avroSchema = ` + { + "type": "record", + "name": "FooBar", + "namespace": "com.foo.bar", + "fields": [ + { + "name": "event", + "type": [ + "null", + { + "type": "enum", + "name": "FooBarEvent", + "symbols": ["CREATED", "UPDATED"] + } + ] + } + ] + } + ` + codec, _ := NewCodec(avroSchema) + + const avroJson = `{"event":{"com.foo.bar.FooBarEvent":"CREATED"}}` + + native, _, err := codec.NativeFromTextual([]byte(avroJson)) + if err != nil { + panic(err) + } + + blob, err := json.Marshal(native) + if err != nil { + panic(err) + } + fmt.Println(string(blob)) + // Output: {"event":{"com.foo.bar.FooBarEvent":"CREATED"}} + +} diff --git a/examples/roundtrip/main.go b/examples/roundtrip/main.go new file mode 100644 index 0000000..19139da --- /dev/null +++ b/examples/roundtrip/main.go @@ -0,0 +1,176 @@ +package main + +import ( + "bufio" + bin "encoding/binary" + hex "encoding/hex" + "flag" + "fmt" + "io" + "io/ioutil" + "os" + + "github.com/linkedin/goavro/v2" +) + +// roundtrip is a tool for checking avro +// +// incoming data is assumed to be standard json +// incoming json is required to be one json object per line +// use `jq -c .` if you need to. get it into one line +// +// you can write out your avro in binary form and stop there +// which is useful for cases where you might want to send it off into other tools +// +// you can also do a roundtrip of decode/encode +// which allows you to see if your avro schema matches your expectations +// +// If you want to use an encoded schemaid then specify a schemid with -sid +// it will be encoded per a common standard (one null byte, 16 bytes of schemaid) +// Its NOT the standard SOE +// SOE should be added +// Probably OCF should be added too +// +// EXAMPLE +// +// kubectl get events -w -o json | jq -c . | ./roundtrip -sid aa6b1ca0e1ee2d885bfbc747f4a4011b -avsc event-schema.json ) -rt + +func MakeAvroHeader(schemaid string) (header []byte, err error) { + dst, err := hex.DecodeString(schemaid) + if err != nil { + return + } + header = append(header, byte(0)) + header = append(header, dst...) + return +} +func main() { + + var avsc = flag.String("avsc", "", "the avro schema") + var data = flag.String("data", "-", "(default stdin) the data that corresponds to the avro schema or error - ONE LINE PER DATA ITEM") + var schemaid = flag.String("sid", "", "the schemaid which is normally the md5hash of rht schema itself") + var roundtrip = flag.Bool("rt", false, "do full round trip to try to rebuild the original data string") + var xxd = flag.String("bin", "", "write out the binary data to this file - look at it with xxd if you want to") + var appendBin = flag.Bool("append", false, "append to the output binary file instead of trunc") + + flag.Parse() + + _avsc, err := ioutil.ReadFile(*avsc) + if err != nil { + panic(fmt.Sprintf("Failed to read avsc file:%s:error:%v:", *avsc, err)) + } + + codec, err := goavro.NewCodecForStandardJSON(string(_avsc)) + if err != nil { + panic(err) + } + + var _data io.Reader + if *data == "-" { + _data = os.Stdin + } else { + file, err := os.Open(*data) + if err != nil { + panic(fmt.Sprintf("Failed to open data file:%s:error:%v:", *data, err)) + } + _data = bufio.NewReader(file) + defer file.Close() + } + + binOut := struct { + file *os.File + do bool + }{} + if len(*xxd) > 0 { + bits := os.O_WRONLY | os.O_CREATE + if *appendBin { + bits |= os.O_APPEND + } else { + bits |= os.O_TRUNC + } + + binOut.file, err = os.OpenFile(*xxd, bits, 0600) + if err != nil { + panic(err) + } + defer binOut.file.Close() + binOut.do = true + } + + scanner := bufio.NewScanner(_data) + + for scanner.Scan() { + + dat := scanner.Text() + if len(dat) == 0 { + fmt.Println("skipping empty line") + continue + } + + fmt.Println("RT in") + fmt.Println(dat) + + textual := []byte(dat) + + fmt.Printf("encoding for schemaid:%s:\n", *schemaid) + avroNative, _, err := codec.NativeFromTextual(textual) + + if err != nil { + fmt.Println(dat) + panic(err) + } + + header, err := MakeAvroHeader(*schemaid) + if err != nil { + fmt.Println(string(textual)) + panic(err) + } + + avrobin, err := codec.BinaryFromNative(nil, avroNative) + if err != nil { + fmt.Println(dat) + panic(err) + } + + // trying to minimize operations within the loop + // so do only a quick boolean check here + if binOut.do { + for _, buf := range [][]byte{header, avrobin} { + err = bin.Write(binOut.file, bin.LittleEndian, buf) + if err != nil { + fmt.Println(dat) + panic(err) + } + } + } + + if *roundtrip { + // this will scramble the order + // since it makes new go maps + // when it takes the binary into native + rtnativeval, _, err := codec.NativeFromBinary(avrobin) + if err != nil { + fmt.Println(dat) + panic(err) + } + + // Convert native Go form to textual Avro data + textual, err = codec.TextualFromNative(nil, rtnativeval) + if err != nil { + fmt.Println(dat) + panic(err) + } + + fmt.Println("RT out") + fmt.Println(string(textual)) + } + + } + if err := scanner.Err(); err != nil { + fmt.Println("scanner error") + panic(err) + } + + fmt.Println("Done with loop - no more data") + +} diff --git a/map.go b/map.go index 8f50137..cfda161 100644 --- a/map.go +++ b/map.go @@ -17,13 +17,13 @@ import ( "reflect" ) -func makeMapCodec(st map[string]*Codec, namespace string, schemaMap map[string]interface{}) (*Codec, error) { +func makeMapCodec(st map[string]*Codec, namespace string, schemaMap map[string]interface{}, cb *codecBuilder) (*Codec, error) { // map type must have values valueSchema, ok := schemaMap["values"] if !ok { return nil, errors.New("Map ought to have values key") } - valueCodec, err := buildCodec(st, namespace, valueSchema) + valueCodec, err := buildCodec(st, namespace, valueSchema, cb) if err != nil { return nil, fmt.Errorf("Map values ought to be valid Avro type: %s", err) } diff --git a/record.go b/record.go index 63ad985..ed788ea 100644 --- a/record.go +++ b/record.go @@ -13,7 +13,7 @@ import ( "fmt" ) -func makeRecordCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}) (*Codec, error) { +func makeRecordCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}, cb *codecBuilder) (*Codec, error) { // NOTE: To support recursive data types, create the codec and register it // using the specified name, and fill in the codec functions later. c, err := registerNewCodec(st, schemaMap, enclosingNamespace) @@ -44,7 +44,7 @@ func makeRecordCodec(st map[string]*Codec, enclosingNamespace string, schemaMap // NOTE: field names are not registered in the symbol table, because // field names are not individually addressable codecs. - fieldCodec, err := buildCodecForTypeDescribedByMap(st, c.typeName.namespace, fieldSchemaMap) + fieldCodec, err := buildCodec(st, c.typeName.namespace, fieldSchemaMap, cb) if err != nil { return nil, fmt.Errorf("Record %q field %d ought to be valid Avro named type: %s", c.typeName, i+1, err) } diff --git a/text_test.go b/text_test.go index b4010f4..8f7adf9 100644 --- a/text_test.go +++ b/text_test.go @@ -61,7 +61,22 @@ func testTextDecodePass(t *testing.T, schema string, datum interface{}, encoded if err != nil { t.Fatalf("schema: %s; %s", schema, err) } - + toNativeAndCompare(t, schema, datum, encoded, codec) +} +func testJSONDecodePass(t *testing.T, schema string, datum interface{}, encoded []byte) { + t.Helper() + codec, err := NewCodecFrom(schema, &codecBuilder{ + buildCodecForTypeDescribedByMap, + buildCodecForTypeDescribedByString, + buildCodecForTypeDescribedBySliceJSON, + }) + if err != nil { + t.Fatalf("schema: %s; %s", schema, err) + } + toNativeAndCompare(t, schema, datum, encoded, codec) +} +func toNativeAndCompare(t *testing.T, schema string, datum interface{}, encoded []byte, codec *Codec) { + t.Helper() decoded, remaining, err := codec.NativeFromTextual(encoded) if err != nil { t.Fatalf("schema: %s; %s", schema, err) diff --git a/union.go b/union.go index 1a44cd6..c15b53d 100644 --- a/union.go +++ b/union.go @@ -11,10 +11,21 @@ package goavro import ( "bytes" + "encoding/json" "errors" "fmt" + "sort" ) +// codecInfo is a set of quick lookups it holds all the lookup info for the +// all the schemas we need to handle the list of types for this union +type codecInfo struct { + allowedTypes []string + codecFromIndex []*Codec + codecFromName map[string]*Codec + indexFromName map[string]int +} + // Union wraps a datum value in a map for encoding as a Union, as required by // Union encoder. // @@ -43,24 +54,23 @@ func Union(name string, datum interface{}) interface{} { return map[string]interface{}{name: datum} } -func buildCodecForTypeDescribedBySlice(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}) (*Codec, error) { - if len(schemaArray) == 0 { - return nil, errors.New("Union ought to have one or more members") - } - +// makeCodecInfo takes the schema array +// and builds some lookup indices +// returning a codecInfo +func makeCodecInfo(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (codecInfo, error) { allowedTypes := make([]string, len(schemaArray)) // used for error reporting when encoder receives invalid datum type codecFromIndex := make([]*Codec, len(schemaArray)) codecFromName := make(map[string]*Codec, len(schemaArray)) indexFromName := make(map[string]int, len(schemaArray)) for i, unionMemberSchema := range schemaArray { - unionMemberCodec, err := buildCodec(st, enclosingNamespace, unionMemberSchema) + unionMemberCodec, err := buildCodec(st, enclosingNamespace, unionMemberSchema, cb) if err != nil { - return nil, fmt.Errorf("Union item %d ought to be valid Avro type: %s", i+1, err) + return codecInfo{}, fmt.Errorf("Union item %d ought to be valid Avro type: %s", i+1, err) } fullName := unionMemberCodec.typeName.fullName if _, ok := indexFromName[fullName]; ok { - return nil, fmt.Errorf("Union item %d ought to be unique type: %s", i+1, unionMemberCodec.typeName) + return codecInfo{}, fmt.Errorf("Union item %d ought to be unique type: %s", i+1, unionMemberCodec.typeName) } allowedTypes[i] = fullName codecFromIndex[i] = unionMemberCodec @@ -68,112 +78,286 @@ func buildCodecForTypeDescribedBySlice(st map[string]*Codec, enclosingNamespace indexFromName[fullName] = i } - return &Codec{ - // NOTE: To support record field default values, union schema set to the - // type name of first member - // TODO: add/change to schemaCanonical below - schemaOriginal: codecFromIndex[0].typeName.fullName, + return codecInfo{ + allowedTypes: allowedTypes, + codecFromIndex: codecFromIndex, + codecFromName: codecFromName, + indexFromName: indexFromName, + }, nil + +} - typeName: &name{"union", nullNamespace}, - nativeFromBinary: func(buf []byte) (interface{}, []byte, error) { - var decoded interface{} - var err error +func nativeFromBinary(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) { - decoded, buf, err = longNativeFromBinary(buf) - if err != nil { - return nil, nil, err + return func(buf []byte) (interface{}, []byte, error) { + var decoded interface{} + var err error + + decoded, buf, err = longNativeFromBinary(buf) + if err != nil { + return nil, nil, err + } + index := decoded.(int64) // longDecoder always returns int64, so elide error checking + if index < 0 || index >= int64(len(cr.codecFromIndex)) { + return nil, nil, fmt.Errorf("cannot decode binary union: index ought to be between 0 and %d; read index: %d", len(cr.codecFromIndex)-1, index) + } + c := cr.codecFromIndex[index] + decoded, buf, err = c.nativeFromBinary(buf) + if err != nil { + return nil, nil, fmt.Errorf("cannot decode binary union item %d: %s", index+1, err) + } + if decoded == nil { + // do not wrap a nil value in a map + return nil, buf, nil + } + // Non-nil values are wrapped in a map with single key set to type name of value + return Union(cr.allowedTypes[index], decoded), buf, nil + } +} +func binaryFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) { + return func(buf []byte, datum interface{}) ([]byte, error) { + switch v := datum.(type) { + case nil: + index, ok := cr.indexFromName["null"] + if !ok { + return nil, fmt.Errorf("cannot encode binary union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum) } - index := decoded.(int64) // longDecoder always returns int64, so elide error checking - if index < 0 || index >= int64(len(codecFromIndex)) { - return nil, nil, fmt.Errorf("cannot decode binary union: index ought to be between 0 and %d; read index: %d", len(codecFromIndex)-1, index) + return longBinaryFromNative(buf, index) + case map[string]interface{}: + if len(v) != 1 { + return nil, fmt.Errorf("cannot encode binary union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum) } - c := codecFromIndex[index] - decoded, buf, err = c.nativeFromBinary(buf) - if err != nil { - return nil, nil, fmt.Errorf("cannot decode binary union item %d: %s", index+1, err) + // will execute exactly once + for key, value := range v { + index, ok := cr.indexFromName[key] + if !ok { + return nil, fmt.Errorf("cannot encode binary union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum) + } + c := cr.codecFromIndex[index] + buf, _ = longBinaryFromNative(buf, index) + return c.binaryFromNative(buf, value) } - if decoded == nil { - // do not wrap a nil value in a map - return nil, buf, nil + } + return nil, fmt.Errorf("cannot encode binary union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum) + } +} +func nativeFromTextual(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) { + return func(buf []byte) (interface{}, []byte, error) { + if len(buf) >= 4 && bytes.Equal(buf[:4], []byte("null")) { + if _, ok := cr.indexFromName["null"]; ok { + return nil, buf[4:], nil + } + } + + var datum interface{} + var err error + datum, buf, err = genericMapTextDecoder(buf, nil, cr.codecFromName) + if err != nil { + return nil, nil, fmt.Errorf("cannot decode textual union: %s", err) + } + + return datum, buf, nil + } +} +func textualFromNative(cr *codecInfo) func(buf []byte, datum interface{}) ([]byte, error) { + return func(buf []byte, datum interface{}) ([]byte, error) { + switch v := datum.(type) { + case nil: + _, ok := cr.indexFromName["null"] + if !ok { + return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum) + } + return append(buf, "null"...), nil + case map[string]interface{}: + if len(v) != 1 { + return nil, fmt.Errorf("cannot encode textual union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum) } - // Non-nil values are wrapped in a map with single key set to type name of value - return Union(allowedTypes[index], decoded), buf, nil - }, - binaryFromNative: func(buf []byte, datum interface{}) ([]byte, error) { - switch v := datum.(type) { - case nil: - index, ok := indexFromName["null"] + // will execute exactly once + for key, value := range v { + index, ok := cr.indexFromName[key] if !ok { - return nil, fmt.Errorf("cannot encode binary union: no member schema types support datum: allowed types: %v; received: %T", allowedTypes, datum) + return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", cr.allowedTypes, datum) } - return longBinaryFromNative(buf, index) - case map[string]interface{}: - if len(v) != 1 { - return nil, fmt.Errorf("cannot encode binary union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", allowedTypes, datum) + buf = append(buf, '{') + var err error + buf, err = stringTextualFromNative(buf, key) + if err != nil { + return nil, fmt.Errorf("cannot encode textual union: %s", err) } - // will execute exactly once - for key, value := range v { - index, ok := indexFromName[key] - if !ok { - return nil, fmt.Errorf("cannot encode binary union: no member schema types support datum: allowed types: %v; received: %T", allowedTypes, datum) - } - c := codecFromIndex[index] - buf, _ = longBinaryFromNative(buf, index) - return c.binaryFromNative(buf, value) + buf = append(buf, ':') + c := cr.codecFromIndex[index] + buf, err = c.textualFromNative(buf, value) + if err != nil { + return nil, fmt.Errorf("cannot encode textual union: %s", err) } + return append(buf, '}'), nil } - return nil, fmt.Errorf("cannot encode binary union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", allowedTypes, datum) - }, - nativeFromTextual: func(buf []byte) (interface{}, []byte, error) { + } + return nil, fmt.Errorf("cannot encode textual union: non-nil values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", cr.allowedTypes, datum) + } +} +func buildCodecForTypeDescribedBySlice(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) { + if len(schemaArray) == 0 { + return nil, errors.New("Union ought to have one or more members") + } + + cr, err := makeCodecInfo(st, enclosingNamespace, schemaArray, cb) + if err != nil { + return nil, err + } + + rv := &Codec{ + // NOTE: To support record field default values, union schema set to the + // type name of first member + // TODO: add/change to schemaCanonical below + schemaOriginal: cr.codecFromIndex[0].typeName.fullName, + + typeName: &name{"union", nullNamespace}, + nativeFromBinary: nativeFromBinary(&cr), + binaryFromNative: binaryFromNative(&cr), + nativeFromTextual: nativeFromTextual(&cr), + textualFromNative: textualFromNative(&cr), + } + return rv, nil +} + +// Standard JSON +// +// The default avro library supports a json that would result from your data into json +// instead of serializing it into binary +// +// JSON in the wild differs from that in one critical way - unions +// the avro spec requires unions to have their type indicated +// which means every value that is of a union type +// is actually sent as a small map {"string", "some string"} +// instead of simply as the value itself, which is the way of wild JSON +// https://avro.apache.org/docs/current/spec.html#json_encoding +// +// In order to use this to avro encode standard json the unions have to be rewritten +// so the can encode into unions as expected by the avro schema +// +// so the technique is to read in the json in the usual way +// when a union type is found, read the next json object +// try to figure out if it fits into any of the types +// that are specified for the union per the supplied schema +// if so, then wrap the value into a map and return the expected Union +// +// the json is morphed on the read side +// and then it will remain avro-json object +// avro data is not serialized back into standard json +// the data goes to avro-json and stays that way +func buildCodecForTypeDescribedBySliceJSON(st map[string]*Codec, enclosingNamespace string, schemaArray []interface{}, cb *codecBuilder) (*Codec, error) { + if len(schemaArray) == 0 { + return nil, errors.New("Union ought to have one or more members") + } + + cr, err := makeCodecInfo(st, enclosingNamespace, schemaArray, cb) + if err != nil { + return nil, err + } + + rv := &Codec{ + // NOTE: To support record field default values, union schema set to the + // type name of first member + // TODO: add/change to schemaCanonical below + schemaOriginal: cr.codecFromIndex[0].typeName.fullName, + + typeName: &name{"union", nullNamespace}, + nativeFromBinary: nativeFromBinary(&cr), + binaryFromNative: binaryFromNative(&cr), + nativeFromTextual: nativeAvroFromTextualJson(&cr), + textualFromNative: textualFromNative(&cr), + } + return rv, nil +} + +func checkAll(allowedTypes []string, cr *codecInfo, buf []byte) (interface{}, []byte, error) { + for _, name := range cr.allowedTypes { + if name == "null" { + // skip null since we know we already got type float64 + continue + } + theCodec, ok := cr.codecFromName[name] + if !ok { + continue + } + rv, rb, err := theCodec.NativeFromTextual(buf) + if err != nil { + continue + } + return map[string]interface{}{name: rv}, rb, nil + } + return nil, buf, fmt.Errorf("could not decode any json data in input %v", string(buf)) +} +func nativeAvroFromTextualJson(cr *codecInfo) func(buf []byte) (interface{}, []byte, error) { + return func(buf []byte) (interface{}, []byte, error) { + + reader := bytes.NewReader(buf) + dec := json.NewDecoder(reader) + var m interface{} + + // i should be able to grab the next json "value" with decoder.Decode() + // https://pkg.go.dev/encoding/json#Decoder.Decode + // that dec.More() loop will give the next + // whatever then dec.Decode(&m) + // if m is interface{} + // it goes one legit json object at a time like this + // json.Delim: [ + // Q:map[string]interface {}: map[Name:Ed Text:Knock knock.] + // Q:map[string]interface {}: map[Name:Sam Text:Who's there?] + // Q:map[string]interface {}: map[Name:Ed Text:Go fmt.] + // Q:map[string]interface {}: map[Name:Sam Text:Go fmt who?] + // Q:map[string]interface {}: map[Name:Ed Text:Go fmt yourself!] + // string: eewew + // bottom:json.Delim: ] + // + // so right here, grab whatever this object is + // grab the object specified as the value + // and try to figure out what it is and handle it + err := dec.Decode(&m) + if err != nil { + return nil, buf, err + } + + allowedTypes := cr.allowedTypes + + switch m.(type) { + case nil: if len(buf) >= 4 && bytes.Equal(buf[:4], []byte("null")) { - if _, ok := indexFromName["null"]; ok { + if _, ok := cr.codecFromName["null"]; ok { return nil, buf[4:], nil } } + case float64: + // dec.Decode turns them all into float64 + // avro spec knows about int, long (variable length zig-zag) + // and then float and double (32 bits, 64 bits) + // https://avro.apache.org/docs/current/spec.html#binary_encode_primitive + // - var datum interface{} - var err error - datum, buf, err = genericMapTextDecoder(buf, nil, codecFromName) - if err != nil { - return nil, nil, fmt.Errorf("cannot decode textual union: %s", err) - } + // double + // doubleNativeFromTextual + // float + // floatNativeFromTextual + // long + // longNativeFromTextual + // int + // intNativeFromTextual - return datum, buf, nil - }, - textualFromNative: func(buf []byte, datum interface{}) ([]byte, error) { - switch v := datum.(type) { - case nil: - _, ok := indexFromName["null"] - if !ok { - return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", allowedTypes, datum) - } - return append(buf, "null"...), nil - case map[string]interface{}: - if len(v) != 1 { - return nil, fmt.Errorf("cannot encode textual union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", allowedTypes, datum) - } - // will execute exactly once - for key, value := range v { - index, ok := indexFromName[key] - if !ok { - return nil, fmt.Errorf("cannot encode textual union: no member schema types support datum: allowed types: %v; received: %T", allowedTypes, datum) - } - buf = append(buf, '{') - var err error - buf, err = stringTextualFromNative(buf, key) - if err != nil { - return nil, fmt.Errorf("cannot encode textual union: %s", err) - } - buf = append(buf, ':') - c := codecFromIndex[index] - buf, err = c.textualFromNative(buf, value) - if err != nil { - return nil, fmt.Errorf("cannot encode textual union: %s", err) - } - return append(buf, '}'), nil - } - } - return nil, fmt.Errorf("cannot encode textual union: non-nil values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: %v; received: %T", allowedTypes, datum) - }, - }, nil + // sorted so it would be + // double, float, int, long + // that makes the priorities right by chance + sort.Strings(cr.allowedTypes) + + case map[string]interface{}: + + // try to decode it as a map + // because a map should fail faster than a record + // if that fails assume record and return it + sort.Strings(cr.allowedTypes) + } + + return checkAll(allowedTypes, cr, buf) + + } } diff --git a/union_test.go b/union_test.go index b3b6e85..54db374 100644 --- a/union_test.go +++ b/union_test.go @@ -220,3 +220,118 @@ func ExampleUnion3() { fmt.Println(value) // Output: decoded string: NaN } + +func ExampleJSONUnion() { + codec, err := NewCodec(`["null","string","int"]`) + if err != nil { + fmt.Println(err) + } + buf, err := codec.TextualFromNative(nil, Union("string", "some string")) + if err != nil { + fmt.Println(err) + } + fmt.Println(string(buf)) + // Output: {"string":"some string"} +} + +// +// The following examples show the way to put a new codec into use +// Currently the only new codec is ont that supports standard json +// which does not indicate unions in any way +// so standard json data needs to be guided into avro unions + +// show how to use the default codec via the NewCodecFrom mechanism +func ExampleCustomCodec() { + codec, err := NewCodecFrom(`"string"`, &codecBuilder{ + buildCodecForTypeDescribedByMap, + buildCodecForTypeDescribedByString, + buildCodecForTypeDescribedBySlice, + }) + if err != nil { + fmt.Println(err) + } + buf, err := codec.TextualFromNative(nil, "some string 22") + if err != nil { + fmt.Println(err) + } + fmt.Println(string(buf)) + // Output: "some string 22" +} + +// Use the standard JSON codec instead +func ExampleJSONStringToTextual() { + codec, err := NewCodecFrom(`["null","string","int"]`, &codecBuilder{ + buildCodecForTypeDescribedByMap, + buildCodecForTypeDescribedByString, + buildCodecForTypeDescribedBySliceJSON, + }) + if err != nil { + fmt.Println(err) + } + buf, err := codec.TextualFromNative(nil, Union("string", "some string")) + if err != nil { + fmt.Println(err) + } + fmt.Println(string(buf)) + // Output: {"string":"some string"} +} + +func ExampleJSONStringToNative() { + codec, err := NewCodecFrom(`["null","string","int"]`, &codecBuilder{ + buildCodecForTypeDescribedByMap, + buildCodecForTypeDescribedByString, + buildCodecForTypeDescribedBySliceJSON, + }) + if err != nil { + fmt.Println(err) + } + // send in a legit json string + t, _, err := codec.NativeFromTextual([]byte("\"some string one\"")) + if err != nil { + fmt.Println(err) + } + // see it parse into a map like the avro encoder does + o, ok := t.(map[string]interface{}) + if !ok { + fmt.Printf("its a %T not a map[string]interface{}", t) + } + + // pull out the string to show its all good + _v := o["string"] + v, ok := _v.(string) + fmt.Println(v) + // Output: some string one +} + +func TestUnionJSON(t *testing.T) { + testJSONDecodePass(t, `["null","int"]`, nil, []byte("null")) + testJSONDecodePass(t, `["null","int","long"]`, Union("int", 3), []byte(`3`)) + testJSONDecodePass(t, `["null","long","int"]`, Union("int", 3), []byte(`3`)) + testJSONDecodePass(t, `["null","int","long"]`, Union("long", 333333333333333), []byte(`333333333333333`)) + testJSONDecodePass(t, `["null","long","int"]`, Union("long", 333333333333333), []byte(`333333333333333`)) + testJSONDecodePass(t, `["null","float","int","long"]`, Union("float", 6.77), []byte(`6.77`)) + testJSONDecodePass(t, `["null","int","float","long"]`, Union("float", 6.77), []byte(`6.77`)) + testJSONDecodePass(t, `["null","double","int","long"]`, Union("double", 6.77), []byte(`6.77`)) + testJSONDecodePass(t, `["null","int","float","double","long"]`, Union("double", 6.77), []byte(`6.77`)) + testJSONDecodePass(t, `["null",{"type":"array","items":"int"}]`, Union("array", []interface{}{1, 2}), []byte(`[1,2]`)) + testJSONDecodePass(t, `["null",{"type":"map","values":"int"}]`, Union("map", map[string]interface{}{"k1": 13}), []byte(`{"k1":13}`)) + testJSONDecodePass(t, `["null",{"name":"r1","type":"record","fields":[{"name":"field1","type":"string"},{"name":"field2","type":"string"}]}]`, Union("r1", map[string]interface{}{"field1": "value1", "field2": "value2"}), []byte(`{"field1": "value1", "field2": "value2"}`)) + testJSONDecodePass(t, `["null","boolean"]`, Union("boolean", true), []byte(`true`)) + testJSONDecodePass(t, `["null","boolean"]`, Union("boolean", false), []byte(`false`)) + testJSONDecodePass(t, `["null",{"type":"enum","name":"e1","symbols":["alpha","bravo"]}]`, Union("e1", "bravo"), []byte(`"bravo"`)) + testJSONDecodePass(t, `["null", "bytes"]`, Union("bytes", []byte("")), []byte("\"\"")) + testJSONDecodePass(t, `["null", "bytes", "string"]`, Union("bytes", []byte("")), []byte("\"\"")) + testJSONDecodePass(t, `["null", "string", "bytes"]`, Union("string", "value1"), []byte(`"value1"`)) + testJSONDecodePass(t, `["null", {"type":"enum","name":"e1","symbols":["alpha","bravo"]}, "string"]`, Union("e1", "bravo"), []byte(`"bravo"`)) + testJSONDecodePass(t, `["null", {"type":"fixed","name":"f1","size":4}]`, Union("f1", []byte(`abcd`)), []byte(`"abcd"`)) + testJSONDecodePass(t, `"string"`, "abcd", []byte(`"abcd"`)) + testJSONDecodePass(t, `{"type":"record","name":"kubeEvents","fields":[{"name":"field1","type":"string","default":""}]}`, map[string]interface{}{"field1": "value1"}, []byte(`{"field1":"value1"}`)) + testJSONDecodePass(t, `{"type":"record","name":"kubeEvents","fields":[{"name":"field1","type":"string","default":""},{"name":"field2","type":"string"}]}`, map[string]interface{}{"field1": "", "field2": "deef"}, []byte(`{"field2": "deef"}`)) + testJSONDecodePass(t, `{"type":"record","name":"kubeEvents","fields":[{"name":"field1","type":["string","null"],"default":""}]}`, map[string]interface{}{"field1": Union("string", "value1")}, []byte(`{"field1":"value1"}`)) + testJSONDecodePass(t, `{"type":"record","name":"kubeEvents","fields":[{"name":"field1","type":["string","null"],"default":""}]}`, map[string]interface{}{"field1": nil}, []byte(`{"field1":null}`)) + // union of null which has minimal syntax + testJSONDecodePass(t, `{"type":"record","name":"LongList","fields":[{"name":"next","type":["null","LongList"],"default":null}]}`, map[string]interface{}{"next": nil}, []byte(`{"next": null}`)) + // record containing union of record (recursive record) + testJSONDecodePass(t, `{"type":"record","name":"LongList","fields":[{"name":"next","type":["null","LongList"],"default":null}]}`, map[string]interface{}{"next": Union("LongList", map[string]interface{}{"next": nil})}, []byte(`{"next":{"next":null}}`)) + testJSONDecodePass(t, `{"type":"record","name":"LongList","fields":[{"name":"next","type":["null","LongList"],"default":null}]}`, map[string]interface{}{"next": Union("LongList", map[string]interface{}{"next": Union("LongList", map[string]interface{}{"next": nil})})}, []byte(`{"next":{"next":{"next":null}}}`)) +}