Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

large workflow hot shard detection #5166

Merged
merged 33 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
50cb929
large workflow hot shard detection
allenchen2244 Mar 17, 2023
d4c8cd4
fix lint
allenchen2244 Mar 17, 2023
5f2d7e7
Merge branch 'master' into large-history-shards
allenchen2244 Mar 17, 2023
e38bb2a
fix bad keyname
allenchen2244 Mar 17, 2023
7b021c6
fix test
allenchen2244 Mar 17, 2023
f0b5ada
fix test
allenchen2244 Mar 17, 2023
55fccb5
turn off to test
allenchen2244 Mar 20, 2023
1c684fb
change to overall shard
allenchen2244 Mar 21, 2023
737d830
add operation too
allenchen2244 Mar 21, 2023
7cb379e
test
allenchen2244 Mar 21, 2023
d72cff0
more logs
allenchen2244 Mar 21, 2023
2399e31
try again and refactor
allenchen2244 Mar 21, 2023
63e1fbf
fix ssome shit
allenchen2244 Mar 21, 2023
94df73e
fix blob size
allenchen2244 Mar 21, 2023
316ace6
test more
allenchen2244 Mar 21, 2023
1d64236
remove testing code
allenchen2244 Mar 21, 2023
15550d5
Merge branch 'master' into large-history-shards
allenchen2244 Mar 21, 2023
3dfcab0
Merge branch 'master' into large-history-shards
Groxx Mar 27, 2023
cb02506
move stuff to context and change default value
allenchen2244 Mar 28, 2023
1c87d11
add defaults
allenchen2244 Mar 28, 2023
f3ae86c
add check again domain value
allenchen2244 Mar 29, 2023
52ce735
Update common/dynamicconfig/constants.go
allenchen2244 Mar 29, 2023
5c4671d
Update common/dynamicconfig/constants.go
allenchen2244 Mar 29, 2023
189be4d
call flipr only once
allenchen2244 Mar 29, 2023
1f1d2a3
move out of persistence obj to fn arg
allenchen2244 Mar 29, 2023
2b86428
Update service/history/execution/context.go
allenchen2244 Mar 29, 2023
c5ae571
found the common minint64
allenchen2244 Mar 29, 2023
4f58e05
fix lint
allenchen2244 Mar 29, 2023
280903a
testing something
allenchen2244 Mar 29, 2023
13522c6
add tag
allenchen2244 Mar 29, 2023
8b9c5bf
add 1 more field
allenchen2244 Mar 29, 2023
edd9766
remove testing code
allenchen2244 Mar 29, 2023
5dec082
Merge branch 'master' into large-history-shards
allenchen2244 Mar 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,7 +1346,21 @@ const (
// Value type: Int
// Default value: 100
SampleLoggingRate

// LargeShardHistorySizeMetricThreshold defines the threshold for what consititutes a large history storage size to alert on
// KeyName: system.largeShardHistorySizeMetricThreshold
// Value type: Int
// Default value: 10485760 (10mb)
LargeShardHistorySizeMetricThreshold
// LargeShardHistoryEventMetricThreshold defines the threshold for what consititutes a large history event size to alert on
// KeyName: system.largeShardHistoryEventMetricThreshold
// Value type: Int
// Default value: 50 * 1024
LargeShardHistoryEventMetricThreshold
// LargeShardHistoryBlobMetricThreshold defines the threshold for what consititutes a large history blob size to alert on
// KeyName: system.largeShardHistoryBlobMetricThreshold
// Value type: Int
// Default value: 262144 (1/4mb)
LargeShardHistoryBlobMetricThreshold
// LastIntKey must be the last one in this const group
LastIntKey
)
Expand Down Expand Up @@ -3454,6 +3468,21 @@ var IntKeys = map[IntKey]DynamicInt{
Description: "The rate for which sampled logs are logged at. 100 means 1/100 is logged",
DefaultValue: 100,
},
LargeShardHistorySizeMetricThreshold: DynamicInt{
KeyName: "system.largeShardHistorySizeMetricThreshold",
Description: "defines the threshold for what consititutes a large history size to alert on, default is 10mb",
DefaultValue: 10485760,
},
LargeShardHistoryEventMetricThreshold: DynamicInt{
KeyName: "system.largeShardHistoryEventMetricThreshold",
Description: "defines the threshold for what consititutes a large history event length to alert on, default is 50k",
DefaultValue: 50 * 1024,
},
LargeShardHistoryBlobMetricThreshold: DynamicInt{
KeyName: "system.largeShardHistoryBlobMetricThreshold",
Description: "defines the threshold for what consititutes a large history blob write to alert on, default is 1/4mb",
DefaultValue: 262144,
},
}

