Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Replace identical emittest.Call with emit.Token #37765

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/collector/featuregate"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
Expand Down Expand Up @@ -1396,15 +1397,15 @@ func TestDelayedDisambiguation(t *testing.T) {
filetest.WriteString(t, file2, newContent2+"\n")
operator.poll(context.Background())

var sameTokenOtherFile emittest.Call
var sameTokenOtherFile emit.Token
if attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()) {
sameTokenOtherFile = emittest.Call{Token: []byte(sameContent), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}}
sameTokenOtherFile = emit.Token{Body: []byte(sameContent), Attributes: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}}
} else {
sameTokenOtherFile = emittest.Call{Token: []byte(sameContent), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}}
sameTokenOtherFile = emit.Token{Body: []byte(sameContent), Attributes: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}}
}
newFromFile1 := emittest.Call{Token: []byte(newContent1), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}}
newFromFile2 := emittest.Call{Token: []byte(newContent2), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}}
sink.ExpectCalls(t, &sameTokenOtherFile, &newFromFile1, &newFromFile2)
newFromFile1 := emit.Token{Body: []byte(newContent1), Attributes: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}}
newFromFile2 := emit.Token{Body: []byte(newContent2), Attributes: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}}
sink.ExpectCalls(t, sameTokenOtherFile, newFromFile1, newFromFile2)
}

