Skip to content

Commit

Permalink
[pkg/ottl] enhance flatten editor to resolve attribute key conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: odubajDT <[email protected]>
  • Loading branch information
odubajDT committed Jan 8, 2025
1 parent 2aa1d00 commit 9de8e9c
Show file tree
Hide file tree
Showing 5 changed files with 477 additions and 21 deletions.
27 changes: 27 additions & 0 deletions .chloggen/flatten-conflict.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/ottl

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Enhance flatten() editor to resolve attribute key conflicts by adding a number suffix to the conflicting keys."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35793]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
46 changes: 46 additions & 0 deletions pkg/ottl/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,21 @@ func Test_e2e_editors(t *testing.T) {
tCtx.GetLogRecord().Attributes().Remove("total.string")
tCtx.GetLogRecord().Attributes().Remove("foo")
tCtx.GetLogRecord().Attributes().Remove("things")
tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1")
tCtx.GetLogRecord().Attributes().Remove("conflict")
},
},
{
statement: `flatten(attributes)`,
want: func(tCtx ottllog.TransformContext) {
tCtx.GetLogRecord().Attributes().Remove("foo")
tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1")
tCtx.GetLogRecord().Attributes().Remove("conflict")
tCtx.GetLogRecord().Attributes().PutStr("foo.bar", "pass")
tCtx.GetLogRecord().Attributes().PutStr("foo.flags", "pass")
tCtx.GetLogRecord().Attributes().PutStr("foo.slice.0", "val")
tCtx.GetLogRecord().Attributes().PutStr("foo.nested.test", "pass")
tCtx.GetLogRecord().Attributes().PutStr("conflict.conflict1.conflict2", "nopass")

tCtx.GetLogRecord().Attributes().Remove("things")
tCtx.GetLogRecord().Attributes().PutStr("things.0.name", "foo")
Expand All @@ -93,6 +98,7 @@ func Test_e2e_editors(t *testing.T) {
m.PutStr("test.foo.flags", "pass")
m.PutStr("test.foo.slice.0", "val")
m.PutStr("test.foo.nested.test", "pass")
m.PutStr("test.conflict.conflict1.conflict2", "nopass")

m.PutStr("test.things.0.name", "foo")
m.PutInt("test.things.0.value", 2)
Expand All @@ -101,6 +107,34 @@ func Test_e2e_editors(t *testing.T) {
m.CopyTo(tCtx.GetLogRecord().Attributes())
},
},
{
statement: `flatten(attributes, "test", resolveConflicts=true)`,
want: func(tCtx ottllog.TransformContext) {
m := pcommon.NewMap()
m.PutStr("test.http.method", "get")
m.PutStr("test.http.path", "/health")
m.PutStr("test.http.url", "http://localhost/health")
m.PutStr("test.flags", "A|B|C")
m.PutStr("test.total.string", "123456789")
m.PutStr("test.foo.bar", "pass")
m.PutStr("test.foo.flags", "pass")
m.PutStr("test.foo.bar", "pass")
m.PutStr("test.foo.flags", "pass")
m.PutStr("test.foo.slice", "val")
m.PutStr("test.foo.nested.test", "pass")

m.PutStr("test.conflict.conflict1.conflict2", "pass")
m.PutStr("test.conflict.conflict1.conflict2.0", "nopass")

m.PutStr("test.things.0.name", "foo")
m.PutInt("test.things.0.value", 2)

m.PutStr("test.things.1.name", "bar")
m.PutInt("test.things.1.value", 5)

m.CopyTo(tCtx.GetLogRecord().Attributes())
},
},
{
statement: `flatten(attributes, depth=1)`,
want: func(tCtx ottllog.TransformContext) {
Expand All @@ -115,6 +149,9 @@ func Test_e2e_editors(t *testing.T) {
m.PutStr("foo.bar", "pass")
m.PutStr("foo.flags", "pass")
m.PutEmptySlice("foo.slice").AppendEmpty().SetStr("val")
m.PutStr("conflict.conflict1.conflict2", "nopass")
mm := m.PutEmptyMap("conflict.conflict1")
mm.PutStr("conflict2", "pass")

m1 := m.PutEmptyMap("things.0")
m1.PutStr("name", "foo")
Expand All @@ -137,6 +174,8 @@ func Test_e2e_editors(t *testing.T) {
tCtx.GetLogRecord().Attributes().Remove("http.url")
tCtx.GetLogRecord().Attributes().Remove("foo")
tCtx.GetLogRecord().Attributes().Remove("things")
tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1")
tCtx.GetLogRecord().Attributes().Remove("conflict")
},
},
{
Expand All @@ -152,6 +191,8 @@ func Test_e2e_editors(t *testing.T) {
tCtx.GetLogRecord().Attributes().Remove("flags")
tCtx.GetLogRecord().Attributes().Remove("foo")
tCtx.GetLogRecord().Attributes().Remove("things")
tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1")
tCtx.GetLogRecord().Attributes().Remove("conflict")
},
},
{
Expand Down Expand Up @@ -1143,6 +1184,11 @@ func constructLogTransformContext() ottllog.TransformContext {
logRecord.Attributes().PutStr("http.url", "http://localhost/health")
logRecord.Attributes().PutStr("flags", "A|B|C")
logRecord.Attributes().PutStr("total.string", "123456789")
mm := logRecord.Attributes().PutEmptyMap("conflict")
mm1 := mm.PutEmptyMap("conflict1")
mm1.PutStr("conflict2", "pass")
mmm := logRecord.Attributes().PutEmptyMap("conflict.conflict1")
mmm.PutStr("conflict2", "nopass")
m := logRecord.Attributes().PutEmptyMap("foo")
m.PutStr("bar", "pass")
m.PutStr("flags", "pass")
Expand Down
44 changes: 42 additions & 2 deletions pkg/ottl/ottlfuncs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,12 @@ Examples:

### flatten

`flatten(target, Optional[prefix], Optional[depth])`
`flatten(target, Optional[prefix], Optional[depth], Optional[resolveConflicts])`

The `flatten` function flattens a `pcommon.Map` by moving items from nested maps to the root.

`target` is a path expression to a `pcommon.Map` type field. `prefix` is an optional string. `depth` is an optional non-negative int.
`target` is a path expression to a `pcommon.Map` type field. `prefix` is an optional string. `depth` is an optional non-negative int, `resolveConflicts` resolves the potential conflicts in the map keys by adding a number suffix starting with `0` from the first duplicated key.


For example, the following map

Expand Down Expand Up @@ -199,6 +200,42 @@ the result would be

A `depth` of `0` means that no flattening will occur.

If `resolveConflicts` is set to `true`, conflicts within the map will be resolved

```json
{
"address": {
"street": {
"number": "first",
},
"house": "1234",
},
"address.street": {
"number": ["second", "third"],
},
"address.street.number": "fourth",
"occupants": [
"user 1",
"user 2",
],
}
```

the result would be

```json
{
"address.street.number": "first",
"address.house": "1234",
"address.street.number.0": "second",
"address.street.number.1": "third",
"occupants": "user 1",
"occupants.0": "user 2",
"address.street.number.2": "fourth",
}

```

Examples:

- `flatten(attributes)`
Expand All @@ -210,6 +247,9 @@ Examples:
- `flatten(body, depth=2)`


- `flatten(body, resolveConflicts=true)`


### keep_keys

`keep_keys(target, keys[])`
Expand Down
61 changes: 45 additions & 16 deletions pkg/ottl/ottlfuncs/func_flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import (
"context"
"fmt"
"math"
"strconv"

"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
)

type FlattenArguments[K any] struct {
Target ottl.PMapGetter[K]
Prefix ottl.Optional[string]
Depth ottl.Optional[int64]
Target ottl.PMapGetter[K]
Prefix ottl.Optional[string]
Depth ottl.Optional[int64]
ResolveConflicts ottl.Optional[bool]
}

func NewFlattenFactory[K any]() ottl.Factory[K] {
Expand All @@ -30,10 +32,10 @@ func createFlattenFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments)
return nil, fmt.Errorf("FlattenFactory args must be of type *FlattenArguments[K]")
}

return flatten(args.Target, args.Prefix, args.Depth)
return flatten(args.Target, args.Prefix, args.Depth, args.ResolveConflicts)
}

func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.Optional[int64]) (ottl.ExprFunc[K], error) {
func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.Optional[int64], c ottl.Optional[bool]) (ottl.ExprFunc[K], error) {
depth := int64(math.MaxInt64)
if !d.IsEmpty() {
depth = d.Get()
Expand All @@ -47,52 +49,79 @@ func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.O
prefix = p.Get()
}

conflict := false
if !c.IsEmpty() {
conflict = c.Get()
}

return func(ctx context.Context, tCtx K) (any, error) {
m, err := target.Get(ctx, tCtx)
if err != nil {
return nil, err
}

result := pcommon.NewMap()
flattenMap(m, result, prefix, 0, depth)
existingKeys := map[string]int{}
flattenMap(m, result, prefix, 0, depth, conflict, existingKeys)
result.MoveTo(m)

return nil, nil
}, nil
}

func flattenMap(m pcommon.Map, result pcommon.Map, prefix string, currentDepth, maxDepth int64) {
func flattenMap(m pcommon.Map, result pcommon.Map, prefix string, currentDepth, maxDepth int64, conflict bool, existingKeys map[string]int) {
if len(prefix) > 0 {
prefix += "."
}
m.Range(func(k string, v pcommon.Value) bool {
return flattenValue(k, v, currentDepth, maxDepth, result, prefix)
return flattenValue(k, v, currentDepth, maxDepth, result, prefix, conflict, existingKeys)
})
}

func flattenSlice(s pcommon.Slice, result pcommon.Map, prefix string, currentDepth int64, maxDepth int64) {
func flattenSlice(s pcommon.Slice, result pcommon.Map, prefix string, currentDepth int64, maxDepth int64, conflict bool, existingKeys map[string]int) {
for i := 0; i < s.Len(); i++ {
flattenValue(fmt.Sprintf("%d", i), s.At(i), currentDepth+1, maxDepth, result, prefix)
flattenValue(fmt.Sprintf("%d", i), s.At(i), currentDepth+1, maxDepth, result, prefix, conflict, existingKeys)
}
}

func flattenValue(k string, v pcommon.Value, currentDepth int64, maxDepth int64, result pcommon.Map, prefix string) bool {
func flattenValue(k string, v pcommon.Value, currentDepth int64, maxDepth int64, result pcommon.Map, prefix string, conflict bool, existingKeys map[string]int) bool {
switch {
case v.Type() == pcommon.ValueTypeMap && currentDepth < maxDepth:
flattenMap(v.Map(), result, prefix+k, currentDepth+1, maxDepth)
flattenMap(v.Map(), result, prefix+k, currentDepth+1, maxDepth, conflict, existingKeys)
case v.Type() == pcommon.ValueTypeSlice && currentDepth < maxDepth:
for i := 0; i < v.Slice().Len(); i++ {
switch {
case v.Slice().At(i).Type() == pcommon.ValueTypeMap && currentDepth+1 < maxDepth:
flattenMap(v.Slice().At(i).Map(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth)
flattenMap(v.Slice().At(i).Map(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth, conflict, existingKeys)
case v.Slice().At(i).Type() == pcommon.ValueTypeSlice && currentDepth+1 < maxDepth:
flattenSlice(v.Slice().At(i).Slice(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth)
flattenSlice(v.Slice().At(i).Slice(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth, conflict, existingKeys)
default:
v.Slice().At(i).CopyTo(result.PutEmpty(fmt.Sprintf("%v.%v", prefix+k, i)))
key := prefix + k
if conflict {
handleConflict(existingKeys, key, v.Slice().At(i), &result)
} else {
v.Slice().At(i).CopyTo(result.PutEmpty(fmt.Sprintf("%v.%v", key, i)))
}
}
}
default:
v.CopyTo(result.PutEmpty(prefix + k))
key := prefix + k
if conflict {
handleConflict(existingKeys, key, v, &result)
} else {
v.CopyTo(result.PutEmpty(key))
}
}
return true
}

func handleConflict(existingKeys map[string]int, key string, v pcommon.Value, result *pcommon.Map) {
if _, exists := result.Get(key); exists {
newKey := key + "." + strconv.Itoa(existingKeys[key])
existingKeys[key]++
v.CopyTo(result.PutEmpty(newKey))
} else {
existingKeys[key] = 0
v.CopyTo(result.PutEmpty(key))
}
}
Loading

0 comments on commit 9de8e9c

Please sign in to comment.