Skip to content

Commit

Permalink
Skip afterUid after decoder seek (#3149)
Browse files Browse the repository at this point in the history
* codec/codec.go: add whence to Seek

The whence value indicates how to use the uid offset with Seek.
Whence can be SeekStart (0) or SeekCurrent (1). SeekStart will
return the list including the offset. SeekCurrent returns values
after the offset.
  • Loading branch information
srfrog authored Apr 12, 2019
1 parent ebfacb7 commit 501e1a6
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 22 deletions.
4 changes: 2 additions & 2 deletions algo/uidlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func IntersectCompressedWith(pack *pb.UidPack, afterUID uint64, v, o *pb.List) {
return
}
dec := codec.Decoder{Pack: pack}
dec.Seek(afterUID)
dec.Seek(afterUID, codec.SeekStart)
n := dec.ApproxLen()
m := len(v.Uids)

Expand Down Expand Up @@ -118,7 +118,7 @@ func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) {
}

for _, u := range q {
uids := dec.Seek(u)
uids := dec.Seek(u, codec.SeekStart)
if len(uids) == 0 {
return
}
Expand Down
2 changes: 1 addition & 1 deletion codec/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func benchmarkUnpack(trials int, chunks *chunks) int {
start := time.Now()
for _, p := range packed {
dec := codec.Decoder{Pack: p}
for uids := dec.Seek(0); len(uids) > 0; uids = dec.Next() {
for uids := dec.Seek(0, 0); len(uids) > 0; uids = dec.Next() {
}
}
times[i] = int(time.Since(start).Nanoseconds())
Expand Down
51 changes: 43 additions & 8 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import (
"github.com/dgraph-io/dgraph/x"
)

type seekPos int

const (
// SeekStart is used with Seek() to search relative to the Uid, returning it in the results.
SeekStart seekPos = iota
// SeekCurrent to Seek() a Uid using it as offset, not as part of the results.
SeekCurrent
)

type Encoder struct {
BlockSize int
pack *pb.UidPack
Expand Down Expand Up @@ -106,7 +115,15 @@ func (d *Decoder) ApproxLen() int {
return int(d.Pack.BlockSize) * (len(d.Pack.Blocks) - d.blockIdx)
}

func (d *Decoder) Seek(uid uint64) []uint64 {
type searchFunc func(int) bool

// Seek will search for uid in a packed block using the specified whence position.
// The value of whence must be one of the predefined values SeekStart or SeekCurrent.
// SeekStart searches uid and includes it as part of the results.
// SeekCurrent searches uid but only as offset, it won't be included with results.
//
// Returns a slice of all uids whence the position, or an empty slice if none found.
func (d *Decoder) Seek(uid uint64, whence seekPos) []uint64 {
if d.Pack == nil {
return []uint64{}
}
Expand All @@ -116,9 +133,18 @@ func (d *Decoder) Seek(uid uint64) []uint64 {
}

pack := d.Pack
idx := sort.Search(len(pack.Blocks), func(i int) bool {
return pack.Blocks[i].Base >= uid
})
blocksFunc := func() searchFunc {
var f searchFunc
switch whence {
case SeekStart:
f = func(i int) bool { return pack.Blocks[i].Base >= uid }
case SeekCurrent:
f = func(i int) bool { return pack.Blocks[i].Base > uid }
}
return f
}

idx := sort.Search(len(pack.Blocks), blocksFunc())
// The first block.Base >= uid.
if idx == 0 {
return d.unpackBlock()
Expand All @@ -135,10 +161,19 @@ func (d *Decoder) Seek(uid uint64) []uint64 {
d.blockIdx = idx - 1 // Move to the previous block. If blockIdx<0, unpack will deal with it.
d.unpackBlock() // And get all their uids.

uidsFunc := func() searchFunc {
var f searchFunc
switch whence {
case SeekStart:
f = func(i int) bool { return d.uids[i] >= uid }
case SeekCurrent:
f = func(i int) bool { return d.uids[i] > uid }
}
return f
}

// uidx points to the first uid in the uid list, which is >= uid.
uidx := sort.Search(len(d.uids), func(i int) bool {
return d.uids[i] >= uid
})
uidx := sort.Search(len(d.uids), uidsFunc())
if uidx < len(d.uids) { // Found an entry in uids, which >= uid.
d.uids = d.uids[uidx:]
return d.uids
Expand Down Expand Up @@ -241,7 +276,7 @@ func Decode(pack *pb.UidPack, seek uint64) []uint64 {
uids := make([]uint64, 0, ApproxLen(pack))
dec := Decoder{Pack: pack}

for block := dec.Seek(seek); len(block) > 0; block = dec.Next() {
for block := dec.Seek(seek, SeekStart); len(block) > 0; block = dec.Next() {
uids = append(uids, block...)
}
return uids
Expand Down
49 changes: 46 additions & 3 deletions codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"compress/gzip"
"encoding/binary"
"math"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -67,6 +68,48 @@ func TestUidPack(t *testing.T) {
}
}

func TestSeek(t *testing.T) {
N := 10001
enc := Encoder{BlockSize: 10}
for i := 0; i < N; i += 10 {
enc.Add(uint64(i))
}
pack := enc.Done()
dec := Decoder{Pack: pack}

tests := []struct {
in, out uint64
whence seekPos
empty bool
}{
{in: 0, out: 0, whence: SeekStart},
{in: 0, out: 0, whence: SeekCurrent},
{in: 100, out: 100, whence: SeekStart},
{in: 100, out: 110, whence: SeekCurrent},
{in: 1000, out: 1000, whence: SeekStart},
{in: 1000, out: 1010, whence: SeekCurrent},
{in: 1999, out: 2000, whence: SeekStart},
{in: 1999, out: 2000, whence: SeekCurrent},
{in: 1101, out: 1110, whence: SeekStart},
{in: 1101, out: 1110, whence: SeekCurrent},
{in: 10000, out: 10000, whence: SeekStart},
{in: 9999, out: 10000, whence: SeekCurrent},
{in: uint64(N), empty: true, whence: SeekStart},
{in: uint64(N), empty: true, whence: SeekCurrent},
{in: math.MaxUint64, empty: true, whence: SeekStart},
{in: math.MaxUint64, empty: true, whence: SeekCurrent},
}

for _, tc := range tests {
uids := dec.Seek(tc.in, tc.whence)
if tc.empty {
require.Empty(t, uids)
} else {
require.Equal(t, tc.out, uids[0])
}
}
}

func TestDecoder(t *testing.T) {
N := 10001
var expected []uint64
Expand All @@ -79,13 +122,13 @@ func TestDecoder(t *testing.T) {

dec := Decoder{Pack: pack}
for i := 3; i < N; i += 3 {
uids := dec.Seek(uint64(i))
uids := dec.Seek(uint64(i), SeekStart)
require.Equal(t, uint64(i), uids[0])

uids = dec.Seek(uint64(i - 1))
uids = dec.Seek(uint64(i-1), SeekStart)
require.Equal(t, uint64(i), uids[0])

uids = dec.Seek(uint64(i - 2))
uids = dec.Seek(uint64(i-2), SeekStart)
require.Equal(t, uint64(i), uids[0])

start := i/3 - 1
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func history(lookup []byte, itr *badger.Iterator) {
fmt.Fprintf(&buf, " Num uids = %d. Size = %d\n",
codec.ExactLen(plist.Pack), plist.Pack.Size())
dec := codec.Decoder{Pack: plist.Pack}
for uids := dec.Seek(0); len(uids) > 0; uids = dec.Next() {
for uids := dec.Seek(0, codec.SeekStart); len(uids) > 0; uids = dec.Next() {
for _, uid := range uids {
fmt.Fprintf(&buf, " Uid = %d\n", uid)
}
Expand Down
13 changes: 8 additions & 5 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ func (it *PIterator) Init(pl *pb.PostingList, afterUid uint64) {
it.uidPosting = &pb.Posting{}

it.dec = &codec.Decoder{Pack: pl.Pack}
it.uids = it.dec.Seek(afterUid)
// codec.SeekCurrent makes sure we skip returning afterUid during seek.
it.uids = it.dec.Seek(afterUid, codec.SeekCurrent)
it.uidx = 0

it.plen = len(pl.Postings)
Expand Down Expand Up @@ -526,11 +527,13 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e
})
}

var mp, pp *pb.Posting
var pitr PIterator
var (
mp, pp *pb.Posting
pitr PIterator
prevUid uint64
err error
)
pitr.Init(plist, afterUid)
prevUid := uint64(0)
var err error
for err == nil {
if midx < mlen {
mp = mposts[midx]
Expand Down
2 changes: 1 addition & 1 deletion query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ func (args *params) fill(gq *gql.GraphQuery) error {
if err != nil {
return err
}
args.AfterUID = uint64(after)
args.AfterUID = after
}

if args.Alias == "shortest" {
Expand Down
2 changes: 1 addition & 1 deletion worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ func (qs *queryState) helpProcessTask(

opts := posting.ListOptions{
ReadTs: q.ReadTs,
AfterUID: uint64(q.AfterUid),
AfterUID: q.AfterUid,
}
// If we have srcFunc and Uids, it means its a filter. So we intersect.
if srcFn.fnType != NotAFunction && q.UidList != nil && len(q.UidList.Uids) > 0 {
Expand Down

0 comments on commit 501e1a6

Please sign in to comment.