Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/kafka] backoff in case of next consumer error #37009

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

yiquanzhou
Copy link

@yiquanzhou yiquanzhou commented Jan 2, 2025

Description

Currently if the next consumer returns an error, kafka receiver simply returns the error and will consume the next message without any backpressure. This behavior is not optimal in case of some errors. For example the memory limiter could return data refused error when the memory usage of the collector is too high. Keeping consuming and sending messages to the memory limiter could further increase the memory usage and cause OutOfMemory error in the collector.

This PR provides an optional error backoff config which allows to wait before consuming the next message in case of errors that require backoff.

Testing

  • Added a test case to TestXXXConsumerGroupHandler_error_nextConsumer tests with an error that requires backoff.

Documentation

  • Added the configuration for error_backoff

Copy link

linux-foundation-easycla bot commented Jan 2, 2025

CLA Signed

The committers listed above are authorized under a signed CLA.

@yiquanzhou yiquanzhou changed the title Kafka receiver backoff [receiver/kafka] backoff in case of next consumer error Jan 2, 2025
@yiquanzhou yiquanzhou changed the title [receiver/kafka] backoff in case of next consumer error [receiver/kafka] backoff in case of next consumer error (WIP) Jan 2, 2025
@yiquanzhou yiquanzhou force-pushed the kafka-receiver-backoff branch from 79afbda to 78a77fe Compare January 3, 2025 08:40
@yiquanzhou yiquanzhou changed the title [receiver/kafka] backoff in case of next consumer error (WIP) [receiver/kafka] backoff in case of next consumer error Jan 3, 2025
@yiquanzhou yiquanzhou marked this pull request as ready for review January 3, 2025 13:50
@yiquanzhou yiquanzhou requested a review from a team as a code owner January 3, 2025 13:50
@@ -97,7 +97,13 @@ The following settings can be optionally configured:
- `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel piepline
- `headers` (default = []): List of headers they'd like to extract from kafka record.
**Note: Matching pattern will be `exact`. Regexes are not supported as of now.**

- `error_backoff`:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this key the right name?

Copy link
Author

@yiquanzhou yiquanzhou Jan 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, at least it's what's specified in https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/37009/files#diff-e5c86c16738dde4f187cd556f10c126b0c0ceee6f0fdf6f0434d817b084430d3R91

ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`

Do you have any suggestion for a better naming ?

- `max_interval`: The upper bound on backoff interval between consecutive message consumption
- `multiplier`: The value multiplied by the backoff interval bounds
- `randomization_factor`: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor)
- `max_elapsed_time`: The maximum time trying to backoff before giving up. If set to 0, the backoff is never stopped.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

link to configretry since you're taking from there

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -582,8 +593,22 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should messages be marked if you're going to retry them?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently if the next consumer returns an error, the kafka receiver simply drops the message and returns the error. At the beginning I intended to keep this behavior and only implement backoff delays without retrying the message. But then I went through the code again and I think we could implement the retry without introducing too much complexity.

I've updated the code and added some comments to explain. Let me know if that makes sense.

I've updated the unit test but I wonder how I could test this change in a integration or e2e test, in particular the retry logic if the offset is correctly reset and the failed message is consumed again in the next loop. Do you have any suggestion?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @atoulme , could you please take another look at the changes that I added for the retry logic ? Also I'd appreciate some guidance on the testing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atoulme could you please check out the changes and my comments? I'd like to move this PR forward

Comment on lines +598 to +603
if backOffDelay == backoff.Stop {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If max elapsed time of backoff config is exceeded, mark the message and return the error.

Comment on lines +608 to +611
if !c.messageMarking.After {
// Unmark the message so it can be retried
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If c.messageMarking.After == false, message is already marked before the pipeline execution https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/37009/files#diff-8fa2a92bedeb4eca30c174611f6204e98b4d80570d7b0c37a3c8b2fdf4e75ef7L554-L556, reset the offset to the message offset so that it can be consumed again in the next for loop. Do not return the error which would interrupt the loop.

If c.messageMarking.After == true then do nothing

if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little bit confused that returning the error here will interrupt the for loop and return from ConsumeClaim function, how do we get back to ConsumeClaim again to consume the next messages?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants