Skip to content

Commit

Permalink
Enable sanity check fr strong idempotency check (#6031)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored May 21, 2024
1 parent cc75f6f commit bbe87aa
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 53 deletions.
13 changes: 13 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2016,6 +2016,13 @@ const (
// Allowed filters: DomainName
EnableStrongIdempotency

// EnableStrongIdempotencySanityCheck enables sanity check for strong idempotency
// KeyName: history.enableStrongIdempotencySanityCheck
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
EnableStrongIdempotencySanityCheck

// LastBoolKey must be the last one in this const group
LastBoolKey
)
Expand Down Expand Up @@ -4335,6 +4342,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "EnableStrongIdempotency enables strong idempotency for APIs",
DefaultValue: false,
},
EnableStrongIdempotencySanityCheck: DynamicBool{
KeyName: "history.enableStrongIdempotencySanityCheck",
Filters: []Filter{DomainName},
Description: "EnableStrongIdempotencySanityCheck enables sanity check for strong idempotency",
DefaultValue: false,
},
}

var FloatKeys = map[FloatKey]DynamicFloat{
Expand Down
6 changes: 4 additions & 2 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ type Config struct {
LargeShardHistoryEventMetricThreshold dynamicconfig.IntPropertyFn
LargeShardHistoryBlobMetricThreshold dynamicconfig.IntPropertyFn

EnableStrongIdempotency dynamicconfig.BoolPropertyFnWithDomainFilter
EnableStrongIdempotency dynamicconfig.BoolPropertyFnWithDomainFilter
EnableStrongIdempotencySanityCheck dynamicconfig.BoolPropertyFnWithDomainFilter

// HostName for machine running the service
HostName string
Expand Down Expand Up @@ -598,7 +599,8 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
LargeShardHistoryEventMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryEventMetricThreshold),
LargeShardHistoryBlobMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryBlobMetricThreshold),

EnableStrongIdempotency: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableStrongIdempotency),
EnableStrongIdempotency: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableStrongIdempotency),
EnableStrongIdempotencySanityCheck: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableStrongIdempotencySanityCheck),

HostName: hostname,
}
Expand Down
72 changes: 43 additions & 29 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,17 @@ func (c *contextImpl) CreateWorkflowExecution(
c.Clear()
}
}()
err := validateWorkflowRequestsAndMode(newWorkflow.WorkflowRequests, workflowRequestMode)
if err != nil {
// TODO(CDNC-8519): convert it to an error after verification in production
c.logger.Error("workflow requests and mode validation error", tag.Error(err))
}
domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
err := validateWorkflowRequestsAndMode(newWorkflow.WorkflowRequests, workflowRequestMode)
if err != nil {
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) {
return err
}
c.logger.Error("workflow requests and mode validation error", tag.Error(err))
}
createRequest := &persistence.CreateWorkflowExecutionRequest{
// workflow create mode & prev run ID & version
Mode: createMode,
Expand Down Expand Up @@ -484,6 +486,10 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
return err
}

domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
var persistedBlobs events.PersistedBlobs
resetHistorySize := c.GetHistorySize()
for _, workflowEvents := range resetWorkflowEventsSeq {
Expand Down Expand Up @@ -512,8 +518,10 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
return err
}
if len(resetWorkflow.WorkflowRequests) != 0 && len(newWorkflow.WorkflowRequests) != 0 {
// TODO(CDNC-8519): convert it to an error after verification in production
c.logger.Error("Workflow reqeusts are only expected to be generated from one workflow for ConflictResolveWorkflowExecution")
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) {
return &types.InternalServiceError{Message: "workflow requests are only expected to be generated from either reset workflow or continue-as-new workflow for ConflictResolveWorkflowExecution"}
}
c.logger.Error("workflow requests are only expected to be generated from either reset workflow or continue-as-new workflow for ConflictResolveWorkflowExecution", tag.Number(int64(len(resetWorkflow.WorkflowRequests))), tag.NextNumber(int64(len(newWorkflow.WorkflowRequests))))
}
newWorkflowSizeSize := newContext.GetHistorySize()
startEvents := newWorkflowEventsSeq[0]
Expand Down Expand Up @@ -542,8 +550,10 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
return err
}
if len(currentWorkflow.WorkflowRequests) != 0 {
// TODO(CDNC-8519): convert it to an error after verification in production
c.logger.Error("workflow requests are not expected from current workflow for ConflictResolveWorkflowExecution")
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) {
return &types.InternalServiceError{Message: "workflow requests are not expected from current workflow for ConflictResolveWorkflowExecution"}
}
c.logger.Error("workflow requests are not expected from current workflow for ConflictResolveWorkflowExecution", tag.Counter(len(currentWorkflow.WorkflowRequests)))
}
currentWorkflowSize := currentContext.GetHistorySize()
for _, workflowEvents := range currentWorkflowEventsSeq {
Expand All @@ -568,10 +578,6 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
); err != nil {
return err
}
domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
resp, err := c.shard.ConflictResolveWorkflowExecution(ctx, &persistence.ConflictResolveWorkflowExecutionRequest{
// RangeID , this is set by shard context
Mode: conflictResolveMode,
Expand Down Expand Up @@ -679,17 +685,19 @@ func (c *contextImpl) UpdateWorkflowExecutionTasks(
Message: "UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new history events",
}
}
domainName, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
if len(currentWorkflow.WorkflowRequests) != 0 {
// TODO(CDNC-8519): convert it to an error after verification in production
c.logger.Error("UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new workflow requests")
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domainName) {
return &types.InternalServiceError{Message: "UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new workflow requests"}
}
c.logger.Error("UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new workflow requests", tag.Counter(len(currentWorkflow.WorkflowRequests)))
}
currentWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: c.GetHistorySize(),
}
domainName, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
resp, err := c.updateWorkflowExecutionFn(ctx, &persistence.UpdateWorkflowExecutionRequest{
// RangeID , this is set by shard context
Mode: persistence.UpdateWorkflowModeIgnoreCurrent,
Expand Down Expand Up @@ -729,9 +737,15 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
if err != nil {
return err
}
domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
err = validateWorkflowRequestsAndMode(currentWorkflow.WorkflowRequests, workflowRequestMode)
if err != nil {
// TODO(CDNC-8519): convert it to an error after verification in production
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) {
return err
}
c.logger.Error("workflow requests and mode validation error", tag.Error(err))
}
var persistedBlobs events.PersistedBlobs
Expand Down Expand Up @@ -769,13 +783,17 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
return err
}
if len(newWorkflow.WorkflowRequests) != 0 && len(currentWorkflow.WorkflowRequests) != 0 {
// TODO(CDNC-8519): convert it to an error after verification in production
c.logger.Error("Workflow reqeusts are only expected to be generated from one workflow for UpdateWorkflowExecution")
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) {
return &types.InternalServiceError{Message: "workflow requests are only expected to be generated from one workflow for UpdateWorkflowExecution"}
}
c.logger.Error("workflow requests are only expected to be generated from one workflow for UpdateWorkflowExecution", tag.Number(int64(len(currentWorkflow.WorkflowRequests))), tag.NextNumber(int64(len(newWorkflow.WorkflowRequests))))
}

err := validateWorkflowRequestsAndMode(newWorkflow.WorkflowRequests, workflowRequestMode)
if err != nil {
// TODO(CDNC-8519): convert it to an error after verification in production
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) {
return err
}
c.logger.Error("workflow requests and mode validation error", tag.Error(err))
}
newWorkflowSizeSize := newContext.GetHistorySize()
Expand Down Expand Up @@ -810,10 +828,6 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
if err := c.updateWorkflowExecutionEventReapplyFn(updateMode, currentWorkflowEventsSeq, newWorkflowEventsSeq); err != nil {
return err
}
domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
resp, err := c.updateWorkflowExecutionFn(ctx, &persistence.UpdateWorkflowExecutionRequest{
// RangeID , this is set by shard context
Mode: updateMode,
Expand Down Expand Up @@ -1412,14 +1426,14 @@ func validateWorkflowRequestsAndMode(requests []*persistence.WorkflowRequest, mo
return nil
}
if len(requests) > 2 {
return &types.InternalServiceError{Message: "Too many workflow requests for a single API request."}
return &types.InternalServiceError{Message: "too many workflow request entities generated from a single API request"}
} else if len(requests) == 2 {
// SignalWithStartWorkflow API can generate 2 workflow requests
if (requests[0].RequestType == persistence.WorkflowRequestTypeStart && requests[1].RequestType == persistence.WorkflowRequestTypeSignal) ||
(requests[1].RequestType == persistence.WorkflowRequestTypeStart && requests[0].RequestType == persistence.WorkflowRequestTypeSignal) {
return nil
}
return &types.InternalServiceError{Message: "Too many workflow requests for a single API request."}
return &types.InternalServiceError{Message: "too many workflow request entities generated from a single API request"}
}
return nil
}
Loading

0 comments on commit bbe87aa

Please sign in to comment.