Skip to content

Commit

Permalink
test: Temporary pipeline error
Browse files Browse the repository at this point in the history
  • Loading branch information
jachym-tousek-keboola committed Feb 11, 2025
1 parent 2ea640e commit 93f5bb1
Showing 1 changed file with 116 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,118 @@ foo4
`)
}

func TestEncodingPipeline_TemporaryError(t *testing.T) {
t.Parallel()

ctx := context.Background()
tc := newEncodingTestCase(t)
tc.Slice.Encoding.Sync.Mode = writesync.ModeDisk
tc.Slice.Encoding.Sync.Wait = true

w, err := tc.OpenPipeline()
require.NoError(t, err)

// Writes are BLOCKING, each write is waiting for the next sync

// Write two rows and trigger sync
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
n, err := w.WriteRecord(tc.TestRecord("foo1"))
require.NoError(t, err)
assert.Equal(t, 5, n)
tc.Logger.Infof(ctx, "TEST: write unblocked")
}()
go func() {
defer wg.Done()
n, err := w.WriteRecord(tc.TestRecord("foo1"))
require.NoError(t, err)
assert.Equal(t, 5, n)
tc.Logger.Infof(ctx, "TEST: write unblocked")
}()
tc.ExpectWritesCount(t, 2)
// When trigger sync is executed, it creates new notifier, old notifier is close and unblocks 2 writes above.
// New notifier is handovered into encoder, so the next write is blocked on new notifier until new trigger sync is executed
tc.TriggerSync(t)
wg.Wait()

// Let the next write fail
tc.Output.WriteError = errors.New("some error")

// Write one row and trigger sync
wg.Add(1)
go func() {
defer wg.Done()
n, err := w.WriteRecord(tc.TestRecord("foo2"))
require.NoError(t, err)
assert.Equal(t, 5, n)
tc.Logger.Infof(ctx, "TEST: write unblocked")
}()
tc.ExpectWritesCount(t, 1)
wg.Add(1)
go func() {
defer wg.Done()
tc.TriggerSync(t)
}()

// Wait for error to be logged
assert.EventuallyWithT(t, func(c *assert.CollectT) {
tc.Logger.AssertJSONMessages(c, `
{"level":"warn","message":"chunks write failed: some error, waiting %s, chunks count = %s"}
`)
}, 5*time.Second, 100*time.Millisecond)

// Disable error and trigger retry
tc.Output.WriteError = nil
tc.Clock.Advance(1*time.Second)

Check failure on line 572 in internal/pkg/service/stream/storage/level/local/encoding/pipeline_test.go

View workflow job for this annotation

GitHub Actions / Lint / lint

File is not properly formatted (gci)

// Wait for goroutines
wg.Wait()

// Close writer - it triggers the last sync
assert.NoError(t, w.Close(ctx))

Check failure on line 578 in internal/pkg/service/stream/storage/level/local/encoding/pipeline_test.go

View workflow job for this annotation

GitHub Actions / Lint / lint

require-error: for error assertions use require (testifylint)

// Check file content
assert.Equal(t, strings.TrimSpace(`
foo1
foo1
foo2
`), strings.TrimSpace(tc.Output.String()))

// Check logs
tc.AssertLogs(`
{"level":"debug","message":"opening encoding pipeline"}
{"level":"debug","message":"sync is enabled, mode=disk, sync each {count=500 or uncompressed=10MB or compressed=1MB or interval=50ms}, check each 1ms"}
{"level":"debug","message":"opened encoding pipeline"}
{"level":"debug","message":"starting sync to disk"}
{"level":"debug","message":"flushing writers"}
{"level":"debug","message":"chunk completed, aligned = true, size = \"10B\""}
{"level":"debug","message":"writers flushed"}
{"level":"debug","message":"sync to disk done"}
{"level":"info","message":"TEST: write unblocked"}
{"level":"info","message":"TEST: write unblocked"}
{"level":"debug","message":"notifier obtained"}
{"level":"debug","message":"starting sync to disk"}
{"level":"debug","message":"flushing writers"}
{"level":"debug","message":"chunk completed, aligned = true, size = \"5B\""}
{"level":"debug","message":"writers flushed"}
{"level":"warn","message":"chunks write failed: some error, waiting %s, chunks count = 1"}
{"level":"debug","message":"sync to disk done"}
{"level":"info","message":"TEST: write unblocked"}
{"level":"debug","message":"closing encoding pipeline"}
{"level":"debug","message":"stopping syncer"}
{"level":"debug","message":"starting sync to disk"}
{"level":"debug","message":"flushing writers"}
{"level":"debug","message":"writers flushed"}
{"level":"debug","message":"sync to disk done"}
{"level":"debug","message":"syncer stopped"}
{"level":"debug","message":"closing chain"}
{"level":"debug","message":"chain closed"}
{"level":"debug","message":"closed encoding pipeline"}
`)
}

// encodingTestCase is a helper to open encoding pipeline in tests.
type encodingTestCase struct {
*writerSyncHelper
Expand Down Expand Up @@ -667,6 +779,7 @@ func (w *dummyEncoder) Close() error {

type dummyOutput struct {
bytes bytes.Buffer
WriteError error
SyncError error
CloseError error
}
Expand All @@ -684,6 +797,9 @@ func (o *dummyOutput) IsReady() bool {
}

func (o *dummyOutput) Write(ctx context.Context, aligned bool, p []byte) (n int, err error) {
if o.WriteError != nil {
return 0, o.WriteError
}
return o.bytes.Write(p)
}

Expand Down

0 comments on commit 93f5bb1

Please sign in to comment.