Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AppendBinary to append avro data without getting binary from native #251

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions ocf_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,71 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error {
return err
}


// AppendBinary appends one or more binary data items to an OCF file in a block.
func (ocfw *OCFWriter) AppendBinary(data [][]byte) error {
// Chunk data so no block has more than MaxBlockCount items.
for int64(len(data)) > MaxBlockCount {
if err := ocfw.appendBinaryDataIntoBlock(data[:MaxBlockCount]); err != nil {
return err
}
data = data[MaxBlockCount:]
}
return ocfw.appendBinaryDataIntoBlock(data)
}

func (ocfw *OCFWriter) appendBinaryDataIntoBlock(data [][]byte) error {
var block []byte // working buffer for encoding data values
var err error

// Encode and concatenate each data item into the block
for _, datum := range data {
block = append(block, datum...)
}

switch ocfw.header.compressionID {
case compressionNull:
// no-op

case compressionDeflate:
// compress into new bytes buffer.
bb := bytes.NewBuffer(make([]byte, 0, len(block)))

cw, _ := flate.NewWriter(bb, flate.DefaultCompression)
// writing bytes to cw will compress bytes and send to bb.
if _, err := cw.Write(block); err != nil {
return err
}
if err := cw.Close(); err != nil {
return err
}
block = bb.Bytes()

case compressionSnappy:
compressed := snappy.Encode(nil, block)

// OCF requires snappy to have CRC32 checksum after each snappy block
compressed = append(compressed, 0, 0, 0, 0) // expand slice by 4 bytes so checksum will fit
binary.BigEndian.PutUint32(compressed[len(compressed)-4:], crc32.ChecksumIEEE(block)) // checksum of decompressed block

block = compressed

default:
return fmt.Errorf("should not get here: cannot compress block using unrecognized compression: %d", ocfw.header.compressionID)

}

// create file data block
buf := make([]byte, 0, len(block)+ocfBlockConst) // pre-allocate block bytes
buf, _ = longBinaryFromNative(buf, len(data)) // block count (number of data items)
buf, _ = longBinaryFromNative(buf, len(block)) // block size (number of bytes in block)
buf = append(buf, block...) // serialized objects
buf = append(buf, ocfw.header.syncMarker[:]...) // sync marker

_, err = ocfw.iow.Write(buf)
return err
}

// Codec returns the codec used by OCFWriter. This function provided because
// upstream may be appending to existing OCF which uses a different schema than
// requested during instantiation.
Expand Down