Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archive events in OpenSearch #24

Merged
merged 1 commit into from
Jan 28, 2025
Merged

Conversation

jjmerchante
Copy link
Collaborator

This PR implements the archiving of events in an OpenSearch backend.

It introduces a new subscriber group that reads from the events stream and stores events in an OpenSearch instance.

Additionally, the worker pool now supports execution on specific task queues, enabling different workers to process tasks based on their queue.

@jjmerchante jjmerchante force-pushed the archive-opensearch branch 2 times, most recently from 439efdd to d4b08fa Compare January 23, 2025 09:25
Copy link
Member

@sduenas sduenas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested it and works good. I would say we need to add a mapping for the events and address the comments I added below.

Regarding the mapping, not sure if we can convert the time field to a datetime, so we can order events in OpenSearch.

@click.option('--workers',
default=10,
show_default=True,
help="Number of workers to run in the pool.")
def eventizers(workers: int):
"""Start a pool of eventizer workers.
def worker_pool(task_types: str, workers: int):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep eventizers command and create a new one (maybe archivists) to store the data (in this case on OpenSearch). I know your proposal it's more generic but the other one gives us more flexibility in terms of configuration and usability because users will know what to run.

"""Test if events are stored correctly"""

# Add some sample events to the stream
events = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say to rename events to expected_events, so it's clear later to understand what's going on.

self.assertEqual(result.consumer_name, 'consumer_1')
self.assertEqual(result.total, len(events))

result_events = MockStorageBackend.get_events()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we can also rename these ones to stored_events.

@jjmerchante jjmerchante force-pushed the archive-opensearch branch 4 times, most recently from b1f500d to 2d03da4 Compare January 28, 2025 09:54
This commit implements archiving of events in an OpenSearch backend.

It introduces a new subscriber group that reads from the events stream
and stores events in an OpenSearch instance.

Additionally, the worker pool now supports execution on specific task
queues, enabling different workers to process tasks based on their queue.

Signed-off-by: Jose Javier Merchante <[email protected]>
Copy link
Member

@sduenas sduenas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sduenas sduenas merged commit 58f3692 into chaoss:main Jan 28, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants