Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to decode AVRO message: cannot decode binary record #1917

Open
robinhuiser opened this issue May 20, 2023 · 6 comments
Open

Unable to decode AVRO message: cannot decode binary record #1917

robinhuiser opened this issue May 20, 2023 · 6 comments
Labels
enhancement processors Any tasks or issues relating specifically to processors

Comments

@robinhuiser
Copy link

robinhuiser commented May 20, 2023

Given the Avro schema schema.avsc:

{"namespace": "com.mycompany.myproduct.avro.message.output",
 "type": "record",
  "name": "FeeDetail",
  "fields": [
    {"name" : "FEEIDENTIFIER_", "aliases" : ["feeIdentifier"], "type" : "string"},
    {"name" : "BILLNUMBER_", "aliases" : ["billNumber"], "type" : ["null", "string"], "default" : null}
    ]
}

and the benthos_config:

input:
  label: "generate_data"
  generate:
    count: 1
    mapping: |
      #!blobl
      root = [ "2000164",                                 # FEEIDENTIFIER_
               "000000010047"                             # BILLNUMBER_
              ]
pipeline:
  threads: -1
  processors:
    - label: "map_fields"
      mapping: |
        #!blobl
        root.FEEIDENTIFIER_ = this.index(0)
        root.BILLNUMBER_ = this.index(1)

    - label: "convert_to_avro"
      avro:
        operator: from_json
        encoding: binary
        schema_path: "file://${PWD}/schema.avsc"
output:
  label: "push_to_kafka"
  kafka:
    addresses: 
      - "${KAFKA_URL:localhost:9092}"
    topic: "${KAFKA_TOPIC:fee_topic}"

and consuming the topic using plumber:

plumber read kafka \
    --topics=fee_topic \
    --decode-type=avro --avro-schema-file=$(PWD)/schema.avsc \
    --pretty --read-offset=0 -f

an error is printed by plumber when BILLNUMBER_ has a string value assigned:

>> ERROR: unable to decode AVRO message: cannot decode binary record "com.mycompany.myproduct.avro.message.output.FeeDetail" field "FEEIDENTIFIER_": cannot decode binary string: cannot decode binary bytes: negative size: -62
>> ERROR: unable to decode message payload for backend 'kafka': unable to decode avro message: cannot decode binary record "com.mycompany.myproduct.avro.message.output.FeeDetail" field "FEEIDENTIFIER_": cannot decode binary string: cannot decode binary bytes: negative size: -62

Commenting out the line in Benthos config root.BILLNUMBER_ = this.index(1) (hence the value is now null) results into a valid Avro encoded message:

------------- [Count: 3 Received at: 2023-05-22T10:39:38+01:00] -------------------

+------------------------+--------------------------------------------------------------+
| Key                    |                                                         NONE |
| topic                  |                                                    fee_topic |
| Offset                 |                                                           17 |
| Partition              |                                                            0 |
| Header(s)              |                                                         NONE |
+------------------------+--------------------------------------------------------------+

{
  "BILLNUMBER_": null,
  "FEEIDENTIFIER_": "2000164"
}

It seems there could be a bug in Benthos (or dependent library handling Avro) when:

  • the Avro schema defines multiple types for a field
  • the default value is not matching the field type of the data received
# Using benthos Docker (jeffail/benthos:4.13) as well as local CLI:
$ benthos -v                 
Version: 4.13.0
Date: 2023-03-15T15:24:54Z
@robinhuiser
Copy link
Author

Test case: benthos-1917.tgz

Please make sure you have plumber and Docker Compose installed.

# Terminal 1
$ docker-compose up kafka -d
$ plumber read kafka \  
    --topics=avro_issue \
    --decode-type=avro --avro-schema-file=$(PWD)/schema_avro_issue.avsc \
    --pretty --read-offset=0 -f

# Terminal 2
# This works (BILLNUMBER_ has no value assigned)
$ benthos -c ./config_avro_issue_OK.yaml

# This does not work (BILLNUMBER_ is assigned a string)
$ benthos -c ./config_avro_issue_NOK.yaml

@Jeffail
Copy link
Collaborator

Jeffail commented May 22, 2023

Hey @robinhuiser, there's a bit more info about avro-json versus actual json here: https://www.benthos.dev/docs/components/processors/schema_registry_decode#avro-json-format, we've expanded the schema_registry_decode and _encode variant with the ability to toggle between standard JSON and avro JSON, but we also need to add those options to the avro processor in order to support what you're going for.

@Jeffail Jeffail added enhancement processors Any tasks or issues relating specifically to processors labels May 22, 2023
@robinhuiser
Copy link
Author

Hi @Jeffail - thanks for the quick follow up!

I will check with the cloud team in case we can leverage the Confluent Schema Registry in production so I can use the schema_registry_encode processor instead.

There is a bug mentioned in the processor - from what I understand this applies to logicalType. Do I understand correctly from the linked goavro issue if I use binary format I should be good to go (hence it supports both logical as non-logical types)?

@mihaitodor
Copy link
Collaborator

There is a bug mentioned in the processor - from what I understand this applies to logicalType. Do I understand correctly from the linked goavro issue if I use binary format I should be good to go (hence it supports both logical as non-logical types)?

It should work, yeah. I'm only aware of this inconsistency in Goavro when decoding binary avro with logical types, which applies to schema_registry_decode. Please report back if you see any other issues.

@robinhuiser
Copy link
Author

I got it to work - thanks for the support @Jeffail @mihaitodor .

Silly question - how do I enable binary encoding for this processor (as this was an option for the avro processor)?

@mihaitodor
Copy link
Collaborator

Not sure I get the question @robinhuiser, but there's a matching schema_registry_encode processor which encodes messages to binary AVRO. Hope that helps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement processors Any tasks or issues relating specifically to processors
Projects
None yet
Development

No branches or pull requests

3 participants