Skip to content

Commit

Permalink
fix: fixing graphql schema update when the data is restored + skippin…
Browse files Browse the repository at this point in the history
…g /probe/graphql from audit (#7925)

* fix: fixing grapgql schema update when the data is restored

* making audit to skip /probe/graphql endpoint as this is health endpoint for kube

(cherry picked from commit b85036e)
  • Loading branch information
aman-bansal authored and all-seeing-code committed Dec 14, 2022
1 parent 600e649 commit 0f935a0
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 32 deletions.
5 changes: 3 additions & 2 deletions ee/audit/interceptor_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ var skipApis = map[string]bool{

var skipEPs = map[string]bool{
// list of endpoints that needs to be skipped
"/health": true,
"/state": true,
"/health": true,
"/state": true,
"/probe/graphql": true,
}

func AuditRequestGRPC(ctx context.Context, req interface{},
Expand Down
52 changes: 24 additions & 28 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package admin

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -582,14 +583,6 @@ func (g *GraphQLHealthStore) updatingSchema() {
g.v.Store(GraphQLHealth{Healthy: true, StatusMsg: "updating schema"})
}

type gqlSchema struct {
ID string `json:"id,omitempty"`
Schema string `json:"schema,omitempty"`
Version uint64
GeneratedSchema string
loaded bool // This indicate whether the schema has been loaded into graphql server or not
}

type adminServer struct {
rf resolve.ResolverFactory
resolver *resolve.RequestResolver
Expand All @@ -600,8 +593,7 @@ type adminServer struct {
// The GraphQL server that's being admin'd
gqlServer IServeGraphQL

schema map[uint64]*gqlSchema

gqlSchemas *worker.GQLSchemaStore
// When the schema changes, we use these to create a new RequestResolver for
// the main graphql endpoint (gqlServer) and thus refresh the API.
fns *resolve.ResolverFns
Expand Down Expand Up @@ -659,7 +651,7 @@ func newAdminResolver(
fns: fns,
withIntrospection: withIntrospection,
globalEpoch: epoch,
schema: make(map[uint64]*gqlSchema),
gqlSchemas: worker.NewGQLSchemaStore(),
gqlServer: defaultGqlServer,
}
adminServerVar = server // store the admin server in package variable
Expand Down Expand Up @@ -695,14 +687,14 @@ func newAdminResolver(
}
ns, _ := x.ParseNamespaceAttr(pk.Attr)

newSchema := &gqlSchema{
newSchema := &worker.GqlSchema{
ID: query.UidToHex(pk.Uid),
Version: kv.GetVersion(),
Schema: string(pl.Postings[0].Value),
}

server.mux.RLock()
currentSchema, ok := server.schema[ns]
currentSchema, ok := server.gqlSchemas.GetCurrent(ns)
if ok {
schemaChanged := newSchema.Schema == currentSchema.Schema
if newSchema.Version <= currentSchema.Version || schemaChanged {
Expand Down Expand Up @@ -730,18 +722,18 @@ func newAdminResolver(

server.incrementSchemaUpdateCounter(ns)
// if the schema hasn't been loaded yet, then we don't need to load it here
currentSchema, ok = server.schema[ns]
if !(ok && currentSchema.loaded) {
currentSchema, ok = server.gqlSchemas.GetCurrent(ns)
if !(ok && currentSchema.Loaded) {
// this just set schema in admin server, so that next invalid badger subscription update gets rejected upfront
server.schema[ns] = newSchema
server.gqlSchemas.Set(ns, newSchema)
glog.Infof("namespace: %d. Skipping in-memory GraphQL schema update, "+
"it will be lazy-loaded later.", ns)
return
}

// update this schema in both admin and graphql server
newSchema.loaded = true
server.schema[ns] = newSchema
newSchema.Loaded = true
server.gqlSchemas.Set(ns, newSchema)
server.resetSchema(ns, gqlSchema)

glog.Infof("namespace: %d. Successfully updated GraphQL schema. "+
Expand Down Expand Up @@ -815,16 +807,16 @@ func newAdminResolverFactory() resolve.ResolverFactory {
return rf.WithSchemaIntrospection()
}

func getCurrentGraphQLSchema(namespace uint64) (*gqlSchema, error) {
func getCurrentGraphQLSchema(namespace uint64) (*worker.GqlSchema, error) {
uid, graphQLSchema, err := edgraph.GetGQLSchema(namespace)
if err != nil {
return nil, err
}

return &gqlSchema{ID: uid, Schema: graphQLSchema}, nil
return &worker.GqlSchema{ID: uid, Schema: graphQLSchema}, nil
}

func generateGQLSchema(sch *gqlSchema, ns uint64) (schema.Schema, error) {
func generateGQLSchema(sch *worker.GqlSchema, ns uint64) (schema.Schema, error) {
schHandler, err := schema.NewHandler(sch.Schema, false)
if err != nil {
return nil, err
Expand Down Expand Up @@ -862,8 +854,8 @@ func (as *adminServer) initServer() {
glog.Errorf("namespace: %d. Error reading GraphQL schema: %s.", x.GalaxyNamespace, err)
continue
}
sch.loaded = true
as.schema[x.GalaxyNamespace] = sch
sch.Loaded = true
as.gqlSchemas.Set(x.GalaxyNamespace, sch)
// adding the actual resolvers for updateGQLSchema and getGQLSchema only after server has
// current GraphQL schema, if there was any.
as.addConnectedAdminResolvers()
Expand Down Expand Up @@ -1003,8 +995,12 @@ func (as *adminServer) resetSchema(ns uint64, gqlSchema schema.Schema) {
return resolve.QueryResolverFunc(func(ctx context.Context, query schema.Query) *resolve.Resolved {
as.mux.RLock()
defer as.mux.RUnlock()
sch := as.schema[ns].Schema
handler, err := schema.NewHandler(sch, true)
sch, ok := as.gqlSchemas.GetCurrent(ns)
if !ok {
return resolve.EmptyResult(query,
fmt.Errorf("error while getting the schema for ns %d", ns))
}
handler, err := schema.NewHandler(sch.Schema, true)
if err != nil {
return resolve.EmptyResult(query, err)
}
Expand All @@ -1031,7 +1027,7 @@ func (as *adminServer) resetSchema(ns uint64, gqlSchema schema.Schema) {
func (as *adminServer) lazyLoadSchema(namespace uint64) error {
// if the schema is already in memory, no need to fetch it from disk
as.mux.RLock()
if currentSchema, ok := as.schema[namespace]; ok && currentSchema.loaded {
if currentSchema, ok := as.gqlSchemas.GetCurrent(namespace); ok && currentSchema.Loaded {
as.mux.RUnlock()
return nil
}
Expand Down Expand Up @@ -1061,8 +1057,8 @@ func (as *adminServer) lazyLoadSchema(namespace uint64) error {

as.mux.Lock()
defer as.mux.Unlock()
sch.loaded = true
as.schema[namespace] = sch
sch.Loaded = true
as.gqlSchemas.Set(namespace, sch)
as.resetSchema(namespace, generatedSchema)

glog.Infof("namespace: %d. Successfully lazy-loaded GraphQL schema.", namespace)
Expand Down
5 changes: 3 additions & 2 deletions graphql/admin/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package admin
import (
"context"
"encoding/json"
"github.com/dgraph-io/dgraph/worker"

"github.com/dgraph-io/dgraph/edgraph"
"github.com/dgraph-io/dgraph/graphql/resolve"
Expand All @@ -33,7 +34,7 @@ type getSchemaResolver struct {
}

type updateGQLSchemaInput struct {
Set gqlSchema `json:"set,omitempty"`
Set worker.GqlSchema `json:"set,omitempty"`
}

type updateSchemaResolver struct {
Expand Down Expand Up @@ -88,7 +89,7 @@ func (gsr *getSchemaResolver) Resolve(ctx context.Context, q schema.Query) *reso
return resolve.EmptyResult(q, err)
}

cs := gsr.admin.schema[ns]
cs, _ := gsr.admin.gqlSchemas.GetCurrent(ns)
if cs == nil || cs.ID == "" {
data = map[string]interface{}{q.Name(): nil}
} else {
Expand Down
47 changes: 47 additions & 0 deletions worker/graphql_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,55 @@ var (
errUpdatingGraphQLSchemaOnNonGroupOneLeader = errors.New(
"while updating GraphQL schema: this server isn't group-1 leader, please retry")
ErrMultipleGraphQLSchemaNodes = errors.New("found multiple nodes for GraphQL schema")
gqlSchemaStore *GQLSchemaStore
)

type GqlSchema struct {
ID string `json:"id,omitempty"`
Schema string `json:"schema,omitempty"`
Version uint64
GeneratedSchema string
Loaded bool // This indicate whether the schema has been loaded into graphql server
// or not
}

type GQLSchemaStore struct {
mux sync.RWMutex
schema map[uint64]*GqlSchema
}

func NewGQLSchemaStore() *GQLSchemaStore {
gqlSchemaStore = &GQLSchemaStore{
mux: sync.RWMutex{},
schema: make(map[uint64]*GqlSchema),
}
return gqlSchemaStore
}

func (gs *GQLSchemaStore) Set(ns uint64, sch *GqlSchema) {
gs.mux.Lock()
defer gs.mux.Unlock()
gs.schema[ns] = sch
}

func (gs *GQLSchemaStore) GetCurrent(ns uint64) (*GqlSchema, bool) {
gs.mux.RLock()
defer gs.mux.RUnlock()
sch, ok := gs.schema[ns]
return sch, ok
}

func (gs *GQLSchemaStore) resetGQLSchema() {
gs.mux.Lock()
defer gs.mux.Unlock()

gs.schema = make(map[uint64]*GqlSchema)
}

func ResetGQLSchemaStore() {
gqlSchemaStore.resetGQLSchema()
}

// UpdateGQLSchemaOverNetwork sends the request to the group one leader for execution.
func UpdateGQLSchemaOverNetwork(ctx context.Context, req *pb.UpdateGraphQLSchemaRequest) (*pb.
UpdateGraphQLSchemaResponse, error) {
Expand Down
4 changes: 4 additions & 0 deletions worker/online_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest, pidx uin
}

ResetAclCache()
// reset gql schema
glog.Info("reseting local gql schema store")
ResetGQLSchemaStore()

// Propose a snapshot immediately after all the work is done to prevent the restore
// from being replayed.
go func(idx uint64) {
Expand Down

0 comments on commit 0f935a0

Please sign in to comment.