Skip to content

Commit

Permalink
[processor/tailsampling] allow setting only min_value or `max_value…
Browse files Browse the repository at this point in the history
…` for the `numeric_attribute` (#37328)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR makes the `numeric_attribute` more flexible and allows to set
only `min_value` or `max_value`, without the need to set both. This is
useful to have simple configurations like these:

```
{
  type: numeric_attribute,
  numeric_attribute: {
    key: http.status_code,
    min_value: 400
  }
}
```

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue

N/A

<!--Describe what testing was performed and which tests were added.-->
#### Testing

Added unit tests to cover all changes and scenarios.

<!--Describe the documentation added.-->
#### Documentation

I fixed an example on the documentation to have just the `min_value` set
<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
bmbferreira authored Feb 8, 2025
1 parent 61d70f8 commit d1d4d69
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: tailsamplingprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: makes the `numeric_attribute` more flexible and allows to set only `min_value` or `max_value`, without the need to set both

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37328]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This is useful to have simple configurations like these:
```
{
type: numeric_attribute,
numeric_attribute: {
key: http.status_code,
min_value: 400
}
}
```
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ processors:
{
name: test-composite-policy-1,
type: numeric_attribute,
numeric_attribute: {key: key1, min_value: 50, max_value: 100}
numeric_attribute: {key: key1, min_value: 50}
},
{
name: test-composite-policy-2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ func newTraceWithKV(traceID pcommon.TraceID, key string, val int64) *TraceData {

func TestCompositeEvaluatorNotSampled(t *testing.T) {
// Create 2 policies which do not match any trace
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
n2 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 200, 300, false)
min0 := int64(0)
max100 := int64(100)
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", &min0, &max100, false)
n2 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", &min0, &max100, false)
c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100}, {n2, 100}}, FakeTimeProvider{})

trace := createTrace()
Expand All @@ -75,7 +77,9 @@ func TestCompositeEvaluatorNotSampled(t *testing.T) {

func TestCompositeEvaluatorSampled(t *testing.T) {
// Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled.
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
min0 := int64(0)
max100 := int64(100)
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", &min0, &max100, false)
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100}, {n2, 100}}, FakeTimeProvider{})

Expand All @@ -93,7 +97,9 @@ func TestCompositeEvaluator_OverflowAlwaysSampled(t *testing.T) {
timeProvider := &FakeTimeProvider{second: 0}

// Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled.
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
min0 := int64(0)
max100 := int64(100)
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", &min0, &max100, false)
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
c := NewComposite(zap.NewNop(), 3, []SubPolicyEvalParams{{n1, 1}, {n2, 1}}, timeProvider)

Expand Down Expand Up @@ -126,7 +132,9 @@ func TestCompositeEvaluator_OverflowAlwaysSampled(t *testing.T) {

func TestCompositeEvaluatorSampled_AlwaysSampled(t *testing.T) {
// Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled.
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
min0 := int64(0)
max100 := int64(100)
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", &min0, &max100, false)
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20}, {n2, 20}}, FakeTimeProvider{})

Expand Down Expand Up @@ -201,7 +209,9 @@ func TestCompositeEvaluatorThrottling(t *testing.T) {
}

func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) {
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
min0 := int64(0)
max100 := int64(100)
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", &min0, &max100, false)
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
timeProvider := &FakeTimeProvider{second: 0}
const totalSPS = 10
Expand Down Expand Up @@ -262,3 +272,26 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) {
assert.Equal(t, expected, decision)
}
}

func TestComposite(t *testing.T) {
// Define the int64 values
min0 := int64(0)
max100 := int64(100)

// Update the policy creation calls
filter1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", &min0, &max100, false)

// Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled.
n1 := filter1
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100}, {n2, 100}}, FakeTimeProvider{})

trace := createTrace()

decision, err := c.Evaluate(context.Background(), traceID, trace)
require.NoError(t, err, "Failed to evaluate composite policy: %v", err)

// The second policy is AlwaysSample, so the decision should be Sampled.
expected := Sampled
assert.Equal(t, expected, decision)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package sampling // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"context"
"math"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -13,17 +14,23 @@ import (
)

type numericAttributeFilter struct {
key string
minValue, maxValue int64
logger *zap.Logger
invertMatch bool
key string
minValue *int64
maxValue *int64
logger *zap.Logger
invertMatch bool
}

var _ PolicyEvaluator = (*numericAttributeFilter)(nil)

// NewNumericAttributeFilter creates a policy evaluator that samples all traces with
// the given attribute in the given numeric range.
func NewNumericAttributeFilter(settings component.TelemetrySettings, key string, minValue, maxValue int64, invertMatch bool) PolicyEvaluator {
// the given attribute in the given numeric range. If minValue is nil, it will use math.MinInt64.
// If maxValue is nil, it will use math.MaxInt64. At least one of minValue or maxValue must be set.
func NewNumericAttributeFilter(settings component.TelemetrySettings, key string, minValue, maxValue *int64, invertMatch bool) PolicyEvaluator {
if minValue == nil && maxValue == nil {
settings.Logger.Error("At least one of minValue or maxValue must be set")
return nil
}
return &numericAttributeFilter{
key: key,
minValue: minValue,
Expand All @@ -39,13 +46,23 @@ func (naf *numericAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID
defer trace.Unlock()
batches := trace.ReceivedBatches

// Get the effective min/max values
minVal := int64(math.MinInt64)
if naf.minValue != nil {
minVal = *naf.minValue
}
maxVal := int64(math.MaxInt64)
if naf.maxValue != nil {
maxVal = *naf.maxValue
}

if naf.invertMatch {
return invertHasResourceOrSpanWithCondition(
batches,
func(resource pcommon.Resource) bool {
if v, ok := resource.Attributes().Get(naf.key); ok {
value := v.Int()
if value >= naf.minValue && value <= naf.maxValue {
if value >= minVal && value <= maxVal {
return false
}
}
Expand All @@ -54,7 +71,7 @@ func (naf *numericAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID
func(span ptrace.Span) bool {
if v, ok := span.Attributes().Get(naf.key); ok {
value := v.Int()
if value >= naf.minValue && value <= naf.maxValue {
if value >= minVal && value <= maxVal {
return false
}
}
Expand All @@ -67,7 +84,7 @@ func (naf *numericAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID
func(resource pcommon.Resource) bool {
if v, ok := resource.Attributes().Get(naf.key); ok {
value := v.Int()
if value >= naf.minValue && value <= naf.maxValue {
if value >= minVal && value <= maxVal {
return true
}
}
Expand All @@ -76,7 +93,7 @@ func (naf *numericAttributeFilter) Evaluate(_ context.Context, _ pcommon.TraceID
func(span ptrace.Span) bool {
if v, ok := span.Attributes().Get(naf.key); ok {
value := v.Int()
if value >= naf.minValue && value <= naf.maxValue {
if value >= minVal && value <= maxVal {
return true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestNumericTagFilter(t *testing.T) {
empty := map[string]any{}
filter := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", math.MinInt32, math.MaxInt32, false)
minVal := int64(math.MinInt32)
maxVal := int64(math.MaxInt32)
filter := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", &minVal, &maxVal, false)

resAttr := map[string]any{}
resAttr["example"] = 8
Expand Down Expand Up @@ -86,7 +89,9 @@ func TestNumericTagFilter(t *testing.T) {

func TestNumericTagFilterInverted(t *testing.T) {
empty := map[string]any{}
filter := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", math.MinInt32, math.MaxInt32, true)
minVal := int64(math.MinInt32)
maxVal := int64(math.MaxInt32)
filter := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", &minVal, &maxVal, true)

resAttr := map[string]any{}
resAttr["example"] = 8
Expand Down Expand Up @@ -153,6 +158,111 @@ func TestNumericTagFilterInverted(t *testing.T) {
}
}

func TestNumericTagFilterOptionalBounds(t *testing.T) {
tests := []struct {
name string
min *int64
max *int64
value int64
invertMatch bool
want Decision
}{
{
name: "only min set - value above min",
min: ptr(int64(100)),
max: nil,
value: 200,
want: Sampled,
},
{
name: "only min set - value below min",
min: ptr(int64(100)),
max: nil,
value: 50,
want: NotSampled,
},
{
name: "only max set - value below max",
min: nil,
max: ptr(int64(100)),
value: 50,
want: Sampled,
},
{
name: "only max set - value above max",
min: nil,
max: ptr(int64(100)),
value: 200,
want: NotSampled,
},
{
name: "both set - value in range",
min: ptr(int64(100)),
max: ptr(int64(200)),
value: 150,
want: Sampled,
},
{
name: "both set - value out of range",
min: ptr(int64(100)),
max: ptr(int64(200)),
value: 50,
want: NotSampled,
},
{
name: "inverted match - only min set - value above min",
min: ptr(int64(100)),
max: nil,
value: 200,
invertMatch: true,
want: InvertNotSampled,
},
{
name: "inverted match - only max set - value below max",
min: nil,
max: ptr(int64(100)),
value: 50,
invertMatch: true,
want: InvertNotSampled,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
filter := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "example", tt.min, tt.max, tt.invertMatch)
require.NotNil(t, filter, "filter should not be nil")

trace := newTraceIntAttrs(map[string]any{}, "example", tt.value)
decision, err := filter.Evaluate(context.Background(), pcommon.TraceID{}, trace)
assert.NoError(t, err)
assert.Equal(t, tt.want, decision)
})
}
}

func TestNumericTagFilterNilBounds(t *testing.T) {
settings := componenttest.NewNopTelemetrySettings()
filter := NewNumericAttributeFilter(settings, "example", nil, nil, false)
assert.Nil(t, filter, "filter should be nil when both bounds are nil")

// Test that the filter is created successfully when at least one bound is set
minBound := int64(100)
filter = NewNumericAttributeFilter(settings, "example", &minBound, nil, false)
assert.NotNil(t, filter, "filter should not be nil when min is set")

maxBound := int64(200)
filter = NewNumericAttributeFilter(settings, "example", nil, &maxBound, false)
assert.NotNil(t, filter, "filter should not be nil when max is set")

filter = NewNumericAttributeFilter(settings, "example", &minBound, &maxBound, false)
assert.NotNil(t, filter, "filter should not be nil when both bounds are set")
}

// helper function to create int64 pointer
func ptr(i int64) *int64 {
return &i
}

func newTraceIntAttrs(nodeAttrs map[string]any, spanAttrKey string, spanAttrValue int64) *TraceData {
traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
Expand Down
4 changes: 3 additions & 1 deletion processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,9 @@ func getSharedPolicyEvaluator(settings component.TelemetrySettings, cfg *sharedP
return sampling.NewLatency(settings, lfCfg.ThresholdMs, lfCfg.UpperThresholdmsMs), nil
case NumericAttribute:
nafCfg := cfg.NumericAttributeCfg
return sampling.NewNumericAttributeFilter(settings, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue, nafCfg.InvertMatch), nil
minValue := nafCfg.MinValue
maxValue := nafCfg.MaxValue
return sampling.NewNumericAttributeFilter(settings, nafCfg.Key, &minValue, &maxValue, nafCfg.InvertMatch), nil
case Probabilistic:
pCfg := cfg.ProbabilisticCfg
return sampling.NewProbabilisticSampler(settings, pCfg.HashSalt, pCfg.SamplingPercentage), nil
Expand Down

0 comments on commit d1d4d69

Please sign in to comment.