Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Physical sharding for NoSQL-based persistence #5187

Merged
merged 4 commits into from
Apr 21, 2023

Conversation

emrahs
Copy link
Contributor

@emrahs emrahs commented Apr 4, 2023

What changed?
Added a sharding layer to the NoSQL persistence stack so that Cadence can use multiple Cassandra clusters at once in a physically sharded manner.

Cadence is a heavily storage-bounded system, so the limits for the load per Cadence cluster is strictly limited by the underlying storage system. Given the massive adoption of Cadence at Uber, this scale limitation forces us to create more Cadence clusters than we want to operate. This capability will let us have one or two orders of magnitude larger Cadence clusters than we have today.

Note that this feature only enables bootstrapping a brand-new cluster with multiple databases behind it. Resharding is designed but not implemented yet.

Why?
So that a Cadence cluster can be bootstrapped with multiple Cassandra clusters powering it.

How did you test it?
Added unit tests. Ran samples and tested bench tests in a staging environment.

Potential risks
Since this change significantly changes the low-level persistence logic, it can cause data loss if something goes terribly wrong.

Release notes
The change is backward compatible. Existing Cadence cluster configurations can be updated, if desired, to use the sharded NoSQL config format. However, they must continue having a single shard since Cadence still doesn't have the ability to reshard data.

Documentation Changes
There is a sample config file included in this PR that shows how to make use of the feature in a new cluster.

@coveralls
Copy link

coveralls commented Apr 6, 2023

Pull Request Test Coverage Report for Build 0187a4e6-dd82-410d-9812-538d5193e1d3

  • 260 of 384 (67.71%) changed or added relevant lines in 18 files are covered.
  • 116 unchanged lines in 25 files lost coverage.
  • Overall coverage increased (+0.004%) to 57.124%

Changes Missing Coverage Covered Lines Changed/Added Lines %
common/persistence/client/factory.go 4 6 66.67%
common/persistence/nosql/factory.go 8 10 80.0%
common/persistence/nosql/shardingPolicy.go 52 54 96.3%
common/persistence/persistence-tests/matchingPersistenceTest.go 0 4 0.0%
common/dynamicconfig/configstore/config_store_client.go 1 11 9.09%
tools/cassandra/handler.go 0 14 0.0%
common/persistence/nosql/nosqlShardStore.go 12 27 44.44%
common/persistence/nosql/shardedNosqlStore.go 67 90 74.44%
common/persistence/nosql/nosqlHistoryStore.go 19 43 44.19%
common/persistence/nosql/nosqlTaskStore.go 23 51 45.1%
Files with Coverage Reduction New Missed Lines %
common/dynamicconfig/configstore/config_store_client.go 1 70.02%
common/task/weightedRoundRobinTaskScheduler.go 1 89.64%
service/history/queue/transfer_queue_processor_base.go 1 77.62%
common/persistence/nosql/nosqlHistoryStore.go 2 69.93%
common/persistence/nosql/nosqlplugin/cassandra/tasks.go 2 75.58%
common/persistence/nosql/nosqlTaskStore.go 2 60.79%
common/task/parallelTaskProcessor.go 2 92.75%
service/history/execution/mutable_state_util.go 2 36.04%
service/history/task/transfer_standby_task_executor.go 2 85.98%
service/matching/matcher.go 2 91.46%
Totals Coverage Status
Change from base Build 0187963e-799b-4034-9890-800458f7961d: 0.004%
Covered Lines: 85665
Relevant Lines: 149962

💛 - Coveralls

if _, found := connections[shardRange.Shard]; !found {
return fmt.Errorf("ShardedNosql config: Unknown history shard name: %v", shardRange.Shard)
}
if shardRange.Start != currentShardID+1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to make start inclusive but end exclusive, because that's a pretty common behavior across different areas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I also debated the two options when modeling this part of the config. In a usual scenario, I agree that the start-inclusive & end-exclusive pattern works the best. However, when I was thinking about more arbitrary shard distributions (i.e. creating a mapping for a single shard) it felt like using inclusive start & end was making the config a little more readable.

Thinking about it again though, there are other ways to cleanly handle single shard pinning if it ever becomes a need. So I'll change this to the standard range pattern.

)

