From fa5f6a91a96b91a5c236544ac138d4b4ce0b8cbf Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 16 Mar 2021 14:24:04 -0700 Subject: [PATCH] opt(bulk): Use the faster snappy instead of gzip compression. --- dgraph/cmd/bulk/mapper.go | 10 +++------- dgraph/cmd/bulk/reduce.go | 7 +++---- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index d0f1a9bae07..c731b2e903a 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -17,9 +17,7 @@ package bulk import ( - "bufio" "bytes" - "compress/gzip" "encoding/binary" "fmt" "log" @@ -42,6 +40,7 @@ import ( "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/ristretto/z" farm "github.com/dgryski/go-farm" + "github.com/golang/snappy" ) type mapper struct { @@ -167,12 +166,9 @@ func (m *mapper) writeMapEntriesToFile(cbuf *z.Buffer, shardIdx int) { x.Check(f.Close()) }() - gzWriter := gzip.NewWriter(f) - w := bufio.NewWriterSize(gzWriter, 4<<20) + w := snappy.NewBufferedWriter(f) defer func() { - x.Check(w.Flush()) - x.Check(gzWriter.Flush()) - x.Check(gzWriter.Close()) + x.Check(w.Close()) }() // Create partition keys for the map file. diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 8e34220edf2..a28d3d83635 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -19,7 +19,6 @@ package bulk import ( "bufio" "bytes" - "compress/gzip" "context" "encoding/binary" "fmt" @@ -45,6 +44,7 @@ import ( "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/ristretto/z" "github.com/dustin/go-humanize" + "github.com/golang/snappy" ) type reducer struct { @@ -221,11 +221,10 @@ func (mi *mapIterator) Close() error { func newMapIterator(filename string) (*pb.MapHeader, *mapIterator) { fd, err := os.Open(filename) x.Check(err) - gzReader, err := gzip.NewReader(fd) - x.Check(err) + r := snappy.NewReader(fd) // Read the header size. - reader := bufio.NewReaderSize(gzReader, 16<<10) + reader := bufio.NewReaderSize(r, 16<<10) headerLenBuf := make([]byte, 4) x.Check2(io.ReadFull(reader, headerLenBuf)) headerLen := binary.BigEndian.Uint32(headerLenBuf)