Skip to content

Commit

Permalink
Add resend context timeout for ndc resender (uber#3247)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored May 8, 2020
1 parent 9313614 commit 1ed8702
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 5 deletions.
25 changes: 22 additions & 3 deletions common/xdc/nDCHistoryResender.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/dynamicconfig"
)

const (
Expand Down Expand Up @@ -67,6 +68,7 @@ type (
adminClient adminClient.Client
historyReplicationFn nDCHistoryReplicationFn
serializer persistence.PayloadSerializer
rereplicationTimeout dynamicconfig.DurationPropertyFnWithDomainIDFilter
logger log.Logger
}

Expand All @@ -82,6 +84,7 @@ func NewNDCHistoryResender(
adminClient adminClient.Client,
historyReplicationFn nDCHistoryReplicationFn,
serializer persistence.PayloadSerializer,
rereplicationTimeout dynamicconfig.DurationPropertyFnWithDomainIDFilter,
logger log.Logger,
) *NDCHistoryResenderImpl {

Expand All @@ -90,6 +93,7 @@ func NewNDCHistoryResender(
adminClient: adminClient,
historyReplicationFn: historyReplicationFn,
serializer: serializer,
rereplicationTimeout: rereplicationTimeout,
logger: logger,
}
}
Expand All @@ -105,7 +109,18 @@ func (n *NDCHistoryResenderImpl) SendSingleWorkflowHistory(
endEventVersion *int64,
) error {

ctx := context.Background()
var cancel context.CancelFunc
if n.rereplicationTimeout != nil {
resendContextTimeout := n.rereplicationTimeout(domainID)
if resendContextTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, resendContextTimeout)
defer cancel()
}
}

historyIterator := collection.NewPagingIterator(n.getPaginationFn(
ctx,
domainID,
workflowID,
runID,
Expand Down Expand Up @@ -133,7 +148,7 @@ func (n *NDCHistoryResenderImpl) SendSingleWorkflowHistory(
historyBatch.rawEventBatch,
historyBatch.versionHistory.GetItems())

err = n.sendReplicationRawRequest(replicationRequest)
err = n.sendReplicationRawRequest(ctx, replicationRequest)
if err != nil {
n.logger.Error("failed to replicate events",
tag.WorkflowDomainID(domainID),
Expand All @@ -147,6 +162,7 @@ func (n *NDCHistoryResenderImpl) SendSingleWorkflowHistory(
}

func (n *NDCHistoryResenderImpl) getPaginationFn(
ctx context.Context,
domainID string,
workflowID string,
runID string,
Expand All @@ -159,6 +175,7 @@ func (n *NDCHistoryResenderImpl) getPaginationFn(
return func(paginationToken []byte) ([]interface{}, []byte, error) {

response, err := n.getHistory(
ctx,
domainID,
workflowID,
runID,
Expand Down Expand Up @@ -207,15 +224,17 @@ func (n *NDCHistoryResenderImpl) createReplicationRawRequest(
}

func (n *NDCHistoryResenderImpl) sendReplicationRawRequest(
ctx context.Context,
request *history.ReplicateEventsV2Request,
) error {

ctx, cancel := context.WithTimeout(context.Background(), resendContextTimeout)
ctx, cancel := context.WithTimeout(ctx, resendContextTimeout)
defer cancel()
return n.historyReplicationFn(ctx, request)
}

func (n *NDCHistoryResenderImpl) getHistory(
ctx context.Context,
domainID string,
workflowID string,
runID string,
Expand All @@ -236,7 +255,7 @@ func (n *NDCHistoryResenderImpl) getHistory(
}
domainName := domainEntry.GetInfo().Name

ctx, cancel := context.WithTimeout(context.Background(), resendContextTimeout)
ctx, cancel := context.WithTimeout(ctx, resendContextTimeout)
defer cancel()
response, err := n.adminClient.GetWorkflowExecutionRawHistoryV2(ctx, &admin.GetWorkflowExecutionRawHistoryV2Request{
Domain: common.StringPtr(domainName),
Expand Down
6 changes: 4 additions & 2 deletions common/xdc/nDCHistoryResender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (s *nDCHistoryResenderSuite) SetupTest() {
return s.mockHistoryClient.ReplicateEventsV2(ctx, request)
},
persistence.NewPayloadSerializer(),
nil,
s.logger,
)
}
Expand Down Expand Up @@ -269,7 +270,7 @@ func (s *nDCHistoryResenderSuite) TestSendReplicationRawRequest() {
}

s.mockHistoryClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil).Times(1)
err := s.rereplicator.sendReplicationRawRequest(request)
err := s.rereplicator.sendReplicationRawRequest(context.Background(), request)
s.Nil(err)
}

Expand Down Expand Up @@ -299,7 +300,7 @@ func (s *nDCHistoryResenderSuite) TestSendReplicationRawRequest_Err() {
}

s.mockHistoryClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(retryErr).Times(1)
err := s.rereplicator.sendReplicationRawRequest(request)
err := s.rereplicator.sendReplicationRawRequest(context.Background(), request)
s.Equal(retryErr, err)
}

Expand Down Expand Up @@ -336,6 +337,7 @@ func (s *nDCHistoryResenderSuite) TestGetHistory() {
}).Return(response, nil).Times(1)

out, err := s.rereplicator.getHistory(
context.Background(),
s.domainID,
workflowID,
runID,
Expand Down
1 change: 1 addition & 0 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ func (adh *AdminHandler) ResendReplicationTasks(
return adh.GetHistoryClient().ReplicateEventsV2(ctx, request)
},
adh.eventSerializder,
nil,
adh.GetLogger(),
)
return resender.SendSingleWorkflowHistory(
Expand Down
1 change: 1 addition & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func NewEngineWithShardContext(
return shard.GetService().GetHistoryClient().ReplicateEventsV2(ctx, request)
},
shard.GetService().GetPayloadSerializer(),
nil,
shard.GetLogger(),
)
historyRereplicator := xdc.NewHistoryRereplicator(
Expand Down
1 change: 1 addition & 0 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func newTimerQueueProcessor(
return historyService.ReplicateEventsV2(ctx, request)
},
shard.GetService().GetPayloadSerializer(),
nil,
logger,
)
standbyTimerProcessors[clusterName] = newTimerQueueStandbyProcessor(
Expand Down
1 change: 1 addition & 0 deletions service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func newTransferQueueProcessor(
return historyService.ReplicateEventsV2(ctx, request)
},
shard.GetService().GetPayloadSerializer(),
nil,
logger,
)
standbyTaskProcessors[clusterName] = newTransferQueueStandbyProcessor(
Expand Down
1 change: 1 addition & 0 deletions service/worker/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (r *Replicator) createKafkaProcessors(currentClusterName string, clusterNam
return historyClient.ReplicateEventsV2(ctx, request)
},
r.historySerializer,
r.config.ReReplicationContextTimeout,
logger,
)
r.processors = append(r.processors, newReplicationTaskProcessor(
Expand Down

0 comments on commit 1ed8702

Please sign in to comment.