-
-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rename serializer methods #133
Comments
@marcosschroh, what do you think of a @functools.singledispatchmethod
def serialize(self, schema_or_id: typing.Any, record: dict, *_) -> bytes:
...
@serialize.register
def _serialize_by_id(self, schema_id: int, record: dict) -> bytes:
"""
Encode a record with a given schema id. The record must
be a python dictionary.
Args:
schema_id (int): integer ID
record (dict): An object to serialize
Returns:
func: decoder function
"""
# use slow avro
if schema_id not in self.id_to_writers:
try:
schema = self.schemaregistry_client.get_by_id(schema_id)
if not schema:
raise SerializerError("Schema does not exist")
self.id_to_writers[schema_id] = self._get_encoder_func(schema)
except ClientError:
exc_type, exc_value, exc_traceback = sys.exc_info()
raise SerializerError(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))
writer = self.id_to_writers[schema_id]
with ContextStringIO() as outf:
# Write the magic byte and schema ID in network byte order (big endian)
outf.write(struct.pack(">bI", MAGIC_BYTE, schema_id))
# write the record to the rest of the buffer
writer(record, outf)
return outf.getvalue()
@serialize.register
def _serialize_with_schema_and_subject(self, schema: BaseSchema, record: dict, subject: str) -> bytes:
"""
Given a parsed avro schema, encode a record for the given subject.
The record is expected to be a dictionary.
The schema is registered with the subject of 'topic-value'
Args:
subject (str): Subject name
schema (avro.schema.RecordSchema): Avro Schema
record (dict): An object to serialize
Returns:
bytes: Encoded record with schema ID as bytes
"""
# Try to register the schema
schema_id = self.schemaregistry_client.register(subject, schema, schema_type=self._serializer_schema_type)
# cache writer
if not self.id_to_writers.get(schema_id):
self.id_to_writers[schema_id] = self._get_encoder_func(schema)
return self._serialize_by_id(schema_id, record) |
@bboggs-streambit I like the idea. The |
@marcosschroh, I've just about got an initial PR ready. Are you wanting to just start a new release branch for v3? Or do you want to incrementally release the new stuff alongside the existing things, maintaining backwards compatibility until v3 is ready? |
@bboggs-streambit let's create a |
Sounds good. I don't think I have perms to create the branch on this side. My PR's ready as soon as the v3 target branch is created. |
@bboggs-streambit I have created the new branch |
decode_message --> deserialize
encode_record_with_schema_id --> serialize. Potentially, only 1 serialization method should be available that could receive either a schema_id or a Schema object.
The text was updated successfully, but these errors were encountered: