Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(GraphQL): GraphQL schema is preserved after drop_data (#5790) #5840

Merged
merged 2 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ var (
// Server implements protos.DgraphServer
type Server struct{}

// graphQLSchemaNode represents the node which contains GraphQL schema
type graphQLSchemaNode struct {
Uid string `json:"uid"`
Schema string `json:"dgraph.graphql.schema"`
}

type existingGQLSchemaQryResp struct {
ExistingGQLSchema []graphQLSchemaNode `json:"ExistingGQLSchema"`
}

// PeriodicallyPostTelemetry periodically reports telemetry data for alpha.
func PeriodicallyPostTelemetry() {
glog.V(2).Infof("Starting telemetry data collection for alpha...")
Expand Down Expand Up @@ -127,6 +137,40 @@ func PeriodicallyPostTelemetry() {
}
}

// GetGQLSchema queries for the GraphQL schema node, and returns the uid and the GraphQL schema.
// If multiple schema nodes were found, it returns an error.
func GetGQLSchema() (uid, graphQLSchema string, err error) {
resp, err := (&Server{}).Query(context.WithValue(context.Background(), Authorize, false),
&api.Request{
Query: `
query {
ExistingGQLSchema(func: has(dgraph.graphql.schema)) {
uid
dgraph.graphql.schema
}
}`})
if err != nil {
return "", "", err
}

var result existingGQLSchemaQryResp
if err := json.Unmarshal(resp.GetJson(), &result); err != nil {
return "", "", errors.Wrap(err, "Couldn't unmarshal response from Dgraph query")
}

if len(result.ExistingGQLSchema) == 0 {
// no schema has been stored yet in Dgraph
return "", "", nil
} else if len(result.ExistingGQLSchema) == 1 {
// we found an existing GraphQL schema
gqlSchemaNode := result.ExistingGQLSchema[0]
return gqlSchemaNode.Uid, gqlSchemaNode.Schema, nil
}

// found multiple GraphQL schema nodes, this should never happen
return "", "", worker.ErrMultipleGraphQLSchemaNodes
}

// UpdateGQLSchema updates the GraphQL and Dgraph schemas using the given inputs.
// It first validates and parses the dgraphSchema given in input. If that fails,
// it returns an error. All this is done on the alpha on which the update request is received.
Expand Down Expand Up @@ -271,7 +315,13 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er

m.DropOp = pb.Mutations_ALL
_, err := query.ApplyMutations(ctx, m)
if err != nil {
return empty, err
}

// insert empty GraphQL schema, so all alphas get notified to
// reset their in-memory GraphQL schema
_, err = UpdateGQLSchema(ctx, "", "")
// recreate the admin account after a drop all operation
ResetAcl()
return empty, err
Expand All @@ -282,9 +332,20 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
return empty, errors.Errorf("If DropOp is set to DATA, DropValue must be empty")
}

// query the GraphQL schema and keep it in memory, so it can be inserted again
_, graphQLSchema, err := GetGQLSchema()
if err != nil {
return empty, err
}

m.DropOp = pb.Mutations_DATA
_, err := query.ApplyMutations(ctx, m)
_, err = query.ApplyMutations(ctx, m)
if err != nil {
return empty, err
}

// just reinsert the GraphQL schema, no need to alter dgraph schema as this was drop_data
_, err = UpdateGQLSchema(ctx, graphQLSchema, "")
// recreate the admin account after a drop data operation
ResetAcl()
return empty, err
Expand Down
67 changes: 22 additions & 45 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ package admin

import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/edgraph"

dgoapi "github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/dgraph/query"

"github.com/golang/glog"
"github.com/pkg/errors"
Expand All @@ -49,8 +48,6 @@ const (
"this indicates a resolver or validation bug " +
"(Please let us know : https://github.com/dgraph-io/dgraph/issues)"

gqlSchemaPred = "dgraph.graphql.schema"

// GraphQL schema for /admin endpoint.
graphqlAdminSchema = `
"""
Expand Down Expand Up @@ -405,7 +402,7 @@ func newAdminResolver(
globalEpoch: epoch,
}

prefix := x.DataKey(gqlSchemaPred, 0)
prefix := x.DataKey(worker.GqlSchemaPred, 0)
// Remove uid from the key, to get the correct prefix
prefix = prefix[:len(prefix)-8]
// Listen for graphql schema changes in group 1.
Expand Down Expand Up @@ -442,10 +439,14 @@ func newAdminResolver(
Schema: string(pl.Postings[0].Value),
}

gqlSchema, err := generateGQLSchema(newSchema)
if err != nil {
glog.Errorf("Error processing GraphQL schema: %s. ", err)
return
var gqlSchema schema.Schema
// on drop_all, we will receive an empty string as the schema update
if newSchema.Schema != "" {
gqlSchema, err = generateGQLSchema(newSchema)
if err != nil {
glog.Errorf("Error processing GraphQL schema: %s. ", err)
return
}
}

glog.Infof("Successfully updated GraphQL schema. Serving New GraphQL API.")
Expand All @@ -454,7 +455,7 @@ func newAdminResolver(
defer server.mux.Unlock()

server.schema = newSchema
server.resetSchema(*gqlSchema)
server.resetSchema(gqlSchema)
}, 1, closer)

go server.initServer()
Expand Down Expand Up @@ -520,42 +521,15 @@ func newAdminResolverFactory() resolve.ResolverFactory {
}

func getCurrentGraphQLSchema() (*gqlSchema, error) {
resp, err := resolve.NewAdminExecutor().Execute(context.Background(),
&dgoapi.Request{
Query: `
query {
ExistingGQLSchema(func: has(dgraph.graphql.schema)) {
uid
dgraph.graphql.schema
}
}`})
uid, graphQLSchema, err := edgraph.GetGQLSchema()
if err != nil {
return nil, err
}

result := make(map[string]interface{})
if err := json.Unmarshal(resp.GetJson(), &result); err != nil {
return nil, schema.GQLWrapf(err, "Couldn't unmarshal response from Dgraph query")
}

existingGQLSchema := result["ExistingGQLSchema"].([]interface{})
if len(existingGQLSchema) == 0 {
// no schema has been stored yet in Dgraph
return &gqlSchema{}, nil
} else if len(existingGQLSchema) == 1 {
// we found an existing GraphQL schema
gqlSchemaNode := existingGQLSchema[0].(map[string]interface{})
return &gqlSchema{
ID: gqlSchemaNode["uid"].(string),
Schema: gqlSchemaNode[gqlSchemaPred].(string),
}, nil
}

// found multiple GraphQL schema nodes, this should never happen
return nil, worker.ErrMultipleGraphQLSchemaNodes
return &gqlSchema{ID: uid, Schema: graphQLSchema}, nil
}

func generateGQLSchema(sch *gqlSchema) (*schema.Schema, error) {
func generateGQLSchema(sch *gqlSchema) (schema.Schema, error) {
schHandler, err := schema.NewHandler(sch.Schema)
if err != nil {
return nil, err
Expand All @@ -566,7 +540,7 @@ func generateGQLSchema(sch *gqlSchema) (*schema.Schema, error) {
return nil, err
}

return &generatedSchema, nil
return generatedSchema, nil
}

func (as *adminServer) initServer() {
Expand Down Expand Up @@ -611,7 +585,7 @@ func (as *adminServer) initServer() {

glog.Infof("Successfully loaded GraphQL schema. Serving GraphQL API.")

as.resetSchema(*generatedSchema)
as.resetSchema(generatedSchema)

break
}
Expand Down Expand Up @@ -737,9 +711,12 @@ func resolverFactoryWithErrorMsg(msg string) resolve.ResolverFactory {
}

func (as *adminServer) resetSchema(gqlSchema schema.Schema) {
resolverFactory := resolverFactoryWithErrorMsg(errResolverNotFound)

resolverFactory := resolverFactoryWithErrorMsg(errResolverNotFound).
WithConventionResolvers(gqlSchema, as.fns)
// it is nil after drop_all
if gqlSchema != nil {
resolverFactory = resolverFactory.WithConventionResolvers(gqlSchema, as.fns)
}
if as.withIntrospection {
resolverFactory.WithSchemaIntrospection()
}
Expand Down
54 changes: 45 additions & 9 deletions graphql/e2e/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"
"time"

"github.com/dgraph-io/dgo/v200/protos/api"

"github.com/dgraph-io/dgraph/worker"

"github.com/dgraph-io/dgo/v200"
Expand Down Expand Up @@ -252,7 +254,7 @@ func TestConcurrentSchemaUpdates(t *testing.T) {
finalGraphQLSchema := tcases[lastSuccessTcaseIdx].graphQLSchema

// now check that both the final GraphQL schema and Dgraph schema are the ones we expect
require.Equal(t, finalGraphQLSchema, getGQLSchema(t, groupOneAdminServer))
require.Equal(t, finalGraphQLSchema, getGQLSchemaRequireId(t, groupOneAdminServer))
require.JSONEq(t, finalDgraphSchema, getDgraphSchema(t, dg))

// now check that there is exactly one node for GraphQL schema in Dgraph,
Expand Down Expand Up @@ -289,14 +291,41 @@ func TestUpdateGQLSchemaAfterDropAll(t *testing.T) {
require.NoError(t, err)
testutil.DropAll(t, dg)

// need to wait a bit, because the update notification takes time to reach the alpha
time.Sleep(time.Second)
// now retrieving the GraphQL schema should report no schema
gqlSchema := getGQLSchemaRequireId(t, groupOneAdminServer)
require.Empty(t, gqlSchema)

// updating the schema now should work
schema := `
type A {
b: String! @id
}`
updateGQLSchemaRequireNoErrors(t, schema, groupOneAdminServer)
// we should get the schema we expect
require.Equal(t, schema, getGQLSchema(t, groupOneAdminServer))
require.Equal(t, schema, getGQLSchemaRequireId(t, groupOneAdminServer))
}

// TestGQLSchemaAfterDropData checks whether if the schema still exists after drop_data
func TestGQLSchemaAfterDropData(t *testing.T) {
schema := `
type A {
b: String!
}`
updateGQLSchemaRequireNoErrors(t, schema, groupOneAdminServer)

// now do drop_data
dg, err := testutil.DgraphClient(groupOnegRPC)
require.NoError(t, err)
require.NoError(t, dg.Alter(context.Background(), &api.Operation{DropOp: api.Operation_DATA}))

// lets wait a bit to be sure that the update notification has reached the alpha,
// otherwise we are anyways gonna get the previous schema from the in-memory schema
time.Sleep(time.Second)
// we should still get the schema we inserted earlier
require.Equal(t, schema, getGQLSchemaRequireId(t, groupOneAdminServer))

}

func updateGQLSchema(t *testing.T, schema, url string) *common.GraphQLResponse {
Expand Down Expand Up @@ -328,7 +357,12 @@ func updateGQLSchemaConcurrent(t *testing.T, schema, url string) bool {
return res.Errors == nil
}

func getGQLSchema(t *testing.T, url string) string {
type gqlSchema struct {
Id string
Schema string
}

func getGQLSchema(t *testing.T, url string) gqlSchema {
get := &common.GraphQLParams{
Query: `query {
getGQLSchema {
Expand All @@ -341,15 +375,17 @@ func getGQLSchema(t *testing.T, url string) string {
require.Nil(t, getResult.Errors)

var resp struct {
GetGQLSchema struct {
Id string
Schema string
}
GetGQLSchema gqlSchema
}
require.NoError(t, json.Unmarshal(getResult.Data, &resp))
require.NotEmpty(t, resp.GetGQLSchema.Id, "Got empty ID in getGQLSchema")

return resp.GetGQLSchema.Schema
return resp.GetGQLSchema
}

func getGQLSchemaRequireId(t *testing.T, url string) string {
schema := getGQLSchema(t, url)
require.NotEmpty(t, schema.Id, "Got empty ID in getGQLSchema")
return schema.Schema
}

func getDgraphSchema(t *testing.T, dg *dgo.Dgraph) string {
Expand Down
12 changes: 9 additions & 3 deletions graphql/resolve/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ const (
"probably a mismatch between the GraphQL and Dgraph/remote schemas. " +
"The value was resolved as null (which may trigger GraphQL error propagation) " +
"and as much other data as possible returned."

errInternal = "Internal error"
)

// A ResolverFactory finds the right resolver for a query/mutation.
Expand Down Expand Up @@ -375,12 +377,12 @@ func (r *RequestResolver) Resolve(ctx context.Context, gqlReq *schema.Request) *

if r == nil {
glog.Errorf("Call to Resolve with nil RequestResolver")
return schema.ErrorResponse(errors.New("Internal error"))
return schema.ErrorResponse(errors.New(errInternal))
}

if r.schema == nil {
glog.Errorf("Call to Resolve with no schema")
return schema.ErrorResponse(errors.New("Internal error"))
return schema.ErrorResponse(errors.New(errInternal))
}

startTime := time.Now()
Expand Down Expand Up @@ -451,7 +453,6 @@ func (r *RequestResolver) Resolve(ctx context.Context, gqlReq *schema.Request) *
// Errors and data in the same response is valid. Both WithError and
// AddData handle nil cases.
addResult(resp, res)

}
}
// A single request can contain either queries or mutations - not both.
Expand Down Expand Up @@ -500,6 +501,11 @@ func (r *RequestResolver) Resolve(ctx context.Context, gqlReq *schema.Request) *

// ValidateSubscription will check the given subscription query is valid or not.
func (r *RequestResolver) ValidateSubscription(req *schema.Request) error {
if r.schema == nil {
glog.Errorf("Call to ValidateSubscription with no schema")
return errors.New(errInternal)
}

op, err := r.schema.Operation(req)
if err != nil {
return err
Expand Down
Loading