Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC][DEPRECATED] exporter batcher - byte size based batching #12017

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions exporter/exporterbatcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type MinSizeConfig struct {
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
MinSizeItems int `mapstructure:"min_size_items"`
MinSizeBytes int `mapstructure:"min_size_bytes"`
}

// MaxSizeConfig defines the configuration for the maximum number of items in a batch.
Expand All @@ -41,18 +42,32 @@ type MaxSizeConfig struct {
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
// Setting this value to zero disables the maximum size limit.
MaxSizeItems int `mapstructure:"max_size_items"`
MaxSizeBytes int `mapstructure:"max_size_bytes"`
}

func (c Config) Validate() error {
if c.MinSizeBytes != 0 && c.MinSizeItems != 0 || c.MinSizeBytes != 0 && c.MaxSizeItems != 0 || c.MinSizeItems != 0 && c.MaxSizeBytes != 0 {
return errors.New("size limit and bytes limit cannot be specified at the same time")
}

if c.MinSizeItems < 0 {
return errors.New("min_size_items must be greater than or equal to zero")
}
if c.MinSizeBytes < 0 {
return errors.New("min_size_bytes must be greater than or equal to zero")
}
if c.MaxSizeItems < 0 {
return errors.New("max_size_items must be greater than or equal to zero")
}
if c.MaxSizeBytes < 0 {
return errors.New("max_size_bytes must be greater than or equal to zero")
}
if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems {
return errors.New("max_size_items must be greater than or equal to min_size_items")
}
if c.MaxSizeBytes != 0 && c.MaxSizeBytes < c.MinSizeBytes {
return errors.New("max_size_bytes must be greater than or equal to min_size_bytes")
}
if c.FlushTimeout <= 0 {
return errors.New("timeout must be greater than zero")
}
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (r *fakeRequest) ItemsCount() int {
return r.items
}

func (r *fakeRequest) ByteSize() int {
return r.items
}

func (r *fakeRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 internal.Request) ([]internal.Request, error) {
if r.mergeErr != nil {
return nil, r.mergeErr
Expand Down
8 changes: 8 additions & 0 deletions exporter/exporterhelper/internal/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,10 @@ func (mer *mockErrorRequest) ItemsCount() int {
return 7
}

func (mer *mockErrorRequest) ByteSize() int {
return 7
}

func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
return nil, nil
}
Expand Down Expand Up @@ -464,6 +468,10 @@ func (m *mockRequest) ItemsCount() int {
return m.cnt
}

func (m *mockRequest) ByteSize() int {
return m.cnt
}

func (m *mockRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) {
return nil, nil
}
Expand Down
9 changes: 7 additions & 2 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ var (
)

type logsRequest struct {
ld plog.Logs
pusher consumer.ConsumeLogsFunc
ld plog.Logs
pusher consumer.ConsumeLogsFunc
byteSize int
}

func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request {
Expand Down Expand Up @@ -66,6 +67,10 @@ func (req *logsRequest) ItemsCount() int {
return req.ld.LogRecordCount()
}

func (req *logsRequest) ByteSize() int {
return req.ld.ByteSize()
}

type logsExporter struct {
*internal.BaseExporter
consumer.Logs
Expand Down
132 changes: 130 additions & 2 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,139 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
}
}

if cfg.MaxSizeItems == 0 {
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
if cfg.MaxSizeItems == 0 && cfg.MaxSizeBytes == 0 {
if req2 != nil {
req2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs())
}
return []Request{req}, nil
}
if cfg.MaxSizeBytes > 0 {
return req.mergeSplitBasedOnByteSize(cfg, req2)
}
return req.mergeSplitBasedOnItemCount(cfg, req2)
}

func (req *logsRequest) mergeSplitBasedOnByteSize(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) {
var (
res []Request
destReq *logsRequest
capacityLeft = cfg.MaxSizeBytes
)
for _, srcReq := range []*logsRequest{req, req2} {
if srcReq == nil {
continue
}

ByteSize := srcReq.byteSize
if ByteSize == 0 {
ByteSize = srcReq.ld.ByteSize()
}
if ByteSize > capacityLeft && capacityLeft < cfg.MaxSizeBytes {
res = append(res, destReq)
destReq = nil
capacityLeft = cfg.MaxSizeBytes
}

if ByteSize <= capacityLeft {
if destReq == nil {
destReq = srcReq
} else {
srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
destReq.byteSize += ByteSize
}
capacityLeft -= ByteSize
continue
}

for {
extractedLogs, capacityReached := extractLogsBasedOnByteSize(srcReq.ld, capacityLeft)
if extractedLogs.LogRecordCount() == 0 {
break
}
if destReq == nil {
destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher}
} else {
extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
}
// Create new batch once capacity is reached.
if capacityReached {
res = append(res, destReq)
destReq = nil
capacityLeft = cfg.MaxSizeBytes
} else {
capacityLeft = cfg.MaxSizeBytes - destReq.ByteSize()
}
}
}

if destReq != nil {
res = append(res, destReq)
}
return res, nil
}

// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records.
func extractLogsBasedOnByteSize(srcLogs plog.Logs, capacity int) (plog.Logs, bool) {
capacityReached := false
destLogs := plog.NewLogs()
srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool {
if capacityReached {
return false
}
needToExtract := srcRL.Size() > capacity-destLogs.ByteSize()
if needToExtract {
srcRL, capacityReached = extractResourceLogsBasedOnByteSize(srcRL, capacity-destLogs.ByteSize())
if srcRL.ScopeLogs().Len() == 0 {
return false
}
}
srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty())
return !needToExtract
})
return destLogs, capacityReached
}

// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records.
func extractResourceLogsBasedOnByteSize(srcRL plog.ResourceLogs, capacity int) (plog.ResourceLogs, bool) {
capacityReached := false
destRL := plog.NewResourceLogs()
destRL.SetSchemaUrl(srcRL.SchemaUrl())
srcRL.Resource().CopyTo(destRL.Resource())
srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool {
if capacityReached {
return false
}
needToExtract := srcSL.Size() > capacity-destRL.Size()
if needToExtract {
srcSL, capacityReached = extractScopeLogsBasedOnByteSize(srcSL, capacity-destRL.Size())
if srcSL.LogRecords().Len() == 0 {
return false
}
}
srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty())
return !needToExtract
})
return destRL, capacityReached
}

// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records.
func extractScopeLogsBasedOnByteSize(srcSL plog.ScopeLogs, capacity int) (plog.ScopeLogs, bool) {
capacityReached := false
destSL := plog.NewScopeLogs()
destSL.SetSchemaUrl(srcSL.SchemaUrl())
srcSL.Scope().CopyTo(destSL.Scope())
srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool {
if capacityReached || srcLR.Size()+destSL.Size() > capacity {
capacityReached = true
return false
}
srcLR.MoveTo(destSL.LogRecords().AppendEmpty())
return true
})
return destSL, capacityReached
}

func (req *logsRequest) mergeSplitBasedOnItemCount(cfg exporterbatcher.MaxSizeConfig, req2 *logsRequest) ([]Request, error) {
var (
res []Request
destReq *logsRequest
Expand Down
Loading
Loading