Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename serializer methods #133

Open
marcosschroh opened this issue May 19, 2022 · 6 comments
Open

Rename serializer methods #133

marcosschroh opened this issue May 19, 2022 · 6 comments
Labels
enhancement New feature or request good first issue Good for newcomers
Milestone

Comments

@marcosschroh
Copy link
Owner

decode_message --> deserialize

encode_record_with_schema_id --> serialize. Potentially, only 1 serialization method should be available that could receive either a schema_id or a Schema object.

@marcosschroh marcosschroh added enhancement New feature or request good first issue Good for newcomers labels May 19, 2022
@marcosschroh marcosschroh added this to the Version 3 milestone May 19, 2022
@bboggs-streambit
Copy link
Contributor

@marcosschroh, what do you think of a @singledispatchmethod implementation for serialize? The actual method implementations could basically stay the same, you'd just have a single entry point to either one depending on the type of the first argument.
something like this:

    @functools.singledispatchmethod
    def serialize(self, schema_or_id: typing.Any, record: dict, *_) -> bytes:
        ...

    @serialize.register
    def _serialize_by_id(self, schema_id: int, record: dict) -> bytes:
        """
               Encode a record with a given schema id.  The record must
               be a python dictionary.
               Args:
                   schema_id (int): integer ID
                   record (dict): An object to serialize
               Returns:
                   func: decoder function
               """
        # use slow avro
        if schema_id not in self.id_to_writers:
            try:
                schema = self.schemaregistry_client.get_by_id(schema_id)
                if not schema:
                    raise SerializerError("Schema does not exist")
                self.id_to_writers[schema_id] = self._get_encoder_func(schema)
            except ClientError:
                exc_type, exc_value, exc_traceback = sys.exc_info()
                raise SerializerError(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))

        writer = self.id_to_writers[schema_id]
        with ContextStringIO() as outf:
            # Write the magic byte and schema ID in network byte order (big endian)
            outf.write(struct.pack(">bI", MAGIC_BYTE, schema_id))

            # write the record to the rest of the buffer
            writer(record, outf)

            return outf.getvalue()

    @serialize.register
    def _serialize_with_schema_and_subject(self, schema: BaseSchema, record: dict, subject: str) -> bytes:
        """
        Given a parsed avro schema, encode a record for the given subject.
        The record is expected to be a dictionary.
        The schema is registered with the subject of 'topic-value'
        Args:
            subject (str): Subject name
            schema (avro.schema.RecordSchema): Avro Schema
            record (dict): An object to serialize
        Returns:
            bytes: Encoded record with schema ID as bytes
        """
        # Try to register the schema
        schema_id = self.schemaregistry_client.register(subject, schema, schema_type=self._serializer_schema_type)

        # cache writer
        if not self.id_to_writers.get(schema_id):
            self.id_to_writers[schema_id] = self._get_encoder_func(schema)

        return self._serialize_by_id(schema_id, record)

@marcosschroh
Copy link
Owner Author

@bboggs-streambit I like the idea. The decode_message should be rename as well.

@bboggs-streambit
Copy link
Contributor

@marcosschroh, I've just about got an initial PR ready. Are you wanting to just start a new release branch for v3? Or do you want to incrementally release the new stuff alongside the existing things, maintaining backwards compatibility until v3 is ready?

@marcosschroh
Copy link
Owner Author

@bboggs-streambit let's create a v3 branch to add the new changes so we can refactor and introduce the breaking changes. When we finish the v3 we can release it

@ghost
Copy link

ghost commented Apr 21, 2023

Sounds good. I don't think I have perms to create the branch on this side. My PR's ready as soon as the v3 target branch is created.
Thanks!

@marcosschroh
Copy link
Owner Author

@bboggs-streambit I have created the new branch release/v3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers
Projects
None yet
Development

No branches or pull requests

2 participants