Skip to content

Commit

Permalink
Add partition key based iterator to the bulk loader (#4841)
Browse files Browse the repository at this point in the history
  • Loading branch information
poonai authored Feb 25, 2020
1 parent 317e02e commit f7d0371
Show file tree
Hide file tree
Showing 9 changed files with 871 additions and 486 deletions.
78 changes: 78 additions & 0 deletions dgraph/cmd/bulk/key.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,10 @@ func (ld *loader) mapStage() {
func (ld *loader) reduceStage() {
ld.prog.setPhase(reducePhase)

r := reducer{state: ld.state}
r := reducer{
state: ld.state,
streamIds: make(map[string]uint32),
}
x.Check(r.run())
}

Expand Down
25 changes: 25 additions & 0 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
farm "github.com/dgryski/go-farm"
)

const partitionKeyShard = 10

type mapper struct {
*state
shards []shardState // shard is based on predicate
Expand Down Expand Up @@ -120,6 +122,29 @@ func (m *mapper) writeMapEntriesToFile(entries []*pb.MapEntry, encodedSize uint6
x.Check(gzWriter.Close())
}()

// Create partition keys for the map file.
header := &pb.MapHeader{
PartitionKeys: [][]byte{},
}
shardPartitionNo := len(entries) / partitionKeyShard
for i := range entries {
if shardPartitionNo == 0 {
// we have very few entries so no need for partition keys.
break
}
if (i+1)%shardPartitionNo == 0 {
header.PartitionKeys = append(header.PartitionKeys, entries[i].GetKey())
}
}
// Write the header to the map file.
headerBuf, err := header.Marshal()
x.Check(err)
lenBuf := make([]byte, 4)
binary.BigEndian.PutUint32(lenBuf, uint32(len(headerBuf)))
x.Check2(w.Write(lenBuf))
x.Check2(w.Write(headerBuf))
x.Check(err)

sizeBuf := make([]byte, binary.MaxVarintLen64)
for _, me := range entries {
n := binary.PutUvarint(sizeBuf, uint64(me.Size()))
Expand Down
4 changes: 3 additions & 1 deletion dgraph/cmd/bulk/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type progress struct {
mapEdgeCount int64
reduceEdgeCount int64
reduceKeyCount int64
numEncoding int32

start time.Time
startReduce time.Time
Expand Down Expand Up @@ -107,14 +108,15 @@ func (p *progress) reportOnce() {
pct = fmt.Sprintf("%.2f%% ", 100*float64(reduceEdgeCount)/float64(mapEdgeCount))
}
fmt.Printf("[%s] REDUCE %s %sedge_count:%s edge_speed:%s/sec "+
"plist_count:%s plist_speed:%s/sec\n",
"plist_count:%s plist_speed:%s/sec. Num Encoding: %d\n",
timestamp,
x.FixedDuration(now.Sub(p.start)),
pct,
niceFloat(float64(reduceEdgeCount)),
niceFloat(float64(reduceEdgeCount)/elapsed.Seconds()),
niceFloat(float64(reduceKeyCount)),
niceFloat(float64(reduceKeyCount)/elapsed.Seconds()),
atomic.LoadInt32(&p.numEncoding),
)
default:
x.AssertTruef(false, "invalid phase")
Expand Down
Loading

0 comments on commit f7d0371

Please sign in to comment.