From 7814c823a172852943ce01b582c61fd0a0f938a1 Mon Sep 17 00:00:00 2001 From: busgo Date: Wed, 31 Jul 2019 08:48:28 +0800 Subject: [PATCH] add failover --- README.md | 8 ++--- etcd.go | 30 +++++++++++++++++++ executor.go | 8 ++--- failover.go | 82 ++++++++++++++++++++++++++++++++++++++++++++++++++++ group.go | 12 +++++++- node.go | 3 ++ node_test.go | 43 --------------------------- protocol.go | 6 +++- scheduler.go | 2 +- util.go | 6 ++-- 10 files changed, 141 insertions(+), 59 deletions(-) create mode 100644 failover.go delete mode 100644 node_test.go diff --git a/README.md b/README.md index 0104d30..adefbf9 100644 --- a/README.md +++ b/README.md @@ -303,12 +303,8 @@ such as /forest/client/execute/snapshot/trade/192.168.1.1/201901011111111323 ## 待完善 -1. 任务故障转移 -2. 手动执行任务 -3. 任务报警 - - - +1. 手动执行任务 +2. 任务配置同步 ## 联系方式 如有问题请联系 QQ:248434199 Email:248434199@qq.com diff --git a/etcd.go b/etcd.go index 21ba0c1..c3a3a31 100644 --- a/etcd.go +++ b/etcd.go @@ -411,3 +411,33 @@ func (etcd *Etcd) TxKeepaliveWithTTL(key, value string, ttl int64) (txResponse * } return } + +// transfer from to with value +func (etcd *Etcd) transfer(from string, to string, value string) (success bool, err error) { + + var ( + txnResponse *clientv3.TxnResponse + ) + + ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout) + defer cancelFunc() + + txn := etcd.client.Txn(ctx) + + txnResponse, err = txn.If( + clientv3.Compare(clientv3.Value(from), "=", value)). + Then( + clientv3.OpDelete(from), + clientv3.OpPut(to, value), + + ).Commit() + + if err != nil { + return + } + + success = txnResponse.Succeeded + + return + +} diff --git a/executor.go b/executor.go index 35806a4..6262920 100644 --- a/executor.go +++ b/executor.go @@ -8,7 +8,7 @@ import ( const ( JobSnapshotPath = "/forest/client/snapshot/" JobSnapshotGroupPath = "/forest/client/snapshot/%s/" - JobClientSnapshotPath = "/forest/client/snapshot/%s/%s" + JobClientSnapshotPath = "/forest/client/snapshot/%s/%s/" ) type JobExecutor struct { @@ -50,10 +50,10 @@ func (exec *JobExecutor) handleJobSnapshot(snapshot *JobSnapshot) { clientName := client.name snapshot.Ip = clientName - log.Printf("clientNmae:%#v",clientName) - snapshotPath := fmt.Sprintf(JobClientSnapshotPath+"/", group, clientName) + log.Printf("clientName:%#v", clientName) + snapshotPath := fmt.Sprintf(JobClientSnapshotPath, group, clientName) - log.Printf("snapshotPath:%#v",snapshotPath) + log.Printf("snapshotPath:%#v", snapshotPath) value, err := ParkJobSnapshot(snapshot) if err != nil { log.Warnf("uPark the snapshot error:%#v", group, err) diff --git a/failover.go b/failover.go new file mode 100644 index 0000000..1618253 --- /dev/null +++ b/failover.go @@ -0,0 +1,82 @@ +package forest + +import ( + "fmt" + "github.com/labstack/gommon/log" + "time" +) + +// fail over the job snapshot when the task client + +type JobSnapshotFailOver struct { + node *JobNode + deleteClientEventChans chan *JobClientDeleteEvent +} + +// new job snapshot fail over +func NewJobSnapshotFailOver(node *JobNode) (f *JobSnapshotFailOver) { + + f = &JobSnapshotFailOver{ + node: node, + deleteClientEventChans: make(chan *JobClientDeleteEvent, 50), + } + + f.loop() + + return +} + +// loop +func (f *JobSnapshotFailOver) loop() { + + go func() { + + for ch := range f.deleteClientEventChans { + f.handleJobClientDeleteEvent(ch) + } + }() +} + +// handle job client delete event +func (f *JobSnapshotFailOver) handleJobClientDeleteEvent(event *JobClientDeleteEvent) { + + var ( + keys [][]byte + values [][]byte + err error + client *Client + success bool + ) + +RETRY: + prefixKey := fmt.Sprintf(JobClientSnapshotPath, event.Group.name, event.Client.name) + if keys, values, err = f.node.etcd.GetWithPrefixKey(prefixKey); err != nil { + log.Errorf("the fail client:%v for path:%s,error must retry", event.Client, prefixKey) + time.Sleep(time.Second) + goto RETRY + } + + if len(keys) == 0 || len(values) == 0 { + log.Warnf("the fail client:%v for path:%s is empty", event.Client, prefixKey) + return + } + + for pos := 0; pos < len(keys); pos++ { + + if client, err = event.Group.selectClient(); err != nil { + log.Warnf("%v", err) + continue + } + + to := fmt.Sprintf(JobClientSnapshotPath, event.Group.name, client.name) + + from := string(keys[pos]) + value := string(values[pos]) + // transfer the kv + if success, _ = f.node.etcd.transfer(from, to, value); success { + log.Infof("the fail client:%v for path:%s success transfer form %s to %s", event.Client, prefixKey, from, to) + } + + } + +} diff --git a/group.go b/group.go index dd3f14b..7236e99 100644 --- a/group.go +++ b/group.go @@ -285,9 +285,14 @@ func (group *Group) addClient(name, path string) { // delete a client for path func (group *Group) deleteClient(path string) { + + var ( + client *Client + ok bool + ) group.lk.Lock() defer group.lk.Unlock() - if _, ok := group.clients[path]; !ok { + if client, ok = group.clients[path]; !ok { log.Warnf("path:%s,the client not exist", path) return } @@ -295,6 +300,11 @@ func (group *Group) deleteClient(path string) { delete(group.clients, path) log.Printf("delete a client for path:%s", path) + // fail over + if group.node.state == NodeLeaderState { + group.node.failOver.deleteClientEventChans <- &JobClientDeleteEvent{Group: group, Client: client} + } + } func (group *Group) selectClient() (client *Client, err error) { diff --git a/node.go b/node.go index 950185f..28de9a2 100644 --- a/node.go +++ b/node.go @@ -29,6 +29,7 @@ type JobNode struct { exec *JobExecutor engine *xorm.Engine collection *JobCollection + failOver *JobSnapshotFailOver close chan bool } @@ -50,6 +51,8 @@ func NewJobNode(id string, etcd *Etcd, httpAddress, dbUrl string) (node *JobNode engine: engine, } + node.failOver = NewJobSnapshotFailOver(node) + node.collection = NewJobCollection(node) node.initNode() diff --git a/node_test.go b/node_test.go deleted file mode 100644 index c50fe71..0000000 --- a/node_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package forest - -import ( - "log" - "testing" - "time" -) - -func TestNewJobNode(t *testing.T) { - - etcd := InitEtcd() - - jobNode, err := NewJobNode("192.168.10.35", etcd, ":8888", time.Second*5) - if err != nil { - t.Error(err) - } - - log.Printf("the job Node:%#v", jobNode) - - go func() { - - time.Sleep(time.Second * 30) - jobNode.Close() - }() - jobNode.Bootstrap() -} -func TestNewJobNode2(t *testing.T) { - - etcd := InitEtcd() - - jobNode, err := NewJobNode("192.168.10.36", etcd, ":8887", time.Second*5) - if err != nil { - t.Error(err) - } - - log.Printf("the job Node:%#v", jobNode) - go func() { - - time.Sleep(time.Second * 30) - jobNode.Close() - }() - jobNode.Bootstrap() -} diff --git a/protocol.go b/protocol.go index e030423..63be5d7 100644 --- a/protocol.go +++ b/protocol.go @@ -34,7 +34,6 @@ const ( JobExecuteSnapshotSuccessStatus = 2 JobExecuteSnapshotUnkonwStatus = 3 JobExecuteSnapshotErrorStatus = -1 - ) // key 变化事件 @@ -59,6 +58,11 @@ type TxResponse struct { Value string } +type JobClientDeleteEvent struct { + Client *Client + Group *Group +} + // job type JobConf struct { Id string `json:"id"` diff --git a/scheduler.go b/scheduler.go index 0057e6e..157b2cb 100644 --- a/scheduler.go +++ b/scheduler.go @@ -218,7 +218,7 @@ func (sch *JobScheduler) trySchedule() time.Duration { log.Infof("schedule execute the plan:%#v", plan) snapshot := &JobSnapshot{ - Id: plan.Id + GenerateSerialNo(), + Id: GenerateSerialNo() + plan.Id, JobId: plan.Id, Name: plan.Name, Group: plan.Group, diff --git a/util.go b/util.go index 9b05766..abb9d46 100644 --- a/util.go +++ b/util.go @@ -13,7 +13,7 @@ func GenerateSerialNo() string { now := time.Now() - format := now.Format("20060101150405") + format := now.Format("20060102150405") suffer := fmt.Sprintf("%06v", rand.New(rand.NewSource(time.Now().UnixNano())).Int31n(1000000)) @@ -23,7 +23,7 @@ func GenerateSerialNo() string { func ToDateString(date time.Time) string { - return date.Format("2006-01-01 15:04:05") + return date.Format("2006-01-02 15:04:05") } func ParkJobConf(jobConf *JobConf) (value []byte, err error) { @@ -133,6 +133,6 @@ func TimeSubDays(t1, t2 time.Time) int { func ParseInLocation(value string) (dateTime time.Time, err error) { - dateTime, err = time.Parse("2006-01-01 15:04:05", value) + dateTime, err = time.Parse("2006-01-02 15:04:05", value) return }