Skip to content

Commit

Permalink
fix: Add network output for test override.
Browse files Browse the repository at this point in the history
In production use `nil` as we would like to create actual network output
  • Loading branch information
Matovidlo committed Feb 17, 2025
1 parent 03064e0 commit 1bdd23f
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (p *SlicePipeline) tryOpen() error {
p.slice.Encoding,
p.slice.LocalStorage,
p.Close,
nil, // Do not override network output
)
if err != nil {
return errors.PrefixErrorf(err, "cannot open slice pipeline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding"
localModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/model"
volume "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/volume/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
Expand Down Expand Up @@ -115,7 +114,14 @@ func startDiskWriterNode(t *testing.T, ctx context.Context, etcdCfg etcdclient.C
return m
}

func openNetworkFile(t *testing.T, ctx context.Context, etcdCfg etcdclient.Config, sourceNodeID string, sliceKey model.SliceKey, slice localModel.Slice) (encoding.NetworkOutput, dependencies.Mocked) {
func openNetworkFile(
t *testing.T,
ctx context.Context,
etcdCfg etcdclient.Config,
sourceNodeID string,
sliceKey model.SliceKey,
slice localModel.Slice,
) (rpc.NetworkOutput, dependencies.Mocked) {
t.Helper()

d, m := dependencies.NewMockedSourceScopeWithConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,17 @@ func TestDefaultFactory_FileTypeCSV(t *testing.T) {
slice := test.NewSlice()
slice.Encoding.Encoder.Type = encoder.TypeCSV

w, err := d.EncodingManager().OpenPipeline(ctx, slice.SliceKey, slice.Mapping, slice.Encoding, discardOutput{})
w, err := d.EncodingManager().OpenPipeline(
ctx,
slice.SliceKey,
d.Telemetry(),
d.ConnectionManager(),
slice.Mapping,
slice.Encoding,
slice.LocalStorage,
func(ctx context.Context, cause string) {},
discardOutput{},
)
require.NoError(t, err)
assert.NotNil(t, w)
}
Expand All @@ -57,7 +67,17 @@ func TestDefaultFactory_FileTypeInvalid(t *testing.T) {
slice := test.NewSlice()
slice.Encoding.Encoder.Type = "invalid"

_, err := d.EncodingManager().OpenPipeline(ctx, slice.SliceKey, slice.Mapping, slice.Encoding, discardOutput{})
_, err := d.EncodingManager().OpenPipeline(
ctx,
slice.SliceKey,
d.Telemetry(),
d.ConnectionManager(),
slice.Mapping,
slice.Encoding,
slice.LocalStorage,
func(ctx context.Context, cause string) {},
discardOutput{},
)
if assert.Error(t, err) {
assert.Equal(t, `unexpected encoder type "invalid"`, err.Error())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/table"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc"
encoding "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/config"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/events"
localModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/model"
Expand Down Expand Up @@ -92,6 +93,7 @@ func (m *Manager) OpenPipeline(
encodingCfg encoding.Config,
localStorage localModel.Slice,
closeFunc func(ctx context.Context, cause string),
network rpc.NetworkOutput,
) (w Pipeline, err error) {
// Check if the pipeline already exists, if not, register an empty reference to unlock immediately
ref, exists := m.addPipeline(sliceKey)
Expand All @@ -112,6 +114,7 @@ func (m *Manager) OpenPipeline(
localStorage,
m.events,
closeFunc,
network,
)
if err != nil {
m.removePipeline(sliceKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Pipeline interface {

// IsReady is used for load balancing, to detect health pipelines.
IsReady() bool
// NetworkOutput returns the network output of the pipeline.
NetworkOutput() rpc.NetworkOutput
// WriteRecord blocks until the record is written and synced to the local storage, if the wait is enabled.
WriteRecord(record recordctx.Context) (int, error)
// Events provides listening to the writer lifecycle.
Expand Down Expand Up @@ -116,6 +118,7 @@ func newPipeline(
localStorage localModel.Slice,
events *events.Events[Pipeline],
closeFunc func(ctx context.Context, cause string),
network rpc.NetworkOutput,
) (out Pipeline, err error) {
p := &pipeline{
logger: logger.WithComponent("encoding.pipeline"),
Expand All @@ -131,9 +134,12 @@ func newPipeline(

// Open remote RPC file
// The disk writer node can notify us of its termination. In that case, we have to gracefully close the pipeline, see Close method.
p.network, err = rpc.OpenNetworkFile(ctx, p.logger, p.telemetry, p.connections, p.sliceKey, localStorage, p.closeFunc)
if err != nil {
return nil, errors.PrefixErrorf(err, "cannot open network file for new slice pipeline")
p.network = network
if network == nil {
p.network, err = rpc.OpenNetworkFile(ctx, p.logger, p.telemetry, p.connections, p.sliceKey, p.localStorage, p.closeFunc)
if err != nil {
return nil, errors.PrefixErrorf(err, "cannot open network file for new slice pipeline")
}
}

ctx = context.WithoutCancel(ctx)
Expand Down Expand Up @@ -282,6 +288,10 @@ func (p *pipeline) IsReady() bool {
return p.ready
}

func (p *pipeline) NetworkOutput() rpc.NetworkOutput {
return p.network
}

func (p *pipeline) WriteRecord(record recordctx.Context) (int, error) {
timestamp := record.Timestamp()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
commonDeps "github.com/keboola/keboola-as-code/internal/pkg/service/common/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/recordctx"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/connection"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/encoder"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding/encoder/result"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/events"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
"github.com/keboola/keboola-as-code/internal/pkg/validator"
)
Expand All @@ -41,9 +43,18 @@ func TestEncodingPipeline_Basic(t *testing.T) {

output := newDummyOutput()

w, err := d.EncodingManager().OpenPipeline(ctx, slice.SliceKey, slice.Mapping, slice.Encoding, output)
w, err := d.EncodingManager().OpenPipeline(
ctx,
slice.SliceKey,
d.Telemetry(),
d.ConnectionManager(),
slice.Mapping,
slice.Encoding,
slice.LocalStorage,
func(ctx context.Context, cause string) {},
output,
)
require.NoError(t, err)

// Test getters
assert.Equal(t, slice.SliceKey, w.SliceKey())

Expand Down Expand Up @@ -81,9 +92,18 @@ func TestEncodingPipeline_FlushError(t *testing.T) {
return w, nil
})

w, err := d.EncodingManager().OpenPipeline(ctx, slice.SliceKey, slice.Mapping, slice.Encoding, newDummyOutput())
w, err := d.EncodingManager().OpenPipeline(
ctx,
slice.SliceKey,
d.Telemetry(),
d.ConnectionManager(),
slice.Mapping,
slice.Encoding,
slice.LocalStorage,
func(ctx context.Context, cause string) {},
nil,
)
require.NoError(t, err)

// Test Close method
err = w.Close(ctx)
if assert.Error(t, err) {
Expand All @@ -104,7 +124,17 @@ func TestEncodingPipeline_CloseError(t *testing.T) {
return w, nil
})

w, err := d.EncodingManager().OpenPipeline(ctx, slice.SliceKey, slice.Mapping, slice.Encoding, newDummyOutput())
w, err := d.EncodingManager().OpenPipeline(
ctx,
slice.SliceKey,
d.Telemetry(),
d.ConnectionManager(),
slice.Mapping,
slice.Encoding,
slice.LocalStorage,
func(ctx context.Context, cause string) {},
nil,
)
require.NoError(t, err)

// Test Close method
Expand Down Expand Up @@ -508,14 +538,16 @@ foo4
// encodingTestCase is a helper to open encoding pipeline in tests.
type encodingTestCase struct {
*writerSyncHelper
T *testing.T
Ctx context.Context
Logger log.DebugLogger
Clock *clockwork.FakeClock
Events *events.Events[encoding.Pipeline]
Output *dummyOutput
Manager *encoding.Manager
Slice *model.Slice
T *testing.T
Ctx context.Context
Logger log.DebugLogger
Clock *clockwork.FakeClock
Telemetry telemetry.Telemetry
ConnectionManager *connection.Manager
Output *dummyOutput
Events *events.Events[encoding.Pipeline]
Manager *encoding.Manager
Slice *model.Slice
}

type writerSyncHelper struct {
Expand All @@ -542,15 +574,17 @@ func newEncodingTestCase(t *testing.T) *encodingTestCase {
slice.Encoding.Sync.OverrideSyncerFactory = helper

tc := &encodingTestCase{
T: t,
writerSyncHelper: helper,
Ctx: ctx,
Logger: mock.DebugLogger(),
Clock: clockwork.NewFakeClock(),
Events: events.New[encoding.Pipeline](),
Output: newDummyOutput(),
Slice: slice,
Manager: d.EncodingManager(),
T: t,
writerSyncHelper: helper,
Ctx: ctx,
Logger: mock.DebugLogger(),
Clock: clockwork.NewFakeClock(),
Telemetry: d.Telemetry(),
ConnectionManager: d.ConnectionManager(),
Output: newDummyOutput(),
Events: events.New[encoding.Pipeline](),
Slice: slice,
Manager: d.EncodingManager(),
}
return tc
}
Expand All @@ -560,7 +594,17 @@ func (tc *encodingTestCase) OpenPipeline() (encoding.Pipeline, error) {
val := validator.New()
require.NoError(tc.T, val.Validate(context.Background(), tc.Slice))

w, err := tc.Manager.OpenPipeline(tc.Ctx, tc.Slice.SliceKey, tc.Slice.Mapping, tc.Slice.Encoding, tc.Output)
w, err := tc.Manager.OpenPipeline(
tc.Ctx,
tc.Slice.SliceKey,
tc.Telemetry,
tc.ConnectionManager,
tc.Slice.Mapping,
tc.Slice.Encoding,
tc.Slice.LocalStorage,
func(ctx context.Context, cause string) {},
tc.Output,
)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/recordctx"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/diskwriter/network/rpc"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/encoding"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/level/local/events"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
Expand Down Expand Up @@ -177,6 +178,10 @@ type testWriter struct {
UncompressedSizeValue datasize.ByteSize
}

func (w *testWriter) NetworkOutput() rpc.NetworkOutput {
return nil
}

func (w *testWriter) SliceKey() model.SliceKey {
return w.SliceKeyValue
}
Expand Down

0 comments on commit 1bdd23f

Please sign in to comment.