Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MessageSerializer.encode... functions not serializing proper Avro #87

Open
xgamer4 opened this issue Dec 3, 2020 · 4 comments
Open
Labels
bug Something isn't working help wanted Extra attention is needed

Comments

@xgamer4
Copy link
Contributor

xgamer4 commented Dec 3, 2020

Describe the bug
The MessageSerializer.encode... functions don't output proper Avro. When serializing a record using MessageSerializer/FaustSerializer the binary doesn't conform to proper Avro, and isn't parsed properly when read back from something that isn't a MessageSerializer instance.

This is particularly problematic when using registry-schema with Faust, as the recommended Faust integration examples use MessageSerializer. The expected behavior in such a case is that MessageSerializer will take the record and serialize it to Avro, which will then get passed to whatever Faust has for its backend (likely Kafka). Because it's not actually proper Avro, this means that any other program not backed by a MessageSerializer will try to read in the message and parse it (because from all appearances it should be proper Avro backed by a schema in a schema-registry) and fail.

To Reproduce

schema = {"type": "record",
          "name": "Example",
          "fields": [{"name": "ID", "type": "long"}],
          }
schema = AvroSchema(schema)
schema_client = SchemaRegistryClient(url='http://schema-registry:8081')
serializer = MessageSerializer(schema_client)
maybe_avro = serializer.encode_record_with_schema('example', schema, {'ID': 42})
fastavro_deserialized = fastavro.schemaless_reader(io.BytesIO(maybe_avro), schema.schema)
serializer_deserialized = serializer.decode_message(maybe_avro)
print(fastavro_deserialized)
print(serializer_deserialized)
exit()

this outputs:

{'ID': 0}
{'ID': 42}

Expected behavior
The above code should output:

{'ID': 42}
{'ID': 42}

Thoughts
So it looks like the reason this happens is because MessageSerializer appends the schema id to the front of the binary output (message_serializer.py, lines 103 on):

        with ContextStringIO() as outf:
            # Write the magic byte and schema ID in network byte order (big endian)
            outf.write(struct.pack(">bI", MAGIC_BYTE, schema_id))

            # write the record to the rest of the buffer
            writer(record, outf)

            return outf.getvalue()

This looks like it's done to keep track of the schema id for deserialization, but it breaks any other ways to deserialize the message.

I don't have any particularly compelling proposals for the bug fix unfortunately. Avro defines a header in the container object, and the proper/ideal way to handle this would be to include the id in the header (or just use the predefined avro.schema key to store the schema), and it looks like fastavro.writer supports header objects, but that would require transitioning the serialization logic to use fastavro.writer instead of fastavro.schemaless_writer. Probably simple... maybe... but I'm not sure about how reading it works or what else would need changed.

Alternatively if this break is intended, the Faust Integration document should be updated to note that this generates out-of-spec Avro that must be read by a MessageSerializer or manually modified to retrieve the proper Avro.

@xgamer4
Copy link
Contributor Author

xgamer4 commented Dec 9, 2020

Apparently Debezium also has the same behavior with a different tech stack, which makes me think this behavior is intentional - in which case it's not a bug.

My assumption - though I definitely don't know for sure - is that it's a form of single-object encoding to preserve a reference to the schema id in the registry. In which case, the specification does define a prepended header - the header is just defined wildy-differently:

"
Single object encoding specification

Single Avro objects are encoded as follows:

  • A two-byte marker, C3 01, to show that the message is Avro and uses this single-record format (version 1).
  • The 8-byte little-endian CRC-64-AVRO fingerprint of the object's schema
  • The Avro object encoded using Avro's binary encoding

Implementations use the 2-byte marker to determine whether a payload is Avro. This check helps avoid expensive lookups that resolve the schema from a fingerprint, when the message is not an encoded Avro payload.
"

Whereas this library (and I'm assuming whatever backs Debezium) has the format (if I'm understanding this correctly) of 00 + the schema id in 4-bytes, big-endian. I'm gonna guess this deviation exists because stamping the schema id in the message itself is both easy to work with, easy to understand, allows you to go from message to schema, and accomplishes the same goals as the official spec without needing to put in the extra work to fingerprint a schema for less functionality.

Am I correct here? Is that what's happening?

If so, with two different implementations deviating from the spec in the same way, this doesn't make sense as a bug. If anyone has any links to documentation or anything describing why this is done, that'd be appreciated.

Then again if this is correct, this is what I'd propose:

  • Updating the documentation of the FaustSerializer to make it clear it serializes in the form 00 + schema ID as 4-byte, big-endian int + message (or the proper format) so deserializing the message outside the context of the FaustSerializer may require removing/skipping the first 5 bytes

  • [Less Important?/Nice-to-have] Argument to serialize with the spec-defined prepended header.

  • [Less Important/Nice-to-have] Argument to serialize without the prepended form

@xgamer4
Copy link
Contributor Author

xgamer4 commented Dec 10, 2020

Just noting that I found the documentation describing the format - it's the Confluent Platform's Wire Format.

@marcosschroh marcosschroh added the bug Something isn't working label Feb 12, 2021
@marcosschroh marcosschroh added the help wanted Extra attention is needed label Apr 13, 2021
@marcosschroh
Copy link
Owner

Hi @xgamer4

Sorry for the delay. As you said it is not a bug.

When you mention the header, are you referring to the kafka headers? Because what I would like to do in the future version is adding the schema-id and the serialization-type in the kafka headers so no-confluent consumers will have more context about it. Ideally, I would like to have a more robust serializer that could interpreted Confluent and No-Confluent events

@xgamer4
Copy link
Contributor Author

xgamer4 commented Jun 9, 2022

@marcosschroh
Oh man, 1.5 years and a new job later, it's been a bit. Let's see what I remember and where I was thinking about going with it.

So I think in the original comment I was referring specifically to the Apache Avro specification. They define a container object that includes a header here, and parts of that header can refer to the schema used for the message. That would be one standards-compliant way to attach the schema.

There's also the single-object encoding I mentioned in my second comment, which is a standards-compliant way of doing what Confluent is doing.

I hadn't considered sending them in the Kafka headers - I think at the time I was mostly treating all the Avro stuff as distinct from the Kafka implementation. It would definitely work though.

I'm really not sure what an easy solution would be. There's an obvious benefit to using the Confluent wire format, and an also obvious problem with having the messages not be out-of-the-box deserializable by fastavro and other serializers. But it's also an easy fix (chop off a few leading bits), and the information is valuable (the reference to the schema to deserialize the message), so as long as it's known that the raw message will need fixed to work, maybe it's not that big a deal to be technically non-compliant?

At the very least writing a serializer that can read multiple formats should be pretty straightforward. If leading byte is 0 and next 4 bytes are reasonably interpretable as an int, Confluent. Else assume there's nothing leading and hope you have the schema. There's a few other options that could be added (the single object encoding for example), but those would require more dev work I imagine (single object encoding would require being able to look up a schema by fingerprint for example).

Hopefully that was somehow helpful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

2 participants