diff --git a/ee/audit/interceptor_ee.go b/ee/audit/interceptor_ee.go index 2e59b705957..e9de9a1b777 100644 --- a/ee/audit/interceptor_ee.go +++ b/ee/audit/interceptor_ee.go @@ -63,8 +63,9 @@ var skipApis = map[string]bool{ var skipEPs = map[string]bool{ // list of endpoints that needs to be skipped - "/health": true, - "/state": true, + "/health": true, + "/state": true, + "/probe/graphql": true, } func AuditRequestGRPC(ctx context.Context, req interface{}, diff --git a/graphql/admin/admin.go b/graphql/admin/admin.go index 5297a4d1eb1..53f09508797 100644 --- a/graphql/admin/admin.go +++ b/graphql/admin/admin.go @@ -18,6 +18,7 @@ package admin import ( "context" + "fmt" "sync" "sync/atomic" "time" @@ -582,14 +583,6 @@ func (g *GraphQLHealthStore) updatingSchema() { g.v.Store(GraphQLHealth{Healthy: true, StatusMsg: "updating schema"}) } -type gqlSchema struct { - ID string `json:"id,omitempty"` - Schema string `json:"schema,omitempty"` - Version uint64 - GeneratedSchema string - loaded bool // This indicate whether the schema has been loaded into graphql server or not -} - type adminServer struct { rf resolve.ResolverFactory resolver *resolve.RequestResolver @@ -600,8 +593,7 @@ type adminServer struct { // The GraphQL server that's being admin'd gqlServer IServeGraphQL - schema map[uint64]*gqlSchema - + gqlSchemas *worker.GQLSchemaStore // When the schema changes, we use these to create a new RequestResolver for // the main graphql endpoint (gqlServer) and thus refresh the API. fns *resolve.ResolverFns @@ -659,7 +651,7 @@ func newAdminResolver( fns: fns, withIntrospection: withIntrospection, globalEpoch: epoch, - schema: make(map[uint64]*gqlSchema), + gqlSchemas: worker.NewGQLSchemaStore(), gqlServer: defaultGqlServer, } adminServerVar = server // store the admin server in package variable @@ -695,14 +687,14 @@ func newAdminResolver( } ns, _ := x.ParseNamespaceAttr(pk.Attr) - newSchema := &gqlSchema{ + newSchema := &worker.GqlSchema{ ID: query.UidToHex(pk.Uid), Version: kv.GetVersion(), Schema: string(pl.Postings[0].Value), } server.mux.RLock() - currentSchema, ok := server.schema[ns] + currentSchema, ok := server.gqlSchemas.GetCurrent(ns) if ok { schemaChanged := newSchema.Schema == currentSchema.Schema if newSchema.Version <= currentSchema.Version || schemaChanged { @@ -730,18 +722,18 @@ func newAdminResolver( server.incrementSchemaUpdateCounter(ns) // if the schema hasn't been loaded yet, then we don't need to load it here - currentSchema, ok = server.schema[ns] - if !(ok && currentSchema.loaded) { + currentSchema, ok = server.gqlSchemas.GetCurrent(ns) + if !(ok && currentSchema.Loaded) { // this just set schema in admin server, so that next invalid badger subscription update gets rejected upfront - server.schema[ns] = newSchema + server.gqlSchemas.Set(ns, newSchema) glog.Infof("namespace: %d. Skipping in-memory GraphQL schema update, "+ "it will be lazy-loaded later.", ns) return } // update this schema in both admin and graphql server - newSchema.loaded = true - server.schema[ns] = newSchema + newSchema.Loaded = true + server.gqlSchemas.Set(ns, newSchema) server.resetSchema(ns, gqlSchema) glog.Infof("namespace: %d. Successfully updated GraphQL schema. "+ @@ -815,16 +807,16 @@ func newAdminResolverFactory() resolve.ResolverFactory { return rf.WithSchemaIntrospection() } -func getCurrentGraphQLSchema(namespace uint64) (*gqlSchema, error) { +func getCurrentGraphQLSchema(namespace uint64) (*worker.GqlSchema, error) { uid, graphQLSchema, err := edgraph.GetGQLSchema(namespace) if err != nil { return nil, err } - return &gqlSchema{ID: uid, Schema: graphQLSchema}, nil + return &worker.GqlSchema{ID: uid, Schema: graphQLSchema}, nil } -func generateGQLSchema(sch *gqlSchema, ns uint64) (schema.Schema, error) { +func generateGQLSchema(sch *worker.GqlSchema, ns uint64) (schema.Schema, error) { schHandler, err := schema.NewHandler(sch.Schema, false) if err != nil { return nil, err @@ -862,8 +854,8 @@ func (as *adminServer) initServer() { glog.Errorf("namespace: %d. Error reading GraphQL schema: %s.", x.GalaxyNamespace, err) continue } - sch.loaded = true - as.schema[x.GalaxyNamespace] = sch + sch.Loaded = true + as.gqlSchemas.Set(x.GalaxyNamespace, sch) // adding the actual resolvers for updateGQLSchema and getGQLSchema only after server has // current GraphQL schema, if there was any. as.addConnectedAdminResolvers() @@ -1003,8 +995,12 @@ func (as *adminServer) resetSchema(ns uint64, gqlSchema schema.Schema) { return resolve.QueryResolverFunc(func(ctx context.Context, query schema.Query) *resolve.Resolved { as.mux.RLock() defer as.mux.RUnlock() - sch := as.schema[ns].Schema - handler, err := schema.NewHandler(sch, true) + sch, ok := as.gqlSchemas.GetCurrent(ns) + if !ok { + return resolve.EmptyResult(query, + fmt.Errorf("error while getting the schema for ns %d", ns)) + } + handler, err := schema.NewHandler(sch.Schema, true) if err != nil { return resolve.EmptyResult(query, err) } @@ -1031,7 +1027,7 @@ func (as *adminServer) resetSchema(ns uint64, gqlSchema schema.Schema) { func (as *adminServer) lazyLoadSchema(namespace uint64) error { // if the schema is already in memory, no need to fetch it from disk as.mux.RLock() - if currentSchema, ok := as.schema[namespace]; ok && currentSchema.loaded { + if currentSchema, ok := as.gqlSchemas.GetCurrent(namespace); ok && currentSchema.Loaded { as.mux.RUnlock() return nil } @@ -1061,8 +1057,8 @@ func (as *adminServer) lazyLoadSchema(namespace uint64) error { as.mux.Lock() defer as.mux.Unlock() - sch.loaded = true - as.schema[namespace] = sch + sch.Loaded = true + as.gqlSchemas.Set(namespace, sch) as.resetSchema(namespace, generatedSchema) glog.Infof("namespace: %d. Successfully lazy-loaded GraphQL schema.", namespace) diff --git a/graphql/admin/schema.go b/graphql/admin/schema.go index c4f329f4814..29a1e7360f8 100644 --- a/graphql/admin/schema.go +++ b/graphql/admin/schema.go @@ -19,6 +19,7 @@ package admin import ( "context" "encoding/json" + "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/edgraph" "github.com/dgraph-io/dgraph/graphql/resolve" @@ -33,7 +34,7 @@ type getSchemaResolver struct { } type updateGQLSchemaInput struct { - Set gqlSchema `json:"set,omitempty"` + Set worker.GqlSchema `json:"set,omitempty"` } type updateSchemaResolver struct { @@ -88,7 +89,7 @@ func (gsr *getSchemaResolver) Resolve(ctx context.Context, q schema.Query) *reso return resolve.EmptyResult(q, err) } - cs := gsr.admin.schema[ns] + cs, _ := gsr.admin.gqlSchemas.GetCurrent(ns) if cs == nil || cs.ID == "" { data = map[string]interface{}{q.Name(): nil} } else { diff --git a/worker/graphql_schema.go b/worker/graphql_schema.go index 4c7fb79ed03..3e8db33de22 100644 --- a/worker/graphql_schema.go +++ b/worker/graphql_schema.go @@ -48,8 +48,55 @@ var ( errUpdatingGraphQLSchemaOnNonGroupOneLeader = errors.New( "while updating GraphQL schema: this server isn't group-1 leader, please retry") ErrMultipleGraphQLSchemaNodes = errors.New("found multiple nodes for GraphQL schema") + gqlSchemaStore *GQLSchemaStore ) +type GqlSchema struct { + ID string `json:"id,omitempty"` + Schema string `json:"schema,omitempty"` + Version uint64 + GeneratedSchema string + Loaded bool // This indicate whether the schema has been loaded into graphql server + // or not +} + +type GQLSchemaStore struct { + mux sync.RWMutex + schema map[uint64]*GqlSchema +} + +func NewGQLSchemaStore() *GQLSchemaStore { + gqlSchemaStore = &GQLSchemaStore{ + mux: sync.RWMutex{}, + schema: make(map[uint64]*GqlSchema), + } + return gqlSchemaStore +} + +func (gs *GQLSchemaStore) Set(ns uint64, sch *GqlSchema) { + gs.mux.Lock() + defer gs.mux.Unlock() + gs.schema[ns] = sch +} + +func (gs *GQLSchemaStore) GetCurrent(ns uint64) (*GqlSchema, bool) { + gs.mux.RLock() + defer gs.mux.RUnlock() + sch, ok := gs.schema[ns] + return sch, ok +} + +func (gs *GQLSchemaStore) resetGQLSchema() { + gs.mux.Lock() + defer gs.mux.Unlock() + + gs.schema = make(map[uint64]*GqlSchema) +} + +func ResetGQLSchemaStore() { + gqlSchemaStore.resetGQLSchema() +} + // UpdateGQLSchemaOverNetwork sends the request to the group one leader for execution. func UpdateGQLSchemaOverNetwork(ctx context.Context, req *pb.UpdateGraphQLSchemaRequest) (*pb. UpdateGraphQLSchemaResponse, error) { diff --git a/worker/online_restore.go b/worker/online_restore.go index bc09abd50b3..1caefe3fe7b 100644 --- a/worker/online_restore.go +++ b/worker/online_restore.go @@ -338,6 +338,10 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest, pidx uin } ResetAclCache() + // reset gql schema + glog.Info("reseting local gql schema store") + ResetGQLSchemaStore() + // Propose a snapshot immediately after all the work is done to prevent the restore // from being replayed. go func(idx uint64) {