Skip to content

Commit

Permalink
feat: Open file when closed during processing chunks.
Browse files Browse the repository at this point in the history
  • Loading branch information
Matovidlo committed Feb 13, 2025
1 parent 2257506 commit e40bfb8
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 48 deletions.
1 change: 1 addition & 0 deletions internal/pkg/service/common/distribution/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (n *GroupNode) updateNodesFrom(ctx context.Context, events []etcdop.WatchEv

var out Events
for _, rawEvent := range events {
fmt.Println("updateNodesFrom", rawEvent.Key, rawEvent.Type)
switch rawEvent.Type {
case etcdop.CreateEvent, etcdop.UpdateEvent:
nodeID := string(rawEvent.Kv.Value)
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/service/stream/sink/router/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (r *pipelineRef) ensureOpened(c recordctx.Context) error {
}

// Try open, if needed, and there is no retry backoff delay active

r.lock.Lock()
defer r.lock.Unlock()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,8 @@ func (m *Manager) ConnectionsCount() int {
}

func (m *Manager) updateConnections(ctx context.Context) {
m.logger.Infof(ctx, `the list of volumes has changed, updating connections`)

activeNodes := m.writerNodes()
m.logger.Infof(ctx, `the list of volumes has changed, updating connections: %v`, activeNodes)

// Detect new nodes - to open connection
var toOpen []*nodeData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,14 @@ func (p *SinkPipeline) ReopenOnSinkModification() bool {
}

func (p *SinkPipeline) WriteRecord(c recordctx.Context) (pipelinePkg.WriteResult, error) {

// Make a local copy of pipelines while holding the read lock
p.writeLock.RLock()
defer p.writeLock.RUnlock()
return p.balancer.WriteRecord(c, p.pipelines)
pipelines := make([]balancer.SlicePipeline, len(p.pipelines))
copy(pipelines, p.pipelines)
p.writeLock.RUnlock()

return p.balancer.WriteRecord(c, pipelines)
}

// UpdateSlicePipelines reacts on slices changes - closes old pipelines and open new pipelines.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
pipelinePkg "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/pipeline"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/router/balancer"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding"
encodingCfg "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/config"
localModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/model"
Expand Down Expand Up @@ -51,7 +50,16 @@ type SliceData struct {
LocalStorage localModel.Slice
}

func NewSlicePipeline(ctx context.Context, logger log.Logger, telemetry telemetry.Telemetry, connections *connection.Manager, encoding *encoding.Manager, ready *readyNotifier, slice *SliceData, onClose func(ctx context.Context, cause string)) *SlicePipeline {
func NewSlicePipeline(
ctx context.Context,
logger log.Logger,
telemetry telemetry.Telemetry,
connections *connection.Manager,
encoding *encoding.Manager,
ready *readyNotifier,
slice *SliceData,
onClose func(ctx context.Context, cause string),
) *SlicePipeline {
p := &SlicePipeline{
logger: logger.With(slice.SliceKey.Telemetry()...),
telemetry: telemetry,
Expand Down Expand Up @@ -159,17 +167,19 @@ func (p *SlicePipeline) tryOpen() error {

ctx := p.ctx

// Open remote RPC file
// The disk writer node can notify us of its termination. In that case, we have to gracefully close the pipeline, see Close method.
remoteFile, err := rpc.OpenNetworkFile(ctx, p.logger, p.telemetry, p.connections, p.slice.SliceKey, p.slice.LocalStorage, p.Close)
if err != nil {
return errors.PrefixErrorf(err, "cannot open network file for new slice pipeline")
}

// Open pipeline
p.pipeline, err = p.encoding.OpenPipeline(ctx, p.slice.SliceKey, p.slice.Mapping, p.slice.Encoding, remoteFile)
var err error
p.pipeline, err = p.encoding.OpenPipeline(
ctx,
p.slice.SliceKey,
p.telemetry,
p.connections,
p.slice.Mapping,
p.slice.Encoding,
p.slice.LocalStorage,
p.Close,
)
if err != nil {
_ = remoteFile.Close(ctx)
return errors.PrefixErrorf(err, "cannot open slice pipeline")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ func (r *Router) onSlicesModification(ctx context.Context, modifiedSinks []key.S
wg.Add(1)
go func() {
defer wg.Done()
if err := p.UpdateSlicePipelines(ctx, r.assignSinkSlices(sinkKey)); err != nil {
slices := r.assignSinkSlices(sinkKey)
if err := p.UpdateSlicePipelines(ctx, slices); err != nil {
r.logger.Errorf(ctx, `cannot update sink slices pipelines: %s, sink %q`, err, sinkKey)
}
}()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/ctxattr"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/pb"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding"
localModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
Expand All @@ -35,6 +34,20 @@ type networkFile struct {
closed <-chan struct{}
}

// NetworkOutput represent a file on a disk writer node, connected via network.
type NetworkOutput interface {
// IsReady returns true if the underlying network is working.
IsReady() bool
// Write bytes to a file in disk writer node.
// When write is aligned, it commits that already writen bytes are safely stored.
// The operation is used on Flush of the encoding pipeline.
Write(ctx context.Context, aligned bool, p []byte) (n int, err error)
// Sync OS disk cache to the physical disk.
Sync(ctx context.Context) error
// Close the underlying OS file and network connection.
Close(ctx context.Context) error
}

func OpenNetworkFile(
ctx context.Context,
logger log.Logger,
Expand All @@ -43,7 +56,7 @@ func OpenNetworkFile(
sliceKey model.SliceKey,
slice localModel.Slice,
onServerTermination func(ctx context.Context, cause string),
) (encoding.NetworkOutput, error) {
) (NetworkOutput, error) {
logger = logger.WithComponent("rpc")

// Use transport layer with multiplexer for connection
Expand Down Expand Up @@ -138,13 +151,15 @@ func OpenNetworkFile(
ctx := context.Background()
// It is expected to receive only one message, `io.EOF` or `message` that the termination is done
if _, err := termStream.Recv(); err != nil {
if errors.Is(err, io.EOF) {
fmt.Println("err", err, "unwrap", errors.Unwrap(err))

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / E2E: Stream / test

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: fmt

Check failure on line 154 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: fmt
if strings.HasSuffix(err.Error(), io.EOF.Error()) {

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / E2E: Stream / test

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: strings

Check failure on line 155 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: strings
onServerTermination(ctx, "remote server shutdown")
return
}

if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled {
if s.Code() == codes.Unavailable && errors.Is(s.Err(), io.EOF) {
fmt.Println("err", s.Err(), "unwrap", errors.Unwrap(s.Err()))

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / E2E: Stream / test

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: fmt

Check failure on line 161 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: fmt
if s.Code() == codes.Unavailable && strings.HasSuffix(s.Err().Error(), io.EOF.Error()) {

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / E2E: Stream / test

undefined: strings

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: strings (typecheck)

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Lint / lint

undefined: strings) (typecheck)

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: strings

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / windows

undefined: strings

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: strings

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / linux

undefined: strings

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: strings

Check failure on line 162 in internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc/client.go

View workflow job for this annotation

GitHub Actions / Unit Tests / mac-os

undefined: strings
onServerTermination(ctx, "remote server shutdown")
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/table"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection"
encoding "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/config"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/events"
localModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

Expand Down Expand Up @@ -80,16 +83,38 @@ func (m *Manager) Pipelines() (out []Pipeline) {
return out
}

func (m *Manager) OpenPipeline(ctx context.Context, sliceKey model.SliceKey, mappingCfg table.Mapping, encodingCfg encoding.Config, out NetworkOutput) (w Pipeline, err error) {
func (m *Manager) OpenPipeline(
ctx context.Context,
sliceKey model.SliceKey,
telemetry telemetry.Telemetry,
connections *connection.Manager,
mappingCfg table.Mapping,
encodingCfg encoding.Config,
localStorage localModel.Slice,
closeFunc func(ctx context.Context, cause string),
) (w Pipeline, err error) {
// Check if the pipeline already exists, if not, register an empty reference to unlock immediately
ref, exists := m.addPipeline(sliceKey)
if exists {
return nil, errors.Errorf(`encoding pipeline for slice "%s" already exists`, sliceKey.String())
}

// Create pipeline
ref.Pipeline, err = newPipeline(ctx, m.logger, m.clock, sliceKey, mappingCfg, encodingCfg, out, m.events)
ref.Pipeline, err = newPipeline(
ctx,
m.logger,
m.clock,
sliceKey,
telemetry,
connections,
mappingCfg,
encodingCfg,
localStorage,
m.events,
closeFunc,
)
if err != nil {
m.removePipeline(sliceKey)
return nil, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package encoding

import (
"context"
"fmt"
"io"
"os"
"strings"
"sync"
"time"

Expand All @@ -16,6 +19,8 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/recordctx"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/table"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc"

Check failure on line 23 in internal/pkg/service/stream/storage/level/local/encoding/pipeline.go

View workflow job for this annotation

GitHub Actions / Lint / lint

could not import github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc (-: # github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/chunk"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/compression"
compressionWriter "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/compression/writer"
Expand All @@ -27,7 +32,9 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/writechain"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/writesync"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/events"
localModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

Expand Down Expand Up @@ -62,20 +69,6 @@ type StatisticsProvider interface {
UncompressedSize() datasize.ByteSize
}

// NetworkOutput represent a file on a disk writer node, connected via network.
type NetworkOutput interface {
// IsReady returns true if the underlying network is working.
IsReady() bool
// Write bytes to a file in disk writer node.
// When write is aligned, it commits that already writen bytes are safely stored.
// The operation is used on Flush of the encoding pipeline.
Write(ctx context.Context, aligned bool, p []byte) (n int, err error)
// Sync OS disk cache to the physical disk.
Sync(ctx context.Context) error
// Close the underlying OS file and network connection.
Close(ctx context.Context) error
}

// pipeline implements Pipeline interface, it wraps common logic for all file types.
// For conversion between record values and bytes, the encoder.Encoder is used.
type pipeline struct {
Expand All @@ -84,11 +77,15 @@ type pipeline struct {
events *events.Events[Pipeline]
flushLock sync.RWMutex

encoder encoder.Encoder
chain *writechain.Chain
syncer *writesync.Syncer
chunks *chunk.Writer
network NetworkOutput
encoder encoder.Encoder
chain *writechain.Chain
syncer *writesync.Syncer
chunks *chunk.Writer
connections *connection.Manager
telemetry telemetry.Telemetry
localStorage localModel.Slice
network rpc.NetworkOutput
closeFunc func(ctx context.Context, cause string)

readyLock sync.RWMutex
ready bool
Expand All @@ -113,18 +110,31 @@ func newPipeline(
logger log.Logger,
clk clockwork.Clock,
sliceKey model.SliceKey,
telemetry telemetry.Telemetry,
connections *connection.Manager,
mappingCfg table.Mapping,
encodingCfg encoding.Config,
network NetworkOutput,
localStorage localModel.Slice,
events *events.Events[Pipeline],
closeFunc func(ctx context.Context, cause string),
) (out Pipeline, err error) {
p := &pipeline{
logger: logger.WithComponent("encoding.pipeline"),
sliceKey: sliceKey,
network: network,
events: events.Clone(), // clone passed events, so additional pipeline specific listeners can be added
ready: true,
closed: make(chan struct{}),
logger: logger.WithComponent("encoding.pipeline"),
telemetry: telemetry,
connections: connections,
sliceKey: sliceKey,
events: events.Clone(), // clone passed events, so additional pipeline specific listeners can be added
ready: true,
closeFunc: closeFunc,
localStorage: localStorage,
closed: make(chan struct{}),
}

// Open remote RPC file
// The disk writer node can notify us of its termination. In that case, we have to gracefully close the pipeline, see Close method.
p.network, err = rpc.OpenNetworkFile(ctx, p.logger, p.telemetry, p.connections, p.sliceKey, localStorage, p.closeFunc)
if err != nil {
return nil, errors.PrefixErrorf(err, "cannot open network file for new slice pipeline")
}

ctx = context.WithoutCancel(ctx)
Expand Down Expand Up @@ -444,6 +454,23 @@ func (p *pipeline) processChunks(ctx context.Context, clk clockwork.Clock, encod
}
l := datasize.ByteSize(length)
if _, err := p.network.Write(ctx, chunk.Aligned(), chunk.Bytes()); err != nil {
fmt.Println("err", errors.Is(errors.Unwrap(err), os.ErrClosed), errors.Unwrap(errors.Unwrap(err)))
if strings.HasSuffix(err.Error(), os.ErrClosed.Error()) {
// Open remote RPC file
p.network, err = rpc.OpenNetworkFile(
ctx,
p.logger,
p.telemetry,
p.connections,
p.sliceKey,
p.localStorage,
p.closeFunc,
)
if err != nil {
return errors.PrefixErrorf(err, "cannot open network file for new slice pipeline")
}
}

p.logger.Debugf(ctx, "chunk write failed, size %q: %s", l.String(), err)
return err
}
Expand Down

0 comments on commit e40bfb8

Please sign in to comment.