Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/netflow] Netflow receiver implementation - PR 2 #36865

Merged
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
f7ff90b
#32732 - Initial implementation
dlopes7 Dec 15, 2024
9871936
#32732 - Make internal methods private
dlopes7 Dec 15, 2024
09a91d1
#32732 - netflow - make checks
dlopes7 Dec 15, 2024
26d5aec
#32732 - netflow - simplify implementation, remove listener
dlopes7 Dec 16, 2024
64acc94
netflow - add parser tests
dlopes7 Dec 16, 2024
658c414
netflow - add more tests
dlopes7 Dec 16, 2024
0a50dbb
netflow - add test to check for udp receiver creation
dlopes7 Dec 16, 2024
dd6a006
netflow - gofmt
dlopes7 Dec 16, 2024
363f93b
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Dec 16, 2024
3854269
netflow - make checks
dlopes7 Dec 16, 2024
0fe206e
netflowreceiver - add changelog for implementation
dlopes7 Dec 17, 2024
d7c0782
netflow - fix license header
dlopes7 Dec 17, 2024
b343ab4
netflowreceiver - lint
dlopes7 Dec 17, 2024
f504466
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Dec 17, 2024
d31a81e
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Dec 19, 2024
8d07a6a
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Dec 19, 2024
f7b1512
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Dec 20, 2024
33d0cee
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Dec 27, 2024
5e178ac
netflow - update .chloggen/netflow-receiver-implementation.yaml
dlopes7 Jan 2, 2025
ebfa73d
netflow - remove scaffold issue
dlopes7 Jan 2, 2025
86701b9
netflow - add comment about default queue size value
dlopes7 Jan 2, 2025
52bca59
netflow - fix duplicate test case
dlopes7 Jan 2, 2025
edbbbc0
netflow - fix typo
dlopes7 Jan 2, 2025
d68e826
netflow - better error handling messages
dlopes7 Jan 8, 2025
053543c
netflow - use a simple log record instead of struct
dlopes7 Jan 8, 2025
169373f
netflow - use context.Background() on producer
dlopes7 Jan 8, 2025
5cf7625
netflow - type the dropHandler interface
dlopes7 Jan 8, 2025
10fc878
Update .chloggen/netflow-receiver-implementation.yaml
dlopes7 Jan 8, 2025
2281a06
Merge branch 'netflow-receiver-implementation' of github.com:dlopes7/…
dlopes7 Jan 8, 2025
54d8b60
netflow - more concise startup log message
dlopes7 Jan 8, 2025
d2c1c1f
netflow - sflow and netflow schema documentation
dlopes7 Jan 8, 2025
41b68a2
netflow - remove 'flow' scheme
dlopes7 Jan 8, 2025
7dbde9b
netflow - add sflow to sample yaml config
dlopes7 Jan 8, 2025
b33920b
netflow - fix default queue size
dlopes7 Jan 8, 2025
12e8425
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Jan 8, 2025
77ec06a
netflow - go mod tidy
dlopes7 Jan 8, 2025
a1f6422
netflow - format
dlopes7 Jan 8, 2025
c738398
netflow - lint and typing
dlopes7 Jan 8, 2025
ccf6e7c
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Jan 8, 2025
c090043
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Jan 9, 2025
3ef5c92
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 9, 2025
6c92bc5
netflow - better wording on log
dlopes7 Jan 13, 2025
b767ec6
netflow - rename fields
dlopes7 Jan 13, 2025
416efa7
netflow - remove unecessary log line
dlopes7 Jan 13, 2025
9e4aa70
netflow - better reference in comment
dlopes7 Jan 13, 2025
583915e
netflow - remove unecessary log line
dlopes7 Jan 13, 2025
64aed8c
netflow - move flow keys to 'flow.' namespace
dlopes7 Jan 13, 2025
ab9b6c1
netflow - update readme keys
dlopes7 Jan 13, 2025
dd8f7d9
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 13, 2025
fed610b
netflow - adjust validSchemes slice size
dlopes7 Jan 13, 2025
f77489b
netflow - clarify minimum queue size
dlopes7 Jan 13, 2025
44e134e
netflow - simplify panic handling
dlopes7 Jan 13, 2025
f6526d6
netflow - formatting and remove a comment
dlopes7 Jan 13, 2025
5c07873
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 13, 2025
59633be
Update receiver/netflowreceiver/producer.go
dlopes7 Jan 13, 2025
75cba97
Update receiver/netflowreceiver/producer.go
dlopes7 Jan 13, 2025
af1e72b
netflow - fix panic handling and test for it
dlopes7 Jan 13, 2025
da9b68e
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 13, 2025
e72e3f3
Merge branch 'netflow-receiver-implementation' of github.com:dlopes7/…
dlopes7 Jan 14, 2025
22c12b2
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 14, 2025
075d87b
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 14, 2025
9049e77
netflow - add tests for parsed timestamps
dlopes7 Jan 16, 2025
735ab5a
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
dlopes7 Jan 16, 2025
9eb1955
netflow - go mod tidy
dlopes7 Jan 16, 2025
838e479
netflow - update semconv
dlopes7 Jan 16, 2025
bbd7c38
Merge branch 'main' into netflow-receiver-implementation
dlopes7 Jan 16, 2025
41c116f
Merge remote-tracking branch 'upstream/main' into netflow-receiver-im…
evan-bradley Jan 22, 2025
a67b9ad
netflow - update semconv
dlopes7 Jan 22, 2025
40cc5ec
go mod tidy
evan-bradley Jan 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/netflow-receiver-implementation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: netflowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds the implementation of the netflow receiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32732]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: The receiver now supports receiving NetFlow v5, NetFow v9, IPFIX, and sFlow v5 logs.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
81 changes: 48 additions & 33 deletions receiver/netflowreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ receivers:
port: 2055
sockets: 16
workers: 32
netflow/sflow:
- scheme: sflow
port: 6343
sockets: 16
workers: 32

