Skip to content

Commit

Permalink
BREAKING: fix json marshal unmarshal for namespace > 127 (#7810)
Browse files Browse the repository at this point in the history
We used to store predicate as <namespace>|<attribute>
(pipe | signifies concatenation). We store this as a string.
<namespace> is 8 bytes uint64, which when marshaled to JSON bytes
mess up the predicate. This is because for the namespace greater
than 127, the UTF-8 encoding might take up several bytes
(also if the mapping does not exist, then it replaces it with
some other rune).

This affects three identified places in Dgraph:
  * Live loader
  * Backup and List Backup
  * Http clients and Ratel

Fix is to have a UTF-8 string when dealing with JSON. A better idea
is to use UTF-8 string even for internal operations. Only when we
read/write to badger we convert it into the format of the byte.
New Format: <anmespace>-<attribute> (- is the hyphen literal)
  • Loading branch information
NamanJain8 authored and mangalaman93 committed Jan 2, 2023
1 parent a7a4a5d commit f2f728c
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 134 deletions.
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func TestTransactionBasic(t *testing.T) {
require.Equal(t, 2, len(mr.preds))
var parsedPreds []string
for _, pred := range mr.preds {
p := strings.Split(pred, "-")[1]
p := strings.SplitN(pred, "-", 2)[1]
parsedPreds = append(parsedPreds, x.ParseAttr(p))
}
sort.Strings(parsedPreds)
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type QueryResult struct {

func splitPreds(ps []string) []string {
for i, p := range ps {
ps[i] = x.ParseAttr(strings.Split(p, "-")[1])
ps[i] = x.ParseAttr(strings.SplitN(p, "-", 2)[1])
}

return ps
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,15 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
checkPreds := func() error {
// Check if any of these tablets is being moved. If so, abort the transaction.
for _, pkey := range src.Preds {
splits := strings.Split(pkey, "-")
splits := strings.SplitN(pkey, "-", 2)
if len(splits) < 2 {
return errors.Errorf("Unable to find group id in %s", pkey)
}
gid, err := strconv.Atoi(splits[0])
if err != nil {
return errors.Wrapf(err, "unable to parse group id from %s", pkey)
}
pred := strings.Join(splits[1:], "-")
pred := splits[1]
tablet := s.ServingTablet(pred)
if tablet == nil {
return errors.Errorf("Tablet for %s is nil", pred)
Expand Down
11 changes: 1 addition & 10 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,16 +1061,7 @@ func filterTablets(ctx context.Context, ms *pb.MembershipState) error {
return errors.Errorf("Namespace not found in JWT.")
}
if namespace == x.GalaxyNamespace {
// For galaxy namespace, we don't want to filter out the predicates. We only format the
// namespace to human readable form.
for _, group := range ms.Groups {
tablets := make(map[string]*pb.Tablet)
for tabletName, tablet := range group.Tablets {
tablet.Predicate = x.FormatNsAttr(tablet.Predicate)
tablets[x.FormatNsAttr(tabletName)] = tablet
}
group.Tablets = tablets
}
// For galaxy namespace, we don't want to filter out the predicates.
return nil
}
for _, group := range ms.GetGroups() {
Expand Down
18 changes: 7 additions & 11 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,16 +592,15 @@ func (r *rebuilder) Run(ctx context.Context) error {

glog.V(1).Infof(
"Rebuilding index for predicate %s: Starting process. StartTs=%d. Prefix=\n%s\n",
x.FormatNsAttr(r.attr), r.startTs, hex.Dump(r.prefix))
r.attr, r.startTs, hex.Dump(r.prefix))

// Counter is used here to ensure that all keys are committed at different timestamp.
// We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0.
var counter uint64 = 1

tmpWriter := tmpDB.NewManagedWriteBatch()
stream := pstore.NewStreamAt(r.startTs)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):",
x.FormatNsAttr(r.attr))
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr)
stream.Prefix = r.prefix
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
// We should return quickly if the context is no longer valid.
Expand Down Expand Up @@ -663,21 +662,19 @@ func (r *rebuilder) Run(ctx context.Context) error {
return err
}
glog.V(1).Infof("Rebuilding index for predicate %s: building temp index took: %v\n",
x.FormatNsAttr(r.attr), time.Since(start))
r.attr, time.Since(start))

// Now we write all the created posting lists to disk.
glog.V(1).Infof("Rebuilding index for predicate %s: writing index to badger",
x.FormatNsAttr(r.attr))
glog.V(1).Infof("Rebuilding index for predicate %s: writing index to badger", r.attr)
start = time.Now()
defer func() {
glog.V(1).Infof("Rebuilding index for predicate %s: writing index took: %v\n",
x.FormatNsAttr(r.attr), time.Since(start))
r.attr, time.Since(start))
}()

writer := pstore.NewManagedWriteBatch()
tmpStream := tmpDB.NewStreamAt(counter)
tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):",
x.FormatNsAttr(r.attr))
tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):", r.attr)
tmpStream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
l, err := ReadPostingList(key, itr)
if err != nil {
Expand Down Expand Up @@ -720,8 +717,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
if err := tmpStream.Orchestrate(ctx); err != nil {
return err
}
glog.V(1).Infof("Rebuilding index for predicate %s: Flushing all writes.\n",
x.FormatNsAttr(r.attr))
glog.V(1).Infof("Rebuilding index for predicate %s: Flushing all writes.\n", r.attr)
return writer.Flush()
}

