Skip to content

Commit

Permalink
open-telemetry#32732 - Initial implementation
Browse files Browse the repository at this point in the history
This adds the implementation for netflow receiver
Introduces:

* The OtelLogsProducerWrapper
* A parser function to convert proto to otel semantics
* The listener implementation as well as the error handling function
  • Loading branch information
dlopes7 committed Dec 15, 2024
1 parent 396c63d commit f7ff90b
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 11 deletions.
2 changes: 1 addition & 1 deletion receiver/netflowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
const (
defaultSockets = 1
defaultWorkers = 2
defaultQueueSize = 1_000_000
defaultQueueSize = 1_000
)

// NewFactory creates a factory for netflow receiver.
Expand Down
4 changes: 3 additions & 1 deletion receiver/netflowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflo
go 1.22.0

require (
github.com/netsampler/goflow2/v2 v2.2.1
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.115.0
go.opentelemetry.io/collector/component/componenttest v0.115.0
go.opentelemetry.io/collector/confmap v1.21.0
go.opentelemetry.io/collector/consumer v1.21.0
go.opentelemetry.io/collector/consumer/consumertest v0.115.0
go.opentelemetry.io/collector/pdata v1.21.0
go.opentelemetry.io/collector/receiver v0.115.0
go.opentelemetry.io/collector/receiver/receivertest v0.115.0
go.uber.org/goleak v1.3.0
Expand All @@ -26,6 +28,7 @@ require (
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.2 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -34,7 +37,6 @@ require (
go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.115.0 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.115.0 // indirect
go.opentelemetry.io/collector/pdata v1.21.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.115.0 // indirect
go.opentelemetry.io/collector/pipeline v0.115.0 // indirect
go.opentelemetry.io/collector/receiver/receiverprofiles v0.115.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions receiver/netflowreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

139 changes: 135 additions & 4 deletions receiver/netflowreceiver/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,140 @@

package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"

import (
"errors"
"fmt"
"net"

"github.com/netsampler/goflow2/v2/decoders/netflow"

protoproducer "github.com/netsampler/goflow2/v2/producer/proto"
"github.com/netsampler/goflow2/v2/utils"
"github.com/netsampler/goflow2/v2/utils/debug"

"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"
)

type Listener struct {
// config Config
// logger *zap.Logger
// recv *utils.UDPReceiver
// logConsumer consumer.Logs
config Config
logger *zap.Logger
recv *utils.UDPReceiver
logConsumer consumer.Logs
}

func (l *Listener) Dropped(msg utils.Message) {
l.logger.Warn("Dropped netflow message", zap.Any("msg", msg))
}

func NewListener(config Config, logger *zap.Logger, logConsumer consumer.Logs) *Listener {
return &Listener{config: config, logger: logger, logConsumer: logConsumer}
}

func (l *Listener) Start() error {
l.logger.Info("Creating the netflow UDP listener", zap.Any("config", l.config))
cfg := &utils.UDPReceiverConfig{
Sockets: l.config.Sockets,
Workers: l.config.Workers,
QueueSize: l.config.QueueSize,
Blocking: false,
ReceiverCallback: l,
}
recv, err := utils.NewUDPReceiver(cfg)
if err != nil {
return err
}
l.recv = recv

decodeFunc, err := l.buildDecodeFunc()
if err != nil {
return err
}

l.logger.Info("Start listening for NetFlow", zap.Any("config", l.config))
if err := l.recv.Start(l.config.Hostname, l.config.Port, decodeFunc); err != nil {
return err
}

go l.handleErrors()

return nil
}

// handleErrors handles errors from the listener
func (l *Listener) handleErrors() {
for err := range l.recv.Errors() {
if errors.Is(err, net.ErrClosed) {
l.logger.Info("receiver closed")
continue
} else if !errors.Is(err, netflow.ErrorTemplateNotFound) && !errors.Is(err, debug.PanicError) {
l.logger.Error("receiver error", zap.Error(err))
continue
} else if errors.Is(err, netflow.ErrorTemplateNotFound) {
l.logger.Warn("template was not found for this message")
continue
} else if errors.Is(err, debug.PanicError) {
var pErrMsg *debug.PanicErrorMessage
if errors.As(err, &pErrMsg) {
l.logger.Error("panic error", zap.String("panic", pErrMsg.Inner))
l.logger.Error("receiver stacktrace", zap.String("stack", string(pErrMsg.Stacktrace)))
l.logger.Error("receiver msg", zap.Any("error", pErrMsg.Msg))
}
l.logger.Error("receiver panic", zap.Error(err))

continue
}
}
}

// buildDecodeFunc creates a decode function based on the scheme
func (l *Listener) buildDecodeFunc() (utils.DecoderFunc, error) {

// Eventually this can be used to configure mappings
cfgProducer := &protoproducer.ProducerConfig{}
cfgm, err := cfgProducer.Compile() // converts configuration into a format that can be used by a protobuf producer
if err != nil {
return nil, err
}
// We use a goflow2 proto producer to produce messages using protobuf format
protoProducer, err := protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem)
if err != nil {
return nil, err
}

// the otel log producer converts those messages into OpenTelemetry logs
otelLogsProducer := NewOtelLogsProducer(protoProducer, l.logConsumer)

cfgPipe := &utils.PipeConfig{
Producer: otelLogsProducer,
// Format: &format.Format{
// FormatDriver: &format.JSONFormatDriver{},
// },
}

var decodeFunc utils.DecoderFunc
var p utils.FlowPipe
if l.config.Scheme == "sflow" {
p = utils.NewSFlowPipe(cfgPipe)
} else if l.config.Scheme == "netflow" {
p = utils.NewNetFlowPipe(cfgPipe)
} else if l.config.Scheme == "flow" {
p = utils.NewFlowPipe(cfgPipe)
} else {
return nil, fmt.Errorf("scheme does not exist: %s", l.config.Scheme)
}

decodeFunc = p.DecodeFlow

// We wrap panics while decoding the message to habndle them later
decodeFunc = debug.PanicDecoderWrapper(decodeFunc)

return decodeFunc, nil
}

func (l *Listener) Shutdown() error {
if l.recv != nil {
return l.recv.Stop()
}
return nil
}
2 changes: 1 addition & 1 deletion receiver/netflowreceiver/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ func TestCreateValidDefaultListener(t *testing.T) {
assert.Equal(t, 2055, receiver.(*netflowReceiver).config.Port)
assert.Equal(t, 1, receiver.(*netflowReceiver).config.Sockets)
assert.Equal(t, 2, receiver.(*netflowReceiver).config.Workers)
assert.Equal(t, 1_000_000, receiver.(*netflowReceiver).config.QueueSize)
assert.Equal(t, 1_000, receiver.(*netflowReceiver).config.QueueSize)
}
138 changes: 138 additions & 0 deletions receiver/netflowreceiver/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"

import (
"errors"
"net/netip"
"time"

"github.com/netsampler/goflow2/v2/producer"
protoproducer "github.com/netsampler/goflow2/v2/producer/proto"
)

var (
etypeName = map[uint32]string{
0x806: "ARP",
0x800: "IPv4",
0x86dd: "IPv6",
}
protoName = map[uint32]string{
1: "ICMP",
6: "TCP",
17: "UDP",
58: "ICMPv6",
132: "SCTP",
}

flowTypeName = map[int32]string{
0: "UNKNOWN",
1: "SFLOW_5",
2: "NETFLOW_V5",
3: "NETFLOW_V9",
4: "IPFIX",
}
)

type NetworkAddress struct {
Address string `json:"address,omitempty"`
Port uint32 `json:"port,omitempty"`
}

type Flow struct {
Type string `json:"type,omitempty"`
TimeReceived time.Time `json:"time_received,omitempty"`
Start time.Time `json:"start,omitempty"`
End time.Time `json:"end,omitempty"`
SequenceNum uint32 `json:"sequence_num,omitempty"`
SamplingRate uint64 `json:"sampling_rate,omitempty"`
SamplerAddress string `json:"sampler_address,omitempty"`
}

type Protocol struct {
Name []byte `json:"name,omitempty"` // Layer 7
}

type NetworkIO struct {
Bytes uint64 `json:"bytes,omitempty"`
Packets uint64 `json:"packets,omitempty"`
}

type OtelNetworkMessage struct {
Source NetworkAddress `json:"source,omitempty"`
Destination NetworkAddress `json:"destination,omitempty"`
Transport string `json:"transport,omitempty"` // Layer 4
Type string `json:"type,omitempty"` // Layer 3
IO NetworkIO `json:"io,omitempty"`
Flow Flow `json:"flow,omitempty"`
}

func getEtypeName(etype uint32) string {
if name, ok := etypeName[etype]; ok {
return name
}
return "unknown"
}

func getProtoName(proto uint32) string {
if name, ok := protoName[proto]; ok {
return name
}
return "unknown"
}

func getFlowTypeName(flowType int32) string {
if name, ok := flowTypeName[flowType]; ok {
return name
}
return "unknown"
}

// ConvertToOtel converts a ProtoProducerMessage to an OtelNetworkMessage
func ConvertToOtel(m producer.ProducerMessage) (*OtelNetworkMessage, error) {

// we know msg is ProtoProducerMessage because that is the parent producer
pm, ok := m.(*protoproducer.ProtoProducerMessage)
if !ok {
return nil, errors.New("message is not ProtoProducerMessage")
}

// Parse IP addresses bytes to netip.Addr
srcAddr, _ := netip.AddrFromSlice(pm.SrcAddr)
dstAddr, _ := netip.AddrFromSlice(pm.DstAddr)
samplerAddr, _ := netip.AddrFromSlice(pm.SamplerAddress)

// Time the receiver received the message
receivedTime := time.Unix(0, int64(pm.TimeReceivedNs))
startTime := time.Unix(0, int64(pm.TimeFlowStartNs))
endTime := time.Unix(0, int64(pm.TimeFlowEndNs))

// Construct the actual log record based on the otel semantic conventions
// see https://opentelemetry.io/docs/specs/semconv/general/attributes/
otelMessage := OtelNetworkMessage{
Source: NetworkAddress{
Address: srcAddr.String(),
Port: pm.SrcPort,
},
Destination: NetworkAddress{
Address: dstAddr.String(),
Port: pm.DstPort,
},
Type: getEtypeName(pm.Etype), // Layer 3
Transport: getProtoName(pm.Proto), // Layer 4
IO: NetworkIO{
Bytes: pm.Bytes,
Packets: pm.Packets,
},
Flow: Flow{
Type: getFlowTypeName(int32(pm.Type)),
TimeReceived: receivedTime,
Start: startTime,
End: endTime,
SequenceNum: pm.SequenceNum,
SamplingRate: pm.SamplingRate,
SamplerAddress: samplerAddr.String(),
},
}

return &otelMessage, nil

}
Loading

0 comments on commit f7ff90b

Please sign in to comment.