Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/kafka] backoff in case of next consumer error #37009

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ The following settings can be optionally configured:
- `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel piepline
- `headers` (default = []): List of headers they'd like to extract from kafka record.
**Note: Matching pattern will be `exact`. Regexes are not supported as of now.**
- `error_backoff`: [BackOff](https://github.com/open-telemetry/opentelemetry-collector/blob/v0.116.0/config/configretry/backoff.go#L27-L43) configuration in case of errors
- `enabled`: (default = false) Whether to enable backoff when next consumers return errors
- `initial_interval`: The time to wait after the first error before retrying
- `max_interval`: The upper bound on backoff interval between consecutive retries
- `multiplier`: The value multiplied by the backoff interval bounds
- `randomization_factor`: A random factor used to calculate next backoff. Randomized interval = RetryInterval * (1 ± RandomizationFactor)
- `max_elapsed_time`: The maximum amount of time trying to backoff before giving up. If set to 0, the retries are never stopped.

Example:

Expand Down
3 changes: 3 additions & 0 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
Expand Down Expand Up @@ -85,6 +86,8 @@ type Config struct {
DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
// The maximum bytes per fetch from Kafka (default "0", no limit)
MaxFetchSize int32 `mapstructure:"max_fetch_size"`
// In case of some errors returned by the next consumer, the receiver will wait before consuming the next message
ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`
}

const (
Expand Down
11 changes: 11 additions & 0 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap/confmaptest"

Expand Down Expand Up @@ -65,6 +66,9 @@ func TestLoadConfig(t *testing.T) {
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
ErrorBackOff: configretry.BackOffConfig{
Enabled: false,
},
},
},
{
Expand Down Expand Up @@ -101,6 +105,13 @@ func TestLoadConfig(t *testing.T) {
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
ErrorBackOff: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 1 * time.Second,
MaxInterval: 10 * time.Second,
MaxElapsedTime: 1 * time.Minute,
Multiplier: 1.5,
},
},
},
}
Expand Down
113 changes: 104 additions & 9 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/IBM/sarama"
"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -35,6 +38,8 @@ const (

var errInvalidInitialOffset = errors.New("invalid initial offset")

var errMemoryLimiterDataRefused = errors.New("data refused due to high memory usage")

