Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/netflow] Netflow receiver implementation - PR 2 #36865

Merged
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
f7ff90b
#32732 - Initial implementation
dlopes7 Dec 15, 2024
9871936
#32732 - Make internal methods private
dlopes7 Dec 15, 2024
09a91d1
#32732 - netflow - make checks
dlopes7 Dec 15, 2024
26d5aec
#32732 - netflow - simplify implementation, remove listener
dlopes7 Dec 16, 2024
64acc94
netflow - add parser tests
dlopes7 Dec 16, 2024
658c414
netflow - add more tests
dlopes7 Dec 16, 2024
0a50dbb
netflow - add test to check for udp receiver creation
dlopes7 Dec 16, 2024
dd6a006
netflow - gofmt
dlopes7 Dec 16, 2024
363f93b
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Dec 16, 2024
3854269
netflow - make checks
dlopes7 Dec 16, 2024
0fe206e
netflowreceiver - add changelog for implementation
dlopes7 Dec 17, 2024
d7c0782
netflow - fix license header
dlopes7 Dec 17, 2024
b343ab4
netflowreceiver - lint
dlopes7 Dec 17, 2024
f504466
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Dec 17, 2024
d31a81e
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Dec 19, 2024
8d07a6a
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Dec 19, 2024
f7b1512
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Dec 20, 2024
33d0cee
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Dec 27, 2024
5e178ac
netflow - update .chloggen/netflow-receiver-implementation.yaml
dlopes7 Jan 2, 2025
ebfa73d
netflow - remove scaffold issue
dlopes7 Jan 2, 2025
86701b9
netflow - add comment about default queue size value
dlopes7 Jan 2, 2025
52bca59
netflow - fix duplicate test case
dlopes7 Jan 2, 2025
edbbbc0
netflow - fix typo
dlopes7 Jan 2, 2025
d68e826
netflow - better error handling messages
dlopes7 Jan 8, 2025
053543c
netflow - use a simple log record instead of struct
dlopes7 Jan 8, 2025
169373f
netflow - use context.Background() on producer
dlopes7 Jan 8, 2025
5cf7625
netflow - type the dropHandler interface
dlopes7 Jan 8, 2025
10fc878
Update .chloggen/netflow-receiver-implementation.yaml
dlopes7 Jan 8, 2025
2281a06
Merge branch 'netflow-receiver-implementation' of github.com:dlopes7/…
dlopes7 Jan 8, 2025
54d8b60
netflow - more concise startup log message
dlopes7 Jan 8, 2025
d2c1c1f
netflow - sflow and netflow schema documentation
dlopes7 Jan 8, 2025
41b68a2
netflow - remove 'flow' scheme
dlopes7 Jan 8, 2025
7dbde9b
netflow - add sflow to sample yaml config
dlopes7 Jan 8, 2025
b33920b
netflow - fix default queue size
dlopes7 Jan 8, 2025
12e8425
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Jan 8, 2025
77ec06a
netflow - go mod tidy
dlopes7 Jan 8, 2025
a1f6422
netflow - format
dlopes7 Jan 8, 2025
c738398
netflow - lint and typing
dlopes7 Jan 8, 2025
ccf6e7c
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Jan 8, 2025
c090043
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Jan 9, 2025
3ef5c92
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 9, 2025
6c92bc5
netflow - better wording on log
dlopes7 Jan 13, 2025
b767ec6
netflow - rename fields
dlopes7 Jan 13, 2025
416efa7
netflow - remove unecessary log line
dlopes7 Jan 13, 2025
9e4aa70
netflow - better reference in comment
dlopes7 Jan 13, 2025
583915e
netflow - remove unecessary log line
dlopes7 Jan 13, 2025
64aed8c
netflow - move flow keys to 'flow.' namespace
dlopes7 Jan 13, 2025
ab9b6c1
netflow - update readme keys
dlopes7 Jan 13, 2025
dd8f7d9
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 13, 2025
fed610b
netflow - adjust validSchemes slice size
dlopes7 Jan 13, 2025
f77489b
netflow - clarify minimum queue size
dlopes7 Jan 13, 2025
44e134e
netflow - simplify panic handling
dlopes7 Jan 13, 2025
f6526d6
netflow - formatting and remove a comment
dlopes7 Jan 13, 2025
5c07873
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 13, 2025
59633be
Update receiver/netflowreceiver/producer.go
dlopes7 Jan 13, 2025
75cba97
Update receiver/netflowreceiver/producer.go
dlopes7 Jan 13, 2025
af1e72b
netflow - fix panic handling and test for it
dlopes7 Jan 13, 2025
da9b68e
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 13, 2025
e72e3f3
Merge branch 'netflow-receiver-implementation' of github.com:dlopes7/…
dlopes7 Jan 14, 2025
22c12b2
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 14, 2025
075d87b
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 14, 2025
9049e77
netflow - add tests for parsed timestamps
dlopes7 Jan 16, 2025
735ab5a
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Jan 16, 2025
9eb1955
netflow - go mod tidy
dlopes7 Jan 16, 2025
838e479
netflow - update semconv
dlopes7 Jan 16, 2025
bbd7c38
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 16, 2025
41c116f
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
evan-bradley Jan 22, 2025
a67b9ad
netflow - update semconv
dlopes7 Jan 22, 2025
40cc5ec
go mod tidy
evan-bradley Jan 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/netflow-receiver-implementation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: netflowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds the implementation of the netflow receiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32732, 97279]
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Implement the UDP listener and the producer, as well as adding tests.
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
50 changes: 49 additions & 1 deletion receiver/netflowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,47 @@ func TestLoadConfig(t *testing.T) {
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000000,
QueueSize: 1000,
},
},
{
id: component.NewIDWithName(metadata.Type, "zero_queue"),
expected: &Config{
Scheme: "netflow",
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000,
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved
},
},
{
id: component.NewIDWithName(metadata.Type, "zero_queue"),
expected: &Config{
Scheme: "netflow",
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000,
},
},
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved
{
id: component.NewIDWithName(metadata.Type, "sflow"),
expected: &Config{
Scheme: "sflow",
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000,
},
},
{
id: component.NewIDWithName(metadata.Type, "flow"),
expected: &Config{
Scheme: "flow",
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000,
},
},
}
Expand Down Expand Up @@ -74,6 +114,14 @@ func TestInvalidConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "invalid_port"),
err: "port must be greater than 0",
},
{
id: component.NewIDWithName(metadata.Type, "zero_sockets"),
err: "sockets must be greater than 0",
},
{
id: component.NewIDWithName(metadata.Type, "zero_workers"),
err: "workers must be greater than 0",
},
}

