Skip to content

Commit

Permalink
opt(bulk): Use the faster snappy instead of gzip compression.
Browse files Browse the repository at this point in the history
  • Loading branch information
manishrjain authored and aman-bansal committed Apr 7, 2021
1 parent eeab3fb commit fa5f6a9
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 11 deletions.
10 changes: 3 additions & 7 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package bulk

import (
"bufio"
"bytes"
"compress/gzip"
"encoding/binary"
"fmt"
"log"
Expand All @@ -42,6 +40,7 @@ import (
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
farm "github.com/dgryski/go-farm"
"github.com/golang/snappy"
)

type mapper struct {
Expand Down Expand Up @@ -167,12 +166,9 @@ func (m *mapper) writeMapEntriesToFile(cbuf *z.Buffer, shardIdx int) {
x.Check(f.Close())
}()

gzWriter := gzip.NewWriter(f)
w := bufio.NewWriterSize(gzWriter, 4<<20)
w := snappy.NewBufferedWriter(f)
defer func() {
x.Check(w.Flush())
x.Check(gzWriter.Flush())
x.Check(gzWriter.Close())
x.Check(w.Close())
}()

// Create partition keys for the map file.
Expand Down
7 changes: 3 additions & 4 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package bulk
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"fmt"
Expand All @@ -45,6 +44,7 @@ import (
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
"github.com/dustin/go-humanize"
"github.com/golang/snappy"
)

type reducer struct {
Expand Down Expand Up @@ -221,11 +221,10 @@ func (mi *mapIterator) Close() error {
func newMapIterator(filename string) (*pb.MapHeader, *mapIterator) {
fd, err := os.Open(filename)
x.Check(err)
gzReader, err := gzip.NewReader(fd)
x.Check(err)
r := snappy.NewReader(fd)

// Read the header size.
reader := bufio.NewReaderSize(gzReader, 16<<10)
reader := bufio.NewReaderSize(r, 16<<10)
headerLenBuf := make([]byte, 4)
x.Check2(io.ReadFull(reader, headerLenBuf))
headerLen := binary.BigEndian.Uint32(headerLenBuf)
Expand Down

0 comments on commit fa5f6a9

Please sign in to comment.