diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index b7b25c4eadb6..f956bb06d294 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -97,6 +97,13 @@ The following settings can be optionally configured: - `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel piepline - `headers` (default = []): List of headers they'd like to extract from kafka record. **Note: Matching pattern will be `exact`. Regexes are not supported as of now.** +- `error_backoff`: [BackOff](https://github.com/open-telemetry/opentelemetry-collector/blob/v0.116.0/config/configretry/backoff.go#L27-L43) configuration in case of errors + - `enabled`: (default = false) Whether to enable backoff when next consumers return errors + - `initial_interval`: The time to wait after the first error before retrying + - `max_interval`: The upper bound on backoff interval between consecutive retries + - `multiplier`: The value multiplied by the backoff interval bounds + - `randomization_factor`: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor) + - `max_elapsed_time`: The maximum amount of time trying to backoff before giving up. If set to 0, the retries are never stopped. Example: diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 5b25c7c7c5f3..20b7acec1588 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -7,6 +7,7 @@ import ( "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" @@ -85,6 +86,8 @@ type Config struct { DefaultFetchSize int32 `mapstructure:"default_fetch_size"` // The maximum bytes per fetch from Kafka (default "0", no limit) MaxFetchSize int32 `mapstructure:"max_fetch_size"` + // In case of some errors returned by the next consumer, the receiver will wait before consuming the next message + ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"` } const ( diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index b5e7faa1dc93..cee1cce72207 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/confmap/confmaptest" @@ -65,6 +66,9 @@ func TestLoadConfig(t *testing.T) { MinFetchSize: 1, DefaultFetchSize: 1048576, MaxFetchSize: 0, + ErrorBackOff: configretry.BackOffConfig{ + Enabled: false, + }, }, }, { @@ -101,6 +105,13 @@ func TestLoadConfig(t *testing.T) { MinFetchSize: 1, DefaultFetchSize: 1048576, MaxFetchSize: 0, + ErrorBackOff: configretry.BackOffConfig{ + Enabled: true, + InitialInterval: 1 * time.Second, + MaxInterval: 10 * time.Second, + MaxElapsedTime: 1 * time.Minute, + Multiplier: 1.5, + }, }, }, } diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 1ec2d5aca6e9..2f10c297c058 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -9,9 +9,12 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/IBM/sarama" + "github.com/cenkalti/backoff/v4" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -35,6 +38,8 @@ const ( var errInvalidInitialOffset = errors.New("invalid initial offset") +var errMemoryLimiterDataRefused = errors.New("data refused due to high memory usage") + // kafkaTracesConsumer uses sarama to consume and handle messages from kafka. type kafkaTracesConsumer struct { config Config @@ -205,6 +210,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, telemetryBuilder: c.telemetryBuilder, + backOff: newExponentialBackOff(c.config.ErrorBackOff), } if c.headerExtraction { consumerGroup.headerExtractor = &headerExtractor{ @@ -313,6 +319,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, telemetryBuilder: c.telemetryBuilder, + backOff: newExponentialBackOff(c.config.ErrorBackOff), } if c.headerExtraction { metricsConsumerGroup.headerExtractor = &headerExtractor{ @@ -424,6 +431,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error messageMarking: c.messageMarking, headerExtractor: &nopHeaderExtractor{}, telemetryBuilder: c.telemetryBuilder, + backOff: newExponentialBackOff(c.config.ErrorBackOff), } if c.headerExtraction { logsConsumerGroup.headerExtractor = &headerExtractor{ @@ -481,6 +489,7 @@ type tracesConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking headerExtractor HeaderExtractor + backOff *backoff.ExponentialBackOff } type metricsConsumerGroupHandler struct { @@ -498,6 +507,7 @@ type metricsConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking headerExtractor HeaderExtractor + backOff *backoff.ExponentialBackOff } type logsConsumerGroupHandler struct { @@ -515,6 +525,7 @@ type logsConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking headerExtractor HeaderExtractor + backOff *backoff.ExponentialBackOff } var ( @@ -541,6 +552,9 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe if !c.autocommitEnabled { defer session.Commit() } + if c.backOff != nil { + c.backOff.Reset() + } for { select { case message, ok := <-claim.Messages(): @@ -579,10 +593,29 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe err = c.nextConsumer.ConsumeTraces(session.Context(), traces) c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err) if err != nil { - if c.messageMarking.After && c.messageMarking.OnError { - session.MarkMessage(message, "") + if errorRequiresBackoff(err) && c.backOff != nil { + backOffDelay := c.backOff.NextBackOff() + if backOffDelay == backoff.Stop { + if c.messageMarking.After && c.messageMarking.OnError { + session.MarkMessage(message, "") + } + return err + } + select { + case <-session.Context().Done(): + return nil + case <-time.After(backOffDelay): + if !c.messageMarking.After { + // Unmark the message so it can be retried + session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") + } + } + } else { + if c.messageMarking.After && c.messageMarking.OnError { + session.MarkMessage(message, "") + } + return err } - return err } if c.messageMarking.After { session.MarkMessage(message, "") @@ -600,6 +633,10 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe } } +func errorRequiresBackoff(err error) bool { + return errors.Is(err, errMemoryLimiterDataRefused) +} + func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { c.readyCloser.Do(func() { close(c.ready) @@ -618,6 +655,9 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS if !c.autocommitEnabled { defer session.Commit() } + if c.backOff != nil { + c.backOff.Reset() + } for { select { case message, ok := <-claim.Messages(): @@ -656,10 +696,29 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics) c.obsrecv.EndMetricsOp(ctx, c.unmarshaler.Encoding(), dataPointCount, err) if err != nil { - if c.messageMarking.After && c.messageMarking.OnError { - session.MarkMessage(message, "") + if errorRequiresBackoff(err) && c.backOff != nil { + backOffDelay := c.backOff.NextBackOff() + if backOffDelay == backoff.Stop { + if c.messageMarking.After && c.messageMarking.OnError { + session.MarkMessage(message, "") + } + return err + } + select { + case <-session.Context().Done(): + return nil + case <-time.After(backOffDelay): + if !c.messageMarking.After { + // Unmark the message so it can be retried + session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") + } + } + } else { + if c.messageMarking.After && c.messageMarking.OnError { + session.MarkMessage(message, "") + } + return err } - return err } if c.messageMarking.After { session.MarkMessage(message, "") @@ -695,6 +754,9 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess if !c.autocommitEnabled { defer session.Commit() } + if c.backOff != nil { + c.backOff.Reset() + } for { select { case message, ok := <-claim.Messages(): @@ -732,10 +794,29 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess err = c.nextConsumer.ConsumeLogs(session.Context(), logs) c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logRecordCount, err) if err != nil { - if c.messageMarking.After && c.messageMarking.OnError { - session.MarkMessage(message, "") + if errorRequiresBackoff(err) && c.backOff != nil { + backOffDelay := c.backOff.NextBackOff() + if backOffDelay == backoff.Stop { + if c.messageMarking.After && c.messageMarking.OnError { + session.MarkMessage(message, "") + } + return err + } + select { + case <-session.Context().Done(): + return nil + case <-time.After(backOffDelay): + if !c.messageMarking.After { + // Unmark the message so it can be retried + session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "") + } + } + } else { + if c.messageMarking.After && c.messageMarking.OnError { + session.MarkMessage(message, "") + } + return err } - return err } if c.messageMarking.After { session.MarkMessage(message, "") @@ -753,6 +834,20 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess } } +func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff { + if !config.Enabled { + return nil + } + backOff := backoff.NewExponentialBackOff() + backOff.InitialInterval = config.InitialInterval + backOff.RandomizationFactor = config.RandomizationFactor + backOff.Multiplier = config.Multiplier + backOff.MaxInterval = config.MaxInterval + backOff.MaxElapsedTime = config.MaxElapsedTime + backOff.Reset() + return backOff +} + func toSaramaInitialOffset(initialOffset string) (int64, error) { switch initialOffset { case offsetEarliest: diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index e353974acd91..c4034cea766b 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/IBM/sarama" + "github.com/cenkalti/backoff/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -340,35 +341,69 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consume") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) - c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: nopTelemetryBuilder(t), - } - wg := sync.WaitGroup{} - wg.Add(1) - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), + tests := []struct { + name string + err, expectedError error + expectedBackoff time.Duration + }{ + { + name: "memory limiter data refused error", + err: errMemoryLimiterDataRefused, + expectedError: nil, + expectedBackoff: backoff.DefaultInitialInterval, + }, + { + name: "consumer error that does not require backoff", + err: consumerError, + expectedError: consumerError, + expectedBackoff: 0, + }, } - go func() { - e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - assert.EqualError(t, e, consumerError.Error()) - wg.Done() - }() - td := ptrace.NewTraces() - td.ResourceSpans().AppendEmpty() - unmarshaler := &ptrace.ProtoMarshaler{} - bts, err := unmarshaler.MarshalTraces(td) - require.NoError(t, err) - groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} - close(groupClaim.messageChan) - wg.Wait() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + backOff := backoff.NewExponentialBackOff() + backOff.RandomizationFactor = 0 + c := tracesConsumerGroupHandler{ + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(tt.err), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: nopTelemetryBuilder(t), + backOff: backOff, + } + + wg := sync.WaitGroup{} + wg.Add(1) + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + go func() { + start := time.Now() + e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) + end := time.Now() + if tt.expectedError != nil { + assert.EqualError(t, e, tt.expectedError.Error()) + } else { + assert.NoError(t, e) + } + assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond) + wg.Done() + }() + + td := ptrace.NewTraces() + td.ResourceSpans().AppendEmpty() + unmarshaler := &ptrace.ProtoMarshaler{} + bts, err := unmarshaler.MarshalTraces(td) + require.NoError(t, err) + groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} + close(groupClaim.messageChan) + wg.Wait() + }) + } } func TestTracesReceiver_encoding_extension(t *testing.T) { @@ -684,34 +719,68 @@ func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consume") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) - c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: nopTelemetryBuilder(t), - } - wg := sync.WaitGroup{} - wg.Add(1) - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), + tests := []struct { + name string + err, expectedError error + expectedBackoff time.Duration + }{ + { + name: "memory limiter data refused error", + err: errMemoryLimiterDataRefused, + expectedError: nil, + expectedBackoff: backoff.DefaultInitialInterval, + }, + { + name: "consumer error that does not require backoff", + err: consumerError, + expectedError: consumerError, + expectedBackoff: 0, + }, } - go func() { - e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - assert.EqualError(t, e, consumerError.Error()) - wg.Done() - }() - ld := testdata.GenerateMetrics(1) - unmarshaler := &pmetric.ProtoMarshaler{} - bts, err := unmarshaler.MarshalMetrics(ld) - require.NoError(t, err) - groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} - close(groupClaim.messageChan) - wg.Wait() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + backOff := backoff.NewExponentialBackOff() + backOff.RandomizationFactor = 0 + c := metricsConsumerGroupHandler{ + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(tt.err), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: nopTelemetryBuilder(t), + backOff: backOff, + } + + wg := sync.WaitGroup{} + wg.Add(1) + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + go func() { + start := time.Now() + e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) + end := time.Now() + if tt.expectedError != nil { + assert.EqualError(t, e, tt.expectedError.Error()) + } else { + assert.NoError(t, e) + } + assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond) + wg.Done() + }() + + ld := testdata.GenerateMetrics(1) + unmarshaler := &pmetric.ProtoMarshaler{} + bts, err := unmarshaler.MarshalMetrics(ld) + require.NoError(t, err) + groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} + close(groupClaim.messageChan) + wg.Wait() + }) + } } func TestMetricsReceiver_encoding_extension(t *testing.T) { @@ -1043,34 +1112,68 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := errors.New("failed to consume") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()}) require.NoError(t, err) - c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, - headerExtractor: &nopHeaderExtractor{}, - telemetryBuilder: nopTelemetryBuilder(t), - } - wg := sync.WaitGroup{} - wg.Add(1) - groupClaim := &testConsumerGroupClaim{ - messageChan: make(chan *sarama.ConsumerMessage), + tests := []struct { + name string + err, expectedError error + expectedBackoff time.Duration + }{ + { + name: "memory limiter data refused error", + err: errMemoryLimiterDataRefused, + expectedError: nil, + expectedBackoff: backoff.DefaultInitialInterval, + }, + { + name: "consumer error that does not require backoff", + err: consumerError, + expectedError: consumerError, + expectedBackoff: 0, + }, } - go func() { - e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) - assert.EqualError(t, e, consumerError.Error()) - wg.Done() - }() - ld := testdata.GenerateLogs(1) - unmarshaler := &plog.ProtoMarshaler{} - bts, err := unmarshaler.MarshalLogs(ld) - require.NoError(t, err) - groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} - close(groupClaim.messageChan) - wg.Wait() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + backOff := backoff.NewExponentialBackOff() + backOff.RandomizationFactor = 0 + c := logsConsumerGroupHandler{ + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(tt.err), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, + telemetryBuilder: nopTelemetryBuilder(t), + backOff: backOff, + } + + wg := sync.WaitGroup{} + wg.Add(1) + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + go func() { + start := time.Now() + e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim) + end := time.Now() + if tt.expectedError != nil { + assert.EqualError(t, e, tt.expectedError.Error()) + } else { + assert.NoError(t, e) + } + assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond) + wg.Done() + }() + + ld := testdata.GenerateLogs(1) + unmarshaler := &plog.ProtoMarshaler{} + bts, err := unmarshaler.MarshalLogs(ld) + require.NoError(t, err) + groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts} + close(groupClaim.messageChan) + wg.Wait() + }) + } } // Test unmarshaler for different charsets and encodings. @@ -1305,7 +1408,6 @@ func (t testConsumerGroupSession) MarkOffset(string, int32, int64, string) { } func (t testConsumerGroupSession) ResetOffset(string, int32, int64, string) { - panic("implement me") } func (t testConsumerGroupSession) MarkMessage(*sarama.ConsumerMessage, string) {} diff --git a/receiver/kafkareceiver/testdata/config.yaml b/receiver/kafkareceiver/testdata/config.yaml index a0a744764602..835b50193198 100644 --- a/receiver/kafkareceiver/testdata/config.yaml +++ b/receiver/kafkareceiver/testdata/config.yaml @@ -35,3 +35,9 @@ kafka/logs: retry: max: 10 backoff: 5s + error_backoff: + enabled: true + initial_interval: 1s + max_interval: 10s + max_elapsed_time: 1m + multiplier: 1.5 \ No newline at end of file