Skip to content

Commit

Permalink
Add a stop hook to clean session (#27565)
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <yun.zhang@zilliz.com>
  • Loading branch information
jaime0815 authored Oct 13, 2023
1 parent 350fcc0 commit 6fee34f
Show file tree
Hide file tree
Showing 18 changed files with 386 additions and 132 deletions.
47 changes: 46 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,56 @@
package main

import (
"log"
"os"
"os/exec"

"golang.org/x/exp/slices"

"github.com/milvus-io/milvus/cmd/milvus"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/sessionutil"
)

func main() {
milvus.RunMilvus(os.Args)
idx := slices.Index(os.Args, "--run-with-subprocess")

// execute command as a subprocess if the command contains "--run-with-subprocess"
if idx > 0 {
args := slices.Delete(os.Args, idx, idx+1)
log.Println("run subprocess with cmd:", args)

/* #nosec G204 */
cmd := exec.Command(args[0], args[1:]...)

cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

// No need to extra wait for the process
err := cmd.Run()

var params paramtable.ComponentParam
params.Init()

if len(args) >= 3 {
metaPath := params.EtcdCfg.MetaRootPath
endpoints := params.EtcdCfg.Endpoints

sessionSuffix := sessionutil.GetSessions(cmd.Process.Pid)
defer sessionutil.RemoveServerInfoFile(cmd.Process.Pid)

// clean session
if err := milvus.CleanSession(metaPath, endpoints, sessionSuffix); err != nil {
log.Println("clean session failed", err.Error())
}
}

if err != nil {
log.Println("subprocess exit, ", err.Error())
} else {
log.Println("exit code:", cmd.ProcessState.ExitCode())
}
} else {
milvus.RunMilvus(os.Args)
}
}
5 changes: 5 additions & 0 deletions cmd/milvus/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)

const (
RunCmd = "run"
RoleMixture = "mixture"
)

