diff --git a/.chloggen/netflow-receiver-implementation.yaml b/.chloggen/netflow-receiver-implementation.yaml new file mode 100644 index 000000000000..9d1216640442 --- /dev/null +++ b/.chloggen/netflow-receiver-implementation.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: netflowreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds the implementation of the netflow receiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32732] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: The receiver now supports receiving NetFlow v5, NetFow v9, IPFIX, and sFlow v5 logs. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/netflowreceiver/README.md b/receiver/netflowreceiver/README.md index 30443c6d1199..e7959cb917e5 100644 --- a/receiver/netflowreceiver/README.md +++ b/receiver/netflowreceiver/README.md @@ -32,6 +32,11 @@ receivers: port: 2055 sockets: 16 workers: 32 + netflow/sflow: + - scheme: sflow + port: 6343 + sockets: 16 + workers: 32 processors: batch: @@ -45,7 +50,7 @@ exporters: service: pipelines: logs: - receivers: [netflow] + receivers: [netflow, netflow/sflow] processors: [batch] exporters: [debug] telemetry: @@ -61,42 +66,52 @@ You would then configure your network devices to send netflow, sflow, or ipfix d | Field | Description | Examples | Default | |-------|-------------|--------| ------- | -| scheme | The type of flow data that to receive | `sflow`, `netflow`, `flow` | `netflow` | +| scheme | The type of flow data that to receive | `sflow`, `netflow` | `netflow` | | hostname | The hostname or IP address to bind to | `localhost` | `0.0.0.0` | | port | The port to bind to | `2055` or `6343` | `2055` | | sockets | The number of sockets to use | 1 | 1 | | workers | The number of workers used to decode incoming flow messages | 2 | 2 | -| queue_size | The size of the incoming netflow packets queue | 1000 | 1000000 | +| queue_size | The size of the incoming netflow packets queue, it will always be at least 1000. | 5000 | 1000 | ## Data format -The netflow data is standardized for the different schemas and is converted to OpenTelemetry logs following the [semantic conventions](https://opentelemetry.io/docs/specs/semconv/general/attributes/#server-client-and-shared-network-attributes) - -The output will adhere the format: - -```json -{ - "destination": { - "address": "192.168.0.1", - "port": 22 - }, - "flow": { - "end": 1731073104662487000, - "sampler_address": "192.168.0.2", - "sequence_num": 49, - "start": 1731073077662487000, - "time_received": 1731073138662487000, - "type": "NETFLOW_V5" - }, - "io": { - "bytes": 529, - "packets": 378 - }, - "source": { - "address": "192.168.0.3", - "port": 40 - }, - "transport": "TCP", - "type": "IPv4" -} -``` +The netflow data is standardized for the different schemas and is converted to OpenTelemetry log records following the [semantic conventions](https://opentelemetry.io/docs/specs/semconv/general/attributes/#server-client-and-shared-network-attributes) + +The log record will have the following attributes (with examples): + +* **source.address**: Str(132.189.238.100) +* **source.port**: Int(1255) +* **destination.address**: Str(241.171.33.110) +* **destination.port**: Int(64744) +* **network.transport**: Str(tcp) +* **network.type**: Str(ipv4) +* **flow.io.bytes**: Int(853) +* **flow.io.packets**: Int(83) +* **flow.type**: Str(netflow_v5) +* **flow.sequence_num**: Int(191) +* **flow.time_received**: Int(1736309689918929427) +* **flow.start**: Int(1736309689830846400) +* **flow.end**: Int(1736309689871846400) +* **flow.sampling_rate**: Int(0) +* **flow.sampler_address**: Str(172.28.176.1) + +The log record timestamps will be: + +* **Observed timestamp**: The time the flow was received. +* **Timestamp**: The flow `start` field. + +### Schema support + +#### netflow + +* Process [Template Records](https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html) if present +* Process Netflow V5, V9, and IPFIX messages +* Extract the attributes documented above +* Mapping of custom fields is not yet supported + +#### sflow + +* Process [sFlow version 5](https://sflow.org/sflow_version_5.txt) datagrams +* `flow_sample` and `flow_sample_expanded` are supported. +* `counter_sample` and `counter_sample_expanded` are NOT yet supported. +* Mapping of custom fields is not yet supported diff --git a/receiver/netflowreceiver/config.go b/receiver/netflowreceiver/config.go index e92eb79feb2e..61d6e6952add 100644 --- a/receiver/netflowreceiver/config.go +++ b/receiver/netflowreceiver/config.go @@ -32,7 +32,7 @@ type Config struct { // Validate checks if the receiver configuration is valid func (cfg *Config) Validate() error { - validSchemes := [3]string{"sflow", "netflow", "flow"} + validSchemes := [2]string{"sflow", "netflow"} validScheme := false for _, scheme := range validSchemes { @@ -42,7 +42,7 @@ func (cfg *Config) Validate() error { } } if !validScheme { - return fmt.Errorf("scheme must be one of sflow, netflow, or flow") + return fmt.Errorf("scheme must be netflow or sflow") } if cfg.Sockets <= 0 { diff --git a/receiver/netflowreceiver/config_test.go b/receiver/netflowreceiver/config_test.go index 167eaadaaf1e..2a51b8b783e1 100644 --- a/receiver/netflowreceiver/config_test.go +++ b/receiver/netflowreceiver/config_test.go @@ -36,7 +36,27 @@ func TestLoadConfig(t *testing.T) { Port: 2055, Sockets: 1, Workers: 1, - QueueSize: 1000000, + QueueSize: 1000, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "zero_queue"), + expected: &Config{ + Scheme: "netflow", + Port: 2055, + Sockets: 1, + Workers: 1, + QueueSize: 1000, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "sflow"), + expected: &Config{ + Scheme: "sflow", + Port: 6343, + Sockets: 1, + Workers: 1, + QueueSize: 1000, }, }, } @@ -68,12 +88,20 @@ func TestInvalidConfig(t *testing.T) { }{ { id: component.NewIDWithName(metadata.Type, "invalid_schema"), - err: "scheme must be one of sflow, netflow, or flow", + err: "scheme must be netflow or sflow", }, { id: component.NewIDWithName(metadata.Type, "invalid_port"), err: "port must be greater than 0", }, + { + id: component.NewIDWithName(metadata.Type, "zero_sockets"), + err: "sockets must be greater than 0", + }, + { + id: component.NewIDWithName(metadata.Type, "zero_workers"), + err: "workers must be greater than 0", + }, } for _, tt := range tests { diff --git a/receiver/netflowreceiver/factory.go b/receiver/netflowreceiver/factory.go index 87c27e9b76ed..f9a150f3c9e4 100644 --- a/receiver/netflowreceiver/factory.go +++ b/receiver/netflowreceiver/factory.go @@ -14,9 +14,12 @@ import ( ) const ( - defaultSockets = 1 - defaultWorkers = 2 - defaultQueueSize = 1_000_000 + defaultSockets = 1 + defaultWorkers = 2 + // The default UDP packet buffer size in GoFlow2 is 9000 bytes, which means + // that for a full queue of 1000 messages, the size in memory will be 9MB. + // Source: https://github.com/netsampler/goflow2/blob/v2.2.1/README.md#security-notes-and-assumptions + defaultQueueSize = 1_000 ) // NewFactory creates a factory for netflow receiver. @@ -27,6 +30,8 @@ func NewFactory() receiver.Factory { receiver.WithLogs(createLogsReceiver, metadata.LogsStability)) } +// Config defines configuration for netflow receiver. +// By default we listen for netflow traffic on port 2055 func createDefaultConfig() component.Config { return &Config{ Scheme: "netflow", @@ -37,14 +42,15 @@ func createDefaultConfig() component.Config { } } +// createLogsReceiver creates a netflow receiver. +// We also create the UDP receiver, which is the piece of software that actually listens +// for incoming netflow traffic on an UDP port. func createLogsReceiver(_ context.Context, params receiver.Settings, cfg component.Config, consumer consumer.Logs) (receiver.Logs, error) { - logger := params.Logger - conf := cfg.(*Config) + conf := *(cfg.(*Config)) - nr := &netflowReceiver{ - logger: logger, - logConsumer: consumer, - config: conf, + nr, err := newNetflowLogsReceiver(params, conf, consumer) + if err != nil { + return nil, err } return nr, nil diff --git a/receiver/netflowreceiver/go.mod b/receiver/netflowreceiver/go.mod index 992f5f2fc28c..acfd024e4a56 100644 --- a/receiver/netflowreceiver/go.mod +++ b/receiver/netflowreceiver/go.mod @@ -3,14 +3,17 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflo go 1.22.0 require ( + github.com/netsampler/goflow2/v2 v2.2.1 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.118.1-0.20250121185328-fbefb22cc2b3 go.opentelemetry.io/collector/component/componenttest v0.118.1-0.20250121185328-fbefb22cc2b3 go.opentelemetry.io/collector/confmap v1.24.1-0.20250121185328-fbefb22cc2b3 go.opentelemetry.io/collector/consumer v1.24.1-0.20250121185328-fbefb22cc2b3 go.opentelemetry.io/collector/consumer/consumertest v0.118.1-0.20250121185328-fbefb22cc2b3 + go.opentelemetry.io/collector/pdata v1.24.1-0.20250121185328-fbefb22cc2b3 go.opentelemetry.io/collector/receiver v0.118.1-0.20250121185328-fbefb22cc2b3 go.opentelemetry.io/collector/receiver/receivertest v0.118.1-0.20250121185328-fbefb22cc2b3 + go.opentelemetry.io/collector/semconv v0.118.1-0.20250121185328-fbefb22cc2b3 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 ) @@ -26,6 +29,7 @@ require ( github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.2 // indirect + github.com/libp2p/go-reuseport v0.4.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -35,7 +39,6 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect - go.opentelemetry.io/collector/pdata v1.24.1-0.20250121185328-fbefb22cc2b3 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect go.opentelemetry.io/collector/pipeline v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect diff --git a/receiver/netflowreceiver/go.sum b/receiver/netflowreceiver/go.sum index 1607e51fdf1b..4767b6ec09b9 100644 --- a/receiver/netflowreceiver/go.sum +++ b/receiver/netflowreceiver/go.sum @@ -31,6 +31,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQscQm2s= +github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -40,6 +42,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/netsampler/goflow2/v2 v2.2.1 h1:QzrtWS/meXsqCLv68hdouL+09NfuLKrCoVDJ1xfmuoE= +github.com/netsampler/goflow2/v2 v2.2.1/go.mod h1:057wOc/Xp7c+hUwRDB7wRqrx55m0r3vc7J0k4NrlFbM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= @@ -82,6 +86,8 @@ go.opentelemetry.io/collector/receiver/receivertest v0.118.1-0.20250121185328-fb go.opentelemetry.io/collector/receiver/receivertest v0.118.1-0.20250121185328-fbefb22cc2b3/go.mod h1:x9N91YI3onF0+enjYegcHYOb50Of2xO05c8EyE/baJ0= go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250121185328-fbefb22cc2b3 h1:lSOxA/PFNKwCCf0bYwOkTtvYn4Ch4QADFVJU/kuye08= go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250121185328-fbefb22cc2b3/go.mod h1:WLPXXIuodY7quBgqCz3OIsPNdBMLDej5nUIbiyyfoUc= +go.opentelemetry.io/collector/semconv v0.118.1-0.20250121185328-fbefb22cc2b3 h1:pkPYgLJrfMwFtLwu+TMjuD53lBQN68uT/OMQ8c7EtxU= +go.opentelemetry.io/collector/semconv v0.118.1-0.20250121185328-fbefb22cc2b3/go.mod h1:N6XE8Q0JKgBN2fAhkUQtqK9LT7rEGR6+Wu/Rtbal1iI= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= diff --git a/receiver/netflowreceiver/listener.go b/receiver/netflowreceiver/listener.go deleted file mode 100644 index d4507cadec8f..000000000000 --- a/receiver/netflowreceiver/listener.go +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver" - -type Listener struct { - // config Config - // logger *zap.Logger - // recv *utils.UDPReceiver - // logConsumer consumer.Logs -} diff --git a/receiver/netflowreceiver/listener_test.go b/receiver/netflowreceiver/listener_test.go deleted file mode 100644 index 9fbae90c700c..000000000000 --- a/receiver/netflowreceiver/listener_test.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package netflowreceiver - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/receiver/receivertest" -) - -func TestCreateValidDefaultListener(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - set := receivertest.NewNopSettings() - receiver, err := factory.CreateLogs(context.Background(), set, cfg, consumertest.NewNop()) - assert.NoError(t, err, "receiver creation failed") - assert.NotNil(t, receiver, "receiver creation failed") - assert.Equal(t, "netflow", receiver.(*netflowReceiver).config.Scheme) - assert.Equal(t, 2055, receiver.(*netflowReceiver).config.Port) - assert.Equal(t, 1, receiver.(*netflowReceiver).config.Sockets) - assert.Equal(t, 2, receiver.(*netflowReceiver).config.Workers) - assert.Equal(t, 1_000_000, receiver.(*netflowReceiver).config.QueueSize) -} diff --git a/receiver/netflowreceiver/parser.go b/receiver/netflowreceiver/parser.go new file mode 100644 index 000000000000..59895c0d15d8 --- /dev/null +++ b/receiver/netflowreceiver/parser.go @@ -0,0 +1,249 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver" + +import ( + "errors" + "net/netip" + "time" + + "github.com/netsampler/goflow2/v2/producer" + protoproducer "github.com/netsampler/goflow2/v2/producer/proto" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + semconv "go.opentelemetry.io/collector/semconv/v1.27.0" +) + +var ( + + // https://en.wikipedia.org/wiki/EtherType + etypeNames = map[uint32]string{ + 0x806: "arp", + 0x800: "ipv4", + 0x86dd: "ipv6", + } + + // https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml + transportProtocolNames = map[uint32]string{ + 0: "hopopt", + 1: "icmp", + 2: "igmp", + 3: "ggp", + 4: "ipv4", + 5: "st", + 6: "tcp", + 7: "cbt", + 8: "egp", + 9: "igp", + 10: "bbn-rcc-mon", + 11: "nvp-ii", + 12: "pup", + 13: "argus", + 14: "emcon", + 15: "xnet", + 16: "chaos", + 17: "udp", + 18: "mux", + 19: "dcn-meas", + 20: "hmp", + 21: "prm", + 22: "xns-idp", + 23: "trunk-1", + 24: "trunk-2", + 25: "leaf-1", + 26: "leaf-2", + 27: "rdp", + 28: "irtp", + 29: "iso-tp4", + 30: "netblt", + 31: "mfe-nsp", + 32: "merit-inp", + 33: "dccp", + 34: "3pc", + 35: "idpr", + 36: "xtp", + 37: "ddp", + 38: "idpr-cmtp", + 39: "tp++", + 40: "il", + 41: "ipv6", + 42: "sdrp", + 43: "ipv6-route", + 44: "ipv6-frag", + 45: "idrp", + 46: "rsvp", + 47: "gre", + 48: "dsr", + 49: "bna", + 50: "esp", + 51: "ah", + 52: "i-nlsp", + 53: "swipe", + 54: "narp", + 55: "min-ipv4", + 56: "tlsp", + 57: "skip", + 58: "ipv6-icmp", + 59: "ipv6-nonxt", + 60: "ipv6-opts", + 61: "any-host-internal-protocol", + 62: "cftp", + 63: "any-local-network", + 64: "sat-expak", + 65: "kryptolan", + 66: "rvd", + 67: "ippc", + 68: "any-distributed-file-system", + 69: "sat-mon", + 70: "visa", + 71: "ipcv", + 72: "cpnx", + 73: "cphb", + 74: "wsn", + 75: "pvp", + 76: "br-sat-mon", + 77: "sun-nd", + 78: "wb-mon", + 79: "wb-expak", + 80: "iso-ip", + 81: "vmtp", + 82: "secure-vmtp", + 83: "vines", + 84: "iptm", + 85: "nsfnet-igp", + 86: "dgp", + 87: "tcf", + 88: "eigrp", + 89: "ospfigp", + 90: "sprite-rpc", + 91: "larp", + 92: "mtp", + 93: "ax.25", + 94: "ipip", + 95: "micp", + 96: "scc-sp", + 97: "etherip", + 98: "encap", + 99: "any-private-encryption-scheme", + 100: "gmtp", + 101: "ifmp", + 102: "pnni", + 103: "pim", + 104: "aris", + 105: "scps", + 106: "qnx", + 107: "a/n", + 108: "ipcomp", + 109: "snp", + 110: "compaq-peer", + 111: "ipx-in-ip", + 112: "vrrp", + 113: "pgm", + 114: "any-0-hop-protocol", + 115: "l2tp", + 116: "ddx", + 117: "iatp", + 118: "stp", + 119: "srp", + 120: "uti", + 121: "smp", + 122: "sm", + 123: "ptp", + 124: "isis over ipv4", + 125: "fire", + 126: "crtp", + 127: "crudp", + 128: "sscopmce", + 129: "iplt", + 130: "sps", + 131: "pipe", + 132: "sctp", + 133: "fc", + 134: "rsvp-e2e-ignore", + 135: "mobility header", + 136: "udplite", + 137: "mpls-in-ip", + 138: "manet", + 139: "hip", + 140: "shim6", + 141: "wesp", + 142: "rohc", + 143: "ethernet", + 144: "aggfrag", + 145: "nsh", + } + + flowTypeNames = map[int32]string{ + 0: "unknown", + 1: "sflow_5", + 2: "netflow_v5", + 3: "netflow_v9", + 4: "ipfix", + } +) + +func getEtypeName(etype uint32) string { + if name, ok := etypeNames[etype]; ok { + return name + } + return "unknown" +} + +func getTransportName(proto uint32) string { + if name, ok := transportProtocolNames[proto]; ok { + return name + } + return "unknown" +} + +func getFlowTypeName(flowType int32) string { + if name, ok := flowTypeNames[flowType]; ok { + return name + } + return "unknown" +} + +// addMessageAttributes parses the message attributes and adds them to the log record +func addMessageAttributes(m producer.ProducerMessage, r *plog.LogRecord) error { + // we know msg is ProtoProducerMessage because that is the parent producer + pm, ok := m.(*protoproducer.ProtoProducerMessage) + if !ok { + return errors.New("this flow message is not ProtoProducerMessage, this is not expected") + } + + // Parse IP addresses bytes to netip.Addr + srcAddr, _ := netip.AddrFromSlice(pm.SrcAddr) + dstAddr, _ := netip.AddrFromSlice(pm.DstAddr) + samplerAddr, _ := netip.AddrFromSlice(pm.SamplerAddress) + + // Time the receiver received the message + receivedTime := time.Unix(0, int64(pm.TimeReceivedNs)) + startTime := time.Unix(0, int64(pm.TimeFlowStartNs)) + + r.SetObservedTimestamp(pcommon.NewTimestampFromTime(receivedTime)) + r.SetTimestamp(pcommon.NewTimestampFromTime(startTime)) + + // Source and destination attributes + r.Attributes().PutStr(semconv.AttributeSourceAddress, srcAddr.String()) + r.Attributes().PutInt(semconv.AttributeSourcePort, int64(pm.SrcPort)) + r.Attributes().PutStr(semconv.AttributeDestinationAddress, dstAddr.String()) + r.Attributes().PutInt(semconv.AttributeDestinationPort, int64(pm.DstPort)) + + // Network attributes + r.Attributes().PutStr(semconv.AttributeNetworkTransport, getTransportName(pm.Proto)) + r.Attributes().PutStr(semconv.AttributeNetworkType, getEtypeName(pm.Etype)) + + // There is no semconv as of today for these + r.Attributes().PutInt("flow.io.bytes", int64(pm.Bytes)) + r.Attributes().PutInt("flow.io.packets", int64(pm.Packets)) + r.Attributes().PutStr("flow.type", getFlowTypeName(int32(pm.Type))) + r.Attributes().PutInt("flow.sequence_num", int64(pm.SequenceNum)) + r.Attributes().PutInt("flow.time_received", int64(pm.TimeReceivedNs)) + r.Attributes().PutInt("flow.start", int64(pm.TimeFlowStartNs)) + r.Attributes().PutInt("flow.end", int64(pm.TimeFlowEndNs)) + r.Attributes().PutInt("flow.sampling_rate", int64(pm.SamplingRate)) + r.Attributes().PutStr("flow.sampler_address", samplerAddr.String()) + + return nil +} diff --git a/receiver/netflowreceiver/parser_test.go b/receiver/netflowreceiver/parser_test.go new file mode 100644 index 000000000000..f41b9bfc8c0f --- /dev/null +++ b/receiver/netflowreceiver/parser_test.go @@ -0,0 +1,124 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package netflowreceiver + +import ( + "net/netip" + "testing" + + flowpb "github.com/netsampler/goflow2/v2/pb" + protoproducer "github.com/netsampler/goflow2/v2/producer/proto" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + semconv "go.opentelemetry.io/collector/semconv/v1.27.0" +) + +func TestGetProtoName(t *testing.T) { + tests := []struct { + proto uint32 + want string + }{ + {proto: 1, want: "icmp"}, + {proto: 6, want: "tcp"}, + {proto: 17, want: "udp"}, + {proto: 58, want: "ipv6-icmp"}, + {proto: 132, want: "sctp"}, + {proto: 0, want: "hopopt"}, + {proto: 400, want: "unknown"}, + } + + for _, tt := range tests { + t.Run(tt.want, func(t *testing.T) { + got := getTransportName(tt.proto) + if got != tt.want { + t.Errorf("getProtoName(%d) = %s; want %s", tt.proto, got, tt.want) + } + }) + } +} + +func TestConvertToOtel(t *testing.T) { + pm := &protoproducer.ProtoProducerMessage{ + FlowMessage: flowpb.FlowMessage{ + SrcAddr: netip.MustParseAddr("192.168.1.1").AsSlice(), + SrcPort: 0, + DstAddr: netip.MustParseAddr("192.168.1.2").AsSlice(), + DstPort: 2055, + SamplerAddress: netip.MustParseAddr("192.168.1.100").AsSlice(), + Type: 3, + Etype: 0x800, + Proto: 6, + Bytes: 100, + Packets: 1, + TimeReceivedNs: 1000000000, + TimeFlowStartNs: 1000000100, + TimeFlowEndNs: 1000000200, + SequenceNum: 1, + SamplingRate: 1, + }, + } + + record := plog.NewLogRecord() + err := addMessageAttributes(pm, &record) + if err != nil { + t.Errorf("TestConvertToOtel() error = %v", err) + return + } + + assert.Equal(t, int64(1000000100), record.Timestamp().AsTime().UnixNano()) + assert.Equal(t, int64(1000000000), record.ObservedTimestamp().AsTime().UnixNano()) + + expectedAttributes := pcommon.NewMap() + expectedAttributes.PutStr(semconv.AttributeSourceAddress, "192.168.1.1") + expectedAttributes.PutInt(semconv.AttributeSourcePort, 0) + expectedAttributes.PutStr(semconv.AttributeDestinationAddress, "192.168.1.2") + expectedAttributes.PutInt(semconv.AttributeDestinationPort, 2055) + expectedAttributes.PutStr(semconv.AttributeNetworkTransport, getTransportName(6)) + expectedAttributes.PutStr(semconv.AttributeNetworkType, getEtypeName(0x800)) + expectedAttributes.PutInt("flow.io.bytes", 100) + expectedAttributes.PutInt("flow.io.packets", 1) + expectedAttributes.PutStr("flow.type", getFlowTypeName(3)) + expectedAttributes.PutInt("flow.sequence_num", 1) + expectedAttributes.PutInt("flow.time_received", 1000000000) + expectedAttributes.PutInt("flow.start", 1000000100) + expectedAttributes.PutInt("flow.end", 1000000200) + expectedAttributes.PutInt("flow.sampling_rate", 1) + expectedAttributes.PutStr("flow.sampler_address", "192.168.1.100") + + assert.Equal(t, expectedAttributes, record.Attributes()) +} + +func TestEmptyConvertToOtel(t *testing.T) { + pm := &protoproducer.ProtoProducerMessage{} + + record := plog.NewLogRecord() + err := addMessageAttributes(pm, &record) + if err != nil { + t.Errorf("TestConvertToOtel() error = %v", err) + return + } + + assert.Equal(t, int64(0), record.Timestamp().AsTime().UnixNano()) + assert.Equal(t, int64(0), record.ObservedTimestamp().AsTime().UnixNano()) + + expectedAttributes := pcommon.NewMap() + expectedAttributes.PutStr(semconv.AttributeSourceAddress, "invalid IP") + expectedAttributes.PutInt(semconv.AttributeSourcePort, 0) + expectedAttributes.PutStr(semconv.AttributeDestinationAddress, "invalid IP") + expectedAttributes.PutInt(semconv.AttributeDestinationPort, 0) + expectedAttributes.PutStr(semconv.AttributeNetworkTransport, "hopopt") + expectedAttributes.PutStr(semconv.AttributeNetworkType, "unknown") + expectedAttributes.PutInt("flow.io.bytes", 0) + expectedAttributes.PutInt("flow.io.packets", 0) + expectedAttributes.PutStr("flow.type", "unknown") + expectedAttributes.PutInt("flow.sequence_num", 0) + expectedAttributes.PutInt("flow.time_received", 0) + expectedAttributes.PutInt("flow.start", 0) + expectedAttributes.PutInt("flow.end", 0) + expectedAttributes.PutInt("flow.sampling_rate", 0) + expectedAttributes.PutStr("flow.sampler_address", "invalid IP") + + assert.Equal(t, expectedAttributes, record.Attributes()) +} diff --git a/receiver/netflowreceiver/producer.go b/receiver/netflowreceiver/producer.go new file mode 100644 index 000000000000..31e9ad3b3949 --- /dev/null +++ b/receiver/netflowreceiver/producer.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver" + +import ( + "context" + + "github.com/netsampler/goflow2/v2/producer" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver/internal/metadata" +) + +// OtelLogsProducerWrapper is a wrapper around a producer.ProducerInterface that sends the messages to a log consumer +type OtelLogsProducerWrapper struct { + wrapped producer.ProducerInterface + logConsumer consumer.Logs + logger *zap.Logger +} + +// Produce converts the message into a list log records and sends them to log consumer +func (o *OtelLogsProducerWrapper) Produce(msg any, args *producer.ProduceArgs) ([]producer.ProducerMessage, error) { + defer func() { + if pErr := recover(); pErr != nil { + errMessage, _ := pErr.(string) + o.logger.Error("unexpected error processing the message", zap.String("error", errMessage)) + } + }() + + // First we let the proto producer parse the message + // All the netflow protocol and structure is handled by the proto producer + flowMessageSet, err := o.wrapped.Produce(msg, args) + if err != nil { + return flowMessageSet, err + } + + // Create the otel log structure to hold our messages + log := plog.NewLogs() + scopeLog := log.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty() + scopeLog.Scope().SetName(metadata.ScopeName) + scopeLog.Scope().Attributes().PutStr("receiver", metadata.Type.String()) + logRecords := scopeLog.LogRecords() + + // A single netflow packet can contain multiple flow messages + for _, msg := range flowMessageSet { + logRecord := logRecords.AppendEmpty() + parseErr := addMessageAttributes(msg, &logRecord) + if parseErr != nil { + continue + } + } + + if len(flowMessageSet) == 0 { + o.logger.Info("received a packet with no flow messages from", zap.String("agent", args.SamplerAddress.String())) + } + + err = o.logConsumer.ConsumeLogs(context.Background(), log) + if err != nil { + return flowMessageSet, err + } + + return flowMessageSet, nil +} + +func (o *OtelLogsProducerWrapper) Close() { + o.wrapped.Close() +} + +func (o *OtelLogsProducerWrapper) Commit(flowMessageSet []producer.ProducerMessage) { + o.wrapped.Commit(flowMessageSet) +} + +func newOtelLogsProducer(wrapped producer.ProducerInterface, logConsumer consumer.Logs, logger *zap.Logger) producer.ProducerInterface { + return &OtelLogsProducerWrapper{ + wrapped: wrapped, + logConsumer: logConsumer, + logger: logger, + } +} diff --git a/receiver/netflowreceiver/producer_test.go b/receiver/netflowreceiver/producer_test.go new file mode 100644 index 000000000000..06262750f84f --- /dev/null +++ b/receiver/netflowreceiver/producer_test.go @@ -0,0 +1,108 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package netflowreceiver + +import ( + "net/netip" + "testing" + + "github.com/netsampler/goflow2/v2/decoders/netflow" + flowpb "github.com/netsampler/goflow2/v2/pb" + "github.com/netsampler/goflow2/v2/producer" + protoproducer "github.com/netsampler/goflow2/v2/producer/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +func TestProduce(t *testing.T) { + // list of netflow.DataFlowSet + message := &netflow.NFv9Packet{ + Version: 9, + Count: 1, + SystemUptime: 0xb3bff683, + UnixSeconds: 0x618aa3a8, + SequenceNumber: 838987416, + SourceId: 256, + FlowSets: []any{ + netflow.DataFlowSet{ + FlowSetHeader: netflow.FlowSetHeader{ + Id: 260, + Length: 1372, + }, + Records: []netflow.DataRecord{ + { + Values: []netflow.DataField{ + { + PenProvided: false, + Type: 2, + Pen: 0, + Value: []uint8{0x00, 0x00, 0x00, 0x01}, + }, + }, + }, + }, + }, + }, + } + + cfgProducer := &protoproducer.ProducerConfig{} + cfgm, err := cfgProducer.Compile() // converts configuration into a format that can be used by a protobuf producer + require.NoError(t, err) + // We use a goflow2 proto producer to produce messages using protobuf format + protoProducer, err := protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem) + require.NoError(t, err) + + otelLogsProducer := newOtelLogsProducer(protoProducer, consumertest.NewNop(), zap.NewNop()) + messages, err := otelLogsProducer.Produce(message, &producer.ProduceArgs{}) + require.NoError(t, err) + require.NotNil(t, messages) + assert.Len(t, messages, 1) + + pm, ok := messages[0].(*protoproducer.ProtoProducerMessage) + assert.True(t, ok) + assert.Equal(t, flowpb.FlowMessage_NETFLOW_V9, pm.Type) + assert.Equal(t, uint64(1), pm.Packets) + assert.Equal(t, uint32(256), pm.ObservationDomainId) + assert.Equal(t, uint32(838987416), pm.SequenceNum) +} + +// This PanicProducer replaces the ProtoProducer, to simulate it producing a panic +type PanicProducer struct{} + +func (m *PanicProducer) Produce(_ any, _ *producer.ProduceArgs) ([]producer.ProducerMessage, error) { + panic("producer panic!") +} + +func (m *PanicProducer) Close() {} + +func (m *PanicProducer) Commit(_ []producer.ProducerMessage) {} + +func TestProducerPanic(t *testing.T) { + // Create a mock logger that can capture logged messages + observedZapCore, observedLogs := observer.New(zap.InfoLevel) + logger := zap.New(observedZapCore) + + // Create a mock consumer + mockConsumer := consumertest.NewNop() + + // Wrap a PanicProducer (instead of ProtoProducer) in the OtelLogsProducerWrapper + wrapper := newOtelLogsProducer(&PanicProducer{}, mockConsumer, logger) + + // Call Produce which should recover from panic + messages, err := wrapper.Produce(nil, &producer.ProduceArgs{ + SamplerAddress: netip.MustParseAddr("127.0.0.1"), + }) + + // Verify that no error is returned (since panic was recovered) + assert.NoError(t, err) + assert.Empty(t, messages) + + // Verify that the error was logged + log := observedLogs.All()[0] + assert.Equal(t, "unexpected error processing the message", log.Message) + assert.Equal(t, "producer panic!", log.ContextMap()["error"]) +} diff --git a/receiver/netflowreceiver/receiver.go b/receiver/netflowreceiver/receiver.go index c31bcbe7baaf..0c44e6997a40 100644 --- a/receiver/netflowreceiver/receiver.go +++ b/receiver/netflowreceiver/receiver.go @@ -5,25 +5,147 @@ package netflowreceiver // import "github.com/open-telemetry/opentelemetry-colle import ( "context" + "errors" + "fmt" + "net" + "github.com/netsampler/goflow2/v2/decoders/netflow" + protoproducer "github.com/netsampler/goflow2/v2/producer/proto" + "github.com/netsampler/goflow2/v2/utils" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" ) +var _ utils.ReceiverCallback = (*dropHandler)(nil) + +type dropHandler struct { + logger *zap.Logger +} + +func (d dropHandler) Dropped(msg utils.Message) { + d.logger.Warn("Dropped netflow message", zap.Any("msg", msg)) +} + type netflowReceiver struct { - // host component.Host - // cancel context.CancelFunc - config *Config - logConsumer consumer.Logs + config Config logger *zap.Logger - // listeners []*Listener + udpReceiver *utils.UDPReceiver + logConsumer consumer.Logs +} + +func newNetflowLogsReceiver(params receiver.Settings, cfg Config, consumer consumer.Logs) (receiver.Logs, error) { + // UDP receiver configuration + udpCfg := &utils.UDPReceiverConfig{ + Sockets: cfg.Sockets, + Workers: cfg.Workers, + QueueSize: cfg.QueueSize, + Blocking: false, + ReceiverCallback: &dropHandler{ + logger: params.Logger, + }, + } + udpReceiver, err := utils.NewUDPReceiver(udpCfg) + if err != nil { + return nil, err + } + + nr := &netflowReceiver{ + logger: params.Logger, + config: cfg, + logConsumer: consumer, + udpReceiver: udpReceiver, + } + + return nr, nil } func (nr *netflowReceiver) Start(_ context.Context, _ component.Host) error { + // The function that will decode packets + decodeFunc, err := nr.buildDecodeFunc() + if err != nil { + return err + } + + nr.logger.Info("Starting UDP listener", zap.String("scheme", nr.config.Scheme), zap.Int("port", nr.config.Port)) + if err := nr.udpReceiver.Start(nr.config.Hostname, nr.config.Port, decodeFunc); err != nil { + return err + } + + // This runs until the receiver is stoppped, consuming from an error channel + go nr.handleErrors() + return nil } -func (nr *netflowReceiver) Shutdown(_ context.Context) error { +func (nr *netflowReceiver) Shutdown(context.Context) error { + if nr.udpReceiver == nil { + return nil + } + err := nr.udpReceiver.Stop() + if err != nil { + nr.logger.Warn("Error stopping UDP receiver", zap.Error(err)) + } return nil } + +// buildDecodeFunc creates a decode function based on the scheme +// This is the fuction that will be invoked for every netflow packet received +// The function depends on the type of schema (netflow, sflow, flow) +func (nr *netflowReceiver) buildDecodeFunc() (utils.DecoderFunc, error) { + // Eventually this can be used to configure mappings + cfgProducer := &protoproducer.ProducerConfig{} + cfgm, err := cfgProducer.Compile() // converts configuration into a format that can be used by a protobuf producer + if err != nil { + return nil, err + } + // We use a goflow2 proto producer to produce messages using protobuf format + protoProducer, err := protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem) + if err != nil { + return nil, err + } + + // the otel log producer converts those messages into OpenTelemetry logs + // it is a wrapper around the protobuf producer + otelLogsProducer := newOtelLogsProducer(protoProducer, nr.logConsumer, nr.logger) + + cfgPipe := &utils.PipeConfig{ + Producer: otelLogsProducer, + } + + var p utils.FlowPipe + switch nr.config.Scheme { + case "sflow": + p = utils.NewSFlowPipe(cfgPipe) + case "netflow": + p = utils.NewNetFlowPipe(cfgPipe) + default: + return nil, fmt.Errorf("scheme does not exist: %s", nr.config.Scheme) + } + return p.DecodeFlow, nil +} + +// handleErrors handles errors from the listener +// We don't want the receiver to stop if there is an error processing a packet +func (nr *netflowReceiver) handleErrors() { + for err := range nr.udpReceiver.Errors() { + switch { + case errors.Is(err, net.ErrClosed): + nr.logger.Info("UDP receiver closed, exiting error handler") + return + + case !errors.Is(err, netflow.ErrorTemplateNotFound): + nr.logger.Error("received a generic error while processing a flow message via GoFlow2 for the netflow receiver", zap.Error(err)) + continue + + case errors.Is(err, netflow.ErrorTemplateNotFound): + nr.logger.Warn("we could not find a template for a flow message, this error is expected from time to time until the device sends a template", zap.Error(err)) + continue + + default: + nr.logger.Error("unexpected error processing the message", zap.Error(err)) + continue + } + } +} diff --git a/receiver/netflowreceiver/receiver_test.go b/receiver/netflowreceiver/receiver_test.go index ad37d7feebf0..1f96f96e507d 100644 --- a/receiver/netflowreceiver/receiver_test.go +++ b/receiver/netflowreceiver/receiver_test.go @@ -19,6 +19,5 @@ func TestCreateValidDefaultReceiver(t *testing.T) { receiver, err := factory.CreateLogs(context.Background(), set, cfg, consumertest.NewNop()) assert.NoError(t, err, "receiver creation failed") assert.NotNil(t, receiver, "receiver creation failed") - // TODO - Will be added on the following PR - // assert.NotNil(t, "sflow", receiver.(*netflowReceiver).listeners[0].recv) + assert.NotNil(t, receiver.(*netflowReceiver).udpReceiver) } diff --git a/receiver/netflowreceiver/testdata/config.yaml b/receiver/netflowreceiver/testdata/config.yaml index cbc5f657b145..1cc4164c1fbd 100644 --- a/receiver/netflowreceiver/testdata/config.yaml +++ b/receiver/netflowreceiver/testdata/config.yaml @@ -14,3 +14,29 @@ netflow/invalid_port: sockets: 1 workers: 1 port: 0 + +netflow/zero_sockets: + scheme: netflow + port: 2055 + sockets: 0 + workers: 1 + +netflow/zero_workers: + scheme: netflow + port: 2055 + sockets: 1 + workers: 0 + +netflow/zero_queue: + scheme: netflow + port: 2055 + sockets: 1 + workers: 1 + queue_size: 0 + +netflow/sflow: + scheme: sflow + port: 6343 + sockets: 1 + workers: 1 + queue_size: 0