diff --git a/pkg/udev/doc.go b/pkg/udev/doc.go new file mode 100644 index 000000000..21e392a63 --- /dev/null +++ b/pkg/udev/doc.go @@ -0,0 +1,140 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package udev + +// Notes: +// The implementation of Reader was inspired a lot by the existing +// but mostly unmaintained github.com/s-urbaniak/uevent golang package. + +// +// Examples: +// +// Reading udev events using udev.Reader: +// +// package main +// +// import ( +// "github.com/containers/nri-plugins/pkg/udev" +// "sigs.k8s.io/yaml" +// +// logger "github.com/containers/nri-plugins/pkg/log" +// ) +// +// var ( +// log = logger.Get("udev") +// ) +// +// func main() { +// r, err := udev.NewEventReader() +// if err != nil { +// log.Fatalf("failed to create udev event reader: %v", err) +// } +// +// events := make(chan *udev.Event, 64) +// +// go func() { +// for evt := range events { +// dump(evt) +// } +// }() +// +// for { +// e, err := r.Read() +// if err != nil { +// log.Fatalf("failed to read udev event: %v", err) +// } +// events <- e +// } +// } +// +// func dump(e *udev.Event) { +// dump, err := yaml.Marshal(e) +// if err != nil { +// log.Errorf("failed to marshal event: %v\n", err) +// return +// } +// log.InfoBlock("udev reader ", "%s", dump) +// } +// +// Filtered reading of udev events using udev.Monitor: +// +// package main +// +// import ( +// "os" +// "strings" +// +// "github.com/containers/nri-plugins/pkg/udev" +// "sigs.k8s.io/yaml" +// +// logger "github.com/containers/nri-plugins/pkg/log" +// ) +// +// var ( +// log = logger.Get("udev") +// ) +// +// func main() { +// var ( +// filters = parseFilters() +// events = make(chan *udev.Event, 64) +// ) +// +// m, err := udev.NewMonitor(udev.WithFilters(filters...)) +// if err != nil { +// log.Fatalf("failed to create udev event reader: %v", err) +// } +// +// m.Start(events) +// +// for evt := range events { +// dump(evt) +// } +// } +// +// func parseFilters() []map[string]string { +// var filters []map[string]string +// +// for _, arg := range os.Args[1:] { +// if !strings.Contains(arg, "=") { +// continue +// } +// +// filter := map[string]string{} +// for _, expr := range strings.Split(arg, ",") { +// kv := strings.SplitN(expr, "=", 2) +// if len(kv) != 2 { +// log.Fatalf("invalid filter expression %s (in %s)", expr, arg) +// } +// filter[strings.ToUpper(kv[0])] = kv[1] +// } +// if len(filter) > 0 { +// log.Info("using parsed filter: %v", filter) +// filters = append(filters, filter) +// } +// } +// +// return filters +// } +// +// func dump(e *udev.Event) { +// dump, err := yaml.Marshal(e) +// if err != nil { +// log.Errorf("failed to marshal event: %v\n", err) +// return +// } +// log.InfoBlock("monitor ", "%s", dump) +// } +// diff --git a/pkg/udev/monitor.go b/pkg/udev/monitor.go new file mode 100644 index 000000000..a9e1b3d82 --- /dev/null +++ b/pkg/udev/monitor.go @@ -0,0 +1,161 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package udev + +import ( + "fmt" + "path" +) + +// MonitorOption is an opaque option which can be applied to a Monitor. +type MonitorOption func(*Monitor) + +// WithFilters returns a MonitorOption for filtering events by properties. +// Properties within a map have AND semantics: the map matches an event if +// all key-value pairs match the event. Multiple maps have OR semantics: +// they match an event if at least one map matches the event. Events which +// are matched are passed through. Others are filtered out. +func WithFilters(filters ...map[string]string) MonitorOption { + return func(m *Monitor) { + m.filters = append(m.filters, filters...) + } +} + +// WithGlobFilters returns a MonitorOption for filtering events by properties. +// Semantics are similar to WithFilters, but properties are matched using glob +// patterns instead of verbatim comparison. +func WithGlobFilters(globbers ...map[string]string) MonitorOption { + return func(m *Monitor) { + m.globbers = append(m.globbers, globbers...) + } +} + +// Monitor monitors udev events. +type Monitor struct { + r *EventReader + filters []map[string]string + globbers []map[string]string + stopCh chan struct{} +} + +// NewMonitor creates an udev monitor with the given options. +func NewMonitor(options ...MonitorOption) (*Monitor, error) { + r, err := NewEventReader() + if err != nil { + return nil, fmt.Errorf("failed to create udev monitor reader: %w", err) + } + + m := &Monitor{ + r: r, + stopCh: make(chan struct{}), + } + + for _, o := range options { + o(m) + } + + return m, nil +} + +// Start starts event monitoring and delivery. +func (m *Monitor) Start(events chan *Event) { + if len(m.filters) == 0 && len(m.globbers) == 0 { + go m.reader(events) + } else { + filter := make(chan *Event, 64) + go m.filterer(filter, events) + go m.reader(filter) + } +} + +// Stop stops event monitoring. +func (m *Monitor) Stop() error { + return m.r.Close() +} + +func (m *Monitor) reader(events chan<- *Event) { + for { + evt, err := m.r.Read() + if err != nil { + log.Errorf("failed to read udev event: %v", err) + m.r.Close() + close(events) + return + } + + events <- evt + } +} + +func (m *Monitor) filterer(events <-chan *Event, filtered chan<- *Event) { + var stuck bool + + for evt := range events { + if !m.filter(evt) { + continue + } + + select { + case filtered <- evt: + stuck = false + default: + if !stuck { + log.Warnf("dropped udev %s %s event", evt.Subsystem, evt.Action) + stuck = true + } + } + } +} + +func (m *Monitor) filter(evt *Event) bool { + if len(m.filters) == 0 && len(m.globbers) == 0 { + return true + } + + for _, filter := range m.filters { + match := true + for k, v := range filter { + if evt.Properties[k] != v { + match = false + break + } + } + if match { + return true + } + } + + for _, glob := range m.globbers { + match := true + for k, p := range glob { + m, err := path.Match(p, evt.Properties[k]) + if err != nil { + log.Errorf("failed to match udev event property %q=%q by pattern %q: %v", + k, evt.Properties[k], p, err) + delete(glob, k) + continue + } + if !m { + match = false + break + } + } + if match { + return true + } + } + + return false +} diff --git a/pkg/udev/reader.go b/pkg/udev/reader.go new file mode 100644 index 000000000..229c061a1 --- /dev/null +++ b/pkg/udev/reader.go @@ -0,0 +1,183 @@ +// Copyright The NRI Plugins Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package udev + +import ( + "bufio" + "fmt" + "io" + "os" + "strings" + "syscall" + + logger "github.com/containers/nri-plugins/pkg/log" +) + +// Event represents a udev event. +type Event struct { + Header string + Subsystem string + Action string + Devpath string + Seqnum string + Properties map[string]string +} + +const ( + // PropertyAction is the key for the ACTION property. + PropertyAction = "ACTION" + // PropertyDevpath is the key for the DEVPATH property. + PropertyDevpath = "DEVPATH" + // PropertySubsystem is the key for the SUBSYSTEM property. + PropertySubsystem = "SUBSYSTEM" + // PropertySeqnum is the key for the SEQNUM property. + PropertySeqnum = "SEQNUM" +) + +var ( + log = logger.Get("udev") +) + +// Reader implements an io.ReadCloser for reading raw event data from +// the udev netlink socket. +type Reader struct { + sock int + closed bool +} + +// NewReader creates a new io.ReadCloser for reading raw udev event data. +func NewReader() (*Reader, error) { + fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW, syscall.NETLINK_KOBJECT_UEVENT) + if err != nil { + return nil, fmt.Errorf("failed to create udev reader: %w", err) + } + + addr := syscall.SockaddrNetlink{ + Family: syscall.AF_NETLINK, + Pid: uint32(os.Getpid()), + Groups: 1, + } + + if err := syscall.Bind(fd, &addr); err != nil { + syscall.Close(fd) + return nil, fmt.Errorf("failed to bind udev reader: %w", err) + } + + return &Reader{sock: fd}, nil +} + +// Read implements the io.Reader interface. +func (r *Reader) Read(p []byte) (int, error) { + n, err := syscall.Read(r.sock, p) + if r.closed { + return 0, io.EOF + } + + // allow wrapping Reader in a bufio.Reader, which would panic on n < 0 + if n == -1 { + n = 0 + } + + // TODO(klihub): make this controllable using an option ? + if err == syscall.ENOBUFS { + log.Warn("udev ran out of buffer space (was dropping events)") + err = nil + } + + return n, err +} + +// Close implements the io.Closer interface. +func (r *Reader) Close() error { + if r.closed { + return nil + } + + r.closed = true + return syscall.Close(r.sock) +} + +// EventReader reads udev events. +type EventReader struct { + r io.ReadCloser + b *bufio.Reader +} + +// NewEventReader creates a new udev event reader. +func NewEventReader() (*EventReader, error) { + r, err := NewReader() + if err != nil { + return nil, err + } + + return &EventReader{ + r: r, + b: bufio.NewReader(r), + }, nil +} + +// NewEventReaderFromReader creates a new udev event reader from an existing +// io.ReadCloser. This can be used to generate synthetic events for testing. +func NewEventReaderFromReader(r io.ReadCloser) *EventReader { + return &EventReader{ + r: r, + b: bufio.NewReader(r), + } +} + +// Read reads a udev event, blocking until one is available. +func (r *EventReader) Read() (*Event, error) { + e := &Event{ + Properties: map[string]string{}, + } + + hdr, err := r.b.ReadString(0) + if err != nil { + return nil, err + } + + e.Header = hdr + for { + next, err := r.b.ReadString(0) + if err != nil { + return nil, err + } + + kv := strings.SplitN(next, "=", 2) + if len(kv) != 2 { + return nil, fmt.Errorf("failed to read udev event: unknown format") + } + + k, v := kv[0], kv[1][:len(kv[1])-1] + e.Properties[k] = v + + switch k { + case PropertyAction: + e.Action = v + case PropertyDevpath: + e.Devpath = v + case PropertySubsystem: + e.Subsystem = v + case PropertySeqnum: + e.Seqnum = v + return e, nil + } + } +} + +// Close closes the reader. +func (r *EventReader) Close() error { + return r.r.Close() +}