Skip to content

Commit

Permalink
add failover
Browse files Browse the repository at this point in the history
  • Loading branch information
busgo committed Jul 31, 2019
1 parent 16fed4e commit 7814c82
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 59 deletions.
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,8 @@ such as /forest/client/execute/snapshot/trade/192.168.1.1/201901011111111323

## 待完善

1. 任务故障转移
2. 手动执行任务
3. 任务报警



1. 手动执行任务
2. 任务配置同步
## 联系方式

如有问题请联系 QQ:248434199 Email:248434199@qq.com
Expand Down
30 changes: 30 additions & 0 deletions etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,33 @@ func (etcd *Etcd) TxKeepaliveWithTTL(key, value string, ttl int64) (txResponse *
}
return
}

// transfer from to with value
func (etcd *Etcd) transfer(from string, to string, value string) (success bool, err error) {

var (
txnResponse *clientv3.TxnResponse
)

ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout)
defer cancelFunc()

txn := etcd.client.Txn(ctx)

txnResponse, err = txn.If(
clientv3.Compare(clientv3.Value(from), "=", value)).
Then(
clientv3.OpDelete(from),
clientv3.OpPut(to, value),

).Commit()

if err != nil {
return
}

success = txnResponse.Succeeded

return

}
8 changes: 4 additions & 4 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
const (
JobSnapshotPath = "/forest/client/snapshot/"
JobSnapshotGroupPath = "/forest/client/snapshot/%s/"
JobClientSnapshotPath = "/forest/client/snapshot/%s/%s"
JobClientSnapshotPath = "/forest/client/snapshot/%s/%s/"
)

type JobExecutor struct {
Expand Down Expand Up @@ -50,10 +50,10 @@ func (exec *JobExecutor) handleJobSnapshot(snapshot *JobSnapshot) {
clientName := client.name
snapshot.Ip = clientName

log.Printf("clientNmae:%#v",clientName)
snapshotPath := fmt.Sprintf(JobClientSnapshotPath+"/", group, clientName)
log.Printf("clientName:%#v", clientName)
snapshotPath := fmt.Sprintf(JobClientSnapshotPath, group, clientName)

log.Printf("snapshotPath:%#v",snapshotPath)
log.Printf("snapshotPath:%#v", snapshotPath)
value, err := ParkJobSnapshot(snapshot)
if err != nil {
log.Warnf("uPark the snapshot error:%#v", group, err)
Expand Down
82 changes: 82 additions & 0 deletions failover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package forest

import (
"fmt"
"github.com/labstack/gommon/log"
"time"
)

// fail over the job snapshot when the task client

type JobSnapshotFailOver struct {
node *JobNode
deleteClientEventChans chan *JobClientDeleteEvent
}

// new job snapshot fail over
func NewJobSnapshotFailOver(node *JobNode) (f *JobSnapshotFailOver) {

f = &JobSnapshotFailOver{
node: node,
deleteClientEventChans: make(chan *JobClientDeleteEvent, 50),
}

f.loop()

return
}

// loop
func (f *JobSnapshotFailOver) loop() {

go func() {

for ch := range f.deleteClientEventChans {
f.handleJobClientDeleteEvent(ch)
}
}()
}

// handle job client delete event
func (f *JobSnapshotFailOver) handleJobClientDeleteEvent(event *JobClientDeleteEvent) {

var (
keys [][]byte
values [][]byte
err error
client *Client
success bool
)

RETRY:
prefixKey := fmt.Sprintf(JobClientSnapshotPath, event.Group.name, event.Client.name)
if keys, values, err = f.node.etcd.GetWithPrefixKey(prefixKey); err != nil {
log.Errorf("the fail client:%v for path:%s,error must retry", event.Client, prefixKey)
time.Sleep(time.Second)
goto RETRY
}

if len(keys) == 0 || len(values) == 0 {
log.Warnf("the fail client:%v for path:%s is empty", event.Client, prefixKey)
return
}

for pos := 0; pos < len(keys); pos++ {

if client, err = event.Group.selectClient(); err != nil {
log.Warnf("%v", err)
continue
}

to := fmt.Sprintf(JobClientSnapshotPath, event.Group.name, client.name)

from := string(keys[pos])
value := string(values[pos])
// transfer the kv
if success, _ = f.node.etcd.transfer(from, to, value); success {
log.Infof("the fail client:%v for path:%s success transfer form %s to %s", event.Client, prefixKey, from, to)
}

}

}
12 changes: 11 additions & 1 deletion group.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,16 +285,26 @@ func (group *Group) addClient(name, path string) {

// delete a client for path
func (group *Group) deleteClient(path string) {

var (
client *Client
ok bool
)
group.lk.Lock()
defer group.lk.Unlock()
if _, ok := group.clients[path]; !ok {
if client, ok = group.clients[path]; !ok {
log.Warnf("path:%s,the client not exist", path)
return
}

delete(group.clients, path)
log.Printf("delete a client for path:%s", path)

// fail over
if group.node.state == NodeLeaderState {
group.node.failOver.deleteClientEventChans <- &JobClientDeleteEvent{Group: group, Client: client}
}

}

func (group *Group) selectClient() (client *Client, err error) {
Expand Down
3 changes: 3 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type JobNode struct {
exec *JobExecutor
engine *xorm.Engine
collection *JobCollection
failOver *JobSnapshotFailOver
close chan bool
}

Expand All @@ -50,6 +51,8 @@ func NewJobNode(id string, etcd *Etcd, httpAddress, dbUrl string) (node *JobNode
engine: engine,
}

node.failOver = NewJobSnapshotFailOver(node)

node.collection = NewJobCollection(node)

node.initNode()
Expand Down
43 changes: 0 additions & 43 deletions node_test.go

This file was deleted.

6 changes: 5 additions & 1 deletion protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const (
JobExecuteSnapshotSuccessStatus = 2
JobExecuteSnapshotUnkonwStatus = 3
JobExecuteSnapshotErrorStatus = -1

)

// key 变化事件
Expand All @@ -59,6 +58,11 @@ type TxResponse struct {
Value string
}

type JobClientDeleteEvent struct {
Client *Client
Group *Group
}

// job
type JobConf struct {
Id string `json:"id"`
Expand Down
2 changes: 1 addition & 1 deletion scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (sch *JobScheduler) trySchedule() time.Duration {
log.Infof("schedule execute the plan:%#v", plan)

snapshot := &JobSnapshot{
Id: plan.Id + GenerateSerialNo(),
Id: GenerateSerialNo() + plan.Id,
JobId: plan.Id,
Name: plan.Name,
Group: plan.Group,
Expand Down
6 changes: 3 additions & 3 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func GenerateSerialNo() string {

now := time.Now()

format := now.Format("20060101150405")
format := now.Format("20060102150405")

suffer := fmt.Sprintf("%06v", rand.New(rand.NewSource(time.Now().UnixNano())).Int31n(1000000))

Expand All @@ -23,7 +23,7 @@ func GenerateSerialNo() string {

func ToDateString(date time.Time) string {

return date.Format("2006-01-01 15:04:05")
return date.Format("2006-01-02 15:04:05")
}
func ParkJobConf(jobConf *JobConf) (value []byte, err error) {

Expand Down Expand Up @@ -133,6 +133,6 @@ func TimeSubDays(t1, t2 time.Time) int {

func ParseInLocation(value string) (dateTime time.Time, err error) {

dateTime, err = time.Parse("2006-01-01 15:04:05", value)
dateTime, err = time.Parse("2006-01-02 15:04:05", value)
return
}

0 comments on commit 7814c82

Please sign in to comment.