-
Notifications
You must be signed in to change notification settings - Fork 221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Zstd and bzip2 compression support #204
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, thank you for your work. The code looks great, with a single correction I would ask to be made.
The one thing I need to check before we merge is potential license conflicts for the libraries being added.
Co-authored-by: Karrick McDermott <[email protected]>
I just took a peek at the LICENSE files for the two added libraries: goavro: Released initially under the Apache License (version 2.0) |
Having a few moments to think about the library dependency issue a bit more, I think I have an idea to eliminate any potential library license concerns, while making it more easy to use other compression libraries in the future without requiring manually adding them. I am going to ask for a tiny bit of patience for this PR, while I add a pluggable compression algorithm feature for processing OCF files. The goal is to make it so a program will call a function to register a compression codec with |
@boiler, Thank you for your patience. I have updated a private branch of code that allows easily adding in new compression methods. It presently looks like this, and I think it's fairly simple to use. However, it has a caveat that concerns me. Adding new compression codecs is an action that mutates global state. This might not be an issue for most, but it could potentially become an issue for users in the future. Because of this caveat, I am looking at a modification to the method of registering new compression codecs for OCF blocks, and while it works, I am not sure I like the API. Just keeping you in the loop. const (
// CompressionSnappyLabel is used when OCF blocks are compressed using the
// snappy algorithm.
CompressionSnappyLabel = "snappy"
)
func init() {
// NOTE: This registration of a compression algorithm serves as an example
// for future compression algorithm additions. However, there is no reason
// to make the compression algorithm name publically available. The various
// compression name labels, including the CompressionSnappyLabel constant
// here, remain publically available for backwards compatibility only. All
// new compression algorithms should be defined without necessarily creating
// a new string constant of their algorithm label.
_ = registerCompression(CompressionSnappyLabel, ocfCompressSnappy, ocfExpandSnappy)
}
// ocfCompressSnappy compresses the expanded byte slice, returning either the
// compressed byte slice, or a non-nil error.
//
// "Each compressed block is followed by the 4-byte, big-endian CRC32 checksum
// of the uncompressed data in the block."
func ocfCompressSnappy(expanded []byte) ([]byte, error) {
compressed := snappy.Encode(nil, expanded)
// OCF requires snappy to have CRC32 checksum after each snappy block
compressed = append(compressed, 0, 0, 0, 0) // expand slice by 4 bytes so checksum will fit
binary.BigEndian.PutUint32(compressed[len(compressed)-4:], crc32.ChecksumIEEE(expanded)) // checksum of expanded block
return compressed, nil
}
// ocfExpandSnappy decompresses the compressed byte slice returning either the
// expanded byte slice, or a non-nil error.
//
// "Each compressed block is followed by the 4-byte, big-endian CRC32 checksum
// of the uncompressed data in the block."
func ocfExpandSnappy(compressed []byte) ([]byte, error) {
index := len(compressed) - 4 // last 4 bytes is crc32 of decoded block
if index <= 0 {
return nil, fmt.Errorf("not enough bytes for CRC32 checksum: %d", len(compressed))
}
expanded, err := snappy.Decode(nil, compressed[:index])
if err != nil {
return nil, err
}
actualCRC := crc32.ChecksumIEEE(expanded)
expectedCRC := binary.BigEndian.Uint32(compressed[index : index+4])
if actualCRC != expectedCRC {
return nil, fmt.Errorf("CRC32 checksum mismatch: %x != %x", actualCRC, expectedCRC)
}
return expanded, nil
} |
I forgot to mention, the above OCF block compression algorithm registration method has another minor downside. It does not allow for more performant buffer handling to minimize memory allocations, and perhaps even re-use of compression structure instances. So the revised API has the ability to allow for that to be optimized in the future. |
Any progress on this? I'd love to get zstd in this upstream tree. (and a way to set zstd.BestCompression) |
tested with avro-tools 1.9.2