Expand Down
16 changes: 8 additions & 8 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func TestAddMutation_mrjn2(t *testing.T) {
}

func TestAddMutation_gru(t *testing.T) {
key := x.DataKey("question.tag", 0x01)
key := x.DataKey(x.GalaxyAttr("question.tag"), 0x01)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -592,7 +592,7 @@ func TestAddMutation_gru(t *testing.T) {
}

func TestAddMutation_gru2(t *testing.T) {
key := x.DataKey("question.tag", 0x100)
key := x.DataKey(x.GalaxyAttr("question.tag"), 0x100)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -639,7 +639,7 @@ func TestAddMutation_gru2(t *testing.T) {
func TestAddAndDelMutation(t *testing.T) {
// Ensure each test uses unique key since we don't clear the postings
// after each test
key := x.DataKey("dummy_key", 0x927)
key := x.DataKey(x.GalaxyAttr("dummy_key"), 0x927)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -878,7 +878,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) {
defer setMaxListSize(maxListSize)
maxListSize = 5000

key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down Expand Up @@ -926,7 +926,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
defer setMaxListSize(maxListSize)
maxListSize = 10000

key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down Expand Up @@ -1087,7 +1087,7 @@ func TestBinSplit(t *testing.T) {
defer func() {
maxListSize = originalListSize
}()
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for i := 1; i <= size; i++ {
Expand Down Expand Up @@ -1268,7 +1268,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
maxListSize = 5000

// Add entries to the maps.
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for i := 1; i <= size; i++ {
Expand Down Expand Up @@ -1407,7 +1407,7 @@ func TestRecursiveSplits(t *testing.T) {

// Create a list that should be split recursively.
size := int(1e5)
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down
4 changes: 2 additions & 2 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (g *groupi) sendTablet(tablet *pb.Tablet) (*pb.Tablet, error) {
}

if out.GroupId == groups().groupId() {
glog.Infof("Serving tablet for: %v\n", x.FormatNsAttr(tablet.GetPredicate()))
glog.Infof("Serving tablet for: %v\n", tablet.GetPredicate())
}
return out, nil
}
Expand Down Expand Up @@ -534,7 +534,7 @@ func (g *groupi) Inform(preds []string) ([]*pb.Tablet, error) {
}

if t.GroupId == groups().groupId() {
glog.Infof("Serving tablet for: %v\n", x.FormatNsAttr(t.GetPredicate()))
glog.Infof("Serving tablet for: %v\n", t.GetPredicate())
}
}
g.Unlock()
Expand Down
Loading

0 comments on commit f2f728c

Please sign in to comment.