Skip to content

Commit

Permalink
large workflow hot shard detection (uber#5166)
Browse files Browse the repository at this point in the history
Metrics for large workflows
  • Loading branch information
allenchen2244 authored Mar 30, 2023
1 parent dd51c53 commit 9d01035
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 7 deletions.
31 changes: 30 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,7 +1346,21 @@ const (
// Value type: Int
// Default value: 100
SampleLoggingRate

// LargeShardHistorySizeMetricThreshold defines the threshold for what consititutes a large history storage size to alert on
// KeyName: system.largeShardHistorySizeMetricThreshold
// Value type: Int
// Default value: 10485760 (10mb)
LargeShardHistorySizeMetricThreshold
// LargeShardHistoryEventMetricThreshold defines the threshold for what consititutes a large history event size to alert on
// KeyName: system.largeShardHistoryEventMetricThreshold
// Value type: Int
// Default value: 50 * 1024
LargeShardHistoryEventMetricThreshold
// LargeShardHistoryBlobMetricThreshold defines the threshold for what consititutes a large history blob size to alert on
// KeyName: system.largeShardHistoryBlobMetricThreshold
// Value type: Int
// Default value: 262144 (1/4mb)
LargeShardHistoryBlobMetricThreshold
// LastIntKey must be the last one in this const group
LastIntKey
)
Expand Down Expand Up @@ -3454,6 +3468,21 @@ var IntKeys = map[IntKey]DynamicInt{
Description: "The rate for which sampled logs are logged at. 100 means 1/100 is logged",
DefaultValue: 100,
},
LargeShardHistorySizeMetricThreshold: DynamicInt{
KeyName: "system.largeShardHistorySizeMetricThreshold",
Description: "defines the threshold for what consititutes a large history size to alert on, default is 10mb",
DefaultValue: 10485760,
},
LargeShardHistoryEventMetricThreshold: DynamicInt{
KeyName: "system.largeShardHistoryEventMetricThreshold",
Description: "defines the threshold for what consititutes a large history event length to alert on, default is 50k",
DefaultValue: 50 * 1024,
},
LargeShardHistoryBlobMetricThreshold: DynamicInt{
KeyName: "system.largeShardHistoryBlobMetricThreshold",
Description: "defines the threshold for what consititutes a large history blob write to alert on, default is 1/4mb",
DefaultValue: 262144,
},
}

var BoolKeys = map[BoolKey]DynamicBool{
Expand Down
15 changes: 15 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,12 @@ const (
HistoryReplicationV2TaskScope
// SyncActivityTaskScope is the scope used by sync activity information processing
SyncActivityTaskScope
// LargeExecutionSizeShardScope is the scope to track large history size for hotshard detection
LargeExecutionSizeShardScope
// LargeExecutionCountShardScope is the scope to track large history count for hotshard detection
LargeExecutionCountShardScope
// LargeExecutionBlobShardScope is the scope to track large blobs for hotshard detection
LargeExecutionBlobShardScope

NumHistoryScopes
)
Expand Down Expand Up @@ -1750,6 +1756,9 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
FailoverMarkerScope: {operation: "FailoverMarker"},
HistoryReplicationV2TaskScope: {operation: "HistoryReplicationV2Task"},
SyncActivityTaskScope: {operation: "SyncActivityTask"},
LargeExecutionSizeShardScope: {operation: "LargeExecutionSizeShard"},
LargeExecutionCountShardScope: {operation: "LargeExecutionCountShard"},
LargeExecutionBlobShardScope: {operation: "LargeExecutionBlobShard"},
},
// Matching Scope Names
Matching: {
Expand Down Expand Up @@ -2247,6 +2256,9 @@ const (
HistoryFailoverCallbackCount
WorkflowVersionCount
WorkflowTypeCount
LargeHistoryBlobCount
LargeHistoryEventCount
LargeHistorySizeCount

NumHistoryMetrics
)
Expand Down Expand Up @@ -2844,6 +2856,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ReplicationTasksCount: {metricName: "replication_tasks_count", metricType: Timer},
WorkflowVersionCount: {metricName: "workflow_version_count", metricType: Gauge},
WorkflowTypeCount: {metricName: "workflow_type_count", metricType: Gauge},
LargeHistoryBlobCount: {metricName: "large_history_blob_count", metricType: Counter},
LargeHistoryEventCount: {metricName: "large_history_event_count", metricType: Counter},
LargeHistorySizeCount: {metricName: "large_history_size_count", metricType: Counter},
},
Matching: {
PollSuccessPerTaskListCounter: {metricName: "poll_success_per_tl", metricRollupName: "poll_success"},
Expand Down
1 change: 1 addition & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func (p *persistenceMetricsClientBase) callWithDomainAndShardScope(scope int, op

domainMetricsScope.RecordTimer(metrics.PersistenceLatencyPerDomain, duration)
shardOperationsMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration)
shardOverallMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration)

