Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async rpc in put message to cluster #24

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions consistence/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ var (
ErrLocalInitTopicFailed = NewCoordErr("local topic init failed", CoordLocalErr)
ErrLocalInitTopicCoordFailed = NewCoordErr("topic coordinator init failed", CoordLocalErr)
ErrLocalTopicDataCorrupt = NewCoordErr("local topic data corrupt", CoordLocalErr)
ErrClusterSyncTimeout = NewCoordErr("cluster sync timeout ", CoordNetErr)
)

func GenNsqdNodeID(n *NsqdNodeInfo, extra string) string {
Expand Down
34 changes: 25 additions & 9 deletions consistence/coordinator_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,36 @@ const (
RpcErrWriteOnNonISR
)

type SlaveAsyncWriteResult struct {
ret gorpc.AsyncResult
type SlaveWriteResult struct {
Ret *gorpc.AsyncResult
SyncCoordErr *CoordErr
}

func (self *SlaveAsyncWriteResult) Wait() {
<-self.ret.Done
func (self *SlaveWriteResult) wait(ticker *time.Ticker) *CoordErr {
select {
case <-ticker.C:
coordLog.Infof("sync operation timeout")
return ErrClusterSyncTimeout
case <-self.Ret.Done:
return nil
}
}

func (self *SlaveAsyncWriteResult) GetResult() *CoordErr {
coordErr, ok := self.ret.Response.(*CoordErr)
if ok {
return convertRpcError(self.ret.Error, coordErr)
func (self *SlaveWriteResult) GetResult(ticker *time.Ticker) *CoordErr {
if self.Ret != nil {
var coordErr *CoordErr
if coordErr = self.wait(ticker); coordErr != nil {
return convertRpcError(nil, coordErr)
}

coordErr, ok := self.Ret.Response.(*CoordErr)
if ok {
return convertRpcError(self.Ret.Error, coordErr)
} else {
return convertRpcError(self.Ret.Error, nil)
}
} else {
return convertRpcError(self.ret.Error, nil)
return convertRpcError(nil, self.SyncCoordErr)
}
}

Expand Down
80 changes: 45 additions & 35 deletions consistence/nsqd_coordinator_cluster_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ type localExitFunc func(*CoordErr)
type localCommitFunc func() error
type localRollbackFunc func()
type refreshCoordFunc func(*coordData) *CoordErr
type slaveSyncFunc func(*NsqdRpcClient, string, *coordData) *CoordErr
type slaveAsyncFunc func(*NsqdRpcClient, string, *coordData) *SlaveAsyncWriteResult
//return SlaveAsyncWriteResult and CoordErr, if rpc call in sync function is async rpc,future result returns in
// *SlaveAsyncWriteResult, else *SlaveAsyncWriteResult should always == nil
type slaveSyncFunc func(*NsqdRpcClient, string, *coordData) *SlaveWriteResult

type handleSyncResultFunc func(int, *coordData) bool

Expand Down Expand Up @@ -94,14 +95,9 @@ func (self *NsqdCoordinator) PutMessageToCluster(topic *nsqd.Topic,
self.requestNotifyNewTopicInfo(d.topicInfo.Name, d.topicInfo.Partition)
return nil
}
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *CoordErr {
doSlaveAsync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) (*SlaveWriteResult) {
// should retry if failed, and the slave should keep the last success write to avoid the duplicated
putErr := c.PutMessage(&tcData.topicLeaderSession, &tcData.topicInfo, commitLog, msg)
if putErr != nil {
coordLog.Infof("sync write to replica %v failed: %v. put offset:%v, logmgr: %v, %v",
nodeID, putErr, commitLog, logMgr.pLogID, logMgr.nLogID)
}
return putErr
return c.PutMessageAsync(&tcData.topicLeaderSession, &tcData.topicInfo, commitLog, msg)
}
handleSyncResult := func(successNum int, tcData *coordData) bool {
if successNum == len(tcData.topicInfo.ISR) {
Expand All @@ -117,7 +113,7 @@ func (self *NsqdCoordinator) PutMessageToCluster(topic *nsqd.Topic,
}

clusterErr := self.doSyncOpToCluster(true, coord, doLocalWrite, doLocalExit, doLocalCommit, doLocalRollback,
doRefresh, doSlaveSync, handleSyncResult)
doRefresh, doSlaveAsync, handleSyncResult)

var err error
if clusterErr != nil {
Expand Down Expand Up @@ -201,14 +197,9 @@ func (self *NsqdCoordinator) PutMessagesToCluster(topic *nsqd.Topic,
self.requestNotifyNewTopicInfo(d.topicInfo.Name, d.topicInfo.Partition)
return nil
}
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *CoordErr {
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *SlaveWriteResult {
// should retry if failed, and the slave should keep the last success write to avoid the duplicated
putErr := c.PutMessages(&tcData.topicLeaderSession, &tcData.topicInfo, commitLog, msgs)
if putErr != nil {
coordLog.Infof("sync write to replica %v failed: %v, put offset: %v, logmgr: %v, %v",
nodeID, putErr, commitLog, logMgr.pLogID, logMgr.nLogID)
}
return putErr
return c.PutMessagesAsync(&tcData.topicLeaderSession, &tcData.topicInfo, commitLog, msgs)
}
handleSyncResult := func(successNum int, tcData *coordData) bool {
if successNum == len(tcData.topicInfo.ISR) {
Expand All @@ -225,6 +216,7 @@ func (self *NsqdCoordinator) PutMessagesToCluster(topic *nsqd.Topic,
clusterErr := self.doSyncOpToCluster(true, coord, doLocalWrite, doLocalExit, doLocalCommit, doLocalRollback,
doRefresh, doSlaveSync, handleSyncResult)


var err error
if clusterErr != nil {
err = clusterErr.ToErrorType()
Expand All @@ -237,8 +229,8 @@ func (self *NsqdCoordinator) PutMessagesToCluster(topic *nsqd.Topic,
}

func (self *NsqdCoordinator) doSyncOpToCluster(isWrite bool, coord *TopicCoordinator, doLocalWrite localWriteFunc,
doLocalExit localExitFunc, doLocalCommit localCommitFunc, doLocalRollback localRollbackFunc,
doRefresh refreshCoordFunc, doSlaveSync slaveSyncFunc, handleSyncResult handleSyncResultFunc) *CoordErr {
doLocalExit localExitFunc, doLocalCommit localCommitFunc, doLocalRollback localRollbackFunc,
doRefresh refreshCoordFunc, doSlaveSync slaveSyncFunc, handleSyncResult handleSyncResultFunc) *CoordErr {

if isWrite {
coord.writeHold.Lock()
Expand Down Expand Up @@ -279,6 +271,16 @@ func (self *NsqdCoordinator) doSyncOpToCluster(isWrite bool, coord *TopicCoordin
failedNodes := make(map[string]struct{})
retryCnt := uint32(0)
exitErr := 0
//totalTimeout = syncTimeout * #replica
totalTimeout := time.Millisecond * 1
//replica synchronization timeout per replica
syncTimeout := time.Second * 3
var timeoutTick *time.Ticker
type RpcResp struct {
nodeId string
ret *SlaveWriteResult
}
var rpcResps []RpcResp

localErr := doLocalWrite(tcData)
if localErr != nil {
Expand All @@ -287,7 +289,7 @@ func (self *NsqdCoordinator) doSyncOpToCluster(isWrite bool, coord *TopicCoordin
}
needLeaveISR = true

retrysync:
retrysync:
if retryCnt > MAX_WRITE_RETRY {
coordLog.Warningf("retrying times is large: %v", retryCnt)
needRefreshISR = true
Expand Down Expand Up @@ -318,15 +320,12 @@ retrysync:
}
}
success = 0
failedNodes = make(map[string]struct{})
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing this? While we retry we need reset the failed info for all nodes

retryCnt++

// send message to slaves with current topic epoch
// replica should check if offset matching. If not matched the replica should leave the ISR list.
// also, the coordinator should retry on fail until all nodes in ISR success.
// If failed, should update ISR and retry.
// write epoch should keep the same (ignore epoch change during write)
// TODO: optimize send all requests first and then wait all responses
exitErr = 0
for _, nodeID := range tcData.topicInfo.ISR {
if nodeID == self.myNode.GetID() {
Expand All @@ -341,12 +340,15 @@ retrysync:
failedNodes[nodeID] = struct{}{}
continue
}

var start time.Time
if checkCost {
start = time.Now()
}
// should retry if failed, and the slave should keep the last success write to avoid the duplicated
rpcErr = doSlaveSync(c, nodeID, tcData)
retSlave := doSlaveSync(c, nodeID, tcData)
rpcResps = append(rpcResps, RpcResp{nodeID, retSlave})
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rpcResps and totalTimeout should be reset while retry.

totalTimeout = totalTimeout + syncTimeout
if checkCost {
cost := time.Since(start)
if cost > time.Millisecond*3 {
Expand All @@ -356,6 +358,14 @@ retrysync:
coordLog.Warningf("slave(%v) sync cost: %v, start: %v, end: %v", nodeID, cost, start, time.Now())
}
}
}
timeoutTick = time.NewTicker(totalTimeout)
for _, rpcResp := range rpcResps {
nodeID := rpcResp.nodeId
retSlave := rpcResp.ret

rpcErr := retSlave.GetResult(timeoutTick)

if rpcErr == nil {
success++
} else {
Expand All @@ -365,15 +375,15 @@ retrysync:
if !rpcErr.CanRetryWrite(int(retryCnt)) {
exitErr++
coordLog.Infof("operation failed and no retry type: %v, %v", rpcErr.ErrType, exitErr)
if exitErr > len(tcData.topicInfo.ISR)/2 {
if int(exitErr) > len(tcData.topicInfo.ISR)/2 {
needLeaveISR = true
goto exitsync
}
}
}
}

if handleSyncResult(success, tcData) {
if handleSyncResult(int(success), tcData) {
localErr := doLocalCommit()
if localErr != nil {
coordLog.Errorf("topic : %v failed commit operation: %v", topicFullName, localErr)
Expand All @@ -385,7 +395,7 @@ retrysync:
}
} else {
coordLog.Warningf("topic %v sync operation failed since no enough success: %v", topicFullName, success)
if success > tcData.topicInfo.Replica/2 {
if int(success) > tcData.topicInfo.Replica/2 {
needLeaveISR = false
if retryCnt > MAX_WRITE_RETRY {
// request lookup to remove the failed nodes from isr and keep the quorum alive.
Expand Down Expand Up @@ -738,16 +748,16 @@ func (self *NsqdCoordinator) SetChannelConsumeOffsetToCluster(ch *nsqd.Channel,
doRefresh := func(d *coordData) *CoordErr {
return nil
}
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *CoordErr {
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) (*SlaveWriteResult) {
if ch.IsEphemeral() {
return nil
return &SlaveWriteResult{nil, nil}
}
rpcErr := c.UpdateChannelOffset(&tcData.topicLeaderSession, &tcData.topicInfo, ch.GetName(), syncOffset)
if rpcErr != nil {
coordLog.Infof("sync channel(%v) offset to replica %v failed: %v, offset: %v", ch.GetName(),
nodeID, rpcErr, syncOffset)
}
return rpcErr
return &SlaveWriteResult{nil, rpcErr}
}
handleSyncResult := func(successNum int, tcData *coordData) bool {
if successNum == len(tcData.topicInfo.ISR) {
Expand Down Expand Up @@ -812,9 +822,9 @@ func (self *NsqdCoordinator) FinishMessageToCluster(channel *nsqd.Channel, clien
doRefresh := func(d *coordData) *CoordErr {
return nil
}
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *CoordErr {
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *SlaveWriteResult {
if !changed || channel.IsEphemeral() {
return nil
return &SlaveWriteResult{nil, nil}
}
var rpcErr *CoordErr
if channel.IsOrdered() {
Expand All @@ -827,7 +837,7 @@ func (self *NsqdCoordinator) FinishMessageToCluster(channel *nsqd.Channel, clien
coordLog.Infof("sync channel(%v) offset to replica %v failed: %v, offset: %v", channel.GetName(),
nodeID, rpcErr, syncOffset)
}
return rpcErr
return &SlaveWriteResult{nil, rpcErr}
}
handleSyncResult := func(successNum int, tcData *coordData) bool {
// we can ignore the error if this channel is not ordered. (just sync next time)
Expand Down Expand Up @@ -940,13 +950,13 @@ func (self *NsqdCoordinator) DeleteChannel(topic *nsqd.Topic, channelName string
doRefresh := func(d *coordData) *CoordErr {
return nil
}
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *CoordErr {
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *SlaveWriteResult {
rpcErr := c.DeleteChannel(&tcData.topicLeaderSession, &tcData.topicInfo, channelName)
if rpcErr != nil {
coordLog.Infof("delete channel(%v) to replica %v failed: %v", channelName,
nodeID, rpcErr)
}
return rpcErr
return &SlaveWriteResult{nil, rpcErr}
}
handleSyncResult := func(successNum int, tcData *coordData) bool {
// we can ignore the error if this channel is not ordered. (just sync next time)
Expand Down
59 changes: 59 additions & 0 deletions consistence/nsqd_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,8 @@ func benchmarkNsqdCoordPubWithArg(b *testing.B, replica int, size int) {

var nsqdCoord2 *NsqdCoordinator
var nsqdCoord3 *NsqdCoordinator
var nsqdCoord4 *NsqdCoordinator
var nsqdCoord5 *NsqdCoordinator
if replica >= 2 {
nsqd2, randPort2, _, data2 := newNsqdNode(nil, "id2")

Expand All @@ -1422,6 +1424,27 @@ func benchmarkNsqdCoordPubWithArg(b *testing.B, replica int, size int) {
nsqdCoord3.Start()
defer nsqdCoord3.Stop()

}
if replica >= 4 {
nsqd4, randPort4, _, data4 := newNsqdNode(nil, "id4")
defer os.RemoveAll(data4)
defer nsqd4.Exit()
nsqdCoord4 = startNsqdCoord(nil, strconv.Itoa(randPort4), data4, "id4", nsqd4, true)
nsqdCoord4.enableBenchCost = true
nsqdCoord4.Start()
defer nsqdCoord4.Stop()

}

if replica >= 5 {
nsqd5, randPort5, _, data5 := newNsqdNode(nil, "id5")
defer os.RemoveAll(data5)
defer nsqd5.Exit()
nsqdCoord5 = startNsqdCoord(nil, strconv.Itoa(randPort5), data5, "id5", nsqd5, true)
nsqdCoord5.enableBenchCost = true
nsqdCoord5.Start()
defer nsqdCoord5.Stop()

}

topicDataList := make([]*nsqdNs.Topic, 0)
Expand All @@ -1439,6 +1462,12 @@ func benchmarkNsqdCoordPubWithArg(b *testing.B, replica int, size int) {
if replica >= 3 {
topicInitInfo.ISR = append(topicInitInfo.ISR, nsqdCoord3.myNode.GetID())
}
if replica >= 4 {
topicInitInfo.ISR = append(topicInitInfo.ISR, nsqdCoord4.myNode.GetID())
}
if replica >= 5 {
topicInitInfo.ISR = append(topicInitInfo.ISR, nsqdCoord5.myNode.GetID())
}
topicInitInfo.Leader = nsqdCoord1.myNode.GetID()
topicInitInfo.Replica = replica
ensureTopicOnNsqdCoord(nsqdCoord1, topicInitInfo)
Expand All @@ -1448,6 +1477,12 @@ func benchmarkNsqdCoordPubWithArg(b *testing.B, replica int, size int) {
if replica >= 3 {
ensureTopicOnNsqdCoord(nsqdCoord3, topicInitInfo)
}
if replica >= 4 {
ensureTopicOnNsqdCoord(nsqdCoord4, topicInitInfo)
}
if replica >= 5 {
ensureTopicOnNsqdCoord(nsqdCoord5, topicInitInfo)
}
leaderSession := &TopicLeaderSession{
LeaderNode: nodeInfo1,
LeaderEpoch: 1,
Expand All @@ -1464,6 +1499,14 @@ func benchmarkNsqdCoordPubWithArg(b *testing.B, replica int, size int) {
ensureTopicLeaderSession(nsqdCoord3, topic, partition, leaderSession)
ensureTopicDisableWrite(nsqdCoord3, topic, partition, false)
}
if replica >= 4 {
ensureTopicLeaderSession(nsqdCoord4, topic, partition, leaderSession)
ensureTopicDisableWrite(nsqdCoord4, topic, partition, false)
}
if replica >= 5 {
ensureTopicLeaderSession(nsqdCoord5, topic, partition, leaderSession)
ensureTopicDisableWrite(nsqdCoord5, topic, partition, false)
}
msg := make([]byte, size)
// check if write ok
topicData := nsqd1.GetTopic(topic, partition)
Expand Down Expand Up @@ -1524,3 +1567,19 @@ func BenchmarkNsqdCoordPub2Replicator1024(b *testing.B) {
func BenchmarkNsqdCoordPub3Replicator1024(b *testing.B) {
benchmarkNsqdCoordPubWithArg(b, 3, 1024)
}

func BenchmarkNsqdCoordPub5Replicator128(b *testing.B) {
benchmarkNsqdCoordPubWithArg(b, 5, 128)
}

func BenchmarkNsqdCoordPub5Replicator1024(b *testing.B) {
benchmarkNsqdCoordPubWithArg(b, 5, 1024)
}

func BenchmarkNsqdCoordPub4Replicator128(b *testing.B) {
benchmarkNsqdCoordPubWithArg(b, 4, 128)
}

func BenchmarkNsqdCoordPub4Replicator1024(b *testing.B) {
benchmarkNsqdCoordPubWithArg(b, 4, 1024)
}
Loading