Skip to content

Commit

Permalink
POC byte based batching
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Jan 8, 2025
1 parent 306c939 commit d156151
Show file tree
Hide file tree
Showing 62 changed files with 524 additions and 6 deletions.
15 changes: 15 additions & 0 deletions exporter/exporterbatcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type MinSizeConfig struct {
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
MinSizeItems int `mapstructure:"min_size_items"`
MinSizeBytes int `mapstructure:"min_size_bytes"`
}

// MaxSizeConfig defines the configuration for the maximum number of items in a batch.
Expand All @@ -41,18 +42,32 @@ type MaxSizeConfig struct {
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
// Setting this value to zero disables the maximum size limit.
MaxSizeItems int `mapstructure:"max_size_items"`
MaxSizeBytes int `mapstructure:"max_size_bytes"`
}

func (c Config) Validate() error {
if c.MinSizeBytes != 0 && c.MinSizeItems != 0 || c.MinSizeBytes != 0 && c.MaxSizeItems != 0 || c.MinSizeItems != 0 && c.MaxSizeBytes != 0 {
return errors.New("size limit and bytes limit cannot be specified at the same time")
}

if c.MinSizeItems < 0 {
return errors.New("min_size_items must be greater than or equal to zero")
}
if c.MinSizeBytes < 0 {
return errors.New("min_size_bytes must be greater than or equal to zero")
}
if c.MaxSizeItems < 0 {
return errors.New("max_size_items must be greater than or equal to zero")
}
if c.MaxSizeBytes < 0 {
return errors.New("max_size_bytes must be greater than or equal to zero")
}
if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems {
return errors.New("max_size_items must be greater than or equal to min_size_items")
}
if c.MaxSizeBytes != 0 && c.MaxSizeBytes < c.MinSizeBytes {
return errors.New("max_size_bytes must be greater than or equal to min_size_bytes")
}
if c.FlushTimeout <= 0 {
return errors.New("timeout must be greater than zero")
}
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (r *fakeRequest) ItemsCount() int {
return r.items
}

func (r *fakeRequest) ByteSize() int {
return r.items
}

func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) {
if r.mergeErr != nil {
return nil, r.mergeErr
Expand Down
8 changes: 8 additions & 0 deletions exporter/exporterhelper/internal/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,10 @@ func (mer *mockErrorRequest) ItemsCount() int {
return 7
}

func (mer *mockErrorRequest) ByteSize() int {
return 7
}

func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
return nil, nil
}
Expand Down Expand Up @@ -464,6 +468,10 @@ func (m *mockRequest) ItemsCount() int {
return m.cnt
}

func (m *mockRequest) ByteSize() int {
return m.cnt
}

func (m *mockRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
return nil, nil
}
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (req *logsRequest) ItemsCount() int {
return req.ld.LogRecordCount()
}

func (req *logsRequest) ByteSize() int {
return req.ld.ByteSize()
}

type logsExporter struct {
*internal.BaseExporter
consumer.Logs
Expand Down
122 changes: 120 additions & 2 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,129 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
}
}

if cfg.MaxSizeItems == 0 {
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
if cfg.MaxSizeItems == 0 && cfg.MaxSizeBytes == 0 {
if req2 != nil {
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
}
return []Request{req}, nil
}
if cfg.MaxSizeBytes > 0 {
return req.mergeSplitBasedOnByteSize(cfg, req2)
}
return req.mergeSplitBasedOnItemCount(cfg, req2)
}

func (req *logsRequest) mergeSplitBasedOnByteSize(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) {
var (
res []Request
destReq *logsRequest
capacityLeft = cfg.MaxSizeBytes
)
for _, srcReq := range []*logsRequest{req, req2} {
if srcReq == nil {
continue
}

ByteSize := srcReq.ld.ByteSize()
if ByteSize <= capacityLeft {
if destReq == nil {
destReq = srcReq
} else {
srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
}
capacityLeft -= ByteSize
continue
}

for {
extractedLogs, capacityReached := extractLogsBasedOnByteSize(srcReq.ld, capacityLeft)
if extractedLogs.LogRecordCount() == 0 {
break
}
if destReq == nil {
destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher}
} else {
extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
}
// Create new batch once capacity is reached.
if capacityReached {
res = append(res, destReq)
destReq = nil
capacityLeft = cfg.MaxSizeBytes
} else {
capacityLeft = cfg.MaxSizeBytes - destReq.ByteSize()
}
}
}

if destReq != nil {
res = append(res, destReq)
}
return res, nil
}

// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.
func extractLogsBasedOnByteSize(srcLogs plog.Logs, capacity int) (plog.Logs, bool) {
capacityReached := false
destLogs := plog.NewLogs()
srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool {
if capacityReached {
return false
}
needToExtract := srcRL.Size() > capacity-destLogs.ByteSize()
if needToExtract {
srcRL, capacityReached = extractResourceLogsBasedOnByteSize(srcRL, capacity-destLogs.ByteSize())
if srcRL.ScopeLogs().Len() == 0 {
return false
}
}
srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty())
return !needToExtract
})
return destLogs, capacityReached
}

// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records.
func extractResourceLogsBasedOnByteSize(srcRL plog.ResourceLogs, capacity int) (plog.ResourceLogs, bool) {
capacityReached := false
destRL := plog.NewResourceLogs()
destRL.SetSchemaUrl(srcRL.SchemaUrl())
srcRL.Resource().CopyTo(destRL.Resource())
srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool {
if capacityReached {
return false
}
needToExtract := srcSL.Size() > capacity-destRL.Size()
if needToExtract {
srcSL, capacityReached = extractScopeLogsBasedOnByteSize(srcSL, capacity-destRL.Size())
if srcSL.LogRecords().Len() == 0 {
return false
}
}
srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty())
return !needToExtract
})
return destRL, capacityReached
}

// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records.
func extractScopeLogsBasedOnByteSize(srcSL plog.ScopeLogs, capacity int) (plog.ScopeLogs, bool) {
capacityReached := false
destSL := plog.NewScopeLogs()
destSL.SetSchemaUrl(srcSL.SchemaUrl())
srcSL.Scope().CopyTo(destSL.Scope())
srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool {
if capacityReached || srcLR.Size()+destSL.Size() > capacity {
capacityReached = true
return false
}
srcLR.MoveTo(destSL.LogRecords().AppendEmpty())
return true
})
return destSL, capacityReached
}

func (req *logsRequest) mergeSplitBasedOnItemCount(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) {
var (
res []Request
destReq *logsRequest
Expand Down
141 changes: 140 additions & 1 deletion exporter/exporterhelper/logs_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestMergeLogsInvalidInput(t *testing.T) {
require.Error(t, err)
}

func TestMergeSplitLogs(t *testing.T) {
func TestMergeSplitLogsBasedOnItemCount(t *testing.T) {
tests := []struct {
name string
cfg exporterbatcher.MaxSizeConfig
Expand Down Expand Up @@ -152,3 +152,142 @@ func TestExtractLogs(t *testing.T) {
assert.Equal(t, 10-i, ld.LogRecordCount())
}
}

func TestMergeSplitLogsBasedOnByteSize(t *testing.T) {
// Magic number is the byte size testdata.GenerateLogs(10)
tests := []struct {
name string
cfg exporterbatcher.MaxSizeConfig
lr1 internal.Request
lr2 internal.Request
expected []*logsRequest
}{
{
name: "both_requests_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10).ByteSize()},
lr1: &logsRequest{ld: plog.NewLogs()},
lr2: &logsRequest{ld: plog.NewLogs()},
expected: []*logsRequest{{ld: plog.NewLogs()}},
},
{
name: "first_request_empty",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10).ByteSize()},
lr1: &logsRequest{ld: plog.NewLogs()},
lr2: &logsRequest{ld: testdata.GenerateLogs(5)},
expected: []*logsRequest{{ld: testdata.GenerateLogs(5)}},
},
{
name: "first_empty_second_nil",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(10).ByteSize()},
lr1: &logsRequest{ld: plog.NewLogs()},
lr2: nil,
expected: []*logsRequest{{ld: plog.NewLogs()}},
},
{
name: "merge_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(11).ByteSize()},
lr1: &logsRequest{ld: testdata.GenerateLogs(4)},
lr2: &logsRequest{ld: testdata.GenerateLogs(6)},
expected: []*logsRequest{{ld: func() plog.Logs {
logs := testdata.GenerateLogs(4)
testdata.GenerateLogs(6).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
return logs
}()}},
},
{
name: "split_only",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(4).ByteSize()},
lr1: &logsRequest{ld: plog.NewLogs()},
lr2: &logsRequest{ld: testdata.GenerateLogs(10)},
expected: []*logsRequest{
{ld: testdata.GenerateLogs(4)},
{ld: testdata.GenerateLogs(4)},
{ld: testdata.GenerateLogs(2)},
},
},
{
name: "merge_and_split",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: (testdata.GenerateLogs(10).ByteSize() + testdata.GenerateLogs(11).ByteSize()) / 2},
lr1: &logsRequest{ld: testdata.GenerateLogs(8)},
lr2: &logsRequest{ld: testdata.GenerateLogs(20)},
expected: []*logsRequest{
{ld: func() plog.Logs {
logs := testdata.GenerateLogs(8)
testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
return logs
}()},
{ld: testdata.GenerateLogs(10)},
{ld: testdata.GenerateLogs(8)},
},
},
{
name: "scope_logs_split",
cfg: exporterbatcher.MaxSizeConfig{MaxSizeBytes: testdata.GenerateLogs(4).ByteSize()},
lr1: &logsRequest{ld: func() plog.Logs {
ld := testdata.GenerateLogs(4)
ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("extra log")
return ld
}()},
lr2: &logsRequest{ld: testdata.GenerateLogs(2)},
expected: []*logsRequest{
{ld: testdata.GenerateLogs(4)},
{ld: func() plog.Logs {
ld := testdata.GenerateLogs(0)
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty().Body().SetStr("extra log")
testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(ld.ResourceLogs())
return ld
}()},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res, err := tt.lr1.MergeSplit(context.Background(), tt.cfg, tt.lr2)
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i, r := range res {
assert.Equal(t, tt.expected[i], r.(*logsRequest))
}
})
}
}

func BenchmarkSplittingBasedOnItemCountManyLogs(b *testing.B) {
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}
for i := 0; i < b.N; i++ {
lr1 := &logsRequest{ld: testdata.GenerateLogs(9)}
for j := 0; j < 1000; j++ {
lr2 := &logsRequest{ld: testdata.GenerateLogs(9)}
lr1.MergeSplit(context.Background(), cfg, lr2)
}
}
}

func BenchmarkSplittingBasedOnByteSizeManyLogs(b *testing.B) {
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 1010}
for i := 0; i < b.N; i++ {
lr1 := &logsRequest{ld: testdata.GenerateLogs(9)}
for j := 0; j < 1000; j++ {
lr2 := &logsRequest{ld: testdata.GenerateLogs(9)}
lr1.MergeSplit(context.Background(), cfg, lr2)
}
}
}

func BenchmarkSplittingBasedOnItemCountHugeLog(b *testing.B) {
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}
for i := 0; i < b.N; i++ {
lr1 := &logsRequest{ld: testdata.GenerateLogs(1)}
lr2 := &logsRequest{ld: testdata.GenerateLogs(1000)}
lr1.MergeSplit(context.Background(), cfg, lr2)
}
}

func BenchmarkSplittingBasedOnByteSizeHugeLog(b *testing.B) {
cfg := exporterbatcher.MaxSizeConfig{MaxSizeBytes: 1010}
for i := 0; i < b.N; i++ {
lr1 := &logsRequest{ld: testdata.GenerateLogs(1)}
lr2 := &logsRequest{ld: testdata.GenerateLogs(1000)}
lr1.MergeSplit(context.Background(), cfg, lr2)
}
}
4 changes: 4 additions & 0 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (req *metricsRequest) ItemsCount() int {
return req.md.DataPointCount()
}

func (req *metricsRequest) ByteSize() int {
return req.md.ByteSize()
}

type metricsExporter struct {
*internal.BaseExporter
consumer.Metrics
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (req *tracesRequest) ItemsCount() int {
return req.td.SpanCount()
}

func (req *tracesRequest) ByteSize() int {
return req.td.ByteSize()
}

type tracesExporter struct {
*internal.BaseExporter
consumer.Traces
Expand Down
Loading

0 comments on commit d156151

Please sign in to comment.