for _, tt := range tests {
Expand Down
17 changes: 10 additions & 7 deletions receiver/netflowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
const (
defaultSockets = 1
defaultWorkers = 2
defaultQueueSize = 1_000_000
defaultQueueSize = 1_000
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved
)

// NewFactory creates a factory for netflow receiver.
Expand All @@ -27,6 +27,8 @@ func NewFactory() receiver.Factory {
receiver.WithLogs(createLogsReceiver, metadata.LogsStability))
}

// Config defines configuration for netflow receiver.
// By default we listen for netflow traffic on port 2055
func createDefaultConfig() component.Config {
return &Config{
Scheme: "netflow",
Expand All @@ -37,14 +39,15 @@ func createDefaultConfig() component.Config {
}
}

// createLogsReceiver creates a netflow receiver.
// We also create the UDP receiver, which is the piece of software that actually listens
// for incoming netflow traffic on an UDP port.
func createLogsReceiver(_ context.Context, params receiver.Settings, cfg component.Config, consumer consumer.Logs) (receiver.Logs, error) {
logger := params.Logger
conf := cfg.(*Config)
conf := *(cfg.(*Config))

nr := &netflowReceiver{
logger: logger,
logConsumer: consumer,
config: conf,
nr, err := newNetflowLogsReceiver(params, conf, consumer)
if err != nil {
return nil, err
}

return nr, nil
Expand Down
4 changes: 3 additions & 1 deletion receiver/netflowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflo
go 1.22.0

require (
github.com/netsampler/goflow2/v2 v2.2.1
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.116.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/component/componenttest v0.116.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/confmap v1.22.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/consumer v1.22.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/consumer/consumertest v0.116.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/pdata v1.22.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/receiver v0.116.1-0.20241220212031-7c2639723f67
go.opentelemetry.io/collector/receiver/receivertest v0.116.1-0.20241220212031-7c2639723f67
go.uber.org/goleak v1.3.0
Expand All @@ -26,6 +28,7 @@ require (
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.2 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -34,7 +37,6 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.116.1-0.20241220212031-7c2639723f67 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.116.1-0.20241220212031-7c2639723f67 // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.116.1-0.20241220212031-7c2639723f67 // indirect
go.opentelemetry.io/collector/pdata v1.22.1-0.20241220212031-7c2639723f67 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.116.1-0.20241220212031-7c2639723f67 // indirect
go.opentelemetry.io/collector/pipeline v0.116.1-0.20241220212031-7c2639723f67 // indirect
go.opentelemetry.io/collector/receiver/xreceiver v0.116.1-0.20241220212031-7c2639723f67 // indirect
Expand Down
4 changes: 4 additions & 0 deletions receiver/netflowreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions receiver/netflowreceiver/listener.go

This file was deleted.

27 changes: 0 additions & 27 deletions receiver/netflowreceiver/listener_test.go

This file was deleted.

139 changes: 139 additions & 0 deletions receiver/netflowreceiver/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"

import (
"errors"
"net/netip"
"time"

"github.com/netsampler/goflow2/v2/producer"
protoproducer "github.com/netsampler/goflow2/v2/producer/proto"
)

var (
etypeName = map[uint32]string{
0x806: "ARP",
0x800: "IPv4",
0x86dd: "IPv6",
}
protoName = map[uint32]string{
1: "ICMP",
6: "TCP",
17: "UDP",
58: "ICMPv6",
132: "SCTP",
}

flowTypeName = map[int32]string{
0: "UNKNOWN",
1: "SFLOW_5",
2: "NETFLOW_V5",
3: "NETFLOW_V9",
4: "IPFIX",
}
)

type NetworkAddress struct {
Address string `json:"address,omitempty"`
Port uint32 `json:"port,omitempty"`
}

type Flow struct {
Type string `json:"type,omitempty"`
TimeReceived time.Time `json:"time_received,omitempty"`
Start time.Time `json:"start,omitempty"`
End time.Time `json:"end,omitempty"`
SequenceNum uint32 `json:"sequence_num,omitempty"`
SamplingRate uint64 `json:"sampling_rate,omitempty"`
SamplerAddress string `json:"sampler_address,omitempty"`
}

type Protocol struct {
Name []byte `json:"name,omitempty"` // Layer 7
}

type NetworkIO struct {
Bytes uint64 `json:"bytes,omitempty"`
Packets uint64 `json:"packets,omitempty"`
}

type OtelNetworkMessage struct {
Source NetworkAddress `json:"source,omitempty"`
Destination NetworkAddress `json:"destination,omitempty"`
Transport string `json:"transport,omitempty"` // Layer 4
Type string `json:"type,omitempty"` // Layer 3
IO NetworkIO `json:"io,omitempty"`
Flow Flow `json:"flow,omitempty"`
}

func getEtypeName(etype uint32) string {
if name, ok := etypeName[etype]; ok {
return name
}
return "unknown"
}

func getProtoName(proto uint32) string {
if name, ok := protoName[proto]; ok {
return name
}
return "unknown"
}

func getFlowTypeName(flowType int32) string {
if name, ok := flowTypeName[flowType]; ok {
return name
}
return "unknown"
}

// convertToOtel converts a ProtoProducerMessage to an OtelNetworkMessage
func convertToOtel(m producer.ProducerMessage) (*OtelNetworkMessage, error) {
// we know msg is ProtoProducerMessage because that is the parent producer
pm, ok := m.(*protoproducer.ProtoProducerMessage)
if !ok {
return nil, errors.New("message is not ProtoProducerMessage")
}

// Parse IP addresses bytes to netip.Addr
srcAddr, _ := netip.AddrFromSlice(pm.SrcAddr)
dstAddr, _ := netip.AddrFromSlice(pm.DstAddr)
samplerAddr, _ := netip.AddrFromSlice(pm.SamplerAddress)

// Time the receiver received the message
receivedTime := time.Unix(0, int64(pm.TimeReceivedNs))
startTime := time.Unix(0, int64(pm.TimeFlowStartNs))
endTime := time.Unix(0, int64(pm.TimeFlowEndNs))

// Construct the actual log record based on the otel semantic conventions
// see https://opentelemetry.io/docs/specs/semconv/general/attributes/
otelMessage := OtelNetworkMessage{
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
Source: NetworkAddress{
Address: srcAddr.String(),
Port: pm.SrcPort,
},
Destination: NetworkAddress{
Address: dstAddr.String(),
Port: pm.DstPort,
},
Type: getEtypeName(pm.Etype), // Layer 3
Transport: getProtoName(pm.Proto), // Layer 4
IO: NetworkIO{
Bytes: pm.Bytes,
Packets: pm.Packets,
},
Flow: Flow{
Type: getFlowTypeName(int32(pm.Type)),
TimeReceived: receivedTime,
Start: startTime,
End: endTime,
SequenceNum: pm.SequenceNum,
SamplingRate: pm.SamplingRate,
SamplerAddress: samplerAddr.String(),
},
}

return &otelMessage, nil
}
Loading
Loading