var (
usageLine = fmt.Sprintf("Usage:\n"+
"%s\n%s\n%s\n%s\n", runLine, stopLine, mckLine, serverTypeLine)
Expand Down
105 changes: 9 additions & 96 deletions cmd/milvus/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,102 +10,37 @@ import (

"go.uber.org/zap"

"github.com/milvus-io/milvus/cmd/roles"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
)

const (
RunCmd = "run"
roleMixture = "mixture"
)

type run struct {
serverType string
// flags
svrAlias string
enableRootCoord bool
enableQueryCoord bool
enableIndexCoord bool
enableDataCoord bool
enableQueryNode bool
enableDataNode bool
enableIndexNode bool
enableProxy bool
}

func (c *run) getHelp() string {
return runLine + "\n" + serverTypeLine
}

func (c *run) execute(args []string, flags *flag.FlagSet) {
if len(args) < 3 {
fmt.Fprintln(os.Stderr, c.getHelp())
fmt.Fprintln(os.Stderr, getHelp())
return
}
flags.Usage = func() {
fmt.Fprintln(os.Stderr, c.getHelp())
fmt.Fprintln(os.Stderr, getHelp())
}
c.serverType = args[2]
c.formatFlags(args, flags)

// make go ignore SIGPIPE when all cgo thread set mask SIGPIPE
signal.Ignore(syscall.SIGPIPE)

var local = false
role := roles.NewMilvusRoles()
switch c.serverType {
case typeutil.RootCoordRole:
role.EnableRootCoord = true
case typeutil.ProxyRole:
role.EnableProxy = true
case typeutil.QueryCoordRole:
role.EnableQueryCoord = true
case typeutil.QueryNodeRole:
role.EnableQueryNode = true
case typeutil.DataCoordRole:
role.EnableDataCoord = true
case typeutil.DataNodeRole:
role.EnableDataNode = true
case typeutil.IndexCoordRole:
role.EnableIndexCoord = true
case typeutil.IndexNodeRole:
role.EnableIndexNode = true
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
role.EnableRootCoord = true
role.EnableProxy = true
role.EnableQueryCoord = true
role.EnableQueryNode = true
role.EnableDataCoord = true
role.EnableDataNode = true
role.EnableIndexCoord = true
role.EnableIndexNode = true
local = true
case roleMixture:
role.EnableRootCoord = c.enableRootCoord
role.EnableQueryCoord = c.enableQueryCoord
role.EnableDataCoord = c.enableDataCoord
role.EnableIndexCoord = c.enableIndexCoord
role.EnableQueryNode = c.enableQueryNode
role.EnableDataNode = c.enableDataNode
role.EnableIndexNode = c.enableIndexNode
role.EnableProxy = c.enableProxy
default:
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", c.serverType, c.getHelp())
os.Exit(-1)
}

serverType := args[2]
roles := GetMilvusRoles(args, flags)
// setup config for embedded milvus
if c.serverType == typeutil.EmbeddedRole {

if serverType == typeutil.EmbeddedRole {
var params paramtable.BaseTable
params.GlobalInitWithYaml("embedded-milvus.yaml")
}

runtimeDir := createRuntimeDir(c.serverType)
filename := getPidFileName(c.serverType, c.svrAlias)
runtimeDir := createRuntimeDir(serverType)
filename := getPidFileName(serverType, roles.Alias)

c.printBanner(flags.Output())
c.injectVariablesToEnv()
Expand All @@ -114,29 +49,7 @@ func (c *run) execute(args []string, flags *flag.FlagSet) {
panic(err)
}
defer removePidFile(lock)
role.Run(local, c.svrAlias)
}

func (c *run) formatFlags(args []string, flags *flag.FlagSet) {
flags.StringVar(&c.svrAlias, "alias", "", "set alias")

flags.BoolVar(&c.enableRootCoord, typeutil.RootCoordRole, false, "enable root coordinator")
flags.BoolVar(&c.enableQueryCoord, typeutil.QueryCoordRole, false, "enable query coordinator")
flags.BoolVar(&c.enableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator")
flags.BoolVar(&c.enableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator")

flags.BoolVar(&c.enableQueryNode, typeutil.QueryNodeRole, false, "enable query node")
flags.BoolVar(&c.enableDataNode, typeutil.DataNodeRole, false, "enable data node")
flags.BoolVar(&c.enableIndexNode, typeutil.IndexNodeRole, false, "enable index node")
flags.BoolVar(&c.enableProxy, typeutil.ProxyRole, false, "enable proxy node")

if c.serverType == typeutil.EmbeddedRole {
flags.SetOutput(io.Discard)
}
hardware.InitMaxprocs(c.serverType, flags)
if err := flags.Parse(args[3:]); err != nil {
os.Exit(-1)
}
roles.Run()
}

func (c *run) printBanner(w io.Writer) {
Expand Down
126 changes: 126 additions & 0 deletions cmd/milvus/util.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package milvus

import (
"context"
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"runtime"
"time"

"github.com/gofrs/flock"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"

"github.com/milvus-io/milvus/cmd/roles"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)

Expand Down Expand Up @@ -106,3 +116,119 @@ func removePidFile(lock *flock.Flock) {
lock.Close()
os.Remove(filename)
}

func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
alias, enableRootCoord, enableQueryCoord, enableIndexCoord, enableDataCoord, enableQueryNode,
enableDataNode, enableIndexNode, enableProxy := formatFlags(args, flags)

serverType := args[2]
role := roles.NewMilvusRoles()
role.Alias = alias

switch serverType {
case typeutil.RootCoordRole:
role.EnableRootCoord = true
case typeutil.ProxyRole:
role.EnableProxy = true
case typeutil.QueryCoordRole:
role.EnableQueryCoord = true
case typeutil.QueryNodeRole:
role.EnableQueryNode = true
case typeutil.DataCoordRole:
role.EnableDataCoord = true
case typeutil.DataNodeRole:
role.EnableDataNode = true
case typeutil.IndexCoordRole:
role.EnableIndexCoord = true
case typeutil.IndexNodeRole:
role.EnableIndexNode = true
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
role.EnableRootCoord = true
role.EnableProxy = true
role.EnableQueryCoord = true
role.EnableQueryNode = true
role.EnableDataCoord = true
role.EnableDataNode = true
role.EnableIndexCoord = true
role.EnableIndexNode = true
role.LocalMode = true
case RoleMixture:
role.EnableRootCoord = enableRootCoord
role.EnableQueryCoord = enableQueryCoord
role.EnableDataCoord = enableDataCoord
role.EnableIndexCoord = enableIndexCoord
role.EnableQueryNode = enableQueryNode
role.EnableDataNode = enableDataNode
role.EnableIndexNode = enableIndexNode
role.EnableProxy = enableProxy
default:
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp())
os.Exit(-1)
}
return role
}

func formatFlags(args []string, flags *flag.FlagSet) (alias string, enableRootCoord, enableQueryCoord,
enableIndexCoord, enableDataCoord, enableQueryNode, enableDataNode, enableIndexNode, enableProxy bool) {
flags.StringVar(&alias, "alias", "", "set alias")

flags.BoolVar(&enableRootCoord, typeutil.RootCoordRole, false, "enable root coordinator")
flags.BoolVar(&enableQueryCoord, typeutil.QueryCoordRole, false, "enable query coordinator")
flags.BoolVar(&enableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator")
flags.BoolVar(&enableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator")

flags.BoolVar(&enableQueryNode, typeutil.QueryNodeRole, false, "enable query node")
flags.BoolVar(&enableDataNode, typeutil.DataNodeRole, false, "enable data node")
flags.BoolVar(&enableIndexNode, typeutil.IndexNodeRole, false, "enable index node")
flags.BoolVar(&enableProxy, typeutil.ProxyRole, false, "enable proxy node")

serverType := args[2]
if serverType == typeutil.EmbeddedRole {
flags.SetOutput(io.Discard)
}
hardware.InitMaxprocs(serverType, flags)
if err := flags.Parse(args[3:]); err != nil {
os.Exit(-1)
}
return
}

func getHelp() string {
return runLine + "\n" + serverTypeLine
}

func CleanSession(metaPath string, etcdEndpoints []string, sessionSuffix []string) error {
if len(sessionSuffix) == 0 {
log.Warn("not found session info , skip to clean sessions")
return nil
}

keys := getSessionPaths(metaPath, sessionSuffix)
if len(keys) == 0 {
return nil
}

etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints)
if err != nil {
return err
}
defer etcdCli.Close()

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
for _, key := range keys {
_, _ = etcdCli.Delete(ctx, key, clientv3.WithPrefix())
}
log.Info("clean sessions from etcd", zap.Any("keys", keys))
return nil
}

func getSessionPaths(metaPath string, sessionSuffix []string) []string {
sessionKeys := make([]string, 0)
sessionPathPrefix := path.Join(metaPath, sessionutil.DefaultServiceRoot)
for _, suffix := range sessionSuffix {
key := path.Join(sessionPathPrefix, suffix)
sessionKeys = append(sessionKeys, key)
}
return sessionKeys
}
Loading

0 comments on commit 6fee34f

Please sign in to comment.