Skip to content

Commit

Permalink
Quiet down some logging; stop transmission reaper spam (#16239)
Browse files Browse the repository at this point in the history
* Quiet down some logging; minor fixes

* Add missing break
  • Loading branch information
samsondav authored Feb 5, 2025
1 parent 1b657b6 commit a869c7c
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 36 deletions.
1 change: 1 addition & 0 deletions core/cmd/shell_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ func (s *Shell) runNode(c *cli.Context) error {
if err := ldb.Close(); err != nil {
lggr.Criticalf("Failed to close LockedDB: %v", err)
}
lggr.Debug("Closed DB")
if err := s.CloseLogger(); err != nil {
log.Printf("Failed to close Logger: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ func (app *ChainlinkApplication) stop() (err error) {
panic("application is already stopped")
}
app.shutdownOnce.Do(func() {
shutdownStart := time.Now()
defer func() {
if app.closeLogger == nil {
return
Expand Down Expand Up @@ -936,7 +937,7 @@ func (app *ChainlinkApplication) stop() (err error) {
err = multierr.Append(err, app.profiler.Stop())
}

app.logger.Info("Exited all services")
app.logger.Debugf("Closed application in %v", time.Since(shutdownStart))

app.started = false
})
Expand Down
33 changes: 15 additions & 18 deletions core/services/job/spawner.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,25 +126,31 @@ func (js *spawner) HealthReport() map[string]error {

func (js *spawner) startAllServices(ctx context.Context) {
// TODO: rename to find AllJobs
specs, _, err := js.orm.FindJobs(ctx, 0, math.MaxUint32)
jbs, _, err := js.orm.FindJobs(ctx, 0, math.MaxUint32)
if err != nil {
werr := fmt.Errorf("couldn't fetch unclaimed jobs: %v", err)
js.lggr.Critical(werr.Error())
js.SvcErrBuffer.Append(werr)
return
}

jobIDs := make([]int32, len(jbs))
for i, jb := range jbs {
jobIDs[i] = jb.ID
}
js.lggr.Debugw("Starting jobs...", "jobIDs", jobIDs)
wg := sync.WaitGroup{}
wg.Add(len(specs))
for _, spec := range specs {
go func(spec Job) {
wg.Add(len(jbs))
for _, jb := range jbs {
go func(jb Job) {
defer wg.Done()
if err = js.StartService(ctx, spec); err != nil {
js.lggr.Errorf("Couldn't start service %q: %v", spec.Name.ValueOrZero(), err)
if err = js.StartService(ctx, jb); err != nil {
js.lggr.Errorw("Couldn't start job", "jobID", jb.ID, "jobName", jb.Name.ValueOrZero(), "err", err)
}
}(spec)
}(jb)
}
wg.Wait()
js.lggr.Debugw("Started jobs", "jobIDs", jobIDs)
// Log Broadcaster fully starts after all initial Register calls are done from other starting services
// to make sure the initial backfill covers those subscribers.
for _, lbd := range js.lbDependentAwaiters {
Expand All @@ -155,6 +161,7 @@ func (js *spawner) startAllServices(ctx context.Context) {
func (js *spawner) stopAllServices() {
jobIDs := js.activeJobIDs()
wg := sync.WaitGroup{}
js.lggr.Debugw("Stopping jobs...", "jobIDs", jobIDs)
wg.Add(len(jobIDs))
for _, jobID := range jobIDs {
go func(jobID int32) {
Expand All @@ -163,13 +170,13 @@ func (js *spawner) stopAllServices() {
}(jobID)
}
wg.Wait()
js.lggr.Debugw("Stopped jobs", "jobIDs", jobIDs)
}

// stopService removes the job from memory and stop the services.
// It will always delete the job from memory even if closing the services fail.
func (js *spawner) stopService(jobID int32) {
lggr := js.lggr.With("jobID", jobID)
lggr.Debug("Stopping services for job")
js.activeJobsMu.Lock()
defer js.activeJobsMu.Unlock()

Expand All @@ -186,11 +193,8 @@ func (js *spawner) stopService(jobID int32) {
if err := service.Close(); err != nil {
sLggr.Criticalw("Error stopping job service", "err", err)
js.SvcErrBuffer.Append(pkgerrors.Wrap(err, "error stopping job service"))
} else {
sLggr.Debug("Stopped job service")
}
}
lggr.Debug("Stopped all services for job")

delete(js.activeJobs, jobID)
}
Expand Down Expand Up @@ -229,8 +233,6 @@ func (js *spawner) StartService(ctx context.Context, jb Job) error {
return pkgerrors.Wrapf(err, "failed to create services for job: %d", jb.ID)
}

lggr.Debugw("JobSpawner: Starting services for job", "count", len(srvs))

var ms services.MultiStart
for _, srv := range srvs {
err = ms.Start(ctx, srv)
Expand All @@ -247,7 +249,6 @@ func (js *spawner) StartService(ctx context.Context, jb Job) error {
}
aj.services = append(aj.services, srv)
}
lggr.Debugw("JobSpawner: Finished starting services for job", "count", len(srvs))
js.activeJobs[jb.ID] = aj
return nil
}
Expand Down Expand Up @@ -322,9 +323,7 @@ func (js *spawner) DeleteJob(ctx context.Context, ds sqlutil.DataSource, jobID i
}
}

lggr.Debugw("Callback: BeforeDeleteJob")
aj.delegate.BeforeJobDeleted(aj.spec)
lggr.Debugw("Callback: BeforeDeleteJob done")

err := sqlutil.Transact(ctx, js.orm.WithDataSource, ds, nil, func(tx ORM) error {
err := tx.DeleteJob(ctx, jobID, aj.spec.Type)
Expand All @@ -335,13 +334,11 @@ func (js *spawner) DeleteJob(ctx context.Context, ds sqlutil.DataSource, jobID i
// This comes after calling orm.DeleteJob(), so that any non-db side effects inside it only get executed if
// we know the DELETE will succeed. The DELETE will be finalized only if all db transactions in OnDeleteJob()
// succeed. If either of those fails, the job will not be stopped and everything will be rolled back.
lggr.Debugw("Callback: OnDeleteJob")
err = aj.delegate.OnDeleteJob(ctx, aj.spec)
if err != nil {
return err
}

lggr.Debugw("Callback: OnDeleteJob done")
return nil
})

Expand Down
41 changes: 31 additions & 10 deletions core/services/llo/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const (
// TransmissionReaperRetryFrequency is the frequency at which the reaper
// will retry if it fails to delete stale transmissions.
TransmissionReaperRetryFrequency = 5 * time.Second
// OvertimeDeleteTimeout is the maximum time we will spend trying to reap
// after exit signal before giving up and logging an error.
OvertimeDeleteTimeout = 2 * time.Second
)

type transmissionReaper struct {
Expand Down Expand Up @@ -73,11 +76,25 @@ func (t *transmissionReaper) start(context.Context) error {

func (t *transmissionReaper) runLoop(ctx context.Context) {
t.eng.Debugw("Transmission reaper running", "reapFreq", t.reapFreq, "maxAge", t.maxAge)
ticker := services.NewTicker(t.reapFreq)
ticker := services.TickerConfig{
// Don't reap right away, wait some time for the application to settle
// down first
Initial: services.DefaultJitter.Apply(t.reapFreq),
JitterPct: services.DefaultJitter,
}.NewTicker(t.reapFreq)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
// make a final effort to clear the database that goes into
// overtime
overtimeCtx, cancel := context.WithTimeout(context.Background(), OvertimeDeleteTimeout)
if n, err := t.reapStale(overtimeCtx, TransmissionReaperBatchSize); err != nil {
t.lggr.Errorw("Failed to reap stale transmissions on exit", "err", err)
} else if n > 0 {
t.lggr.Infow("Reaped stale transmissions on exit", "nDeleted", n)
}
cancel()
return
case <-ticker.C:
for {
Expand All @@ -86,16 +103,20 @@ func (t *transmissionReaper) runLoop(ctx context.Context) {
// deletion)
//
// https://smartcontract-it.atlassian.net/browse/MERC-6807
if n, err := t.reapStale(ctx, TransmissionReaperBatchSize); err != nil {
t.lggr.Errorw("Failed to reap", "err", err)
select {
case <-ctx.Done():
return
case <-time.After(TransmissionReaperRetryFrequency):
continue
n, err := t.reapStale(ctx, TransmissionReaperBatchSize)
if err == nil {
if n > 0 {
t.lggr.Infow("Reaped stale transmissions", "nDeleted", n)
}
} else {
t.lggr.Debugw("Reaped stale transmissions", "nDeleted", n)
break
}

t.lggr.Errorw("Failed to reap", "err", err)
select {
case <-ctx.Done():
return
case <-time.After(TransmissionReaperRetryFrequency):
continue
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions core/services/llo/mercurytransmitter/persistence_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewPersistenceManager(lggr logger.Logger, orm ORM, serverURL string, maxTra
return &persistenceManager{
orm: orm,
donID: orm.DonID(),
lggr: logger.Sugared(lggr).Named("LLOPersistenceManager").With("serverURL", serverURL),
lggr: logger.Sugared(lggr).Named("LLOPersistenceManager"),
serverURL: serverURL,
stopCh: make(services.StopChan),
maxTransmitQueueSize: maxTransmitQueueSize,
Expand Down Expand Up @@ -127,7 +127,12 @@ func (pm *persistenceManager) runFlushDeletesLoop() {
ctx, cancel := pm.stopCh.NewCtx()
defer cancel()

ticker := services.NewTicker(pm.flushDeletesFrequency)
ticker := services.TickerConfig{
// Don't prune right away, wait some time for the application to settle
// down first
Initial: services.DefaultJitter.Apply(pm.pruneFrequency),
JitterPct: services.DefaultJitter,
}.NewTicker(pm.pruneFrequency)
defer ticker.Stop()
for {
select {
Expand Down
2 changes: 1 addition & 1 deletion core/services/llo/mercurytransmitter/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func newTransmitter(opts Opts) *transmitter {
sugared := logger.Sugared(opts.Lggr).Named("LLOMercuryTransmitter")
servers := make(map[string]*server, len(opts.Clients))
for serverURL, client := range opts.Clients {
sLggr := sugared.Named(serverURL).With("serverURL", serverURL)
sLggr := sugared.Named(fmt.Sprintf("%q", serverURL)).With("serverURL", serverURL)
servers[serverURL] = newServer(sLggr, opts.VerboseLogging, opts.Cfg, client, opts.ORM, serverURL)
}
return &transmitter{
Expand Down
1 change: 0 additions & 1 deletion core/services/ocr2/plugins/llo/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,6 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, serverPubKey
err := db.GetContext(tests.Context(t), &cnt, "SELECT count(*) FROM llo_mercury_transmit_queue WHERE server_url = 'example.invalid'")
require.NoError(t, err)
assert.LessOrEqual(t, cnt, maxQueueSize, "persisted transmit queue size too large for node %d for failing server", i)
assert.Equal(t, maxQueueSize, cnt, "expected persisted transmit queue size to exactly equal maxQueueSize for node %d for failing server", i)

// The succeeding server
err = db.GetContext(tests.Context(t), &cnt, "SELECT count(*) FROM llo_mercury_transmit_queue WHERE server_url = $1", serverURL)
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ func (r *Relayer) NewLLOProvider(ctx context.Context, rargs commontypes.RelayArg
switch r.mercuryCfg.Transmitter().Protocol() {
case config.MercuryTransmitterProtocolGRPC:
client = grpc.NewClient(grpc.ClientOpts{
Logger: lggr.Named(server.URL),
Logger: lggr.Named(fmt.Sprintf("%q", server.URL)).With("serverURL", server.URL),
ClientPrivKey: privKey.PrivateKey(),
ServerPubKey: ed25519.PublicKey(server.PubKey),
ServerURL: server.URL,
Expand Down
2 changes: 0 additions & 2 deletions core/services/streams/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,10 @@ func (s *StreamService) Start(_ context.Context) error {
if s.jb.PipelineSpec == nil {
return errors.New("pipeline spec unexpectedly missing for stream")
}
s.lggr.Debugw("Registering stream", "jobID", s.jb.ID)
return s.registry.Register(s.jb, s.rrs)
}

func (s *StreamService) Close() error {
s.lggr.Debugw("Unregistering stream", "jobID", s.jb.ID)
s.registry.Unregister(s.jb.ID)
return nil
}
Expand Down

0 comments on commit a869c7c

Please sign in to comment.