Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New option to reuse buffers when appending records #223

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,62 @@ func BenchmarkNativeFromTextualUsingV2(b *testing.B) {
_ = nativeFromTextUsingV2(b, codec, textData)
}
}

func benchWriteSimilarSizeRecords(b *testing.B, reuseBuffers, deflate bool) {
const schema = `{
"namespace": "my.namespace.com",
"type": "record",
"name": "indentity",
"fields": [
{ "name": "Name", "type": "string"},
{ "name": "Picture", "type": "bytes"}
]
}`

record := map[string]interface{}{
"Name": "MyName",
"Picture": make([]byte, 2048),
}

var records []map[string]interface{}
for i := 0; i < 1000; i++ {
records = append(records, record)
}

var compressionName string
if deflate {
compressionName = CompressionDeflateLabel
} else {
compressionName = CompressionNullLabel
}

w, _ := NewOCFWriter(OCFConfig{
W: ioutil.Discard,
Schema: schema,
ReuseBuffers: reuseBuffers,
CompressionName: compressionName,
})

b.ResetTimer()

for n := 0; n < b.N; n++ {
for j := 0; j < 10; j++ {
w.Append(records)
}
}
}

func BenchmarkWriteSimilarSizeRecords(b *testing.B) {
b.Run("ReuseBuffers=false Deflate=false", func(b *testing.B) {
benchWriteSimilarSizeRecords(b, false, false)
})
b.Run("ReuseBuffers=true Deflate=false", func(b *testing.B) {
benchWriteSimilarSizeRecords(b, true, false)
})
b.Run("ReuseBuffers=false Deflate=true", func(b *testing.B) {
benchWriteSimilarSizeRecords(b, false, true)
})
b.Run("ReuseBuffers=true Deflate=true", func(b *testing.B) {
benchWriteSimilarSizeRecords(b, true, true)
})
}
54 changes: 43 additions & 11 deletions ocf_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,29 @@ type OCFConfig struct {
//the OCF file. When appending to an existing OCF, this field
//is ignored
MetaData map[string][]byte

// Advanced option that allows to reuse internal buffers when
// appending records. May save memory allocations and may improve
// performances. Do benchmarks before activate this option.
ReuseBuffers bool
}

// OCFWriter is used to create a new or append to an existing Avro Object
// Container File (OCF).
type OCFWriter struct {
header *ocfHeader
iow io.Writer
header *ocfHeader
iow io.Writer
reuseBuffers bool
expandedBlock []byte
deflateBlock []byte
}

// NewOCFWriter returns a new OCFWriter instance that may be used for appending
// binary Avro data, either by appending to an existing OCF file or creating a
// new OCF file.
func NewOCFWriter(config OCFConfig) (*OCFWriter, error) {
var err error
ocf := &OCFWriter{iow: config.W}
ocf := &OCFWriter{iow: config.W, reuseBuffers: config.ReuseBuffers}

switch config.W.(type) {
case nil:
Expand Down Expand Up @@ -178,20 +186,40 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error {
var block []byte // working buffer for encoding data values
var err error

if ocfw.reuseBuffers {
block = ocfw.expandedBlock
}

// Encode and concatenate each data item into the block
for _, datum := range data {
if block, err = ocfw.header.codec.BinaryFromNative(block, datum); err != nil {
return fmt.Errorf("cannot translate datum to binary: %v; %s", datum, err)
}
}

if ocfw.reuseBuffers {
ocfw.expandedBlock = block[:0]
}

var block2 []byte
switch ocfw.header.compressionID {
case compressionNull:
// no-op
block2 = block

case compressionDeflate:
// compress into new bytes buffer.
bb := bytes.NewBuffer(make([]byte, 0, len(block)))
var deflateBlock []byte
if ocfw.reuseBuffers {
deflateBlock = ocfw.deflateBlock
if cap(deflateBlock) < len(block) {
deflateBlock = make([]byte, 0, len(block))
}
} else {
deflateBlock = make([]byte, 0, len(block))
}

bb := bytes.NewBuffer(deflateBlock)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be in one style ?
var bb = bytes.NewBuffer(deflateBlock)


cw, _ := flate.NewWriter(bb, flate.DefaultCompression)
// writing bytes to cw will compress bytes and send to bb.
Expand All @@ -201,7 +229,11 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error {
if err := cw.Close(); err != nil {
return err
}
block = bb.Bytes()
block2 = bb.Bytes()

if ocfw.reuseBuffers {
ocfw.deflateBlock = deflateBlock[:0]
}

case compressionSnappy:
compressed := snappy.Encode(nil, block)
Expand All @@ -210,19 +242,19 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error {
compressed = append(compressed, 0, 0, 0, 0) // expand slice by 4 bytes so checksum will fit
binary.BigEndian.PutUint32(compressed[len(compressed)-4:], crc32.ChecksumIEEE(block)) // checksum of decompressed block

block = compressed
block2 = compressed

default:
return fmt.Errorf("should not get here: cannot compress block using unrecognized compression: %d", ocfw.header.compressionID)

}

// create file data block
buf := make([]byte, 0, len(block)+ocfBlockConst) // pre-allocate block bytes
buf, _ = longBinaryFromNative(buf, len(data)) // block count (number of data items)
buf, _ = longBinaryFromNative(buf, len(block)) // block size (number of bytes in block)
buf = append(buf, block...) // serialized objects
buf = append(buf, ocfw.header.syncMarker[:]...) // sync marker
buf := make([]byte, 0, len(block2)+ocfBlockConst) // pre-allocate block bytes
buf, _ = longBinaryFromNative(buf, len(data)) // block count (number of data items)
buf, _ = longBinaryFromNative(buf, len(block2)) // block size (number of bytes in block)
buf = append(buf, block2...) // serialized objects
buf = append(buf, ocfw.header.syncMarker[:]...) // sync marker

_, err = ocfw.iow.Write(buf)
return err
Expand Down