processors:
batch:
Expand All @@ -45,7 +50,7 @@ exporters:
service:
pipelines:
logs:
receivers: [netflow]
receivers: [netflow, netflow/sflow]
processors: [batch]
exporters: [debug]
telemetry:
Expand All @@ -61,42 +66,52 @@ You would then configure your network devices to send netflow, sflow, or ipfix d

| Field | Description | Examples | Default |
|-------|-------------|--------| ------- |
| scheme | The type of flow data that to receive | `sflow`, `netflow`, `flow` | `netflow` |
| scheme | The type of flow data that to receive | `sflow`, `netflow` | `netflow` |
| hostname | The hostname or IP address to bind to | `localhost` | `0.0.0.0` |
| port | The port to bind to | `2055` or `6343` | `2055` |
| sockets | The number of sockets to use | 1 | 1 |
| workers | The number of workers used to decode incoming flow messages | 2 | 2 |
| queue_size | The size of the incoming netflow packets queue | 1000 | 1000000 |
| queue_size | The size of the incoming netflow packets queue | 1000 | 1000 |

## Data format

The netflow data is standardized for the different schemas and is converted to OpenTelemetry logs following the [semantic conventions](https://opentelemetry.io/docs/specs/semconv/general/attributes/#server-client-and-shared-network-attributes)

The output will adhere the format:

```json
{
"destination": {
"address": "192.168.0.1",
"port": 22
},
"flow": {
"end": 1731073104662487000,
"sampler_address": "192.168.0.2",
"sequence_num": 49,
"start": 1731073077662487000,
"time_received": 1731073138662487000,
"type": "NETFLOW_V5"
},
"io": {
"bytes": 529,
"packets": 378
},
"source": {
"address": "192.168.0.3",
"port": 40
},
"transport": "TCP",
"type": "IPv4"
}
```
The netflow data is standardized for the different schemas and is converted to OpenTelemetry log records following the [semantic conventions](https://opentelemetry.io/docs/specs/semconv/general/attributes/#server-client-and-shared-network-attributes)

The log record will have the following attributes (with examples):

* **source.address**: Str(132.189.238.100)
* **source.port**: Int(1255)
* **destination.address**: Str(241.171.33.110)
* **destination.port**: Int(64744)
* **network.transport**: Str(tcp)
* **network.type**: Str(ipv4)
* **network.io.bytes**: Int(853)
* **network.io.packets**: Int(83)
* **network.flow.type**: Str(netflow_v5)
* **network.flow.sequence_num**: Int(191)
* **network.flow.time_received**: Int(1736309689918929427)
* **network.flow.start**: Int(1736309689830846400)
* **network.flow.end**: Int(1736309689871846400)
* **network.flow.sampling_rate**: Int(0)
* **network.flow.sampler_address**: Str(172.28.176.1)
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved

The log record timestamps will be:

* **Observed timestamp**: The time the flow was received.
* **Timestamp**: The flow `start` field.

### Schema support

#### netflow

* Process [Template Records](https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html) if present
* Process Netflow V5, V9, and IPFIX messages
* Extract the attributes documented above
* Mapping of custom fields is not yet supported

#### sflow

* Process [sFlow version 5](https://sflow.org/sflow_version_5.txt) datagrams
* `flow_sample` and `flow_sample_expanded` are supported.
* `counter_sample` and `counter_sample_expanded` are NOT yet supported.
* Mapping of custom fields is not yet supported
4 changes: 2 additions & 2 deletions receiver/netflowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Config struct {

// Validate checks if the receiver configuration is valid
func (cfg *Config) Validate() error {
validSchemes := [3]string{"sflow", "netflow", "flow"}
validSchemes := [3]string{"sflow", "netflow"}

validScheme := false
for _, scheme := range validSchemes {
Expand All @@ -42,7 +42,7 @@ func (cfg *Config) Validate() error {
}
}
if !validScheme {
return fmt.Errorf("scheme must be one of sflow, netflow, or flow")
return fmt.Errorf("scheme must be netflow or sflow")
}

if cfg.Sockets <= 0 {
Expand Down
32 changes: 30 additions & 2 deletions receiver/netflowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,27 @@ func TestLoadConfig(t *testing.T) {
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000000,
QueueSize: 1000,
},
},
{
id: component.NewIDWithName(metadata.Type, "zero_queue"),
expected: &Config{
Scheme: "netflow",
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000,
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved
},
},
{
id: component.NewIDWithName(metadata.Type, "sflow"),
expected: &Config{
Scheme: "sflow",
Port: 6343,
Sockets: 1,
Workers: 1,
QueueSize: 1000,
},
},
}
Expand Down Expand Up @@ -68,12 +88,20 @@ func TestInvalidConfig(t *testing.T) {
}{
{
id: component.NewIDWithName(metadata.Type, "invalid_schema"),
err: "scheme must be one of sflow, netflow, or flow",
err: "scheme must be netflow or sflow",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_port"),
err: "port must be greater than 0",
},
{
id: component.NewIDWithName(metadata.Type, "zero_sockets"),
err: "sockets must be greater than 0",
},
{
id: component.NewIDWithName(metadata.Type, "zero_workers"),
err: "workers must be greater than 0",
},
}

for _, tt := range tests {
Expand Down
24 changes: 15 additions & 9 deletions receiver/netflowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ import (
)

const (
defaultSockets = 1
defaultWorkers = 2
defaultQueueSize = 1_000_000
defaultSockets = 1
defaultWorkers = 2
// The default UDP packet buffer size in GoFlow2 is 9000 bytes, which means
// that for a full queue of 1000 messages, the size in memory will be 9MB.
// Source: https://github.com/netsampler/goflow2/blob/v2.2.1/README.md#security-notes-and-assumptions
defaultQueueSize = 1_000
dlopes7 marked this conversation as resolved.
Show resolved Hide resolved
)

// NewFactory creates a factory for netflow receiver.
Expand All @@ -27,6 +30,8 @@ func NewFactory() receiver.Factory {
receiver.WithLogs(createLogsReceiver, metadata.LogsStability))
}

// Config defines configuration for netflow receiver.
// By default we listen for netflow traffic on port 2055
func createDefaultConfig() component.Config {
return &Config{
Scheme: "netflow",
Expand All @@ -37,14 +42,15 @@ func createDefaultConfig() component.Config {
}
}

// createLogsReceiver creates a netflow receiver.
// We also create the UDP receiver, which is the piece of software that actually listens
// for incoming netflow traffic on an UDP port.
func createLogsReceiver(_ context.Context, params receiver.Settings, cfg component.Config, consumer consumer.Logs) (receiver.Logs, error) {
logger := params.Logger
conf := cfg.(*Config)
conf := *(cfg.(*Config))

nr := &netflowReceiver{
logger: logger,
logConsumer: consumer,
config: conf,
nr, err := newNetflowLogsReceiver(params, conf, consumer)
if err != nil {
return nil, err
}

return nr, nil
Expand Down
5 changes: 4 additions & 1 deletion receiver/netflowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflo
go 1.22.0

require (
github.com/netsampler/goflow2/v2 v2.2.1
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.117.0
go.opentelemetry.io/collector/component/componenttest v0.117.0
go.opentelemetry.io/collector/confmap v1.23.0
go.opentelemetry.io/collector/consumer v1.23.0
go.opentelemetry.io/collector/consumer/consumertest v0.117.0
go.opentelemetry.io/collector/pdata v1.23.0
go.opentelemetry.io/collector/receiver v0.117.0
go.opentelemetry.io/collector/receiver/receivertest v0.117.0
go.opentelemetry.io/collector/semconv v0.117.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
)
Expand All @@ -26,6 +29,7 @@ require (
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.2 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -34,7 +38,6 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.117.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.117.0 // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.117.0 // indirect
go.opentelemetry.io/collector/pdata v1.23.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.117.0 // indirect
go.opentelemetry.io/collector/pipeline v0.117.0 // indirect
go.opentelemetry.io/collector/receiver/xreceiver v0.117.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions receiver/netflowreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions receiver/netflowreceiver/listener.go

This file was deleted.

27 changes: 0 additions & 27 deletions receiver/netflowreceiver/listener_test.go

This file was deleted.

Loading
Loading