Skip to content

Commit

Permalink
Fix mixture (milvus-io#17332)
Browse files Browse the repository at this point in the history
Signed-off-by: shaoyue.chen <shaoyue.chen@zilliz.com>
  • Loading branch information
haorenfsa authored Jun 2, 2022
1 parent e0cbacb commit 7527285
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 27 deletions.
2 changes: 2 additions & 0 deletions cmd/milvus/milvus.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func RunMilvus(args []string) {
case typeutil.IndexNodeRole:
role.EnableIndexNode = true
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
role.HasMultipleRoles = true
role.EnableRootCoord = true
role.EnableProxy = true
role.EnableQueryCoord = true
Expand All @@ -294,6 +295,7 @@ func RunMilvus(args []string) {
role.EnableIndexNode = true
local = true
case roleMixture:
role.HasMultipleRoles = true
role.EnableRootCoord = enableRootCoord
role.EnableQueryCoord = enableQueryCoord
role.EnableDataCoord = enableDataCoord
Expand Down
65 changes: 38 additions & 27 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func stopRocksmq() {

// MilvusRoles decides which components are brought up with Milvus.
type MilvusRoles struct {
HasMultipleRoles bool
EnableRootCoord bool `env:"ENABLE_ROOT_COORD"`
EnableProxy bool `env:"ENABLE_PROXY"`
EnableQueryCoord bool `env:"ENABLE_QUERY_COORD"`
Expand Down Expand Up @@ -107,7 +108,7 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone
if err != nil {
panic(err)
}
if !localMsg {
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: rc})
}
wg.Done()
Expand Down Expand Up @@ -139,7 +140,7 @@ func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string
if err != nil {
panic(err)
}
if !localMsg {
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: pn})
}
wg.Done()
Expand Down Expand Up @@ -170,7 +171,7 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon
if err != nil {
panic(err)
}
if !localMsg {
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: qs})
}
wg.Done()
Expand Down Expand Up @@ -202,7 +203,7 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias st
if err != nil {
panic(err)
}
if !localMsg {
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: qn})
}
wg.Done()
Expand Down Expand Up @@ -235,7 +236,7 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone
if err != nil {
panic(err)
}
if !localMsg {
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: ds})
}
wg.Done()
Expand Down Expand Up @@ -267,7 +268,7 @@ func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias str
if err != nil {
panic(err)
}
if !localMsg {
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: dn})
}
wg.Done()
Expand Down Expand Up @@ -299,7 +300,7 @@ func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *compon
if err != nil {
panic(err)
}
if !localMsg {
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: is})
}
wg.Done()
Expand Down Expand Up @@ -332,7 +333,7 @@ func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, alias st
if err != nil {
panic(err)
}
if !localMsg {
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: in})
}
wg.Done()
Expand Down Expand Up @@ -448,35 +449,45 @@ func (mr *MilvusRoles) Run(local bool, alias string) {
}
}

if local {
standaloneHealthzHandler := func(w http.ResponseWriter, r *http.Request) {
if mr.HasMultipleRoles {
multiRoleHealthzHandler := func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
req := &internalpb.GetComponentStatesRequest{}
validateResp := func(resp *internalpb.ComponentStates, err error) bool {
return err == nil && resp != nil && resp.GetState().GetStateCode() == internalpb.StateCode_Healthy
}
if rc == nil || !validateResp(rc.GetComponentStates(ctx, req)) {
rootCoordNotServingHandler(w, r)
return
if mr.EnableRootCoord {
if rc == nil || !validateResp(rc.GetComponentStates(ctx, req)) {
rootCoordNotServingHandler(w, r)
return
}
}
if qs == nil || !validateResp(qs.GetComponentStates(ctx, req)) {
queryCoordNotServingHandler(w, r)
return

if mr.EnableQueryCoord {
if qs == nil || !validateResp(qs.GetComponentStates(ctx, req)) {
queryCoordNotServingHandler(w, r)
return
}
}

if ds == nil || !validateResp(ds.GetComponentStates(ctx, req)) {
dataCoordNotServingHandler(w, r)
return
if mr.EnableDataCoord {
if ds == nil || !validateResp(ds.GetComponentStates(ctx, req)) {
dataCoordNotServingHandler(w, r)
return
}
}
if is == nil || !validateResp(is.GetComponentStates(ctx, req)) {
indexCoordNotServingHandler(w, r)
return
if mr.EnableIndexCoord {
if is == nil || !validateResp(is.GetComponentStates(ctx, req)) {
indexCoordNotServingHandler(w, r)
return
}
}

if pn == nil || !validateResp(pn.GetComponentStates(ctx, req)) {
proxyNotServingHandler(w, r)
return
if mr.EnableProxy {
if pn == nil || !validateResp(pn.GetComponentStates(ctx, req)) {
proxyNotServingHandler(w, r)
return
}
}
// TODO(dragondriver): need to check node state?

Expand All @@ -490,7 +501,7 @@ func (mr *MilvusRoles) Run(local bool, alias string) {

// TODO(dragondriver): handle component states
}
http.HandleFunc(healthz.HealthzRouterPath, standaloneHealthzHandler)
http.HandleFunc(healthz.HealthzRouterPath, multiRoleHealthzHandler)
}

metrics.ServeHTTP(Registry)
Expand Down

0 comments on commit 7527285

Please sign in to comment.