diff --git a/internal/pkg/service/common/etcdop/watch.go b/internal/pkg/service/common/etcdop/watch.go index f693a3233c..e077ab66e0 100644 --- a/internal/pkg/service/common/etcdop/watch.go +++ b/internal/pkg/service/common/etcdop/watch.go @@ -2,7 +2,6 @@ package etcdop import ( "context" - "fmt" "time" "go.etcd.io/etcd/api/v3/mvccpb" @@ -158,7 +157,6 @@ func (v Prefix) WatchWithoutRestart(ctx context.Context, client etcd.Watcher, op // The rawCh channel is closed by the context, so the context does not have to be checked here again. rawCh := client.Watch(ctx, v.Prefix(), append([]etcd.OpOption{etcd.WithPrefix(), etcd.WithCreatedNotify()}, opts...)...) for rawResp := range rawCh { - fmt.Println("rawResp from WatchWithoutRestart", rawResp.Header.Revision) header := rawResp.Header resp := WatchResponseRaw{} resp.Header = &header @@ -197,7 +195,6 @@ func (v Prefix) WatchWithoutRestart(ctx context.Context, client etcd.Watcher, op // Map event type for _, rawEvent := range rawResp.Events { - fmt.Println("rawEvent from WatchWithoutRestart", string(rawEvent.Kv.Key), rawEvent.Type) var typ EventType switch { case rawEvent.IsCreate(): diff --git a/internal/pkg/service/common/etcdop/watch_mirror_map.go b/internal/pkg/service/common/etcdop/watch_mirror_map.go index 7c71c82be2..3c5b819c72 100644 --- a/internal/pkg/service/common/etcdop/watch_mirror_map.go +++ b/internal/pkg/service/common/etcdop/watch_mirror_map.go @@ -2,7 +2,6 @@ package etcdop import ( "context" - "fmt" "maps" "sync" @@ -89,7 +88,6 @@ func (m *MirrorMap[T, K, V]) StartMirroring(ctx context.Context, wg *sync.WaitGr // Atomically process all events for _, event := range events { - fmt.Println("event from StartMirroring", event.Key, event.Type) if m.filter != nil && !m.filter(event) { continue } @@ -152,7 +150,6 @@ func (m *MirrorMap[T, K, V]) StartMirroring(ctx context.Context, wg *sync.WaitGr m.updated = make(chan struct{}) m.updatedLock.Unlock() - fmt.Println("calling callbacks from mirror map", update, "restart", restart) // Call callbacks for _, fn := range m.onUpdate { go fn(update) diff --git a/internal/pkg/service/stream/sink/router/pipeline.go b/internal/pkg/service/stream/sink/router/pipeline.go index 6a71023e6b..f3da489992 100644 --- a/internal/pkg/service/stream/sink/router/pipeline.go +++ b/internal/pkg/service/stream/sink/router/pipeline.go @@ -65,6 +65,7 @@ func (r *pipelineRef) ensureOpened(c recordctx.Context) error { } // Try open, if needed, and there is no retry backoff delay active + r.lock.Lock() defer r.lock.Unlock() diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_sink.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_sink.go index ed413e8c22..341a6c0f20 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_sink.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_sink.go @@ -93,9 +93,14 @@ func (p *SinkPipeline) ReopenOnSinkModification() bool { } func (p *SinkPipeline) WriteRecord(c recordctx.Context) (pipelinePkg.WriteResult, error) { + + // Make a local copy of pipelines while holding the read lock p.writeLock.RLock() - defer p.writeLock.RUnlock() - return p.balancer.WriteRecord(c, p.pipelines) + pipelines := make([]balancer.SlicePipeline, len(p.pipelines)) + copy(pipelines, p.pipelines) + p.writeLock.RUnlock() + + return p.balancer.WriteRecord(c, pipelines) } // UpdateSlicePipelines reacts on slices changes - closes old pipelines and open new pipelines. diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_slice.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_slice.go index 7e7effe95d..62f41d0689 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_slice.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_slice.go @@ -50,7 +50,16 @@ type SliceData struct { LocalStorage localModel.Slice } -func NewSlicePipeline(ctx context.Context, logger log.Logger, telemetry telemetry.Telemetry, connections *connection.Manager, encoding *encoding.Manager, ready *readyNotifier, slice *SliceData, onClose func(ctx context.Context, cause string)) *SlicePipeline { +func NewSlicePipeline( + ctx context.Context, + logger log.Logger, + telemetry telemetry.Telemetry, + connections *connection.Manager, + encoding *encoding.Manager, + ready *readyNotifier, + slice *SliceData, + onClose func(ctx context.Context, cause string), +) *SlicePipeline { p := &SlicePipeline{ logger: logger.With(slice.SliceKey.Telemetry()...), telemetry: telemetry, diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router.go index 80cb4b38b9..b54ecaf53b 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router.go @@ -192,7 +192,8 @@ func (r *Router) onSlicesModification(ctx context.Context, modifiedSinks []key.S wg.Add(1) go func() { defer wg.Done() - if err := p.UpdateSlicePipelines(ctx, r.assignSinkSlices(sinkKey)); err != nil { + slices := r.assignSinkSlices(sinkKey) + if err := p.UpdateSlicePipelines(ctx, slices); err != nil { r.logger.Errorf(ctx, `cannot update sink slices pipelines: %s, sink %q`, err, sinkKey) } }() diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go index c37acf00ed..ce4d9b65a0 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go @@ -151,13 +151,15 @@ func OpenNetworkFile( ctx := context.Background() // It is expected to receive only one message, `io.EOF` or `message` that the termination is done if _, err := termStream.Recv(); err != nil { - if errors.Is(err, io.EOF) { + fmt.Println("err", err, "unwrap", errors.Unwrap(err)) + if strings.HasSuffix(err.Error(), io.EOF.Error()) { onServerTermination(ctx, "remote server shutdown") return } if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled { - if s.Code() == codes.Unavailable && errors.Is(s.Err(), io.EOF) { + fmt.Println("err", s.Err(), "unwrap", errors.Unwrap(s.Err())) + if s.Code() == codes.Unavailable && strings.HasSuffix(s.Err().Error(), io.EOF.Error()) { onServerTermination(ctx, "remote server shutdown") return } diff --git a/internal/pkg/service/stream/storage/level/local/encoding/manager.go b/internal/pkg/service/stream/storage/level/local/encoding/manager.go index cb9e067326..c95406e2fc 100644 --- a/internal/pkg/service/stream/storage/level/local/encoding/manager.go +++ b/internal/pkg/service/stream/storage/level/local/encoding/manager.go @@ -114,6 +114,7 @@ func (m *Manager) OpenPipeline( closeFunc, ) if err != nil { + m.removePipeline(sliceKey) return nil, err }