Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feature flag to not fail in flight decision #3167

Merged
merged 10 commits into from
Apr 7, 2020
4 changes: 4 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ var keys = map[Key]string{
MutableStateChecksumGenProbability: "history.mutableStateChecksumGenProbability",
MutableStateChecksumVerifyProbability: "history.mutableStateChecksumVerifyProbability",
MutableStateChecksumInvalidateBefore: "history.mutableStateChecksumInvalidateBefore",
ReplicationEventsFromCurrentCluster: "history.ReplicationEventsFromCurrentCluster",

WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerReplicatorMetaTaskConcurrency: "worker.replicatorMetaTaskConcurrency",
Expand Down Expand Up @@ -709,6 +710,9 @@ const (
// MutableStateChecksumInvalidateBefore is the epoch timestamp before which all checksums are to be discarded
MutableStateChecksumInvalidateBefore

//ReplicationEventsFromCurrentCluster is a feature flag to allow cross DC replicate events that generated from the current cluster
ReplicationEventsFromCurrentCluster

// lastKeyForTest must be the last one in this const group for testing purpose
lastKeyForTest
)
Expand Down
28 changes: 21 additions & 7 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,22 @@ func (r *historyReplicator) ApplyEvents(
default:
// apply events, other than simple start workflow execution
// the continue as new + start workflow execution combination will also be processed here
msBuilder, err := context.loadWorkflowExecution()
var mutableState mutableState
var err error
domainEntry, err := r.domainCache.GetDomainByID(context.getDomainID())
if err != nil {
return err
}

if r.shard.GetConfig().ReplicationEventsFromCurrentCluster(domainEntry.GetInfo().Name) {
// this branch is used when replicating events (generated from current cluster)from remote cluster to current cluster.
// this could happen when the events are lost in current cluster and plan to recover them from remote cluster.
// if the incoming version equals last write version, skip to fail in-flight decision.
mutableState, err = context.loadWorkflowExecutionForReplication(request.GetVersion())
} else {
mutableState, err = context.loadWorkflowExecution()
}

if err != nil {
if _, ok := err.(*shared.EntityNotExistsError); !ok {
return err
Expand All @@ -285,16 +300,16 @@ func (r *historyReplicator) ApplyEvents(
}

// Sanity check to make only 2DC mutable state here
if msBuilder.GetReplicationState() == nil {
if mutableState.GetReplicationState() == nil {
return &workflow.InternalServiceError{Message: "The mutable state does not support 2DC."}
}

logger.WithTags(tag.CurrentVersion(msBuilder.GetReplicationState().LastWriteVersion))
msBuilder, err = r.ApplyOtherEventsVersionChecking(ctx, context, msBuilder, request, logger)
if err != nil || msBuilder == nil {
logger.WithTags(tag.CurrentVersion(mutableState.GetReplicationState().LastWriteVersion))
mutableState, err = r.ApplyOtherEventsVersionChecking(ctx, context, mutableState, request, logger)
if err != nil || mutableState == nil {
return err
}
return r.ApplyOtherEvents(ctx, context, msBuilder, request, logger)
return r.ApplyOtherEvents(ctx, context, mutableState, request, logger)
}
}

Expand Down Expand Up @@ -864,7 +879,6 @@ func (r *historyReplicator) conflictResolutionTerminateCurrentRunningIfNotSelf(
return currentRunID, currentLastWriteVetsion, persistence.WorkflowStateCompleted, nil
}

// func (r *historyReplicator) getCurrentWorkflowInfo(domainID string, workflowID string) (runID string, lastWriteVersion int64, closeStatus int, retError error) {
func (r *historyReplicator) getCurrentWorkflowMutableState(
ctx ctx.Context,
domainID string,
Expand Down
1 change: 1 addition & 0 deletions service/history/mutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ type (
GetUpdateCondition() int64

StartTransaction(entry *cache.DomainCacheEntry) (bool, error)
StartTransactionSkipDecisionFail(entry *cache.DomainCacheEntry) error
CloseTransactionAsMutation(now time.Time, transactionPolicy transactionPolicy) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error)
CloseTransactionAsSnapshot(now time.Time, transactionPolicy transactionPolicy) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error)
}
Expand Down
25 changes: 22 additions & 3 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3874,14 +3874,27 @@ func (e *mutableStateBuilder) StartTransaction(
return false, err
}

flushBeforeReady, err := e.startTransactionHandleDecisionFailover()
flushBeforeReady, err := e.startTransactionHandleDecisionFailover(false)
if err != nil {
return false, err
}

return flushBeforeReady, nil
}

func (e *mutableStateBuilder) StartTransactionSkipDecisionFail(
domainEntry *cache.DomainCacheEntry,
) error {

e.domainEntry = domainEntry
if err := e.UpdateCurrentVersion(domainEntry.GetFailoverVersion(), false); err != nil {
return err
}

_, err := e.startTransactionHandleDecisionFailover(true)
return err
}

func (e *mutableStateBuilder) CloseTransactionAsMutation(
now time.Time,
transactionPolicy transactionPolicy,
Expand Down Expand Up @@ -4351,7 +4364,9 @@ func (e *mutableStateBuilder) validateNoEventsAfterWorkflowFinish(
}
}

func (e *mutableStateBuilder) startTransactionHandleDecisionFailover() (bool, error) {
func (e *mutableStateBuilder) startTransactionHandleDecisionFailover(
skipDecisionTaskFailed bool,
) (bool, error) {

if !e.IsWorkflowExecutionRunning() ||
!e.canReplicateEvents() {
Expand Down Expand Up @@ -4428,7 +4443,11 @@ func (e *mutableStateBuilder) startTransactionHandleDecisionFailover() (bool, er
return false, err
}

// we have a decision on the fly with a lower version, fail it
if skipDecisionTaskFailed {
return false, nil
}

// we have a decision with buffered events on the fly with a lower version, fail it
if err := failDecision(
e,
decision,
Expand Down
14 changes: 14 additions & 0 deletions service/history/mutableState_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,20 @@ func (r *nDCHistoryReplicatorImpl) applyEvents(
default:
// apply events, other than simple start workflow execution
// the continue as new + start workflow execution combination will also be processed here
mutableState, err := context.loadWorkflowExecution()
var mutableState mutableState
var err error
domainEntry, err := r.domainCache.GetDomainByID(context.getDomainID())
if err != nil {
return err
}

if r.shard.GetConfig().ReplicationEventsFromCurrentCluster(domainEntry.GetInfo().Name) {
// this branch is used when replicating events (generated from current cluster)from remote cluster to current cluster.
// this could happen when the events are lost in current cluster and plan to recover them from remote cluster.
mutableState, err = context.loadWorkflowExecutionForReplication(task.getVersion())
} else {
mutableState, err = context.loadWorkflowExecution()
}
switch err.(type) {
case nil:
// Sanity check to make only 3DC mutable state here
Expand Down
5 changes: 5 additions & 0 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ type Config struct {
MutableStateChecksumGenProbability dynamicconfig.IntPropertyFnWithDomainFilter
MutableStateChecksumVerifyProbability dynamicconfig.IntPropertyFnWithDomainFilter
MutableStateChecksumInvalidateBefore dynamicconfig.FloatPropertyFn

//Crocess DC Replication configuration
ReplicationEventsFromCurrentCluster dynamicconfig.BoolPropertyFnWithDomainFilter
}

const (
Expand Down Expand Up @@ -359,6 +362,8 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin
MutableStateChecksumGenProbability: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MutableStateChecksumGenProbability, 0),
MutableStateChecksumVerifyProbability: dc.GetIntPropertyFilteredByDomain(dynamicconfig.MutableStateChecksumVerifyProbability, 0),
MutableStateChecksumInvalidateBefore: dc.GetFloat64Property(dynamicconfig.MutableStateChecksumInvalidateBefore, 0),

ReplicationEventsFromCurrentCluster: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.ReplicationEventsFromCurrentCluster, false),
}

return cfg
Expand Down
78 changes: 78 additions & 0 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type (
getExecution() *workflow.WorkflowExecution

loadWorkflowExecution() (mutableState, error)
loadWorkflowExecutionForReplication(incomingVersion int64) (mutableState, error)
loadExecutionStats() (*persistence.ExecutionStats, error)
clear()

Expand Down Expand Up @@ -225,6 +226,83 @@ func (c *workflowExecutionContextImpl) loadExecutionStats() (*persistence.Execut
return c.stats, nil
}

func (c *workflowExecutionContextImpl) loadWorkflowExecutionForReplication(
incomingVersion int64,
) (mutableState, error) {

domainEntry, err := c.shard.GetDomainCache().GetDomainByID(c.domainID)
if err != nil {
return nil, err
}

if c.mutableState == nil {
response, err := c.getWorkflowExecutionWithRetry(&persistence.GetWorkflowExecutionRequest{
DomainID: c.domainID,
Execution: c.workflowExecution,
})
if err != nil {
return nil, err
}

c.mutableState = newMutableStateBuilder(
c.shard,
c.shard.GetEventsCache(),
c.logger,
domainEntry,
)

c.mutableState.Load(response.State)

c.stats = response.State.ExecutionStats
c.updateCondition = response.State.ExecutionInfo.NextEventID

// finally emit execution and session stats
emitWorkflowExecutionStats(
c.metricsClient,
c.getDomainName(),
response.MutableStateStats,
c.stats.HistorySize,
)
}

lastWriteVersion, err := c.mutableState.GetLastWriteVersion()
if err != nil {
return nil, err
}

if lastWriteVersion == incomingVersion {
err = c.mutableState.StartTransactionSkipDecisionFail(domainEntry)
if err != nil {
return nil, err
}
} else {
flushBeforeReady, err := c.mutableState.StartTransaction(domainEntry)
if err != nil {
return nil, err
}
if !flushBeforeReady {
return c.mutableState, nil
}

if err = c.updateWorkflowExecutionAsActive(
c.shard.GetTimeSource().Now(),
); err != nil {
return nil, err
}

flushBeforeReady, err = c.mutableState.StartTransaction(domainEntry)
if err != nil {
return nil, err
}
if flushBeforeReady {
return nil, &workflow.InternalServiceError{
Message: "workflowExecutionContext counter flushBeforeReady status after loading mutable state from DB",
}
}
}
return c.mutableState, nil
}

func (c *workflowExecutionContextImpl) loadWorkflowExecution() (mutableState, error) {

domainEntry, err := c.shard.GetDomainCache().GetDomainByID(c.domainID)
Expand Down
15 changes: 15 additions & 0 deletions service/history/workflowExecutionContext_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.