var BoolKeys = map[BoolKey]DynamicBool{
Expand Down
15 changes: 15 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,12 @@ const (
HistoryReplicationV2TaskScope
// SyncActivityTaskScope is the scope used by sync activity information processing
SyncActivityTaskScope
// LargeExecutionSizeShardScope is the scope to track large history size for hotshard detection
LargeExecutionSizeShardScope
// LargeExecutionCountShardScope is the scope to track large history count for hotshard detection
LargeExecutionCountShardScope
// LargeExecutionBlobShardScope is the scope to track large blobs for hotshard detection
LargeExecutionBlobShardScope

NumHistoryScopes
)
Expand Down Expand Up @@ -1750,6 +1756,9 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
FailoverMarkerScope: {operation: "FailoverMarker"},
HistoryReplicationV2TaskScope: {operation: "HistoryReplicationV2Task"},
SyncActivityTaskScope: {operation: "SyncActivityTask"},
LargeExecutionSizeShardScope: {operation: "LargeExecutionSizeShard"},
LargeExecutionCountShardScope: {operation: "LargeExecutionCountShard"},
LargeExecutionBlobShardScope: {operation: "LargeExecutionBlobShard"},
},
// Matching Scope Names
Matching: {
Expand Down Expand Up @@ -2247,6 +2256,9 @@ const (
HistoryFailoverCallbackCount
WorkflowVersionCount
WorkflowTypeCount
LargeHistoryBlobCount
LargeHistoryEventCount
LargeHistorySizeCount

NumHistoryMetrics
)
Expand Down Expand Up @@ -2844,6 +2856,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ReplicationTasksCount: {metricName: "replication_tasks_count", metricType: Timer},
WorkflowVersionCount: {metricName: "workflow_version_count", metricType: Gauge},
WorkflowTypeCount: {metricName: "workflow_type_count", metricType: Gauge},
LargeHistoryBlobCount: {metricName: "large_history_blob_count", metricType: Counter},
LargeHistoryEventCount: {metricName: "large_history_event_count", metricType: Counter},
LargeHistorySizeCount: {metricName: "large_history_size_count", metricType: Counter},
},
Matching: {
PollSuccessPerTaskListCounter: {metricName: "poll_success_per_tl", metricRollupName: "poll_success"},
Expand Down
1 change: 1 addition & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func (p *persistenceMetricsClientBase) callWithDomainAndShardScope(scope int, op

domainMetricsScope.RecordTimer(metrics.PersistenceLatencyPerDomain, duration)
shardOperationsMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration)
shardOverallMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration)

if p.enableLatencyHistogramMetrics {
domainMetricsScope.RecordHistogramDuration(metrics.PersistenceLatencyHistogram, duration)
Expand Down
15 changes: 11 additions & 4 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,12 @@ type Config struct {
EnableDebugMode bool // note that this value is initialized once on service start
EnableTaskInfoLogByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter

SampleLoggingRate dynamicconfig.IntPropertyFn
EnableShardIDMetrics dynamicconfig.BoolPropertyFn
// Hotshard stuff
SampleLoggingRate dynamicconfig.IntPropertyFn
EnableShardIDMetrics dynamicconfig.BoolPropertyFn
LargeShardHistorySizeMetricThreshold dynamicconfig.IntPropertyFn
LargeShardHistoryEventMetricThreshold dynamicconfig.IntPropertyFn
LargeShardHistoryBlobMetricThreshold dynamicconfig.IntPropertyFn
}

// New returns new service config with default values
Expand Down Expand Up @@ -556,8 +560,11 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
EnableDebugMode: dc.GetBoolProperty(dynamicconfig.EnableDebugMode)(),
EnableTaskInfoLogByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.HistoryEnableTaskInfoLogByDomainID),

SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate),
EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics),
SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate),
EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics),
LargeShardHistorySizeMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistorySizeMetricThreshold),
LargeShardHistoryEventMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryEventMetricThreshold),
LargeShardHistoryBlobMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryBlobMetricThreshold),
}