// kafkaTracesConsumer uses sarama to consume and handle messages from kafka.
type kafkaTracesConsumer struct {
config Config
Expand Down Expand Up @@ -205,6 +210,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: c.telemetryBuilder,
backOff: newExponentialBackOff(c.config.ErrorBackOff),
}
if c.headerExtraction {
consumerGroup.headerExtractor = &headerExtractor{
Expand Down Expand Up @@ -313,6 +319,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: c.telemetryBuilder,
backOff: newExponentialBackOff(c.config.ErrorBackOff),
}
if c.headerExtraction {
metricsConsumerGroup.headerExtractor = &headerExtractor{
Expand Down Expand Up @@ -424,6 +431,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
telemetryBuilder: c.telemetryBuilder,
backOff: newExponentialBackOff(c.config.ErrorBackOff),
}
if c.headerExtraction {
logsConsumerGroup.headerExtractor = &headerExtractor{
Expand Down Expand Up @@ -481,6 +489,7 @@ type tracesConsumerGroupHandler struct {
autocommitEnabled bool
messageMarking MessageMarking
headerExtractor HeaderExtractor
backOff *backoff.ExponentialBackOff
}

type metricsConsumerGroupHandler struct {
Expand All @@ -498,6 +507,7 @@ type metricsConsumerGroupHandler struct {
autocommitEnabled bool
messageMarking MessageMarking
headerExtractor HeaderExtractor
backOff *backoff.ExponentialBackOff
}

type logsConsumerGroupHandler struct {
Expand All @@ -515,6 +525,7 @@ type logsConsumerGroupHandler struct {
autocommitEnabled bool
messageMarking MessageMarking
headerExtractor HeaderExtractor
backOff *backoff.ExponentialBackOff
}

var (
Expand All @@ -541,6 +552,9 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
if !c.autocommitEnabled {
defer session.Commit()
}
if c.backOff != nil {
c.backOff.Reset()
}
for {
select {
case message, ok := <-claim.Messages():
Expand Down Expand Up @@ -579,10 +593,29 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
if err != nil {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
if errorRequiresBackoff(err) && c.backOff != nil {
backOffDelay := c.backOff.NextBackOff()
if backOffDelay == backoff.Stop {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}
Comment on lines +598 to +603
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If max elapsed time of backoff config is exceeded, mark the message and return the error.

select {
case <-session.Context().Done():
return nil
case <-time.After(backOffDelay):
if !c.messageMarking.After {
// Unmark the message so it can be retried
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
}
Comment on lines +608 to +611
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If c.messageMarking.After == false, message is already marked before the pipeline execution https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/37009/files#diff-8fa2a92bedeb4eca30c174611f6204e98b4d80570d7b0c37a3c8b2fdf4e75ef7L554-L556, reset the offset to the message offset so that it can be consumed again in the next for loop. Do not return the error which would interrupt the loop.

If c.messageMarking.After == true then do nothing

}
} else {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little bit confused that returning the error here will interrupt the for loop and return from ConsumeClaim function, how do we get back to ConsumeClaim again to consume the next messages?

}
return err
}
if c.messageMarking.After {
session.MarkMessage(message, "")
Expand All @@ -600,6 +633,10 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
}
}

func errorRequiresBackoff(err error) bool {
return errors.Is(err, errMemoryLimiterDataRefused)
}

func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
c.readyCloser.Do(func() {
close(c.ready)
Expand All @@ -618,6 +655,9 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
if !c.autocommitEnabled {
defer session.Commit()
}
if c.backOff != nil {
c.backOff.Reset()
}
for {
select {
case message, ok := <-claim.Messages():
Expand Down Expand Up @@ -656,10 +696,29 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics)
c.obsrecv.EndMetricsOp(ctx, c.unmarshaler.Encoding(), dataPointCount, err)
if err != nil {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
if errorRequiresBackoff(err) && c.backOff != nil {
backOffDelay := c.backOff.NextBackOff()
if backOffDelay == backoff.Stop {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}
select {
case <-session.Context().Done():
return nil
case <-time.After(backOffDelay):
if !c.messageMarking.After {
// Unmark the message so it can be retried
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
}
}
} else {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}
return err
}
if c.messageMarking.After {
session.MarkMessage(message, "")
Expand Down Expand Up @@ -695,6 +754,9 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
if !c.autocommitEnabled {
defer session.Commit()
}
if c.backOff != nil {
c.backOff.Reset()
}
for {
select {
case message, ok := <-claim.Messages():
Expand Down Expand Up @@ -732,10 +794,29 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logRecordCount, err)
if err != nil {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
if errorRequiresBackoff(err) && c.backOff != nil {
backOffDelay := c.backOff.NextBackOff()
if backOffDelay == backoff.Stop {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}
select {
case <-session.Context().Done():
return nil
case <-time.After(backOffDelay):
if !c.messageMarking.After {
// Unmark the message so it can be retried
session.ResetOffset(claim.Topic(), claim.Partition(), message.Offset, "")
}
}
} else {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
}
return err
}
return err
}
if c.messageMarking.After {
session.MarkMessage(message, "")
Expand All @@ -753,6 +834,20 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
}
}

func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff {
if !config.Enabled {
return nil
}
backOff := backoff.NewExponentialBackOff()
backOff.InitialInterval = config.InitialInterval
backOff.RandomizationFactor = config.RandomizationFactor
backOff.Multiplier = config.Multiplier
backOff.MaxInterval = config.MaxInterval
backOff.MaxElapsedTime = config.MaxElapsedTime
backOff.Reset()
return backOff
}

func toSaramaInitialOffset(initialOffset string) (int64, error) {
switch initialOffset {
case offsetEarliest:
Expand Down
Loading
Loading