-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Archive events in OpenSearch #24
Conversation
439efdd
to
d4b08fa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tested it and works good. I would say we need to add a mapping for the events and address the comments I added below.
Regarding the mapping, not sure if we can convert the time
field to a datetime, so we can order events in OpenSearch.
@click.option('--workers', | ||
default=10, | ||
show_default=True, | ||
help="Number of workers to run in the pool.") | ||
def eventizers(workers: int): | ||
"""Start a pool of eventizer workers. | ||
def worker_pool(task_types: str, workers: int): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should keep eventizers
command and create a new one (maybe archivists
) to store the data (in this case on OpenSearch). I know your proposal it's more generic but the other one gives us more flexibility in terms of configuration and usability because users will know what to run.
"""Test if events are stored correctly""" | ||
|
||
# Add some sample events to the stream | ||
events = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say to rename events
to expected_events
, so it's clear later to understand what's going on.
self.assertEqual(result.consumer_name, 'consumer_1') | ||
self.assertEqual(result.total, len(events)) | ||
|
||
result_events = MockStorageBackend.get_events() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we can also rename these ones to stored_events
.
b1f500d
to
2d03da4
Compare
This commit implements archiving of events in an OpenSearch backend. It introduces a new subscriber group that reads from the events stream and stores events in an OpenSearch instance. Additionally, the worker pool now supports execution on specific task queues, enabling different workers to process tasks based on their queue. Signed-off-by: Jose Javier Merchante <[email protected]>
2d03da4
to
cc3dd1a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR implements the archiving of events in an OpenSearch backend.
It introduces a new subscriber group that reads from the events stream and stores events in an OpenSearch instance.
Additionally, the worker pool now supports execution on specific task queues, enabling different workers to process tasks based on their queue.