type (
// Factory vends datastore implementations backed by cassandra
Factory struct {
sync.RWMutex
cfg config.Cassandra
cfg config.ShardedNoSQL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this strictly a feature superset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cassandra is same as NoSQL and ShardedNoSQL is strictly a superset of it, so a NoSQL configuration can be parsed into a ShardedNoSQL config object.

return &shard, nil
}

s, err := sn.connectToShard(cfg.NoSQLPlugin, shardName)
Copy link
Contributor

@davidporter-id-au davidporter-id-au Apr 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't have any already (and I may have missed them) it'd be great to get some metrics around the number of connections occurring

Ignore this, I think I was somehow wildly overstating the number of shards in my head for some reason.

}
tlShards := sp.config.ShardingPolicy.TaskListHashing.ShardOrder
tlShardCount := len(tlShards)
hash := farm.Hash32([]byte(domainID+"_"+taskListName)) % uint32(tlShardCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having just made a silly mistake with this myself, just double-cehcking, are there any conditions under which tlShardCount could be zero? https://go.dev/play/p/Pv73y1KpbOZ

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be impossible with the if !sp.hasShardedTasklist at the top of the func.

// ShardingPolicy contains configuration for physical DB sharding
ShardingPolicy struct {
// HistoryShardMapping defines the ranges of history shards stored by each DB shard. Ranges listed here *MUST*
// be continuous and non-overlapping, such that the first range in the list starts from Shard 0, each following
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nonblocking question: I could be lazy to look for this, but i didn't see it. Do we have any runtime checks to ensure this is true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in persistence.go

@@ -198,6 +198,8 @@ type (
SQL *SQL `yaml:"sql"`
// NoSQL contains the config for a NoSQL based datastore
NoSQL *NoSQL `yaml:"nosql"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we deprecating NoSQL field?
Add a comment to announce the deprecation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not deprecate it right away. Most users outside Uber have very small Cadence deployments and the ShardedNoSQL config will just be unnecessarily more complex for them to maintain.

Copy link
Contributor

@Shaddoll Shaddoll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you adding new test to the buildkite pipeline?
I think we need to build an environment with shardedNoSQL to run all the persistence and integration tests.

Copy link
Contributor

@Shaddoll Shaddoll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question: it seems that the sharding policy thing doesn't have any dependency on nosql, so probably the logic should not be put into nosql package. The logic can be reused for sql case, any plans for shardedSQL support?

@emrahs
Copy link
Contributor Author

emrahs commented Apr 20, 2023

Are you adding new test to the buildkite pipeline? I think we need to build an environment with shardedNoSQL to run all the persistence and integration tests.

Good idea but I'll leave that out as a separate task.

@emrahs
Copy link
Contributor Author

emrahs commented Apr 20, 2023

Another question: it seems that the sharding policy thing doesn't have any dependency on nosql, so probably the logic should not be put into nosql package. The logic can be reused for sql case, any plans for shardedSQL support?

There is already a partitioning implementation for SQL and I think it has various weaknesses and needs to be refactored so that it can support resharding and does better job at colocation of related records across executions and history tables. Unifying the two sharding approaches by refactoring the SQL implementation is a good next step, and reusing some of the pieces in this PR might make sense at that time. I'll keep shardingPolicy in NoSQL implementation for now until it's needed outside the package.

@shijiesheng shijiesheng merged commit 45129f1 into uber:master Apr 21, 2023
shijiesheng added a commit that referenced this pull request Apr 21, 2023
What changed?
Added a sharding layer to the NoSQL persistence stack so that Cadence can use multiple Cassandra clusters at once in a physically sharded manner.

Cadence is a heavily storage-bounded system, so the limits for the load per Cadence cluster is strictly limited by the underlying storage system. Given the massive adoption of Cadence at Uber, this scale limitation forces us to create more Cadence clusters than we want to operate. This capability will let us have one or two orders of magnitude larger Cadence clusters than we have today.

Note that this feature only enables bootstrapping a brand-new cluster with multiple databases behind it. Resharding is designed but not implemented yet.

Why?
So that a Cadence cluster can be bootstrapped with multiple Cassandra clusters powering it.

How did you test it?
Added unit tests. Ran samples and tested bench tests in a staging environment.

Potential risks
Since this change significantly changes the low-level persistence logic, it can cause data loss if something goes terribly wrong.

Release notes
The change is backward compatible. Existing Cadence cluster configurations can be updated, if desired, to use the sharded NoSQL config format. However, they must continue having a single shard since Cadence still doesn't have the ability to reshard data.

Documentation Changes
There is a sample config file included in this PR that shows how to make use of the feature in a new cluster.

Co-authored-by: emrahs <emrah@uber.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants