Skip to content

Commit 1017c4e

Browse files
opt(txn commits): Optimize txns by passing Skiplists to Badger (#7777)
In this PR, we use concurrent mutations system, introduced in #7694 , to generate Skiplists directly from a transaction, even before it is committed. When a transaction does get committed, we replace the key timestamps with the commit timestamp, in-place within the Skiplist. And in case of multiple transactions being committed, merge them together into one bigger Skiplist during the serial commit. This Skiplist is then handed over to Badger, bypassing its value log and WAL system. Instead, when Badger persists the skiplist into L0 tables, Dgraph gets a callback, which it uses to decide when it is safe to take a Raft snapshot / checkpoint. So, we no longer need Badger's WAL. Furthermore, live loader is also optimized to better deal with the conflicting transactions, and retrying them as frequently as possible, instead of only towards the end. The updated output from live loader shows we're able to deal with stragglers much better, with them causing almost no delays with this PR. There's also a bug fix in this PR. The Raft checkpointing code was not getting run as often as it should. This PR fixes that. ``` Master: [18:48:34-0700] Elapsed: 08m45s Txns: 21222 N-Quads: 21,221,870 N-Quads/s: 9,033 Aborts: 0 Number of TXs run : 21240 Number of N-Quads processed : 21239870 Time spent : 8m49.039284385s N-Quads processed per second : 40150 This PR: Number of TXs run : 21240 Number of N-Quads processed : 21239870 Time spent : 6m56.641399434s N-Quads processed per second : 51057 ``` Notable Changes: * Have each mutation create its own skiplist. Merge them during commit. * Use skiplist Builder * optimize mutations as well * Optimize live loader to better deal with conflicted requests. * Make a callback delete txns from Oracle. * Bug fix: Calculate raft checkpoint frequently. * Clarify snapshot rules. * Incremental rollups use skiplists too. * Stop rollups during drop operations. Co-authored-by: Ahsan Barkati <ahsanbarkati@gmail.com>
1 parent 60bec16 commit 1017c4e

File tree

22 files changed

+591
-241
lines changed

22 files changed

+591
-241
lines changed

dgraph/cmd/alpha/http_test.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -322,12 +322,16 @@ func runRequest(req *http.Request) (*x.QueryResWithData, []byte, *http.Response,
322322
func runWithRetriesForResp(method, contentType, url string, body string) (
323323
*x.QueryResWithData, []byte, *http.Response, error) {
324324

325+
label:
325326
req, err := createRequest(method, contentType, url, body)
326327
if err != nil {
327328
return nil, nil, nil, err
328329
}
329-
330330
qr, respBody, resp, err := runRequest(req)
331+
if err != nil && strings.Contains(err.Error(), "Please retry operation") {
332+
time.Sleep(time.Second)
333+
goto label
334+
}
331335
if err != nil && strings.Contains(err.Error(), "Token is expired") {
332336
err = token.refreshToken()
333337
if err != nil {
@@ -619,7 +623,13 @@ func TestAlterSanity(t *testing.T) {
619623
`{"drop_all":true}`}
620624

621625
for _, op := range ops {
626+
label:
622627
qr, _, err := runWithRetries("PUT", "", addr+"/alter", op)
628+
if err != nil && strings.Contains(err.Error(), "Please retry") {
629+
t.Logf("Got error: %v. Retrying...", err)
630+
time.Sleep(time.Second)
631+
goto label
632+
}
623633
require.NoError(t, err)
624634
require.Len(t, qr.Errors, 0)
625635
}
@@ -886,13 +896,20 @@ func TestDrainingMode(t *testing.T) {
886896
require.NoError(t, err, "Got error while running mutation: %v", err)
887897
}
888898

889-
err = alterSchema(`name: string @index(term) .`)
890-
if expectErr {
891-
require.True(t, err != nil && strings.Contains(err.Error(), "the server is in draining mode"))
892-
} else {
893-
require.NoError(t, err, "Got error while running alter: %v", err)
894-
}
895-
899+
err = x.RetryUntilSuccess(3, time.Second, func() error {
900+
err := alterSchema(`name: string @index(term) .`)
901+
if expectErr {
902+
if err == nil {
903+
return errors.New("expected error")
904+
}
905+
if err != nil && strings.Contains(err.Error(), "server is in draining mode") {
906+
return nil
907+
}
908+
return err
909+
}
910+
return err
911+
})
912+
require.NoError(t, err, "Got error while running alter: %v", err)
896913
}
897914

898915
token := testutil.GrootHttpLogin(addr + "/admin")

dgraph/cmd/alpha/reindex_test.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"testing"
2222
"time"
2323

24+
"github.com/dgraph-io/dgraph/x"
2425
"github.com/stretchr/testify/require"
2526
)
2627

@@ -39,7 +40,10 @@ func TestReindexTerm(t *testing.T) {
3940
require.NoError(t, err)
4041

4142
// perform re-indexing
42-
require.NoError(t, alterSchema(`name: string @index(term) .`))
43+
err = x.RetryUntilSuccess(3, time.Second, func() error {
44+
return alterSchema(`name: string @index(term) .`)
45+
})
46+
require.NoError(t, err)
4347

4448
q1 := `{
4549
q(func: anyofterms(name, "bc")) {
@@ -67,8 +71,11 @@ func TestReindexLang(t *testing.T) {
6771
_, err := mutationWithTs(mutationInp{body: m1, typ: "application/rdf", commitNow: true})
6872
require.NoError(t, err)
6973

70-
// reindex
71-
require.NoError(t, alterSchema(`name: string @lang @index(exact) .`))
74+
// perform re-indexing
75+
err = x.RetryUntilSuccess(3, time.Second, func() error {
76+
return alterSchema(`name: string @lang @index(exact) .`)
77+
})
78+
require.NoError(t, err)
7279

7380
q1 := `{
7481
q(func: eq(name@en, "Runtime")) {
@@ -141,8 +148,11 @@ func TestReindexReverseCount(t *testing.T) {
141148
_, err := mutationWithTs(mutationInp{body: m1, typ: "application/rdf", commitNow: true})
142149
require.NoError(t, err)
143150

144-
// reindex
145-
require.NoError(t, alterSchema(`value: [uid] @count @reverse .`))
151+
// perform re-indexing
152+
err = x.RetryUntilSuccess(3, time.Second, func() error {
153+
return alterSchema(`value: [uid] @count @reverse .`)
154+
})
155+
require.NoError(t, err)
146156

147157
q1 := `{
148158
q(func: eq(count(~value), "3")) {

dgraph/cmd/live/batch.go

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,12 @@ type loader struct {
7474
retryRequestsWg sync.WaitGroup
7575

7676
// Miscellaneous information to print counters.
77-
// Num of N-Quads sent
78-
nquads uint64
79-
// Num of txns sent
80-
txns uint64
81-
// Num of aborts
82-
aborts uint64
83-
// To get time elapsed
84-
start time.Time
77+
nquads uint64 // Num of N-Quads sent
78+
txns uint64 // Num of txns sent
79+
aborts uint64 // Num of aborts
80+
start time.Time // To get time elapsed
81+
inflight int32 // Number of inflight requests.
82+
conc int32 // Number of request makers.
8583

8684
conflicts map[uint64]struct{}
8785
uidsLock sync.RWMutex
@@ -165,13 +163,15 @@ func (l *loader) infinitelyRetry(req *request) {
165163
}
166164

167165
func (l *loader) mutate(req *request) error {
166+
atomic.AddInt32(&l.inflight, 1)
168167
txn := l.dc.NewTxn()
169168
req.CommitNow = true
170169
request := &api.Request{
171170
CommitNow: true,
172171
Mutations: []*api.Mutation{req.Mutation},
173172
}
174173
_, err := txn.Do(l.opts.Ctx, request)
174+
atomic.AddInt32(&l.inflight, -1)
175175
return err
176176
}
177177

@@ -383,39 +383,69 @@ func (l *loader) deregister(req *request) {
383383
// makeRequests can receive requests from batchNquads or directly from BatchSetWithMark.
384384
// It doesn't need to batch the requests anymore. Batching is already done for it by the
385385
// caller functions.
386-
func (l *loader) makeRequests() {
386+
func (l *loader) makeRequests(id int) {
387387
defer l.requestsWg.Done()
388+
atomic.AddInt32(&l.conc, 1)
389+
defer atomic.AddInt32(&l.conc, -1)
388390

389391
buffer := make([]*request, 0, l.opts.bufferSize)
390-
drain := func(maxSize int) {
391-
for len(buffer) > maxSize {
392-
i := 0
393-
for _, req := range buffer {
394-
// If there is no conflict in req, we will use it
395-
// and then it would shift all the other reqs in buffer
396-
if !l.addConflictKeys(req) {
397-
buffer[i] = req
398-
i++
399-
continue
400-
}
401-
// Req will no longer be part of a buffer
402-
l.request(req)
392+
var loops int
393+
drain := func() {
394+
i := 0
395+
for _, req := range buffer {
396+
loops++
397+
// If there is no conflict in req, we will use it
398+
// and then it would shift all the other reqs in buffer
399+
if !l.addConflictKeys(req) {
400+
buffer[i] = req
401+
i++
402+
continue
403403
}
404-
buffer = buffer[:i]
404+
// Req will no longer be part of a buffer
405+
l.request(req)
405406
}
407+
buffer = buffer[:i]
406408
}
407409

408-
for req := range l.reqs {
409-
req.conflicts = l.conflictKeysForReq(req)
410-
if l.addConflictKeys(req) {
411-
l.request(req)
412-
} else {
413-
buffer = append(buffer, req)
410+
t := time.NewTicker(5 * time.Second)
411+
defer t.Stop()
412+
413+
outer:
414+
for {
415+
select {
416+
case req, ok := <-l.reqs:
417+
if !ok {
418+
break outer
419+
}
420+
req.conflicts = l.conflictKeysForReq(req)
421+
if l.addConflictKeys(req) {
422+
l.request(req)
423+
} else {
424+
buffer = append(buffer, req)
425+
}
426+
427+
case <-t.C:
428+
for {
429+
drain()
430+
if len(buffer) < l.opts.bufferSize {
431+
break
432+
}
433+
time.Sleep(100 * time.Millisecond)
434+
}
414435
}
415-
drain(l.opts.bufferSize - 1)
416436
}
417437

418-
drain(0)
438+
for len(buffer) > 0 {
439+
select {
440+
case <-t.C:
441+
fmt.Printf("[%2d] Draining. len(buffer): %d\n", id, len(buffer))
442+
default:
443+
}
444+
445+
drain()
446+
time.Sleep(100 * time.Millisecond)
447+
}
448+
fmt.Printf("[%2d] Looped %d times over buffered requests.\n", id, loops)
419449
}
420450

421451
func (l *loader) printCounters() {
@@ -429,9 +459,11 @@ func (l *loader) printCounters() {
429459
r.Capture(c.Nquads)
430460
elapsed := time.Since(start).Round(time.Second)
431461
timestamp := time.Now().Format("15:04:05Z0700")
432-
fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %s N-Quads/s: %s Aborts: %d\n",
462+
fmt.Printf("[%s] Elapsed: %s Txns: %d N-Quads: %s N-Quads/s: %s"+
463+
" Inflight: %2d/%2d Aborts: %d\n",
433464
timestamp, x.FixedDuration(elapsed), c.TxnsDone,
434-
humanize.Comma(int64(c.Nquads)), humanize.Comma(int64(r.Rate())), c.Aborts)
465+
humanize.Comma(int64(c.Nquads)), humanize.Comma(int64(r.Rate())),
466+
atomic.LoadInt32(&l.inflight), atomic.LoadInt32(&l.conc), c.Aborts)
435467
}
436468
}
437469

dgraph/cmd/live/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ func setup(opts batchMutationOptions, dc *dgo.Dgraph, conf *viper.Viper) *loader
648648

649649
l.requestsWg.Add(opts.Pending)
650650
for i := 0; i < opts.Pending; i++ {
651-
go l.makeRequests()
651+
go l.makeRequests(i)
652652
}
653653

654654
rand.Seed(time.Now().Unix())

dgraph/main.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,13 @@ func main() {
3434
// benchmark notes are located in badger-bench/randread.
3535
runtime.GOMAXPROCS(128)
3636

37-
absDiff := func(a, b uint64) uint64 {
37+
absU := func(a, b uint64) uint64 {
38+
if a > b {
39+
return a - b
40+
}
41+
return b - a
42+
}
43+
abs := func(a, b int) int {
3844
if a > b {
3945
return a - b
4046
}
@@ -53,11 +59,12 @@ func main() {
5359

5460
var js z.MemStats
5561
var lastAlloc uint64
62+
var numGo int
5663

5764
for range ticker.C {
5865
// Read Jemalloc stats first. Print if there's a big difference.
5966
z.ReadMemStats(&js)
60-
if diff := absDiff(uint64(z.NumAllocBytes()), lastAlloc); diff > 1<<30 {
67+
if diff := absU(uint64(z.NumAllocBytes()), lastAlloc); diff > 1<<30 {
6168
glog.V(2).Infof("NumAllocBytes: %s jemalloc: Active %s Allocated: %s"+
6269
" Resident: %s Retained: %s\n",
6370
humanize.IBytes(uint64(z.NumAllocBytes())),
@@ -69,7 +76,13 @@ func main() {
6976
}
7077

7178
runtime.ReadMemStats(&ms)
72-
diff := absDiff(ms.HeapAlloc, lastMs.HeapAlloc)
79+
diff := absU(ms.HeapAlloc, lastMs.HeapAlloc)
80+
81+
curGo := runtime.NumGoroutine()
82+
if diff := abs(curGo, numGo); diff >= 64 {
83+
glog.V(2).Infof("Num goroutines: %d\n", curGo)
84+
numGo = curGo
85+
}
7386

7487
switch {
7588
case ms.NumGC > lastNumGC:

edgraph/server.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -538,9 +538,16 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
538538
// TODO: Maybe add some checks about the schema.
539539
m.Schema = result.Preds
540540
m.Types = result.Types
541-
_, err = query.ApplyMutations(ctx, m)
541+
for i := 0; i < 3; i++ {
542+
_, err = query.ApplyMutations(ctx, m)
543+
if err != nil && strings.Contains(err.Error(), "Please retry operation") {
544+
time.Sleep(time.Second)
545+
continue
546+
}
547+
break
548+
}
542549
if err != nil {
543-
return empty, err
550+
return empty, errors.Wrapf(err, "During ApplyMutations")
544551
}
545552

546553
// wait for indexing to complete or context to be canceled.

ee/acl/acl_curl_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestCurlAuthorization(t *testing.T) {
4040
// test query through curl
4141
token, err := testutil.HttpLogin(&testutil.LoginParams{
4242
Endpoint: adminEndpoint,
43-
UserID: userid,
43+
UserID: commonUserId,
4444
Passwd: userpassword,
4545
Namespace: x.GalaxyNamespace,
4646
})

0 commit comments

Comments
 (0)