diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 84c72044ef47..78c3b6789576 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/featuregate" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" @@ -1396,15 +1397,15 @@ func TestDelayedDisambiguation(t *testing.T) { filetest.WriteString(t, file2, newContent2+"\n") operator.poll(context.Background()) - var sameTokenOtherFile emittest.Call + var sameTokenOtherFile emit.Token if attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()) { - sameTokenOtherFile = emittest.Call{Token: []byte(sameContent), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}} + sameTokenOtherFile = emit.Token{Body: []byte(sameContent), Attributes: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}} } else { - sameTokenOtherFile = emittest.Call{Token: []byte(sameContent), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}} + sameTokenOtherFile = emit.Token{Body: []byte(sameContent), Attributes: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}} } - newFromFile1 := emittest.Call{Token: []byte(newContent1), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}} - newFromFile2 := emittest.Call{Token: []byte(newContent2), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}} - sink.ExpectCalls(t, &sameTokenOtherFile, &newFromFile1, &newFromFile2) + newFromFile1 := emit.Token{Body: []byte(newContent1), Attributes: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}} + newFromFile2 := emit.Token{Body: []byte(newContent2), Attributes: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}} + sink.ExpectCalls(t, sameTokenOtherFile, newFromFile1, newFromFile2) } func TestNoLostPartial(t *testing.T) { diff --git a/pkg/stanza/fileconsumer/internal/emittest/sink.go b/pkg/stanza/fileconsumer/internal/emittest/sink.go index 6c7d9954b3fc..34d0f7f95219 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/sink.go +++ b/pkg/stanza/fileconsumer/internal/emittest/sink.go @@ -22,13 +22,8 @@ type sinkCfg struct { type SinkOpt func(*sinkCfg) -type Call struct { - Token []byte - Attrs map[string]any -} - type Sink struct { - emitChan chan *Call + emitChan chan emit.Token timeout time.Duration emit.Callback } @@ -53,17 +48,15 @@ func NewSink(opts ...SinkOpt) *Sink { for _, opt := range opts { opt(cfg) } - emitChan := make(chan *Call, cfg.emitChanLen) + emitChan := make(chan emit.Token, cfg.emitChanLen) return &Sink{ emitChan: emitChan, timeout: cfg.timeout, Callback: func(ctx context.Context, token emit.Token) error { - copied := make([]byte, len(token.Body)) - copy(copied, token.Body) select { case <-ctx.Done(): return ctx.Err() - case emitChan <- &Call{copied, token.Attributes}: + case emitChan <- token: } return nil }, @@ -79,8 +72,8 @@ func (s *Sink) NextTokens(t *testing.T, n int) [][]byte { emitChan := make([][]byte, 0, n) for i := 0; i < n; i++ { select { - case call := <-s.emitChan: - emitChan = append(emitChan, call.Token) + case token := <-s.emitChan: + emitChan = append(emitChan, token.Body) case <-time.After(s.timeout): assert.Fail(t, "Timed out waiting for message") return nil @@ -91,8 +84,8 @@ func (s *Sink) NextTokens(t *testing.T, n int) [][]byte { func (s *Sink) NextCall(t *testing.T) ([]byte, map[string]any) { select { - case c := <-s.emitChan: - return c.Token, c.Attrs + case token := <-s.emitChan: + return token.Body, token.Attributes case <-time.After(s.timeout): assert.Fail(t, "Timed out waiting for message") return nil, nil @@ -101,8 +94,8 @@ func (s *Sink) NextCall(t *testing.T) ([]byte, map[string]any) { func (s *Sink) ExpectToken(t *testing.T, expected []byte) { select { - case call := <-s.emitChan: - assert.Equal(t, expected, call.Token) + case token := <-s.emitChan: + assert.Equal(t, expected, token.Body) case <-time.After(s.timeout): assert.Fail(t, fmt.Sprintf("Timed out waiting for token: %s", expected)) } @@ -112,8 +105,8 @@ func (s *Sink) ExpectTokens(t *testing.T, expected ...[]byte) { actual := make([][]byte, 0, len(expected)) for i := 0; i < len(expected); i++ { select { - case call := <-s.emitChan: - actual = append(actual, call.Token) + case token := <-s.emitChan: + actual = append(actual, token.Body) case <-time.After(s.timeout): assert.Fail(t, fmt.Sprintf("timeout: expected: %d, actual: %d", len(expected), i)) return @@ -124,16 +117,16 @@ func (s *Sink) ExpectTokens(t *testing.T, expected ...[]byte) { func (s *Sink) ExpectCall(t *testing.T, expected []byte, attrs map[string]any) { select { - case c := <-s.emitChan: - assert.Equal(t, expected, c.Token) - assert.Equal(t, attrs, c.Attrs) + case token := <-s.emitChan: + assert.Equal(t, expected, token.Body) + assert.Equal(t, attrs, token.Attributes) case <-time.After(s.timeout): assert.Fail(t, fmt.Sprintf("Timed out waiting for token: %s", expected)) } } -func (s *Sink) ExpectCalls(t *testing.T, expected ...*Call) { - actual := make([]*Call, 0, len(expected)) +func (s *Sink) ExpectCalls(t *testing.T, expected ...emit.Token) { + actual := make([]emit.Token, 0, len(expected)) for i := 0; i < len(expected); i++ { select { case call := <-s.emitChan: @@ -153,7 +146,7 @@ func (s *Sink) ExpectNoCalls(t *testing.T) { func (s *Sink) ExpectNoCallsUntil(t *testing.T, d time.Duration) { select { case c := <-s.emitChan: - assert.Fail(t, "Received unexpected message", "Message: %s", c.Token) + assert.Fail(t, "Received unexpected message", "Message: %s", c.Body) case <-time.After(d): } } diff --git a/pkg/stanza/fileconsumer/internal/emittest/sink_test.go b/pkg/stanza/fileconsumer/internal/emittest/sink_test.go index 7d340b4a79fd..a246b16b5820 100644 --- a/pkg/stanza/fileconsumer/internal/emittest/sink_test.go +++ b/pkg/stanza/fileconsumer/internal/emittest/sink_test.go @@ -19,7 +19,7 @@ func TestNextToken(t *testing.T) { s, testCalls := sinkTest(t) for _, c := range testCalls { token := s.NextToken(t) - assert.Equal(t, c.Token, token) + assert.Equal(t, c.Body, token) } } @@ -27,7 +27,7 @@ func TestNextTokenTimeout(t *testing.T) { s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) for _, c := range testCalls { token := s.NextToken(t) - assert.Equal(t, c.Token, token) + assert.Equal(t, c.Body, token) } // Create a new T so we can expect it to fail without failing the overall test. @@ -40,8 +40,8 @@ func TestNextTokens(t *testing.T) { s, testCalls := sinkTest(t) for i := 0; i < 5; i++ { tokens := s.NextTokens(t, 2) - assert.Equal(t, testCalls[2*i].Token, tokens[0]) - assert.Equal(t, testCalls[2*i+1].Token, tokens[1]) + assert.Equal(t, testCalls[2*i].Body, tokens[0]) + assert.Equal(t, testCalls[2*i+1].Body, tokens[1]) } } @@ -49,8 +49,8 @@ func TestNextTokensTimeout(t *testing.T) { s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) for i := 0; i < 5; i++ { tokens := s.NextTokens(t, 2) - assert.Equal(t, testCalls[2*i].Token, tokens[0]) - assert.Equal(t, testCalls[2*i+1].Token, tokens[1]) + assert.Equal(t, testCalls[2*i].Body, tokens[0]) + assert.Equal(t, testCalls[2*i+1].Body, tokens[1]) } // Create a new T so we can expect it to fail without failing the overall test. @@ -63,8 +63,8 @@ func TestNextCall(t *testing.T) { s, testCalls := sinkTest(t) for _, c := range testCalls { token, attributes := s.NextCall(t) - require.Equal(t, c.Token, token) - require.Equal(t, c.Attrs, attributes) + require.Equal(t, c.Body, token) + require.Equal(t, c.Attributes, attributes) } } @@ -72,8 +72,8 @@ func TestNextCallTimeout(t *testing.T) { s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) for _, c := range testCalls { token, attributes := s.NextCall(t) - require.Equal(t, c.Token, token) - require.Equal(t, c.Attrs, attributes) + require.Equal(t, c.Body, token) + require.Equal(t, c.Attributes, attributes) } // Create a new T so we can expect it to fail without failing the overall test. @@ -85,14 +85,14 @@ func TestNextCallTimeout(t *testing.T) { func TestExpectToken(t *testing.T) { s, testCalls := sinkTest(t) for _, c := range testCalls { - s.ExpectToken(t, c.Token) + s.ExpectToken(t, c.Body) } } func TestExpectTokenTimeout(t *testing.T) { s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) for _, c := range testCalls { - s.ExpectToken(t, c.Token) + s.ExpectToken(t, c.Body) } // Create a new T so we can expect it to fail without failing the overall test. @@ -104,14 +104,14 @@ func TestExpectTokenTimeout(t *testing.T) { func TestExpectTokens(t *testing.T) { s, testCalls := sinkTest(t) for i := 0; i < 5; i++ { - s.ExpectTokens(t, testCalls[2*i].Token, testCalls[2*i+1].Token) + s.ExpectTokens(t, testCalls[2*i].Body, testCalls[2*i+1].Body) } } func TestExpectTokensTimeout(t *testing.T) { s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) for i := 0; i < 5; i++ { - s.ExpectTokens(t, testCalls[2*i].Token, testCalls[2*i+1].Token) + s.ExpectTokens(t, testCalls[2*i].Body, testCalls[2*i+1].Body) } // Create a new T so we can expect it to fail without failing the overall test. @@ -123,14 +123,14 @@ func TestExpectTokensTimeout(t *testing.T) { func TestExpectCall(t *testing.T) { s, testCalls := sinkTest(t) for _, c := range testCalls { - s.ExpectCall(t, c.Token, c.Attrs) + s.ExpectCall(t, c.Body, c.Attributes) } } func TestExpectCallTimeout(t *testing.T) { s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) for _, c := range testCalls { - s.ExpectCall(t, c.Token, c.Attrs) + s.ExpectCall(t, c.Body, c.Attributes) } // Create a new T so we can expect it to fail without failing the overall test. @@ -141,7 +141,7 @@ func TestExpectCallTimeout(t *testing.T) { func TestExpectCalls(t *testing.T) { s, testCalls := sinkTest(t) - testCallsOutOfOrder := make([]*Call, 0, 10) + testCallsOutOfOrder := make([]emit.Token, 0, 10) for i := 0; i < len(testCalls); i += 2 { testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i]) } @@ -153,7 +153,7 @@ func TestExpectCalls(t *testing.T) { func TestExpectCallsTimeout(t *testing.T) { s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond)) - testCallsOutOfOrder := make([]*Call, 0, 10) + testCallsOutOfOrder := make([]emit.Token, 0, 10) for i := 0; i < len(testCalls); i += 2 { testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i]) } @@ -164,7 +164,7 @@ func TestExpectCallsTimeout(t *testing.T) { // Create a new T so we can expect it to fail without failing the overall test. tt := new(testing.T) - s.ExpectCalls(tt, new(Call)) + s.ExpectCalls(tt, emit.Token{}) assert.True(t, tt.Failed()) } @@ -187,24 +187,24 @@ func TestExpectNoCallsFailure(t *testing.T) { func TestWithCallBuffer(t *testing.T) { s, testCalls := sinkTest(t, WithCallBuffer(5)) for i := 0; i < 10; i++ { - s.ExpectCall(t, testCalls[i].Token, testCalls[i].Attrs) + s.ExpectCall(t, testCalls[i].Body, testCalls[i].Attributes) } } -func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) { +func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []emit.Token) { s := NewSink(opts...) - testCalls := make([]*Call, 0, 10) + testCalls := make([]emit.Token, 0, 10) for i := 0; i < 10; i++ { - testCalls = append(testCalls, &Call{ - Token: []byte(fmt.Sprintf("token-%d", i)), - Attrs: map[string]any{ + testCalls = append(testCalls, emit.Token{ + Body: []byte(fmt.Sprintf("token-%d", i)), + Attributes: map[string]any{ "key": fmt.Sprintf("value-%d", i), }, }) } go func() { for _, c := range testCalls { - assert.NoError(t, s.Callback(context.Background(), emit.NewToken(c.Token, c.Attrs))) + assert.NoError(t, s.Callback(context.Background(), emit.NewToken(c.Body, c.Attributes))) } }() return s, testCalls