Skip to content

Commit

Permalink
Infer type of schema from JSON and RDF mutations. (#4328)
Browse files Browse the repository at this point in the history
If the same subject-predicate pair appears multiple times in a mutation
and the predicate needs to be created, create it as a list to avoid
losing data.
  • Loading branch information
martinmr authored Dec 19, 2019
1 parent acd7f3d commit 4da3614
Show file tree
Hide file tree
Showing 11 changed files with 899 additions and 337 deletions.
3 changes: 1 addition & 2 deletions chunker/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,7 @@ func (jc *jsonChunker) Parse(chunkBuf *bytes.Buffer) error {
return nil
}

err := jc.nqs.ParseJSON(chunkBuf.Bytes(), SetNquads)
return err
return jc.nqs.ParseJSON(chunkBuf.Bytes(), SetNquads)
}

func slurpSpace(r *bufio.Reader) error {
Expand Down
35 changes: 31 additions & 4 deletions chunker/json_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"unicode"

"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -233,6 +234,7 @@ type NQuadBuffer struct {
batchSize int
nquads []*api.NQuad
nqCh chan []*api.NQuad
predHints map[string]pb.Metadata_HintType
}

// NewNQuadBuffer returns a new NQuadBuffer instance with the specified batch size.
Expand All @@ -244,6 +246,7 @@ func NewNQuadBuffer(batchSize int) *NQuadBuffer {
if buf.batchSize > 0 {
buf.nquads = make([]*api.NQuad, 0, batchSize)
}
buf.predHints = make(map[string]pb.Metadata_HintType)
return buf
}

Expand All @@ -263,6 +266,23 @@ func (buf *NQuadBuffer) Push(nqs ...*api.NQuad) {
}
}

// Metadata returns the parse metadata that has been aggregated so far..
func (buf *NQuadBuffer) Metadata() *pb.Metadata {
return &pb.Metadata{
PredHints: buf.predHints,
}
}

// PushPredHint pushes and aggregates hints about the type of the predicate derived
// during the parsing. This metadata is expected to be a lot smaller than the set of
// NQuads so it's not necessary to send them in batches.
func (buf *NQuadBuffer) PushPredHint(pred string, hint pb.Metadata_HintType) {
if oldHint, ok := buf.predHints[pred]; ok && hint != oldHint {
hint = pb.Metadata_LIST
}
buf.predHints[pred] = hint
}

// Flush must be called at the end to push out all the buffered NQuads to the channel. Once Flush is
// called, this instance of NQuadBuffer should no longer be used.
func (buf *NQuadBuffer) Flush() {
Expand Down Expand Up @@ -387,6 +407,7 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred
return mr, err
}
buf.Push(&nq)
buf.PushPredHint(pred, pb.Metadata_SINGLE)
case map[string]interface{}:
if len(v) == 0 {
continue
Expand All @@ -398,6 +419,7 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred
}
if ok {
buf.Push(&nq)
buf.PushPredHint(pred, pb.Metadata_SINGLE)
continue
}

