Skip to content

Commit

Permalink
async rpc in put message to cluster
Browse files Browse the repository at this point in the history
Signed-off-by: 元守 <lulin@youzan.com>

async in put messages to cluster

Signed-off-by: 元守 <lulin@youzan.com>

remove debug log

Signed-off-by: 元守 <lulin@youzan.com>

one doSyncOpToCluster

Signed-off-by: 元守 <lulin@youzan.com>
  • Loading branch information
元守 committed May 17, 2017
1 parent 656a436 commit 32a31e0
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 50 deletions.
1 change: 1 addition & 0 deletions consistence/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ var (
ErrLocalInitTopicFailed = NewCoordErr("local topic init failed", CoordLocalErr)
ErrLocalInitTopicCoordFailed = NewCoordErr("topic coordinator init failed", CoordLocalErr)
ErrLocalTopicDataCorrupt = NewCoordErr("local topic data corrupt", CoordLocalErr)
ErrClusterSyncTimeout = NewCoordErr("cluster sync timeout ", CoordNetErr)
)

func GenNsqdNodeID(n *NsqdNodeInfo, extra string) string {
Expand Down
34 changes: 25 additions & 9 deletions consistence/coordinator_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,36 @@ const (
RpcErrWriteOnNonISR
)

type SlaveAsyncWriteResult struct {
ret gorpc.AsyncResult
type SlaveWriteResult struct {
Ret *gorpc.AsyncResult
SyncCoordErr *CoordErr
}

func (self *SlaveAsyncWriteResult) Wait() {
<-self.ret.Done
func (self *SlaveWriteResult) wait(ticker *time.Ticker) *CoordErr {
select {
case <-ticker.C:
coordLog.Infof("sync operation timeout")
return ErrClusterSyncTimeout
case <-self.Ret.Done:
return nil
}
}

func (self *SlaveAsyncWriteResult) GetResult() *CoordErr {
coordErr, ok := self.ret.Response.(*CoordErr)
if ok {
return convertRpcError(self.ret.Error, coordErr)
func (self *SlaveWriteResult) GetResult(ticker *time.Ticker) *CoordErr {
if self.Ret != nil {
var coordErr *CoordErr
if coordErr = self.wait(ticker); coordErr != nil {
return convertRpcError(nil, coordErr)
}

coordErr, ok := self.Ret.Response.(*CoordErr)
if ok {
return convertRpcError(self.Ret.Error, coordErr)
} else {
return convertRpcError(self.Ret.Error, nil)
}
} else {
return convertRpcError(self.ret.Error, nil)
return convertRpcError(nil, self.SyncCoordErr)
}
}

Expand Down
80 changes: 45 additions & 35 deletions consistence/nsqd_coordinator_cluster_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ type localExitFunc func(*CoordErr)
type localCommitFunc func() error
type localRollbackFunc func()
type refreshCoordFunc func(*coordData) *CoordErr
type slaveSyncFunc func(*NsqdRpcClient, string, *coordData) *CoordErr
type slaveAsyncFunc func(*NsqdRpcClient, string, *coordData) *SlaveAsyncWriteResult
//return SlaveAsyncWriteResult and CoordErr, if rpc call in sync function is async rpc,future result returns in
// *SlaveAsyncWriteResult, else *SlaveAsyncWriteResult should always == nil
type slaveSyncFunc func(*NsqdRpcClient, string, *coordData) *SlaveWriteResult

type handleSyncResultFunc func(int, *coordData) bool

Expand Down Expand Up @@ -94,14 +95,9 @@ func (self *NsqdCoordinator) PutMessageToCluster(topic *nsqd.Topic,
self.requestNotifyNewTopicInfo(d.topicInfo.Name, d.topicInfo.Partition)
return nil
}
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *CoordErr {
doSlaveAsync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) (*SlaveWriteResult) {
// should retry if failed, and the slave should keep the last success write to avoid the duplicated
putErr := c.PutMessage(&tcData.topicLeaderSession, &tcData.topicInfo, commitLog, msg)
if putErr != nil {
coordLog.Infof("sync write to replica %v failed: %v. put offset:%v, logmgr: %v, %v",
nodeID, putErr, commitLog, logMgr.pLogID, logMgr.nLogID)
}
return putErr
return c.PutMessageAsync(&tcData.topicLeaderSession, &tcData.topicInfo, commitLog, msg)
}
handleSyncResult := func(successNum int, tcData *coordData) bool {
if successNum == len(tcData.topicInfo.ISR) {
Expand All @@ -117,7 +113,7 @@ func (self *NsqdCoordinator) PutMessageToCluster(topic *nsqd.Topic,
}

clusterErr := self.doSyncOpToCluster(true, coord, doLocalWrite, doLocalExit, doLocalCommit, doLocalRollback,
doRefresh, doSlaveSync, handleSyncResult)
doRefresh, doSlaveAsync, handleSyncResult)

var err error
if clusterErr != nil {
Expand Down Expand Up @@ -201,14 +197,9 @@ func (self *NsqdCoordinator) PutMessagesToCluster(topic *nsqd.Topic,
self.requestNotifyNewTopicInfo(d.topicInfo.Name, d.topicInfo.Partition)
return nil
}
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *CoordErr {
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *SlaveWriteResult {
// should retry if failed, and the slave should keep the last success write to avoid the duplicated
putErr := c.PutMessages(&tcData.topicLeaderSession, &tcData.topicInfo, commitLog, msgs)
if putErr != nil {
coordLog.Infof("sync write to replica %v failed: %v, put offset: %v, logmgr: %v, %v",
nodeID, putErr, commitLog, logMgr.pLogID, logMgr.nLogID)
}
return putErr
return c.PutMessagesAsync(&tcData.topicLeaderSession, &tcData.topicInfo, commitLog, msgs)
}
handleSyncResult := func(successNum int, tcData *coordData) bool {
if successNum == len(tcData.topicInfo.ISR) {
Expand All @@ -225,6 +216,7 @@ func (self *NsqdCoordinator) PutMessagesToCluster(topic *nsqd.Topic,
clusterErr := self.doSyncOpToCluster(true, coord, doLocalWrite, doLocalExit, doLocalCommit, doLocalRollback,
doRefresh, doSlaveSync, handleSyncResult)


var err error
if clusterErr != nil {
err = clusterErr.ToErrorType()
Expand All @@ -237,8 +229,8 @@ func (self *NsqdCoordinator) PutMessagesToCluster(topic *nsqd.Topic,
}

func (self *NsqdCoordinator) doSyncOpToCluster(isWrite bool, coord *TopicCoordinator, doLocalWrite localWriteFunc,
doLocalExit localExitFunc, doLocalCommit localCommitFunc, doLocalRollback localRollbackFunc,
doRefresh refreshCoordFunc, doSlaveSync slaveSyncFunc, handleSyncResult handleSyncResultFunc) *CoordErr {
doLocalExit localExitFunc, doLocalCommit localCommitFunc, doLocalRollback localRollbackFunc,
doRefresh refreshCoordFunc, doSlaveSync slaveSyncFunc, handleSyncResult handleSyncResultFunc) *CoordErr {

if isWrite {
coord.writeHold.Lock()
Expand Down Expand Up @@ -279,6 +271,16 @@ func (self *NsqdCoordinator) doSyncOpToCluster(isWrite bool, coord *TopicCoordin
failedNodes := make(map[string]struct{})
retryCnt := uint32(0)
exitErr := 0
//totalTimeout = syncTimeout * #replica
totalTimeout := time.Millisecond * 1
//replica synchronization timeout per replica
syncTimeout := time.Second * 3
var timeoutTick *time.Ticker
type RpcResp struct {
nodeId string
ret *SlaveWriteResult
}
var rpcResps []RpcResp

localErr := doLocalWrite(tcData)
if localErr != nil {
Expand All @@ -287,7 +289,7 @@ func (self *NsqdCoordinator) doSyncOpToCluster(isWrite bool, coord *TopicCoordin
}
needLeaveISR = true

retrysync:
retrysync:
if retryCnt > MAX_WRITE_RETRY {
coordLog.Warningf("retrying times is large: %v", retryCnt)
needRefreshISR = true
Expand Down Expand Up @@ -318,15 +320,12 @@ retrysync:
}
}
success = 0
failedNodes = make(map[string]struct{})
retryCnt++

// send message to slaves with current topic epoch
// replica should check if offset matching. If not matched the replica should leave the ISR list.
// also, the coordinator should retry on fail until all nodes in ISR success.
// If failed, should update ISR and retry.
// write epoch should keep the same (ignore epoch change during write)
// TODO: optimize send all requests first and then wait all responses
exitErr = 0
for _, nodeID := range tcData.topicInfo.ISR {
if nodeID == self.myNode.GetID() {
Expand All @@ -341,12 +340,15 @@ retrysync:
failedNodes[nodeID] = struct{}{}
continue
}

var start time.Time
if checkCost {
start = time.Now()
}
// should retry if failed, and the slave should keep the last success write to avoid the duplicated
rpcErr = doSlaveSync(c, nodeID, tcData)
retSlave := doSlaveSync(c, nodeID, tcData)
rpcResps = append(rpcResps, RpcResp{nodeID, retSlave})
totalTimeout = totalTimeout + syncTimeout
if checkCost {
cost := time.Since(start)
if cost > time.Millisecond*3 {
Expand All @@ -356,6 +358,14 @@ retrysync:
coordLog.Warningf("slave(%v) sync cost: %v, start: %v, end: %v", nodeID, cost, start, time.Now())
}
}
}
timeoutTick = time.NewTicker(totalTimeout)
for _, rpcResp := range rpcResps {
nodeID := rpcResp.nodeId
retSlave := rpcResp.ret

rpcErr := retSlave.GetResult(timeoutTick)

if rpcErr == nil {
success++
} else {
Expand All @@ -365,15 +375,15 @@ retrysync:
if !rpcErr.CanRetryWrite(int(retryCnt)) {
exitErr++
coordLog.Infof("operation failed and no retry type: %v, %v", rpcErr.ErrType, exitErr)
if exitErr > len(tcData.topicInfo.ISR)/2 {
if int(exitErr) > len(tcData.topicInfo.ISR)/2 {
needLeaveISR = true
goto exitsync
}
}
}
}

if handleSyncResult(success, tcData) {
if handleSyncResult(int(success), tcData) {
localErr := doLocalCommit()
if localErr != nil {
coordLog.Errorf("topic : %v failed commit operation: %v", topicFullName, localErr)
Expand All @@ -385,7 +395,7 @@ retrysync:
}
} else {
coordLog.Warningf("topic %v sync operation failed since no enough success: %v", topicFullName, success)
if success > tcData.topicInfo.Replica/2 {
if int(success) > tcData.topicInfo.Replica/2 {
needLeaveISR = false
if retryCnt > MAX_WRITE_RETRY {
// request lookup to remove the failed nodes from isr and keep the quorum alive.
Expand Down Expand Up @@ -738,16 +748,16 @@ func (self *NsqdCoordinator) SetChannelConsumeOffsetToCluster(ch *nsqd.Channel,
doRefresh := func(d *coordData) *CoordErr {
return nil
}
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *CoordErr {
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) (*SlaveWriteResult) {
if ch.IsEphemeral() {
return nil
return &SlaveWriteResult{nil, nil}
}
rpcErr := c.UpdateChannelOffset(&tcData.topicLeaderSession, &tcData.topicInfo, ch.GetName(), syncOffset)
if rpcErr != nil {
coordLog.Infof("sync channel(%v) offset to replica %v failed: %v, offset: %v", ch.GetName(),
nodeID, rpcErr, syncOffset)
}
return rpcErr
return &SlaveWriteResult{nil, rpcErr}
}
handleSyncResult := func(successNum int, tcData *coordData) bool {
if successNum == len(tcData.topicInfo.ISR) {
Expand Down Expand Up @@ -812,9 +822,9 @@ func (self *NsqdCoordinator) FinishMessageToCluster(channel *nsqd.Channel, clien
doRefresh := func(d *coordData) *CoordErr {
return nil
}
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *CoordErr {
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *SlaveWriteResult {
if !changed || channel.IsEphemeral() {
return nil
return &SlaveWriteResult{nil, nil}
}
var rpcErr *CoordErr
if channel.IsOrdered() {
Expand All @@ -827,7 +837,7 @@ func (self *NsqdCoordinator) FinishMessageToCluster(channel *nsqd.Channel, clien
coordLog.Infof("sync channel(%v) offset to replica %v failed: %v, offset: %v", channel.GetName(),
nodeID, rpcErr, syncOffset)
}
return rpcErr
return &SlaveWriteResult{nil, rpcErr}
}
handleSyncResult := func(successNum int, tcData *coordData) bool {
// we can ignore the error if this channel is not ordered. (just sync next time)
Expand Down Expand Up @@ -940,13 +950,13 @@ func (self *NsqdCoordinator) DeleteChannel(topic *nsqd.Topic, channelName string
doRefresh := func(d *coordData) *CoordErr {
return nil
}
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *CoordErr {
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *SlaveWriteResult {
rpcErr := c.DeleteChannel(&tcData.topicLeaderSession, &tcData.topicInfo, channelName)
if rpcErr != nil {
coordLog.Infof("delete channel(%v) to replica %v failed: %v", channelName,
nodeID, rpcErr)
}
return rpcErr
return &SlaveWriteResult{nil, rpcErr}
}
handleSyncResult := func(successNum int, tcData *coordData) bool {
// we can ignore the error if this channel is not ordered. (just sync next time)
Expand Down
59 changes: 59 additions & 0 deletions consistence/nsqd_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,8 @@ func benchmarkNsqdCoordPubWithArg(b *testing.B, replica int, size int) {

var nsqdCoord2 *NsqdCoordinator
var nsqdCoord3 *NsqdCoordinator
var nsqdCoord4 *NsqdCoordinator
var nsqdCoord5 *NsqdCoordinator
if replica >= 2 {
nsqd2, randPort2, _, data2 := newNsqdNode(nil, "id2")

Expand All @@ -1422,6 +1424,27 @@ func benchmarkNsqdCoordPubWithArg(b *testing.B, replica int, size int) {
nsqdCoord3.Start()
defer nsqdCoord3.Stop()

}
if replica >= 4 {
nsqd4, randPort4, _, data4 := newNsqdNode(nil, "id4")
defer os.RemoveAll(data4)
defer nsqd4.Exit()
nsqdCoord4 = startNsqdCoord(nil, strconv.Itoa(randPort4), data4, "id4", nsqd4, true)
nsqdCoord4.enableBenchCost = true
nsqdCoord4.Start()
defer nsqdCoord4.Stop()

}

if replica >= 5 {
nsqd5, randPort5, _, data5 := newNsqdNode(nil, "id5")
defer os.RemoveAll(data5)
defer nsqd5.Exit()
nsqdCoord5 = startNsqdCoord(nil, strconv.Itoa(randPort5), data5, "id5", nsqd5, true)
nsqdCoord5.enableBenchCost = true
nsqdCoord5.Start()
defer nsqdCoord5.Stop()

}

topicDataList := make([]*nsqdNs.Topic, 0)
Expand All @@ -1439,6 +1462,12 @@ func benchmarkNsqdCoordPubWithArg(b *testing.B, replica int, size int) {
if replica >= 3 {
topicInitInfo.ISR = append(topicInitInfo.ISR, nsqdCoord3.myNode.GetID())
}
if replica >= 4 {
topicInitInfo.ISR = append(topicInitInfo.ISR, nsqdCoord4.myNode.GetID())
}
if replica >= 5 {
topicInitInfo.ISR = append(topicInitInfo.ISR, nsqdCoord5.myNode.GetID())
}
topicInitInfo.Leader = nsqdCoord1.myNode.GetID()
topicInitInfo.Replica = replica
ensureTopicOnNsqdCoord(nsqdCoord1, topicInitInfo)
Expand All @@ -1448,6 +1477,12 @@ func benchmarkNsqdCoordPubWithArg(b *testing.B, replica int, size int) {
if replica >= 3 {
ensureTopicOnNsqdCoord(nsqdCoord3, topicInitInfo)
}
if replica >= 4 {
ensureTopicOnNsqdCoord(nsqdCoord4, topicInitInfo)
}
if replica >= 5 {
ensureTopicOnNsqdCoord(nsqdCoord5, topicInitInfo)
}
leaderSession := &TopicLeaderSession{
LeaderNode: nodeInfo1,
LeaderEpoch: 1,
Expand All @@ -1464,6 +1499,14 @@ func benchmarkNsqdCoordPubWithArg(b *testing.B, replica int, size int) {
ensureTopicLeaderSession(nsqdCoord3, topic, partition, leaderSession)
ensureTopicDisableWrite(nsqdCoord3, topic, partition, false)
}
if replica >= 4 {
ensureTopicLeaderSession(nsqdCoord4, topic, partition, leaderSession)
ensureTopicDisableWrite(nsqdCoord4, topic, partition, false)
}
if replica >= 5 {
ensureTopicLeaderSession(nsqdCoord5, topic, partition, leaderSession)
ensureTopicDisableWrite(nsqdCoord5, topic, partition, false)
}
msg := make([]byte, size)
// check if write ok
topicData := nsqd1.GetTopic(topic, partition)
Expand Down Expand Up @@ -1524,3 +1567,19 @@ func BenchmarkNsqdCoordPub2Replicator1024(b *testing.B) {
func BenchmarkNsqdCoordPub3Replicator1024(b *testing.B) {
benchmarkNsqdCoordPubWithArg(b, 3, 1024)
}

func BenchmarkNsqdCoordPub5Replicator128(b *testing.B) {
benchmarkNsqdCoordPubWithArg(b, 5, 128)
}

func BenchmarkNsqdCoordPub5Replicator1024(b *testing.B) {
benchmarkNsqdCoordPubWithArg(b, 5, 1024)
}

func BenchmarkNsqdCoordPub4Replicator128(b *testing.B) {
benchmarkNsqdCoordPubWithArg(b, 4, 128)
}

func BenchmarkNsqdCoordPub4Replicator1024(b *testing.B) {
benchmarkNsqdCoordPubWithArg(b, 4, 1024)
}
Loading

0 comments on commit 32a31e0

Please sign in to comment.