Skip to content

Commit

Permalink
Wire Kafka with NDC (uber#2642)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Oct 7, 2019
1 parent dccfa24 commit 376e029
Show file tree
Hide file tree
Showing 19 changed files with 452 additions and 153 deletions.
64 changes: 60 additions & 4 deletions .gen/go/history/history.go

Large diffs are not rendered by default.

52 changes: 48 additions & 4 deletions .gen/go/replicator/replicator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,8 +886,8 @@ const (
HistoryReplicationTaskScope
// HistoryMetadataReplicationTaskScope is the scope used by history metadata task replication processing
HistoryMetadataReplicationTaskScope
// HistoryReplicationTaskV2Scope is the scope used by history task replication processing
HistoryReplicationTaskV2Scope
// HistoryReplicationV2TaskScope is the scope used by history task replication processing
HistoryReplicationV2TaskScope
// SyncShardTaskScope is the scope used by sync shrad information processing
SyncShardTaskScope
// SyncActivityTaskScope is the scope used by sync activity information processing
Expand Down Expand Up @@ -1304,7 +1304,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
DomainReplicationTaskScope: {operation: "DomainReplicationTask"},
HistoryReplicationTaskScope: {operation: "HistoryReplicationTask"},
HistoryMetadataReplicationTaskScope: {operation: "HistoryMetadataReplicationTask"},
HistoryReplicationTaskV2Scope: {operation: "HistoryReplicationTaskV2"},
HistoryReplicationV2TaskScope: {operation: "HistoryReplicationV2Task"},
SyncShardTaskScope: {operation: "SyncShardTask"},
SyncActivityTaskScope: {operation: "SyncActivityTask"},
ESProcessorScope: {operation: "ESProcessor"},
Expand Down
1 change: 1 addition & 0 deletions host/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//+build esintegration

// to run locally, make sure kafka and es is running,
Expand Down
4 changes: 2 additions & 2 deletions host/xdc/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ GetHistoryLoop:
var historyResponse *workflow.GetWorkflowExecutionHistoryResponse
eventsReplicated := false
GetHistoryLoop2:
for i := 0; i < 15; i++ {
for i := 0; i < numOfRetry; i++ {
historyResponse, err = client2.GetWorkflowExecutionHistory(createContext(), getHistoryReq)
if err == nil {
history := historyResponse.History
Expand All @@ -365,7 +365,7 @@ GetHistoryLoop2:
break GetHistoryLoop2
}
}
time.Sleep(1 * time.Second)
time.Sleep(waitTimeInMs * time.Millisecond)
}
s.Nil(err)
s.True(eventsReplicated)
Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/history.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ service HistoryService {
4: ShardOwnershipLostError shardOwnershipLostError,
5: shared.ServiceBusyError serviceBusyError,
6: shared.RetryTaskError retryTaskError,
7: shared.RetryTaskV2Error retryTaskV2Error,
)

/**
Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/replicator.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ struct SyncActicvityTaskAttributes {
}

struct HistoryTaskV2Attributes {
05: optional i64 (js.type = "Long") taskId
10: optional string domainId
20: optional string workflowId
30: optional string runId
Expand Down
2 changes: 2 additions & 0 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,8 @@ func (h *Handler) updateErrorMetric(
h.metricsClient.IncCounter(scope, metrics.CadenceErrLimitExceededCounter)
case *gen.RetryTaskError:
h.metricsClient.IncCounter(scope, metrics.CadenceErrRetryTaskCounter)
case *gen.RetryTaskV2Error:
h.metricsClient.IncCounter(scope, metrics.CadenceErrRetryTaskCounter)
case *gen.ServiceBusyError:
h.metricsClient.IncCounter(scope, metrics.CadenceErrServiceBusyCounter)
case *yarpcerrors.Status:
Expand Down
16 changes: 13 additions & 3 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,19 @@ func (r *historyReplicator) SyncActivity(
return nil
}

// version >= last write version
// this can happen if out of order delivery heppens
return newRetryTaskErrorWithHint(ErrRetrySyncActivityMsg, domainID, execution.GetWorkflowId(), execution.GetRunId(), msBuilder.GetNextEventID())
// TODO when 2DC is deprecated, remove this block
if msBuilder.GetReplicationState() != nil {
// version >= last write version
// this can happen if out of order delivery happens
return newRetryTaskErrorWithHint(
ErrRetrySyncActivityMsg,
domainID,
execution.GetWorkflowId(),
execution.GetRunId(),
msBuilder.GetNextEventID(),
)
}
return newNDCRetryTaskErrorWithHint()
}

ai, ok := msBuilder.GetActivityInfo(scheduleID)
Expand Down
1 change: 1 addition & 0 deletions service/history/historyReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ func (s *historyReplicatorSuite) TestSyncActivity_IncomingScheduleIDLarger_Incom
msBuilder.On("IsWorkflowExecutionRunning").Return(true)
msBuilder.On("GetNextEventID").Return(nextEventID)
msBuilder.On("GetLastWriteVersion").Return(lastWriteVersion, nil)
msBuilder.On("GetReplicationState").Return(&persistence.ReplicationState{})
s.mockDomainCache.On("GetDomainByID", domainID).Return(
cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: domainID, Name: domainName},
Expand Down
1 change: 1 addition & 0 deletions service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,5 +543,6 @@ func (r *nDCHistoryReplicatorImpl) notify(
}

func newNDCRetryTaskErrorWithHint() error {
// TODO add detail info here
return &shared.RetryTaskV2Error{}
}
Loading

0 comments on commit 376e029

Please sign in to comment.