-
Notifications
You must be signed in to change notification settings - Fork 99
feat: Add support for Kafka clusters sharding #1454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This allows to configured many different Kafka clusters per topic. The basic idea is to provide the numebr of shards and the lower bound for the range with the kafka config name and the topic name on that cluster ``` metrics: shards: 65000 mapping: 0: name: "ingest-metrics-1" config: "metric_1" 25000: name: "ingest-metrics-2" config: "metrics_2" 45000: name: "ingest-metrics-3" config: "metrics_3" ``` This will also allow to configure one Kafka clusters and use different topics names on it. INGEST-1592
pub struct Sharded { | ||
/// The number of shards used for this topic. | ||
shards: u64, | ||
/// The Kafka configuration assigned to the specific shard range. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you describe that the u64 is the start index and the next u64 describes the range. Explicitly calling out what's inclusive (i'm assuming the start u64 is inclusive, the end u64 is excluded and part of the next range). Maybe even write an example out on the struct doc comment like you did in the PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, 💯
relay-server/src/actors/outcome.rs
Outdated
let org_id_hash = hasher.finish(); | ||
let shard = org_id_hash % shards; | ||
|
||
// should be ok to unwrap since we MUST have at least one range defined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is reasonable but because it is so far apart from each other in the code I'm a bit uncomfortable about it. One way would be make a custom type for the producers, then it is enforced in that type and relying on that invariant would be a lot nicer and keeps the code interacting with the invariant close together. I guess this would be a newtype over the BTreeMap with a few small methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a very good point. I will look into it.
It might also simplify the code and removes some duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I moved the same functionality under the Sharded
producer in store actore d4a05c2.
I do not know if it's a good idea to create a new type over BTreeMap
, which can be a bit more complicated to understand.
But I might have to pick your brain on this , and how to implement better.
relay-server/src/actors/store.rs
Outdated
let mut hasher = FnvHasher::default(); | ||
std::hash::Hash::hash(&organization_id, &mut hasher); | ||
let org_id_hash = hasher.finish(); | ||
let shard = org_id_hash % shards; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code exists in two places already, given the nature of it - i suspect both would need updating together all the time - it might be better to create a method somewhere. Maybe also on the newtype around the BTreeMap I already suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved this into the impl for Sharded
producer in d4a05c2
/// The maximum number of logical shards for this set of configs. | ||
shards: u64, | ||
/// The list of the sharded Kafka configs. | ||
configs: BTreeMap<u64, KafkaParams<'a>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have this mapping in three different places now, I wonder if it would be possible to have it only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it makes sense this way.
Now Kafka config with topic name and parameters is produced from TopicAssignment enum
relay-config/src/config.rs
Outdated
assert!(matches!( | ||
kafka_config_profiles, | ||
KafkaConfigName::Single { .. } | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any way we could keep this assertion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in 1c3d5e0.
relay-config/src/config.rs
Outdated
topic_name: topic.as_str(), | ||
kafka_config_name: None, | ||
}, | ||
fn kafka_config_name<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update the doc comment and the function name, now that this function returns a full config rather than a name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, done in 1c3d5e0
/// The maximum number of logical shards for this set of configs. | ||
shards: u64, | ||
/// The list of the sharded Kafka configs. | ||
configs: BTreeMap<u64, KafkaParams<'a>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it makes sense this way.
relay-server/src/actors/store.rs
Outdated
Self::Single { | ||
topic_name, | ||
producer, | ||
} => (topic_name.as_str(), Arc::clone(producer)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can probably change the signature of get_producer
to return (&str, &ThreadedProducer)
to save on Arc::clone
s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in 1c3d5e0
@@ -133,7 +134,7 @@ def inner( | |||
default_opts.setdefault(key, {}).update(options[key]) | |||
|
|||
dir = config_dir("relay") | |||
dir.join("config.yml").write(json.dumps(default_opts)) | |||
dir.join("config.yml").write(yaml.dump(default_opts)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't have to change this now, but I believe that at some point the configuration will have to become json-serializable. then we will have to revisit the schema. but that's fine
As a folllowup for #1454 we decided that we do not have to hash the org id to get the sahrd number and simple modulo #shard will be sufficient. As part of this small refactoring the `unwrap` was also removed and Result is returned instead, to propagate the error to the caller if it happens.
As a followup for #1454 we decided that we do not have to hash the org id to get the shard number and simple modulo #shard will be sufficient. As part of this small refactoring the `unwrap` was also removed and Result is returned instead, to propagate the error to the caller if it happens.
This allows to configured many different Kafka clusters per topic. The
basic idea is to provide the numebr of shards and the lower bound for
the range with the kafka config name and the topic name on that cluster
This will also allow to configure one Kafka clusters and use different
topics names on it.