return cfg
Expand Down
7 changes: 4 additions & 3 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,6 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
currentWorkflowTransactionPolicy TransactionPolicy,
newWorkflowTransactionPolicy *TransactionPolicy,
) (retError error) {

defer func() {
if retError != nil {
c.Clear()
Expand All @@ -720,11 +719,12 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
if err != nil {
return err
}

var persistedBlobs events.PersistedBlobs
currentWorkflowSize := c.GetHistorySize()
currentWorkflowSize, oldWorkflowSize := c.GetHistorySize(), c.GetHistorySize()
currentWorkflowHistoryCount, oldWorkflowHistoryCount := c.mutableState.GetNextEventID()-1, c.mutableState.GetNextEventID()-1
allenchen2244 marked this conversation as resolved.
Show resolved Hide resolved
for _, workflowEvents := range currentWorkflowEventsSeq {
blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
currentWorkflowHistoryCount += int64(len(workflowEvents.Events))
if err != nil {
return err
}
Expand Down Expand Up @@ -852,6 +852,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
domainName,
resp.MutableStateUpdateSessionStats,
)
c.emitLargeWorkflowShardIDStats(currentWorkflowSize-oldWorkflowSize, oldWorkflowHistoryCount, oldWorkflowSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while I'm exploring the code in here:
would any of these be more-correct if they came from newWorkflow? and/or would using newWorkflow mean we wouldn't need to maintain as much separately?

I'm not quite sure what the difference is tbh. I'd have to read more carefully. I don't think it'll be dangerous to use the wrong one, just possibly misleading (due to subtly-wrong values, or due to unnecessary duplicate calculations that could drift)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it matters. From what I can tell new workflow kind of derives the numbers the same way I am. Maybe it would be less duplicated code but at most it's saving 5 lines and makes it a bit more confusing to read imo.

Copy link
Contributor

@Groxx Groxx Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main reason to prefer using the new-workflow's data is that it'd make it clear what the source of truth is, and get rid of any risk of the calculations drifting away from that source of truth due to future changes.

which is an "if". tbh I'm not sure if it's more-correct or not.

Copy link
Contributor Author

@allenchen2244 allenchen2244 Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could try testing it later? The other 2 metric emission fns is kinda calculated the same way so i wanted to make it consistent. I don't wanna just change part of it then it's super confusing if we never change the rest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which ones are those?
but yea, later's fine (if ever)

// emit workflow completion stats if any
if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted {
if event, err := c.mutableState.GetCompletionEvent(ctx); err == nil {
Expand Down
53 changes: 53 additions & 0 deletions service/history/execution/context_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package execution

import (
"strconv"
"time"

"github.com/uber/cadence/common/log"
Expand All @@ -30,6 +31,58 @@ import (
"github.com/uber/cadence/common/types"
)

func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCount int64, oldHistorySize int64) {
if c.shard.GetConfig().EnableShardIDMetrics() {
shardIDStr := strconv.Itoa(c.shard.GetShardID())

var blobSizeWarn int64
blobSizeGlobalWarn := c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold()
blobSizeDomainWarn := c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName())
if blobSizeGlobalWarn < blobSizeDomainWarn {
blobSizeWarn = int64(blobSizeGlobalWarn)
} else {
blobSizeWarn = int64(blobSizeDomainWarn)
}
allenchen2244 marked this conversation as resolved.
Show resolved Hide resolved
// check if blob size is larger than threshold in Dynamic config if so alert on it every time
if blobSize > blobSizeWarn {
c.logger.SampleInfo("Workflow writing a large blob", c.shard.GetConfig().SampleLoggingRate(), tag.WorkflowDomainName(c.GetDomainName()),
tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()))
c.metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda odd at a glance that this takes a string, but meh. maybe there's a reason for it.

}

var historyCountWarn int64
historyCountGlobalWarn := c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()
historyCountDomainWarn := c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName())
if historyCountGlobalWarn < historyCountDomainWarn {
historyCountWarn = int64(historyCountGlobalWarn)
} else {
historyCountWarn = int64(historyCountDomainWarn)
}
// check if the new history count is greater than our threshold and only count/log it once when it passes it
// this might sometimes double count if the workflow is extremely fast but should be ok to get a rough idea and identify bad actors
if oldHistoryCount < historyCountWarn && (c.mutableState.GetNextEventID()-1) >= historyCountWarn {
c.logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()),
tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()))
c.metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount)
}

var historySizeWarn int64
historySizeGlobalWarn := c.shard.GetConfig().LargeShardHistorySizeMetricThreshold()
historySizeDomainWarn := c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName())
if historySizeGlobalWarn < historySizeDomainWarn {
historySizeWarn = int64(historySizeGlobalWarn)
} else {
historySizeWarn = int64(historySizeDomainWarn)
}
// check if the new history size is greater than our threshold and only count/log it once when it passes it
if oldHistorySize < historySizeWarn && c.stats.HistorySize >= historySizeWarn {
c.logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()),
tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()))
c.metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount)
}
}
}

func emitWorkflowHistoryStats(
metricsClient metrics.Client,
domainName string,
Expand Down