Expand All @@ -410,7 +432,9 @@ func (buf *NQuadBuffer) mapToNquads(m map[string]interface{}, op int, parentPred
nq.ObjectId = cr.uid
nq.Facets = cr.fcts
buf.Push(&nq)
buf.PushPredHint(pred, pb.Metadata_SINGLE)
case []interface{}:
buf.PushPredHint(pred, pb.Metadata_LIST)
for _, item := range v {
nq := api.NQuad{
Subject: mr.uid,
Expand Down Expand Up @@ -509,12 +533,15 @@ func (buf *NQuadBuffer) ParseJSON(b []byte, op int) error {

// ParseJSON is a convenience wrapper function to get all NQuads in one call. This can however, lead
// to high memory usage. So be careful using this.
func ParseJSON(b []byte, op int) ([]*api.NQuad, error) {
func ParseJSON(b []byte, op int) ([]*api.NQuad, *pb.Metadata, error) {
buf := NewNQuadBuffer(-1)
if err := buf.ParseJSON(b, op); err != nil {
return nil, err
err := buf.ParseJSON(b, op)
if err != nil {
return nil, nil, err
}

buf.Flush()
nqs := <-buf.Ch()
return nqs, nil
metadata := buf.Metadata()
return nqs, metadata, nil
}
30 changes: 27 additions & 3 deletions chunker/rdf_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/dgraph-io/dgo/v2/protos/api"
"github.com/dgraph-io/dgraph/lex"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -54,7 +55,7 @@ func sane(s string) bool {

// ParseRDFs is a convenience wrapper function to get all NQuads in one call. This can however, lead
// to high memory usage. So, be careful using this.
func ParseRDFs(b []byte) ([]*api.NQuad, error) {
func ParseRDFs(b []byte) ([]*api.NQuad, *pb.Metadata, error) {
var nqs []*api.NQuad
var l lex.Lexer
for _, line := range bytes.Split(b, []byte{'\n'}) {
Expand All @@ -63,11 +64,12 @@ func ParseRDFs(b []byte) ([]*api.NQuad, error) {
continue
}
if err != nil {
return nil, err
return nil, nil, err
}
nqs = append(nqs, &nq)
}
return nqs, nil

return nqs, calculateTypeHints(nqs), nil
}

// ParseRDF parses a mutation string and returns the N-Quad representation for it.
Expand Down Expand Up @@ -320,6 +322,28 @@ func parseFacetsRDF(it *lex.ItemIterator, rnq *api.NQuad) error {
return nil
}

// subjectPred is a type to store the count for each <subject, pred> in the mutations.
type subjectPred struct {
subject string
pred string
}

func calculateTypeHints(nqs []*api.NQuad) *pb.Metadata {
// Stores the count of <subject, pred> pairs to help figure out whether
// schemas should be created as scalars or lists of scalars.
schemaCountMap := make(map[subjectPred]int)
predHints := make(map[string]pb.Metadata_HintType)

for _, nq := range nqs {
subPredPair := subjectPred{subject: nq.Subject, pred: nq.Predicate}
schemaCountMap[subPredPair]++
if count := schemaCountMap[subPredPair]; count > 1 {
predHints[nq.Predicate] = pb.Metadata_LIST
}
}
return &pb.Metadata{PredHints: predHints}
}

var typeMap = map[string]types.TypeID{
"xs:password": types.PasswordID,
"xs:string": types.StringID,
Expand Down
29 changes: 24 additions & 5 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,23 @@ func (s *Server) doMutate(ctx context.Context, qc *queryContext, resp *api.Respo
return err
}

m := &pb.Mutations{Edges: edges, StartTs: qc.req.StartTs}
predHints := make(map[string]pb.Metadata_HintType)
for _, gmu := range qc.gmuList {
for pred, hint := range gmu.Metadata.GetPredHints() {
if oldHint := predHints[pred]; oldHint == pb.Metadata_LIST {
continue
}
predHints[pred] = hint
}
}
m := &pb.Mutations{
Edges: edges,
StartTs: qc.req.StartTs,
Metadata: &pb.Metadata{
PredHints: predHints,
},
}

qc.span.Annotatef(nil, "Applying mutations: %+v", m)
resp.Txn, err = query.ApplyMutations(ctx, m)
qc.span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Txn, err)
Expand Down Expand Up @@ -957,28 +973,31 @@ func parseMutationObject(mu *api.Mutation) (*gql.Mutation, error) {
res := &gql.Mutation{Cond: mu.Cond}

if len(mu.SetJson) > 0 {
nqs, err := chunker.ParseJSON(mu.SetJson, chunker.SetNquads)
nqs, md, err := chunker.ParseJSON(mu.SetJson, chunker.SetNquads)
if err != nil {
return nil, err
}
res.Set = append(res.Set, nqs...)
res.Metadata = md
}
if len(mu.DeleteJson) > 0 {
nqs, err := chunker.ParseJSON(mu.DeleteJson, chunker.DeleteNquads)
// The metadata is not currently needed for delete operations so it can be safely ignored.
nqs, _, err := chunker.ParseJSON(mu.DeleteJson, chunker.DeleteNquads)
if err != nil {
return nil, err
}
res.Del = append(res.Del, nqs...)
}
if len(mu.SetNquads) > 0 {
nqs, err := chunker.ParseRDFs(mu.SetNquads)
nqs, md, err := chunker.ParseRDFs(mu.SetNquads)
if err != nil {
return nil, err
}
res.Set = append(res.Set, nqs...)
res.Metadata = md
}
if len(mu.DelNquads) > 0 {
nqs, err := chunker.ParseRDFs(mu.DelNquads)
nqs, _, err := chunker.ParseRDFs(mu.DelNquads)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions edgraph/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestParseNQuads(t *testing.T) {
# this line is a comment
_:a <join> _:b .
`
nqs, err := chunker.ParseRDFs([]byte(nquads))
nqs, _, err := chunker.ParseRDFs([]byte(nquads))
require.NoError(t, err)
require.Equal(t, []*api.NQuad{
makeNquad("_:a", "predA", &api.Value{Val: &api.Value_DefaultVal{DefaultVal: "A"}}),
Expand All @@ -59,13 +59,13 @@ func TestParseNQuads(t *testing.T) {

func TestValNquads(t *testing.T) {
nquads := `uid(m) <name> val(f) .`
_, err := chunker.ParseRDFs([]byte(nquads))
_, _, err := chunker.ParseRDFs([]byte(nquads))
require.NoError(t, err)
}

func TestParseNQuadsWindowsNewline(t *testing.T) {
nquads := "_:a <predA> \"A\" .\r\n_:b <predB> \"B\" ."
nqs, err := chunker.ParseRDFs([]byte(nquads))
nqs, _, err := chunker.ParseRDFs([]byte(nquads))
require.NoError(t, err)
require.Equal(t, []*api.NQuad{
makeNquad("_:a", "predA", &api.Value{Val: &api.Value_DefaultVal{DefaultVal: "A"}}),
Expand All @@ -75,7 +75,7 @@ func TestParseNQuadsWindowsNewline(t *testing.T) {

func TestParseNQuadsDelete(t *testing.T) {
nquads := `_:a * * .`
nqs, err := chunker.ParseRDFs([]byte(nquads))
nqs, _, err := chunker.ParseRDFs([]byte(nquads))
require.NoError(t, err)
require.Equal(t, []*api.NQuad{
makeNquad("_:a", x.Star, &api.Value{Val: &api.Value_DefaultVal{DefaultVal: x.Star}}),
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestValidateKeys(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
nq, err := chunker.ParseRDFs([]byte(tc.nquad))
nq, _, err := chunker.ParseRDFs([]byte(tc.nquad))
require.NoError(t, err)

err = validateKeys(nq[0])
Expand Down
2 changes: 2 additions & 0 deletions gql/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Mutation struct {
Cond string
Set []*api.NQuad
Del []*api.NQuad

Metadata *pb.Metadata
}

// ParseUid parses the given string into an UID. This method returns with an error
Expand Down
18 changes: 18 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,24 @@ message Mutations {
}
DropOp drop_op = 7;
string drop_value = 8;

Metadata metadata = 9;
}

message Metadata {
// HintType represents a hint that will be passed along the mutation and used
// to add the predicate to the schema if it's not already there.
enum HintType {
// DEFAULT means no hint is provided and Dgraph will follow the default behavior.
DEFAULT = 0;
// SINGLE signals that the predicate should be created as a single type (e.g string, uid).
SINGLE = 1;
// LIST signals that the predicate should be created as a list (e.g [string], [uid]).
LIST = 2;
}

// Map of predicates to their hints.
map<string, HintType> pred_hints = 1;
}

message Snapshot {
Expand Down
Loading

0 comments on commit 4da3614

Please sign in to comment.