diff --git a/internal/pkg/service/common/distribution/node.go b/internal/pkg/service/common/distribution/node.go index 28c9595f4c..4396d37150 100644 --- a/internal/pkg/service/common/distribution/node.go +++ b/internal/pkg/service/common/distribution/node.go @@ -217,6 +217,7 @@ func (n *GroupNode) updateNodesFrom(ctx context.Context, events []etcdop.WatchEv var out Events for _, rawEvent := range events { + fmt.Println("updateNodesFrom", rawEvent.Key, rawEvent.Type) switch rawEvent.Type { case etcdop.CreateEvent, etcdop.UpdateEvent: nodeID := string(rawEvent.Kv.Value) diff --git a/internal/pkg/service/stream/sink/router/pipeline.go b/internal/pkg/service/stream/sink/router/pipeline.go index 6a71023e6b..f3da489992 100644 --- a/internal/pkg/service/stream/sink/router/pipeline.go +++ b/internal/pkg/service/stream/sink/router/pipeline.go @@ -65,6 +65,7 @@ func (r *pipelineRef) ensureOpened(c recordctx.Context) error { } // Try open, if needed, and there is no retry backoff delay active + r.lock.Lock() defer r.lock.Unlock() diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection/connection.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection/connection.go index 5aa60651d6..0e120aa2bd 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection/connection.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection/connection.go @@ -143,9 +143,8 @@ func (m *Manager) ConnectionsCount() int { } func (m *Manager) updateConnections(ctx context.Context) { - m.logger.Infof(ctx, `the list of volumes has changed, updating connections`) - activeNodes := m.writerNodes() + m.logger.Infof(ctx, `the list of volumes has changed, updating connections: %v`, activeNodes) // Detect new nodes - to open connection var toOpen []*nodeData diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_sink.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_sink.go index ed413e8c22..341a6c0f20 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_sink.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_sink.go @@ -93,9 +93,14 @@ func (p *SinkPipeline) ReopenOnSinkModification() bool { } func (p *SinkPipeline) WriteRecord(c recordctx.Context) (pipelinePkg.WriteResult, error) { + + // Make a local copy of pipelines while holding the read lock p.writeLock.RLock() - defer p.writeLock.RUnlock() - return p.balancer.WriteRecord(c, p.pipelines) + pipelines := make([]balancer.SlicePipeline, len(p.pipelines)) + copy(pipelines, p.pipelines) + p.writeLock.RUnlock() + + return p.balancer.WriteRecord(c, pipelines) } // UpdateSlicePipelines reacts on slices changes - closes old pipelines and open new pipelines. diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_slice.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_slice.go index 61a19eff88..62f41d0689 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_slice.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/pipeline/pipeline_slice.go @@ -14,7 +14,6 @@ import ( pipelinePkg "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/pipeline" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/balancer" - "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding" encodingCfg "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/config" localModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/model" @@ -51,7 +50,16 @@ type SliceData struct { LocalStorage localModel.Slice } -func NewSlicePipeline(ctx context.Context, logger log.Logger, telemetry telemetry.Telemetry, connections *connection.Manager, encoding *encoding.Manager, ready *readyNotifier, slice *SliceData, onClose func(ctx context.Context, cause string)) *SlicePipeline { +func NewSlicePipeline( + ctx context.Context, + logger log.Logger, + telemetry telemetry.Telemetry, + connections *connection.Manager, + encoding *encoding.Manager, + ready *readyNotifier, + slice *SliceData, + onClose func(ctx context.Context, cause string), +) *SlicePipeline { p := &SlicePipeline{ logger: logger.With(slice.SliceKey.Telemetry()...), telemetry: telemetry, @@ -159,17 +167,19 @@ func (p *SlicePipeline) tryOpen() error { ctx := p.ctx - // Open remote RPC file - // The disk writer node can notify us of its termination. In that case, we have to gracefully close the pipeline, see Close method. - remoteFile, err := rpc.OpenNetworkFile(ctx, p.logger, p.telemetry, p.connections, p.slice.SliceKey, p.slice.LocalStorage, p.Close) - if err != nil { - return errors.PrefixErrorf(err, "cannot open network file for new slice pipeline") - } - // Open pipeline - p.pipeline, err = p.encoding.OpenPipeline(ctx, p.slice.SliceKey, p.slice.Mapping, p.slice.Encoding, remoteFile) + var err error + p.pipeline, err = p.encoding.OpenPipeline( + ctx, + p.slice.SliceKey, + p.telemetry, + p.connections, + p.slice.Mapping, + p.slice.Encoding, + p.slice.LocalStorage, + p.Close, + ) if err != nil { - _ = remoteFile.Close(ctx) return errors.PrefixErrorf(err, "cannot open slice pipeline") } diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router.go index 80cb4b38b9..b54ecaf53b 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/router.go @@ -192,7 +192,8 @@ func (r *Router) onSlicesModification(ctx context.Context, modifiedSinks []key.S wg.Add(1) go func() { defer wg.Done() - if err := p.UpdateSlicePipelines(ctx, r.assignSinkSlices(sinkKey)); err != nil { + slices := r.assignSinkSlices(sinkKey) + if err := p.UpdateSlicePipelines(ctx, slices); err != nil { r.logger.Errorf(ctx, `cannot update sink slices pipelines: %s, sink %q`, err, sinkKey) } }() diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go b/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go index b7d1f72135..ce4d9b65a0 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go @@ -17,7 +17,6 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/service/common/ctxattr" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/pb" - "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding" localModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" "github.com/keboola/keboola-as-code/internal/pkg/telemetry" @@ -35,6 +34,20 @@ type networkFile struct { closed <-chan struct{} } +// NetworkOutput represent a file on a disk writer node, connected via network. +type NetworkOutput interface { + // IsReady returns true if the underlying network is working. + IsReady() bool + // Write bytes to a file in disk writer node. + // When write is aligned, it commits that already writen bytes are safely stored. + // The operation is used on Flush of the encoding pipeline. + Write(ctx context.Context, aligned bool, p []byte) (n int, err error) + // Sync OS disk cache to the physical disk. + Sync(ctx context.Context) error + // Close the underlying OS file and network connection. + Close(ctx context.Context) error +} + func OpenNetworkFile( ctx context.Context, logger log.Logger, @@ -43,7 +56,7 @@ func OpenNetworkFile( sliceKey model.SliceKey, slice localModel.Slice, onServerTermination func(ctx context.Context, cause string), -) (encoding.NetworkOutput, error) { +) (NetworkOutput, error) { logger = logger.WithComponent("rpc") // Use transport layer with multiplexer for connection @@ -138,13 +151,15 @@ func OpenNetworkFile( ctx := context.Background() // It is expected to receive only one message, `io.EOF` or `message` that the termination is done if _, err := termStream.Recv(); err != nil { - if errors.Is(err, io.EOF) { + fmt.Println("err", err, "unwrap", errors.Unwrap(err)) + if strings.HasSuffix(err.Error(), io.EOF.Error()) { onServerTermination(ctx, "remote server shutdown") return } if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled { - if s.Code() == codes.Unavailable && errors.Is(s.Err(), io.EOF) { + fmt.Println("err", s.Err(), "unwrap", errors.Unwrap(s.Err())) + if s.Code() == codes.Unavailable && strings.HasSuffix(s.Err().Error(), io.EOF.Error()) { onServerTermination(ctx, "remote server shutdown") return } diff --git a/internal/pkg/service/stream/storage/level/local/encoding/manager.go b/internal/pkg/service/stream/storage/level/local/encoding/manager.go index 532d50d289..c95406e2fc 100644 --- a/internal/pkg/service/stream/storage/level/local/encoding/manager.go +++ b/internal/pkg/service/stream/storage/level/local/encoding/manager.go @@ -10,9 +10,12 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/log" "github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/table" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection" encoding "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/config" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/events" + localModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" + "github.com/keboola/keboola-as-code/internal/pkg/telemetry" "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" ) @@ -80,7 +83,16 @@ func (m *Manager) Pipelines() (out []Pipeline) { return out } -func (m *Manager) OpenPipeline(ctx context.Context, sliceKey model.SliceKey, mappingCfg table.Mapping, encodingCfg encoding.Config, out NetworkOutput) (w Pipeline, err error) { +func (m *Manager) OpenPipeline( + ctx context.Context, + sliceKey model.SliceKey, + telemetry telemetry.Telemetry, + connections *connection.Manager, + mappingCfg table.Mapping, + encodingCfg encoding.Config, + localStorage localModel.Slice, + closeFunc func(ctx context.Context, cause string), +) (w Pipeline, err error) { // Check if the pipeline already exists, if not, register an empty reference to unlock immediately ref, exists := m.addPipeline(sliceKey) if exists { @@ -88,8 +100,21 @@ func (m *Manager) OpenPipeline(ctx context.Context, sliceKey model.SliceKey, map } // Create pipeline - ref.Pipeline, err = newPipeline(ctx, m.logger, m.clock, sliceKey, mappingCfg, encodingCfg, out, m.events) + ref.Pipeline, err = newPipeline( + ctx, + m.logger, + m.clock, + sliceKey, + telemetry, + connections, + mappingCfg, + encodingCfg, + localStorage, + m.events, + closeFunc, + ) if err != nil { + m.removePipeline(sliceKey) return nil, err } diff --git a/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go b/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go index dc43d5a7fd..8be4ac9aa5 100644 --- a/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go +++ b/internal/pkg/service/stream/storage/level/local/encoding/pipeline.go @@ -2,7 +2,10 @@ package encoding import ( "context" + "fmt" "io" + "os" + "strings" "sync" "time" @@ -16,6 +19,8 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/recordctx" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/table" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection" + "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/chunk" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/compression" compressionWriter "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/compression/writer" @@ -27,7 +32,9 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/writechain" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/writesync" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/events" + localModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" + "github.com/keboola/keboola-as-code/internal/pkg/telemetry" "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" ) @@ -62,20 +69,6 @@ type StatisticsProvider interface { UncompressedSize() datasize.ByteSize } -// NetworkOutput represent a file on a disk writer node, connected via network. -type NetworkOutput interface { - // IsReady returns true if the underlying network is working. - IsReady() bool - // Write bytes to a file in disk writer node. - // When write is aligned, it commits that already writen bytes are safely stored. - // The operation is used on Flush of the encoding pipeline. - Write(ctx context.Context, aligned bool, p []byte) (n int, err error) - // Sync OS disk cache to the physical disk. - Sync(ctx context.Context) error - // Close the underlying OS file and network connection. - Close(ctx context.Context) error -} - // pipeline implements Pipeline interface, it wraps common logic for all file types. // For conversion between record values and bytes, the encoder.Encoder is used. type pipeline struct { @@ -84,11 +77,15 @@ type pipeline struct { events *events.Events[Pipeline] flushLock sync.RWMutex - encoder encoder.Encoder - chain *writechain.Chain - syncer *writesync.Syncer - chunks *chunk.Writer - network NetworkOutput + encoder encoder.Encoder + chain *writechain.Chain + syncer *writesync.Syncer + chunks *chunk.Writer + connections *connection.Manager + telemetry telemetry.Telemetry + localStorage localModel.Slice + network rpc.NetworkOutput + closeFunc func(ctx context.Context, cause string) readyLock sync.RWMutex ready bool @@ -113,18 +110,31 @@ func newPipeline( logger log.Logger, clk clockwork.Clock, sliceKey model.SliceKey, + telemetry telemetry.Telemetry, + connections *connection.Manager, mappingCfg table.Mapping, encodingCfg encoding.Config, - network NetworkOutput, + localStorage localModel.Slice, events *events.Events[Pipeline], + closeFunc func(ctx context.Context, cause string), ) (out Pipeline, err error) { p := &pipeline{ - logger: logger.WithComponent("encoding.pipeline"), - sliceKey: sliceKey, - network: network, - events: events.Clone(), // clone passed events, so additional pipeline specific listeners can be added - ready: true, - closed: make(chan struct{}), + logger: logger.WithComponent("encoding.pipeline"), + telemetry: telemetry, + connections: connections, + sliceKey: sliceKey, + events: events.Clone(), // clone passed events, so additional pipeline specific listeners can be added + ready: true, + closeFunc: closeFunc, + localStorage: localStorage, + closed: make(chan struct{}), + } + + // Open remote RPC file + // The disk writer node can notify us of its termination. In that case, we have to gracefully close the pipeline, see Close method. + p.network, err = rpc.OpenNetworkFile(ctx, p.logger, p.telemetry, p.connections, p.sliceKey, localStorage, p.closeFunc) + if err != nil { + return nil, errors.PrefixErrorf(err, "cannot open network file for new slice pipeline") } ctx = context.WithoutCancel(ctx) @@ -444,6 +454,23 @@ func (p *pipeline) processChunks(ctx context.Context, clk clockwork.Clock, encod } l := datasize.ByteSize(length) if _, err := p.network.Write(ctx, chunk.Aligned(), chunk.Bytes()); err != nil { + fmt.Println("err", errors.Is(errors.Unwrap(err), os.ErrClosed), errors.Unwrap(errors.Unwrap(err))) + if strings.HasSuffix(err.Error(), os.ErrClosed.Error()) { + // Open remote RPC file + p.network, err = rpc.OpenNetworkFile( + ctx, + p.logger, + p.telemetry, + p.connections, + p.sliceKey, + p.localStorage, + p.closeFunc, + ) + if err != nil { + return errors.PrefixErrorf(err, "cannot open network file for new slice pipeline") + } + } + p.logger.Debugf(ctx, "chunk write failed, size %q: %s", l.String(), err) return err }