-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[receiver/kafka] backoff in case of next consumer error #37009
base: main
Are you sure you want to change the base?
Changes from all commits
78a77fe
ca00ad8
704560b
6009470
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,9 +9,12 @@ import ( | |
"fmt" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
"github.com/IBM/sarama" | ||
"github.com/cenkalti/backoff/v4" | ||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config/configretry" | ||
"go.opentelemetry.io/collector/consumer" | ||
"go.opentelemetry.io/collector/pdata/plog" | ||
"go.opentelemetry.io/collector/pdata/pmetric" | ||
|
@@ -35,6 +38,8 @@ const ( | |
|
||
var errInvalidInitialOffset = errors.New("invalid initial offset") | ||
|
||
var errMemoryLimiterDataRefused = errors.New("data refused due to high memory usage") | ||
|
||
// kafkaTracesConsumer uses sarama to consume and handle messages from kafka. | ||
type kafkaTracesConsumer struct { | ||
config Config | ||
|
@@ -205,6 +210,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro | |
messageMarking: c.messageMarking, | ||
headerExtractor: &nopHeaderExtractor{}, | ||
telemetryBuilder: c.telemetryBuilder, | ||
backOff: newExponentialBackOff(c.config.ErrorBackOff), | ||
} | ||
if c.headerExtraction { | ||
consumerGroup.headerExtractor = &headerExtractor{ | ||
|
@@ -313,6 +319,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err | |
messageMarking: c.messageMarking, | ||
headerExtractor: &nopHeaderExtractor{}, | ||
telemetryBuilder: c.telemetryBuilder, | ||
backOff: newExponentialBackOff(c.config.ErrorBackOff), | ||
} | ||
if c.headerExtraction { | ||
metricsConsumerGroup.headerExtractor = &headerExtractor{ | ||
|
@@ -424,6 +431,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error | |
messageMarking: c.messageMarking, | ||
headerExtractor: &nopHeaderExtractor{}, | ||
telemetryBuilder: c.telemetryBuilder, | ||
backOff: newExponentialBackOff(c.config.ErrorBackOff), | ||
} | ||
if c.headerExtraction { | ||
logsConsumerGroup.headerExtractor = &headerExtractor{ | ||
|
@@ -481,6 +489,7 @@ type tracesConsumerGroupHandler struct { | |
autocommitEnabled bool | ||
messageMarking MessageMarking | ||
headerExtractor HeaderExtractor | ||
backOff *backoff.ExponentialBackOff | ||
} | ||
|
||
type metricsConsumerGroupHandler struct { | ||
|
@@ -498,6 +507,7 @@ type metricsConsumerGroupHandler struct { | |
autocommitEnabled bool | ||
messageMarking MessageMarking | ||
headerExtractor HeaderExtractor | ||
backOff *backoff.ExponentialBackOff | ||
} | ||
|
||
type logsConsumerGroupHandler struct { | ||
|
@@ -515,6 +525,7 @@ type logsConsumerGroupHandler struct { | |
autocommitEnabled bool | ||
messageMarking MessageMarking | ||
headerExtractor HeaderExtractor | ||
backOff *backoff.ExponentialBackOff | ||
} | ||
|
||
var ( | ||
|
@@ -541,6 +552,9 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe | |
if !c.autocommitEnabled { | ||
defer session.Commit() | ||
} | ||
if c.backOff != nil { | ||
c.backOff.Reset() | ||
} | ||
for { | ||
select { | ||
case message, ok := <-claim.Messages(): | ||
|
@@ -579,10 +593,29 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe | |
err = c.nextConsumer.ConsumeTraces(session.Context(), traces) | ||
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err) | ||
if err != nil { | ||
if c.messageMarking.After && c.messageMarking.OnError { | ||
session.MarkMessage(message, "") | ||
if errorRequiresBackoff(err) && c.backOff != nil { | ||
backOffDelay := c.backOff.NextBackOff() | ||
if backOffDelay == backoff.Stop { | ||
if c.messageMarking.After && c.messageMarking.OnError { | ||
session.MarkMessage(message, "") | ||
} | ||
return err | ||
} | ||
select { | ||
case <-session.Context().Done(): | ||
return nil | ||
case <-time.After(backOffDelay): | ||
if !c.messageMarking.After { | ||
// Unmark the message so it can be retried | ||
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") | ||
} | ||
Comment on lines
+608
to
+611
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If If |
||
} | ||
} else { | ||
if c.messageMarking.After && c.messageMarking.OnError { | ||
session.MarkMessage(message, "") | ||
} | ||
return err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little bit confused that returning the error here will interrupt the for loop and return from |
||
} | ||
return err | ||
} | ||
if c.messageMarking.After { | ||
session.MarkMessage(message, "") | ||
|
@@ -600,6 +633,10 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe | |
} | ||
} | ||
|
||
func errorRequiresBackoff(err error) bool { | ||
return errors.Is(err, errMemoryLimiterDataRefused) | ||
} | ||
|
||
func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { | ||
c.readyCloser.Do(func() { | ||
close(c.ready) | ||
|
@@ -618,6 +655,9 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS | |
if !c.autocommitEnabled { | ||
defer session.Commit() | ||
} | ||
if c.backOff != nil { | ||
c.backOff.Reset() | ||
} | ||
for { | ||
select { | ||
case message, ok := <-claim.Messages(): | ||
|
@@ -656,10 +696,29 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS | |
err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics) | ||
c.obsrecv.EndMetricsOp(ctx, c.unmarshaler.Encoding(), dataPointCount, err) | ||
if err != nil { | ||
if c.messageMarking.After && c.messageMarking.OnError { | ||
session.MarkMessage(message, "") | ||
if errorRequiresBackoff(err) && c.backOff != nil { | ||
backOffDelay := c.backOff.NextBackOff() | ||
if backOffDelay == backoff.Stop { | ||
if c.messageMarking.After && c.messageMarking.OnError { | ||
session.MarkMessage(message, "") | ||
} | ||
return err | ||
} | ||
select { | ||
case <-session.Context().Done(): | ||
return nil | ||
case <-time.After(backOffDelay): | ||
if !c.messageMarking.After { | ||
// Unmark the message so it can be retried | ||
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") | ||
} | ||
} | ||
} else { | ||
if c.messageMarking.After && c.messageMarking.OnError { | ||
session.MarkMessage(message, "") | ||
} | ||
return err | ||
} | ||
return err | ||
} | ||
if c.messageMarking.After { | ||
session.MarkMessage(message, "") | ||
|
@@ -695,6 +754,9 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess | |
if !c.autocommitEnabled { | ||
defer session.Commit() | ||
} | ||
if c.backOff != nil { | ||
c.backOff.Reset() | ||
} | ||
for { | ||
select { | ||
case message, ok := <-claim.Messages(): | ||
|
@@ -732,10 +794,29 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess | |
err = c.nextConsumer.ConsumeLogs(session.Context(), logs) | ||
c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logRecordCount, err) | ||
if err != nil { | ||
if c.messageMarking.After && c.messageMarking.OnError { | ||
session.MarkMessage(message, "") | ||
if errorRequiresBackoff(err) && c.backOff != nil { | ||
backOffDelay := c.backOff.NextBackOff() | ||
if backOffDelay == backoff.Stop { | ||
if c.messageMarking.After && c.messageMarking.OnError { | ||
session.MarkMessage(message, "") | ||
} | ||
return err | ||
} | ||
select { | ||
case <-session.Context().Done(): | ||
return nil | ||
case <-time.After(backOffDelay): | ||
if !c.messageMarking.After { | ||
// Unmark the message so it can be retried | ||
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") | ||
} | ||
} | ||
} else { | ||
if c.messageMarking.After && c.messageMarking.OnError { | ||
session.MarkMessage(message, "") | ||
} | ||
return err | ||
} | ||
return err | ||
} | ||
if c.messageMarking.After { | ||
session.MarkMessage(message, "") | ||
|
@@ -753,6 +834,20 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess | |
} | ||
} | ||
|
||
func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff { | ||
if !config.Enabled { | ||
return nil | ||
} | ||
backOff := backoff.NewExponentialBackOff() | ||
backOff.InitialInterval = config.InitialInterval | ||
backOff.RandomizationFactor = config.RandomizationFactor | ||
backOff.Multiplier = config.Multiplier | ||
backOff.MaxInterval = config.MaxInterval | ||
backOff.MaxElapsedTime = config.MaxElapsedTime | ||
backOff.Reset() | ||
return backOff | ||
} | ||
|
||
func toSaramaInitialOffset(initialOffset string) (int64, error) { | ||
switch initialOffset { | ||
case offsetEarliest: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If max elapsed time of backoff config is exceeded, mark the message and return the error.