func TestNoLostPartial(t *testing.T) {
Expand Down
41 changes: 17 additions & 24 deletions pkg/stanza/fileconsumer/internal/emittest/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@ type sinkCfg struct {

type SinkOpt func(*sinkCfg)

type Call struct {
Token []byte
Attrs map[string]any
}

type Sink struct {
emitChan chan *Call
emitChan chan emit.Token
timeout time.Duration
emit.Callback
}
Expand All @@ -53,17 +48,15 @@ func NewSink(opts ...SinkOpt) *Sink {
for _, opt := range opts {
opt(cfg)
}
emitChan := make(chan *Call, cfg.emitChanLen)
emitChan := make(chan emit.Token, cfg.emitChanLen)
return &Sink{
emitChan: emitChan,
timeout: cfg.timeout,
Callback: func(ctx context.Context, token emit.Token) error {
copied := make([]byte, len(token.Body))
copy(copied, token.Body)
select {
case <-ctx.Done():
return ctx.Err()
case emitChan <- &Call{copied, token.Attributes}:
case emitChan <- token:
}
return nil
},
Expand All @@ -79,8 +72,8 @@ func (s *Sink) NextTokens(t *testing.T, n int) [][]byte {
emitChan := make([][]byte, 0, n)
for i := 0; i < n; i++ {
select {
case call := <-s.emitChan:
emitChan = append(emitChan, call.Token)
case token := <-s.emitChan:
emitChan = append(emitChan, token.Body)
case <-time.After(s.timeout):
assert.Fail(t, "Timed out waiting for message")
return nil
Expand All @@ -91,8 +84,8 @@ func (s *Sink) NextTokens(t *testing.T, n int) [][]byte {

func (s *Sink) NextCall(t *testing.T) ([]byte, map[string]any) {
select {
case c := <-s.emitChan:
return c.Token, c.Attrs
case token := <-s.emitChan:
return token.Body, token.Attributes
case <-time.After(s.timeout):
assert.Fail(t, "Timed out waiting for message")
return nil, nil
Expand All @@ -101,8 +94,8 @@ func (s *Sink) NextCall(t *testing.T) ([]byte, map[string]any) {

func (s *Sink) ExpectToken(t *testing.T, expected []byte) {
select {
case call := <-s.emitChan:
assert.Equal(t, expected, call.Token)
case token := <-s.emitChan:
assert.Equal(t, expected, token.Body)
case <-time.After(s.timeout):
assert.Fail(t, fmt.Sprintf("Timed out waiting for token: %s", expected))
}
Expand All @@ -112,8 +105,8 @@ func (s *Sink) ExpectTokens(t *testing.T, expected ...[]byte) {
actual := make([][]byte, 0, len(expected))
for i := 0; i < len(expected); i++ {
select {
case call := <-s.emitChan:
actual = append(actual, call.Token)
case token := <-s.emitChan:
actual = append(actual, token.Body)
case <-time.After(s.timeout):
assert.Fail(t, fmt.Sprintf("timeout: expected: %d, actual: %d", len(expected), i))
return
Expand All @@ -124,16 +117,16 @@ func (s *Sink) ExpectTokens(t *testing.T, expected ...[]byte) {

func (s *Sink) ExpectCall(t *testing.T, expected []byte, attrs map[string]any) {
select {
case c := <-s.emitChan:
assert.Equal(t, expected, c.Token)
assert.Equal(t, attrs, c.Attrs)
case token := <-s.emitChan:
assert.Equal(t, expected, token.Body)
assert.Equal(t, attrs, token.Attributes)
case <-time.After(s.timeout):
assert.Fail(t, fmt.Sprintf("Timed out waiting for token: %s", expected))
}
}

func (s *Sink) ExpectCalls(t *testing.T, expected ...*Call) {
actual := make([]*Call, 0, len(expected))
func (s *Sink) ExpectCalls(t *testing.T, expected ...emit.Token) {
actual := make([]emit.Token, 0, len(expected))
for i := 0; i < len(expected); i++ {
select {
case call := <-s.emitChan:
Expand All @@ -153,7 +146,7 @@ func (s *Sink) ExpectNoCalls(t *testing.T) {
func (s *Sink) ExpectNoCallsUntil(t *testing.T, d time.Duration) {
select {
case c := <-s.emitChan:
assert.Fail(t, "Received unexpected message", "Message: %s", c.Token)
assert.Fail(t, "Received unexpected message", "Message: %s", c.Body)
case <-time.After(d):
}
}
52 changes: 26 additions & 26 deletions pkg/stanza/fileconsumer/internal/emittest/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ func TestNextToken(t *testing.T) {
s, testCalls := sinkTest(t)
for _, c := range testCalls {
token := s.NextToken(t)
assert.Equal(t, c.Token, token)
assert.Equal(t, c.Body, token)
}
}

func TestNextTokenTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for _, c := range testCalls {
token := s.NextToken(t)
assert.Equal(t, c.Token, token)
assert.Equal(t, c.Body, token)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -40,17 +40,17 @@ func TestNextTokens(t *testing.T) {
s, testCalls := sinkTest(t)
for i := 0; i < 5; i++ {
tokens := s.NextTokens(t, 2)
assert.Equal(t, testCalls[2*i].Token, tokens[0])
assert.Equal(t, testCalls[2*i+1].Token, tokens[1])
assert.Equal(t, testCalls[2*i].Body, tokens[0])
assert.Equal(t, testCalls[2*i+1].Body, tokens[1])
}
}

func TestNextTokensTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for i := 0; i < 5; i++ {
tokens := s.NextTokens(t, 2)
assert.Equal(t, testCalls[2*i].Token, tokens[0])
assert.Equal(t, testCalls[2*i+1].Token, tokens[1])
assert.Equal(t, testCalls[2*i].Body, tokens[0])
assert.Equal(t, testCalls[2*i+1].Body, tokens[1])
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -63,17 +63,17 @@ func TestNextCall(t *testing.T) {
s, testCalls := sinkTest(t)
for _, c := range testCalls {
token, attributes := s.NextCall(t)
require.Equal(t, c.Token, token)
require.Equal(t, c.Attrs, attributes)
require.Equal(t, c.Body, token)
require.Equal(t, c.Attributes, attributes)
}
}

func TestNextCallTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for _, c := range testCalls {
token, attributes := s.NextCall(t)
require.Equal(t, c.Token, token)
require.Equal(t, c.Attrs, attributes)
require.Equal(t, c.Body, token)
require.Equal(t, c.Attributes, attributes)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -85,14 +85,14 @@ func TestNextCallTimeout(t *testing.T) {
func TestExpectToken(t *testing.T) {
s, testCalls := sinkTest(t)
for _, c := range testCalls {
s.ExpectToken(t, c.Token)
s.ExpectToken(t, c.Body)
}
}

func TestExpectTokenTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for _, c := range testCalls {
s.ExpectToken(t, c.Token)
s.ExpectToken(t, c.Body)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -104,14 +104,14 @@ func TestExpectTokenTimeout(t *testing.T) {
func TestExpectTokens(t *testing.T) {
s, testCalls := sinkTest(t)
for i := 0; i < 5; i++ {
s.ExpectTokens(t, testCalls[2*i].Token, testCalls[2*i+1].Token)
s.ExpectTokens(t, testCalls[2*i].Body, testCalls[2*i+1].Body)
}
}

func TestExpectTokensTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for i := 0; i < 5; i++ {
s.ExpectTokens(t, testCalls[2*i].Token, testCalls[2*i+1].Token)
s.ExpectTokens(t, testCalls[2*i].Body, testCalls[2*i+1].Body)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -123,14 +123,14 @@ func TestExpectTokensTimeout(t *testing.T) {
func TestExpectCall(t *testing.T) {
s, testCalls := sinkTest(t)
for _, c := range testCalls {
s.ExpectCall(t, c.Token, c.Attrs)
s.ExpectCall(t, c.Body, c.Attributes)
}
}

func TestExpectCallTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
for _, c := range testCalls {
s.ExpectCall(t, c.Token, c.Attrs)
s.ExpectCall(t, c.Body, c.Attributes)
}

// Create a new T so we can expect it to fail without failing the overall test.
Expand All @@ -141,7 +141,7 @@ func TestExpectCallTimeout(t *testing.T) {

func TestExpectCalls(t *testing.T) {
s, testCalls := sinkTest(t)
testCallsOutOfOrder := make([]*Call, 0, 10)
testCallsOutOfOrder := make([]emit.Token, 0, 10)
for i := 0; i < len(testCalls); i += 2 {
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
}
Expand All @@ -153,7 +153,7 @@ func TestExpectCalls(t *testing.T) {

func TestExpectCallsTimeout(t *testing.T) {
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
testCallsOutOfOrder := make([]*Call, 0, 10)
testCallsOutOfOrder := make([]emit.Token, 0, 10)
for i := 0; i < len(testCalls); i += 2 {
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
}
Expand All @@ -164,7 +164,7 @@ func TestExpectCallsTimeout(t *testing.T) {

// Create a new T so we can expect it to fail without failing the overall test.
tt := new(testing.T)
s.ExpectCalls(tt, new(Call))
s.ExpectCalls(tt, emit.Token{})
assert.True(t, tt.Failed())
}

Expand All @@ -187,24 +187,24 @@ func TestExpectNoCallsFailure(t *testing.T) {
func TestWithCallBuffer(t *testing.T) {
s, testCalls := sinkTest(t, WithCallBuffer(5))
for i := 0; i < 10; i++ {
s.ExpectCall(t, testCalls[i].Token, testCalls[i].Attrs)
s.ExpectCall(t, testCalls[i].Body, testCalls[i].Attributes)
}
}

func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) {
func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []emit.Token) {
s := NewSink(opts...)
testCalls := make([]*Call, 0, 10)
testCalls := make([]emit.Token, 0, 10)
for i := 0; i < 10; i++ {
testCalls = append(testCalls, &Call{
Token: []byte(fmt.Sprintf("token-%d", i)),
Attrs: map[string]any{
testCalls = append(testCalls, emit.Token{
Body: []byte(fmt.Sprintf("token-%d", i)),
Attributes: map[string]any{
"key": fmt.Sprintf("value-%d", i),
},
})
}
go func() {
for _, c := range testCalls {
assert.NoError(t, s.Callback(context.Background(), emit.NewToken(c.Token, c.Attrs)))
assert.NoError(t, s.Callback(context.Background(), emit.NewToken(c.Body, c.Attributes)))
}
}()
return s, testCalls
Expand Down
Loading