Skip to content

Commit

Permalink
Merge pull request #2237 from keboola/petr-hosek-PSGO-834_telemetryIn…
Browse files Browse the repository at this point in the history
…terval

Add configurable telemetry interval for data collection
  • Loading branch information
hosekpeter authored Feb 12, 2025
2 parents 3e449dd + b362e3e commit 3931707
Show file tree
Hide file tree
Showing 24 changed files with 91 additions and 49 deletions.
8 changes: 4 additions & 4 deletions internal/pkg/service/common/etcdop/watch_mirror_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ func (s MirrorMapSetup[T, K, V]) BuildMirror() *MirrorMap[T, K, V] {
// StartMirroring initializes the mirroring process for the MirrorMap by starting a watcher and processing events.
// It locks and updates the internal map on event changes, captures telemetry, and invokes registered callbacks.
// Returns a channel of initialization errors if the consumer fails to start.
func (m *MirrorMap[T, K, V]) StartMirroring(ctx context.Context, wg *sync.WaitGroup, logger log.Logger, tel telemetry.Telemetry) (initErr <-chan error) {
func (m *MirrorMap[T, K, V]) StartMirroring(ctx context.Context, wg *sync.WaitGroup, logger log.Logger, tel telemetry.Telemetry, watchTelemetryInterval time.Duration) (initErr <-chan error) {
ctx = ctxattr.ContextWith(ctx, attribute.String("stream.prefix", m.stream.WatchedPrefix()))

// Start telemetry collection in a separate goroutine.
// This routine collects metrics about the memory usage and the state of the MirrorMap.
wg.Add(1)
go m.startTelemetryCollection(ctx, wg, logger, tel)
go m.startTelemetryCollection(ctx, wg, logger, tel, watchTelemetryInterval)

consumer := newConsumerSetup(m.stream).
WithForEach(func(events []WatchEvent[T], header *Header, restart bool) {
Expand Down Expand Up @@ -301,10 +301,10 @@ func (m *MirrorMap[T, K, V]) recordMemoryTelemetry(ctx context.Context, tel tele
}

// Function for periodic telemetry collection (runs as a goroutine).
func (m *MirrorMap[T, K, V]) startTelemetryCollection(ctx context.Context, wg *sync.WaitGroup, logger log.Logger, tel telemetry.Telemetry) {
func (m *MirrorMap[T, K, V]) startTelemetryCollection(ctx context.Context, wg *sync.WaitGroup, logger log.Logger, tel telemetry.Telemetry, watchTelemetryInterval time.Duration) {
defer wg.Done()

ticker := time.NewTicker(30 * time.Second) // Emit telemetry every 5 minutes
ticker := time.NewTicker(watchTelemetryInterval)
defer ticker.Stop()

for {
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/service/common/etcdop/watch_mirror_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestMirrorMap(t *testing.T) {
return !strings.Contains(event.Kv.String(), "/ignore")
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger, tel)
errCh := mirror.StartMirroring(ctx, wg, logger, tel, 5*time.Minute)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestMirror_WithOnUpdate(t *testing.T) {
updateCh <- update
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger, tel)
errCh := mirror.StartMirroring(ctx, wg, logger, tel, 5*time.Minute)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down Expand Up @@ -270,7 +270,7 @@ func TestMirrorMap_WithOnChanges(t *testing.T) {
changesCh <- changes
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger, tel)
errCh := mirror.StartMirroring(ctx, wg, logger, tel, 5*time.Minute)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/service/common/etcdop/watch_mirror_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ func (s MirrorTreeSetup[T, V]) BuildMirror() *MirrorTree[T, V] {
}
}

func (m *MirrorTree[T, V]) StartMirroring(ctx context.Context, wg *sync.WaitGroup, logger log.Logger, tel telemetry.Telemetry) (initErr <-chan error) {
func (m *MirrorTree[T, V]) StartMirroring(ctx context.Context, wg *sync.WaitGroup, logger log.Logger, tel telemetry.Telemetry, watchTelemetryInterval time.Duration) (initErr <-chan error) {
ctx = ctxattr.ContextWith(ctx, attribute.String("stream.prefix", m.stream.WatchedPrefix()))

wg.Add(1)
// Launching a goroutine to start collecting telemetry data for the MirrorTree.
// This allows asynchronous monitoring of metrics related to the tree's performance and usage.
go m.startTelemetryCollection(ctx, wg, tel, logger)
go m.startTelemetryCollection(ctx, wg, tel, logger, watchTelemetryInterval)

consumer := newConsumerSetup(m.stream).
WithForEach(func(events []WatchEvent[T], header *Header, restart bool) {
Expand Down Expand Up @@ -327,10 +327,10 @@ func (m *MirrorTree[T, V]) recordMemoryTelemetry(ctx context.Context, tel teleme

// startTelemetryCollection begins periodic telemetry reporting for the MirrorTree, including memory usage and key count.
// It runs until the given context is canceled and ensures the provided wait group is marked as done upon completion.
func (m *MirrorTree[T, V]) startTelemetryCollection(ctx context.Context, wg *sync.WaitGroup, tel telemetry.Telemetry, log log.Logger) {
func (m *MirrorTree[T, V]) startTelemetryCollection(ctx context.Context, wg *sync.WaitGroup, tel telemetry.Telemetry, log log.Logger, watchTelemetryInterval time.Duration) (initErr <-chan error) {
defer wg.Done()

ticker := time.NewTicker(30 * time.Second) // Emit telemetry every 30 seconds
ticker := time.NewTicker(watchTelemetryInterval)
defer ticker.Stop()

for {
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/service/common/etcdop/watch_mirror_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestMirrorTree(t *testing.T) {
return !strings.Contains(event.Kv.String(), "/ignore")
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger, tel)
errCh := mirror.StartMirroring(ctx, wg, logger, tel, 5*time.Minute)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestMirrorTree_WithOnUpdate(t *testing.T) {
updateCh <- update
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger, tel)
errCh := mirror.StartMirroring(ctx, wg, logger, tel, 5*time.Minute)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestMirrorTree_WithOnChanges(t *testing.T) {
changesCh <- changes
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger, tel)
errCh := mirror.StartMirroring(ctx, wg, logger, tel, 5*time.Minute)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestFullMirrorTree(t *testing.T) {
return !strings.Contains(event.Kv.String(), "/ignore")
}).
BuildMirror()
errCh := mirror.StartMirroring(ctx, wg, logger, tel)
errCh := mirror.StartMirroring(ctx, wg, logger, tel, 5*time.Minute)

// waitForSync: it waits until the memory mirror is synchronized with the revision of the last change
var header *op.Header
Expand Down
45 changes: 25 additions & 20 deletions internal/pkg/service/stream/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"net/url"
"time"

"github.com/keboola/keboola-as-code/internal/pkg/service/common/distribution"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdclient"
Expand All @@ -17,23 +18,26 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/utils/strhelper"
)

const DefaultWatchInterval = time.Minute * 5

// Config of the Stream services.
type Config struct {
DebugLog bool `configKey:"debugLog" configUsage:"Enable logging at DEBUG level."`
DebugHTTPClient bool `configKey:"debugHTTPClient" configUsage:"Log HTTP client requests and responses as debug messages."`
NodeID string `configKey:"nodeID" configUsage:"Unique ID of the node in the cluster." validate:"required"`
Hostname string `configKey:"hostname" configUsage:"Hostname for communication between nodes." validate:"required"`
StorageAPIHost string `configKey:"storageApiHost" configUsage:"Storage API host." validate:"required,hostname"`
PProf pprof.Config `configKey:"pprof"`
Datadog datadog.Config `configKey:"datadog"`
Etcd etcdclient.Config `configKey:"etcd"`
Metrics prometheus.Config `configKey:"metrics"`
API API `configKey:"api"`
Distribution distribution.Config `configKey:"distribution"`
Source source.Config `configKey:"source"`
Sink sink.Config `configKey:"sink"`
Storage storage.Config `configKey:"storage"`
Encryption encryption.Config `configKey:"encryption"`
DebugLog bool `configKey:"debugLog" configUsage:"Enable logging at DEBUG level."`
DebugHTTPClient bool `configKey:"debugHTTPClient" configUsage:"Log HTTP client requests and responses as debug messages."`
NodeID string `configKey:"nodeID" configUsage:"Unique ID of the node in the cluster." validate:"required"`
WatchTelemetryInterval time.Duration `configKey:"watchTelemetryInterval" configUsage:"Interval for sending telemetry data."`
Hostname string `configKey:"hostname" configUsage:"Hostname for communication between nodes." validate:"required"`
StorageAPIHost string `configKey:"storageApiHost" configUsage:"Storage API host." validate:"required,hostname"`
PProf pprof.Config `configKey:"pprof"`
Datadog datadog.Config `configKey:"datadog"`
Etcd etcdclient.Config `configKey:"etcd"`
Metrics prometheus.Config `configKey:"metrics"`
API API `configKey:"api"`
Distribution distribution.Config `configKey:"distribution"`
Source source.Config `configKey:"source"`
Sink sink.Config `configKey:"sink"`
Storage storage.Config `configKey:"storage"`
Encryption encryption.Config `configKey:"encryption"`
}

type Patch struct {
Expand Down Expand Up @@ -63,11 +67,12 @@ func New() Config {
PublicURL: &url.URL{Scheme: "http", Host: "localhost:8000"},
Task: task.NewNodeConfig(),
},
Distribution: distribution.NewConfig(),
Source: source.NewConfig(),
Sink: sink.NewConfig(),
Storage: storage.NewConfig(),
Encryption: encryption.NewConfig(),
Distribution: distribution.NewConfig(),
Source: source.NewConfig(),
Sink: sink.NewConfig(),
Storage: storage.NewConfig(),
Encryption: encryption.NewConfig(),
WatchTelemetryInterval: DefaultWatchInterval,
}
}

Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/service/stream/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ debugLog: false
debugHTTPClient: false
# Unique ID of the node in the cluster. Validation rules: required
nodeID: ""
# Interval for sending telemetry data.
watchTelemetryInterval: 5m0s
# Hostname for communication between nodes. Validation rules: required
hostname: ""
# Storage API host. Validation rules: required,hostname
Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/service/stream/dependencies/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ package dependencies

import (
"net/url"
"time"

"github.com/keboola/keboola-as-code/internal/pkg/service/common/dependencies"
aggregationRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/aggregation/repository"
Expand Down Expand Up @@ -88,6 +89,7 @@ type ServiceScope interface {
AggregationRepository() *aggregationRepo.Repository
KeboolaSinkBridge() *keboolaSinkBridge.Bridge
KeboolaBridgeRepository() *keboolaBridgeRepo.Repository
WatchTelemetryInterval() time.Duration
}

type APIScope interface {
Expand Down
9 changes: 9 additions & 0 deletions internal/pkg/service/stream/dependencies/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"testing"
"time"

"github.com/jonboulle/clockwork"
"github.com/keboola/go-client/pkg/keboola"
Expand Down Expand Up @@ -49,6 +50,7 @@ type serviceScope struct {
aggregationRepository *aggregationRepo.Repository
keboolaBridge *keboolaSinkBridge.Bridge
keboolaBridgeRepository *keboolaBridgeRepo.Repository
watchTelemetryInterval time.Duration
}

type parentScopes struct {
Expand All @@ -58,6 +60,7 @@ type parentScopes struct {
dependencies.EncryptionScope
dependencies.DistributionScope
dependencies.DistributedLockScope
WatchTelemetryInterval time.Duration
}

func NewServiceScope(
Expand Down Expand Up @@ -202,6 +205,8 @@ func newServiceScope(

d.EncryptionScope = encryptionScp

d.watchTelemetryInterval = cfg.WatchTelemetryInterval

d.logger = baseScp.Logger().With(attribute.String("nodeId", cfg.NodeID))

d.plugins = plugin.New(d.Logger())
Expand Down Expand Up @@ -270,3 +275,7 @@ func (v *serviceScope) StatisticsRepository() *statsRepo.Repository {
func (v *serviceScope) AggregationRepository() *aggregationRepo.Repository {
return v.aggregationRepository
}

func (v *serviceScope) WatchTelemetryInterval() time.Duration {
return v.watchTelemetryInterval
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type dependencies interface {
DistributedLockProvider() *distlock.Provider
Encryptor() cloudencrypt.Encryptor
Telemetry() telemetry.Telemetry
WatchTelemetryInterval() time.Duration
}

func New(d dependencies, apiProvider apiProvider, config keboolasink.Config) (*Bridge, error) {
Expand Down Expand Up @@ -159,7 +160,7 @@ func (b *Bridge) MirrorJobs(ctx context.Context, d dependencies) error {
}
},
).BuildMirror()
if err := <-b.jobs.StartMirroring(ctx, wg, b.logger, d.Telemetry()); err != nil {
if err := <-b.jobs.StartMirroring(ctx, wg, b.logger, d.Telemetry(), d.WatchTelemetryInterval()); err != nil {
b.logger.Errorf(ctx, "cannot start mirroring jobs: %s", err)
return err
}
Expand Down
4 changes: 3 additions & 1 deletion internal/pkg/service/stream/source/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"strconv"
"sync"
"time"

"github.com/keboola/go-client/pkg/keboola"
etcd "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -45,6 +46,7 @@ type dependencies interface {
DefinitionRepository() *definitionRepo.Repository
SinkRouter() *sinkRouter.Router
Telemetry() telemetry.Telemetry
WatchTelemetryInterval() time.Duration
}

func New(d dependencies, logger log.Logger) (*Dispatcher, error) {
Expand Down Expand Up @@ -77,7 +79,7 @@ func New(d dependencies, logger log.Logger) (*Dispatcher, error) {
return event.Value.Type == definition.SourceTypeHTTP
}).
BuildMirror()
if err := <-dp.sources.StartMirroring(ctx, &dp.wg, dp.logger, d.Telemetry()); err != nil {
if err := <-dp.sources.StartMirroring(ctx, &dp.wg, dp.logger, d.Telemetry(), d.WatchTelemetryInterval()); err != nil {
return nil, err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type dependencies interface {
DefinitionRepository() *definitionRepo.Repository
SinkRouter() *sinkRouter.Router
Telemetry() telemetry.Telemetry
WatchTelemetryInterval() time.Duration
}

func Start(ctx context.Context, d dependencies, cfg Config) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"slices"
"strings"
"sync"
"time"

"github.com/hashicorp/yamux"
etcd "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -57,6 +58,7 @@ type dependencies interface {
Process() *servicectx.Process
StorageRepository() *storageRepo.Repository
Telemetry() telemetry.Telemetry
WatchTelemetryInterval() time.Duration
}

func NewManager(d dependencies, cfg network.Config, nodeID string) (*Manager, error) {
Expand Down Expand Up @@ -116,7 +118,7 @@ func NewManager(d dependencies, cfg network.Config, nodeID string) (*Manager, er
m.updateConnections(ctx)
}).
BuildMirror()
if err := <-m.volumes.StartMirroring(ctx, wg, m.logger, d.Telemetry()); err != nil {
if err := <-m.volumes.StartMirroring(ctx, wg, m.logger, d.Telemetry(), d.WatchTelemetryInterval()); err != nil {
return nil, err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package closesync

import (
"time"

etcd "go.etcd.io/etcd/client/v3"

"github.com/keboola/keboola-as-code/internal/pkg/log"
Expand All @@ -18,6 +20,7 @@ type dependencies interface {
EtcdClient() *etcd.Client
EtcdSerde() *serde.Serde
Telemetry() telemetry.Telemetry
WatchTelemetryInterval() time.Duration
}

type schema struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewCoordinatorNode(d dependencies) (*CoordinatorNode, error) {
n.invokeListeners()
}).
BuildMirror()
if err := <-n.revisions.StartMirroring(ctx, wg, n.logger, d.Telemetry()); err != nil {
if err := <-n.revisions.StartMirroring(ctx, wg, n.logger, d.Telemetry(), d.WatchTelemetryInterval()); err != nil {
return nil, err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"slices"
"strings"
"sync"
"time"

etcd "go.etcd.io/etcd/client/v3"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -57,6 +58,7 @@ type dependencies interface {
StorageRepository() *storageRepo.Repository
ConnectionManager() *connection.Manager
EncodingManager() *encoding.Manager
WatchTelemetryInterval() time.Duration
}

func New(d dependencies, sourceNodeID, sourceType string, config network.Config) (r *Router, err error) {
Expand Down Expand Up @@ -148,7 +150,7 @@ func New(d dependencies, sourceNodeID, sourceType string, config network.Config)
}
}).
BuildMirror()
if err := <-r.slices.StartMirroring(ctx, wg, r.logger, d.Telemetry()); err != nil {
if err := <-r.slices.StartMirroring(ctx, wg, r.logger, d.Telemetry(), d.WatchTelemetryInterval()); err != nil {
return nil, err
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package repository

import (
"time"

etcd "go.etcd.io/etcd/client/v3"

"github.com/keboola/keboola-as-code/internal/pkg/log"
Expand All @@ -24,6 +26,7 @@ type dependencies interface {
Plugins() *plugin.Plugins
DefinitionRepository() *definitionRepo.Repository
Telemetry() telemetry.Telemetry
WatchTelemetryInterval() time.Duration
}

// Repository provides database operations with the storage entities.
Expand Down
Loading

0 comments on commit 3931707

Please sign in to comment.