if p.enableLatencyHistogramMetrics {
domainMetricsScope.RecordHistogramDuration(metrics.PersistenceLatencyHistogram, duration)
Expand Down
15 changes: 11 additions & 4 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,12 @@ type Config struct {
EnableDebugMode bool // note that this value is initialized once on service start
EnableTaskInfoLogByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter

SampleLoggingRate dynamicconfig.IntPropertyFn
EnableShardIDMetrics dynamicconfig.BoolPropertyFn
// Hotshard stuff
SampleLoggingRate dynamicconfig.IntPropertyFn
EnableShardIDMetrics dynamicconfig.BoolPropertyFn
LargeShardHistorySizeMetricThreshold dynamicconfig.IntPropertyFn
LargeShardHistoryEventMetricThreshold dynamicconfig.IntPropertyFn
LargeShardHistoryBlobMetricThreshold dynamicconfig.IntPropertyFn
}

// New returns new service config with default values
Expand Down Expand Up @@ -556,8 +560,11 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
EnableDebugMode: dc.GetBoolProperty(dynamicconfig.EnableDebugMode)(),
EnableTaskInfoLogByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.HistoryEnableTaskInfoLogByDomainID),

SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate),
EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics),
SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate),
EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics),
LargeShardHistorySizeMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistorySizeMetricThreshold),
LargeShardHistoryEventMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryEventMetricThreshold),
LargeShardHistoryBlobMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryBlobMetricThreshold),
}

return cfg
Expand Down
7 changes: 5 additions & 2 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,6 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
currentWorkflowTransactionPolicy TransactionPolicy,
newWorkflowTransactionPolicy *TransactionPolicy,
) (retError error) {

defer func() {
if retError != nil {
c.Clear()
Expand All @@ -720,11 +719,14 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
if err != nil {
return err
}

var persistedBlobs events.PersistedBlobs
currentWorkflowSize := c.GetHistorySize()
oldWorkflowSize := currentWorkflowSize
currentWorkflowHistoryCount := c.mutableState.GetNextEventID() - 1
oldWorkflowHistoryCount := currentWorkflowHistoryCount
for _, workflowEvents := range currentWorkflowEventsSeq {
blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
currentWorkflowHistoryCount += int64(len(workflowEvents.Events))
if err != nil {
return err
}
Expand Down Expand Up @@ -852,6 +854,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
domainName,
resp.MutableStateUpdateSessionStats,
)
c.emitLargeWorkflowShardIDStats(currentWorkflowSize-oldWorkflowSize, oldWorkflowHistoryCount, oldWorkflowSize, currentWorkflowHistoryCount)
// emit workflow completion stats if any
if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted {
if event, err := c.mutableState.GetCompletionEvent(ctx); err == nil {
Expand Down
34 changes: 34 additions & 0 deletions service/history/execution/context_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,49 @@
package execution

import (
"strconv"
"time"

"github.com/uber/cadence/common"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)

func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCount int64, oldHistorySize int64, newHistoryCount int64) {
if c.shard.GetConfig().EnableShardIDMetrics() {
shardIDStr := strconv.Itoa(c.shard.GetShardID())

blobSizeWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold()), int64(c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName())))
// check if blob size is larger than threshold in Dynamic config if so alert on it every time
if blobSize > blobSizeWarn {
c.logger.SampleInfo("Workflow writing a large blob", c.shard.GetConfig().SampleLoggingRate(), tag.WorkflowDomainName(c.GetDomainName()),
tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()), tag.WorkflowRunID(c.workflowExecution.GetRunID()))
c.metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount)
}

historyCountWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()), int64(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName())))
// check if the new history count is greater than our threshold and only count/log it once when it passes it
// this seems to double count and I can't figure out why but should be ok to get a rough idea and identify bad actors
if oldHistoryCount < historyCountWarn && newHistoryCount >= historyCountWarn {
c.logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()),
tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()), tag.WorkflowRunID(c.workflowExecution.GetRunID()))
c.metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount)
}

historySizeWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistorySizeMetricThreshold()), int64(c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName())))
// check if the new history size is greater than our threshold and only count/log it once when it passes it
if oldHistorySize < historySizeWarn && c.stats.HistorySize >= historySizeWarn {
c.logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()),
tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()), tag.WorkflowRunID(c.workflowExecution.GetRunID()))
c.metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount)
}
}
}

func emitWorkflowHistoryStats(
metricsClient metrics.Client,
domainName string,
Expand Down

0 comments on commit 9d01035

Please sign in to comment.