Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zstd and bzip2 compression support #204

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ module github.com/linkedin/goavro/v2

go 1.12

require github.com/golang/snappy v0.0.1
require (
github.com/dsnet/compress v0.0.1
github.com/golang/snappy v0.0.1
github.com/klauspost/compress v1.10.3
)
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
github.com/dsnet/compress v0.0.1 h1:PlZu0n3Tuv04TzpfPbrnI0HW/YwodEXDS+oPKahKF0Q=
github.com/dsnet/compress v0.0.1/go.mod h1:Aw8dCMJ7RioblQeTqt88akK31OvO8Dhf5JflhBbQEHo=
github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdfkVLjJ8T6VcRQv3SXugXy999NBtR9aFY=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/klauspost/compress v1.4.1 h1:8VMb5+0wMgdBykOV96DwNwKFQ+WTI4pzYURP99CcB9E=
github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/ulikunitz/xz v0.5.6/go.mod h1:2bypXElzHzzJZwzH67Y6wb67pO62Rzfn7BSiF4ABRW8=
22 changes: 22 additions & 0 deletions ocf.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ const (
// CompressionSnappyLabel is used when OCF blocks are compressed using the
// snappy algorithm.
CompressionSnappyLabel = "snappy"

// CompressionBzip2Label is used when OCF blocks are compressed using the
// bzip2 algorithm.
CompressionBzip2Label = "bzip2"

// CompressionZstdLabel is used when OCF blocks are compressed using the
// ZStandart algorithm.
CompressionZstdLabel = "zstandard"
)

// compressionID are values used to specify compression algorithm used to compress
Expand All @@ -38,6 +46,8 @@ const (
compressionNull compressionID = iota
compressionDeflate
compressionSnappy
compressionBzip2
compressionZstd
)

const (
Expand Down Expand Up @@ -81,6 +91,10 @@ func newOCFHeader(config OCFConfig) (*ocfHeader, error) {
header.compressionID = compressionDeflate
case CompressionSnappyLabel:
header.compressionID = compressionSnappy
case CompressionBzip2Label:
header.compressionID = compressionBzip2
case CompressionZstdLabel:
header.compressionID = compressionZstd
default:
return nil, fmt.Errorf("cannot create OCF header using unrecognized compression algorithm: %q", config.CompressionName)
}
Expand Down Expand Up @@ -153,6 +167,10 @@ func readOCFHeader(ior io.Reader) (*ocfHeader, error) {
cID = compressionDeflate
case CompressionSnappyLabel:
cID = compressionSnappy
case CompressionBzip2Label:
cID = compressionBzip2
case CompressionZstdLabel:
cID = compressionZstd
default:
return nil, fmt.Errorf("cannot read OCF header using unrecognized compression algorithm from avro.codec: %q", avroCodec)
}
Expand Down Expand Up @@ -197,6 +215,10 @@ func writeOCFHeader(header *ocfHeader, iow io.Writer) (err error) {
avroCodec = CompressionDeflateLabel
case compressionSnappy:
avroCodec = CompressionSnappyLabel
case compressionBzip2:
avroCodec = CompressionBzip2Label
case compressionZstd:
avroCodec = CompressionZstdLabel
default:
return fmt.Errorf("should not get here: cannot write OCF header using unrecognized compression algorithm: %d", header.compressionID)
}
Expand Down
35 changes: 35 additions & 0 deletions ocf_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"io"
"io/ioutil"

"github.com/dsnet/compress/bzip2"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
)

// OCFReader structure is used to read Object Container Files (OCF).
Expand Down Expand Up @@ -80,6 +82,10 @@ func (ocfr *OCFReader) CompressionName() string {
return CompressionDeflateLabel
case compressionSnappy:
return CompressionSnappyLabel
case compressionBzip2:
return CompressionBzip2Label
case compressionZstd:
return CompressionZstdLabel
default:
return "should not get here: unrecognized compression algorithm"
}
Expand Down Expand Up @@ -220,6 +226,35 @@ func (ocfr *OCFReader) Scan() bool {
}
ocfr.block = decoded

case compressionBzip2:
rc, err := bzip2.NewReader(bytes.NewBuffer(ocfr.block), nil)
if err != nil {
ocfr.rerr = fmt.Errorf("bzip2 error: %s", err)
return false
}
ocfr.block, ocfr.rerr = ioutil.ReadAll(rc)
if ocfr.rerr != nil {
_ = rc.Close()
return false
}
if ocfr.rerr = rc.Close(); ocfr.rerr != nil {
return false
}

case compressionZstd:
rc, err := zstd.NewReader(bytes.NewBuffer(ocfr.block))
if err != nil {
ocfr.rerr = fmt.Errorf("zstd error: %s", err)
return false
}
ocfr.block, ocfr.rerr = ioutil.ReadAll(rc)
if cerr := rc.Close(); ocfr.rerr == nil {
ocfr.rerr = cerr
}
if ocfr.rerr != nil {
return false
}

default:
ocfr.rerr = fmt.Errorf("should not get here: cannot compress block using unrecognized compression: %d", ocfr.header.compressionID)
return false
Expand Down
4 changes: 4 additions & 0 deletions ocf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func TestOCFWriterCompressionSnappy(t *testing.T) {
testOCFRoundTrip(t, CompressionSnappyLabel)
}

func TestOCFWriterCompressionBzip2(t *testing.T) {
testOCFRoundTrip(t, CompressionBzip2Label)
}

func TestOCFWriterWithApplicationMetaData(t *testing.T) {
testOCFRoundTripWithHeaders(t, CompressionNullLabel, map[string][]byte{"foo": []byte("BOING"), "goo": []byte("zoo")})
}
24 changes: 24 additions & 0 deletions ocf_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"io/ioutil"
"os"

"github.com/dsnet/compress/bzip2"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
)

// OCFConfig is used to specify creation parameters for OCFWriter.
Expand Down Expand Up @@ -212,6 +214,28 @@ func (ocfw *OCFWriter) appendDataIntoBlock(data []interface{}) error {

block = compressed

case compressionBzip2:
bb := bytes.NewBuffer(make([]byte, 0, len(block)))
cw, _ := bzip2.NewWriter(bb, &bzip2.WriterConfig{Level: 9})
if _, err := cw.Write(block); err != nil {
return err
}
if err := cw.Close(); err != nil {
return err
}
block = bb.Bytes()

case compressionZstd:
bb := bytes.NewBuffer(make([]byte, 0, len(block)))
cw, _ := zstd.NewWriter(bb)
if _, err := cw.Write(block); err != nil {
return err
}
if err := cw.Close(); err != nil {
return err
}
block = bb.Bytes()

default:
return fmt.Errorf("should not get here: cannot compress block using unrecognized compression: %d", ocfw.header.compressionID)

Expand Down