Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup types #4514

Merged
merged 6 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) {
if err != nil {
return false
}

// Backup type keys in every group.
if parsedKey.IsType() {
return true
}

// Only backup schema and data keys for the requested predicates.
_, ok := predMap[parsedKey.Attr]
return ok
}
Expand Down
3 changes: 2 additions & 1 deletion ee/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func RunRestore(pdir, location, backupId string) (uint64, error) {
}

// loadFromBackup reads the backup, converts the keys and values to the required format,
// and loads them to the given badger DB.
// and loads them to the given badger DB. The set of predicates is used to avoid restoring
// values from predicates no longer assigned to this group.
func loadFromBackup(db *badger.DB, r io.Reader, preds predicateSet) error {
br := bufio.NewReaderSize(r, 16<<10)
unmarshalBuf := make([]byte, 1<<10)
Expand Down
21 changes: 18 additions & 3 deletions ee/backup/tests/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,16 @@ func TestBackupFilesystem(t *testing.T) {
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))

// Add initial data.
ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))
require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: `movie: string .`}))

// Add schema and types.
require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: `movie: string .
type Node {
movie
}`}))

// Add initial data.
original, err := dg.NewTxn().Mutate(ctx, &api.Mutation{
CommitNow: true,
SetNquads: []byte(`
Expand Down Expand Up @@ -256,10 +262,19 @@ func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64) m
require.Equal(t, uint32(i+1), groupId)
}

restored, err := testutil.GetPValues("./data/restore/p1", "movie", commitTs)
pdir := "./data/restore/p1"
restored, err := testutil.GetPredicateValues(pdir, "movie", commitTs)
require.NoError(t, err)
t.Logf("--- Restored values: %+v\n", restored)

restoredPreds, err := testutil.GetPredicateNames(pdir, commitTs)
require.NoError(t, err)
require.ElementsMatch(t, []string{"dgraph.type", "movie"}, restoredPreds)

restoredTypes, err := testutil.GetTypeNames(pdir, commitTs)
require.NoError(t, err)
require.ElementsMatch(t, []string{"Node"}, restoredTypes)

return restored
}

Expand Down
24 changes: 20 additions & 4 deletions ee/backup/tests/minio/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,21 @@ func TestBackupMinio(t *testing.T) {
conn, err := grpc.Dial(testutil.SockAddr, grpc.WithInsecure())
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

mc, err = testutil.NewMinioClient()
require.NoError(t, err)
require.NoError(t, mc.MakeBucket(bucketName, ""))

ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

// Add schema and types.
require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: `movie: string .
type Node {
movie
}`}))

// Add initial data.
require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: `movie: string .`}))
original, err := dg.NewTxn().Mutate(ctx, &api.Mutation{
CommitNow: true,
SetNquads: []byte(`
Expand Down Expand Up @@ -264,8 +270,18 @@ func runRestore(t *testing.T, lastDir string, commitTs uint64) map[string]string
require.NoError(t, err)
require.Equal(t, uint32(i+1), groupId)
}
pdir := "./data/restore/p1"
restored, err := testutil.GetPredicateValues(pdir, "movie", commitTs)
require.NoError(t, err)

restoredPreds, err := testutil.GetPredicateNames(pdir, commitTs)
require.NoError(t, err)
require.ElementsMatch(t, []string{"dgraph.type", "movie"}, restoredPreds)

restoredTypes, err := testutil.GetTypeNames(pdir, commitTs)
require.NoError(t, err)
require.ElementsMatch(t, []string{"Node"}, restoredTypes)

restored, err := testutil.GetPValues("./data/restore/p1", "movie", commitTs)
require.NoError(t, err)
t.Logf("--- Restored values: %+v\n", restored)

Expand Down
108 changes: 76 additions & 32 deletions testutil/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,78 +17,122 @@
package testutil

import (
"context"
"fmt"
"math"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
bpb "github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
)

// GetPValues reads the specified p directory and returns the values for the given
// attribute in a map.
func GetPValues(pdir, attr string, readTs uint64) (map[string]string, error) {
func openDgraph(pdir string) (*badger.DB, error) {
opt := badger.DefaultOptions(pdir).WithTableLoadingMode(options.MemoryMap).
WithReadOnly(true)
db, err := badger.OpenManaged(opt)
return badger.OpenManaged(opt)
}

// GetPredicateValues reads the specified p directory and returns the values for the given
// attribute in a map.
func GetPredicateValues(pdir, attr string, readTs uint64) (map[string]string, error) {
db, err := openDgraph(pdir)
if err != nil {
return nil, err
}
defer db.Close()

values := make(map[string]string)

stream := db.NewStreamAt(math.MaxUint64)
stream.ChooseKey = func(item *badger.Item) bool {
txn := db.NewTransactionAt(readTs, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions)
defer itr.Close()

for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
pk, err := x.Parse(item.Key())
x.Check(err)
switch {
case pk.Attr != attr:
return false
case pk.IsSchema():
return false
continue
case !pk.IsData():
continue
}
return pk.IsData()
}
stream.KeyToList = func(key []byte, it *badger.Iterator) (*bpb.KVList, error) {
pk, err := x.Parse(key)
x.Check(err)
pl, err := posting.ReadPostingList(key, it)

pl, err := posting.ReadPostingList(item.Key(), itr)
if err != nil {
return nil, err
}
var list bpb.KVList

err = pl.Iterate(readTs, 0, func(p *pb.Posting) error {
vID := types.TypeID(p.ValType)
src := types.ValueForType(vID)
src.Value = p.Value
str, err := types.Convert(src, types.StringID)
if err != nil {
fmt.Println(err)
return err
}
value := str.Value.(string)
list.Kv = append(list.Kv, &bpb.KV{
Key: []byte(fmt.Sprintf("%#x", pk.Uid)),
Value: []byte(value),
})
values[fmt.Sprintf("%#x", pk.Uid)] = value

return nil
})
return &list, err
}
stream.Send = func(list *bpb.KVList) error {
for _, kv := range list.Kv {
values[string(kv.Key)] = string(kv.Value)

if err != nil {
return nil, err
}
return nil
}
if err := stream.Orchestrate(context.Background()); err != nil {

return values, err
}

type dataType int

const (
schemaPredicate dataType = iota
schemaType
)

func readSchema(pdir string, dType dataType) ([]string, error) {
db, err := openDgraph(pdir)
if err != nil {
return nil, err
}
return values, err
defer db.Close()
values := make([]string, 0)

// Predicates and types in the schema are written with timestamp 1.
txn := db.NewTransactionAt(1, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions)
defer itr.Close()

for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
pk, err := x.Parse(item.Key())
x.Check(err)

switch {
case item.UserMeta() != posting.BitSchemaPosting:
continue
case pk.IsSchema() && dType != schemaPredicate:
continue
case pk.IsType() && dType != schemaType:
continue
}

values = append(values, pk.Attr)
}
return values, nil
}

// GetPredicateNames returns the list of all the predicates stored in the restored pdir.
func GetPredicateNames(pdir string, readTs uint64) ([]string, error) {
return readSchema(pdir, schemaPredicate)
}

// GetTypeNames returns the list of all the types stored in the restored pdir.
func GetTypeNames(pdir string, readTs uint64) ([]string, error) {
return readSchema(pdir, schemaType)
}