Skip to content

feat: Add support for Kafka clusters sharding #1454

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Sep 14, 2022
Merged

Conversation

olksdr
Copy link
Contributor

@olksdr olksdr commented Sep 6, 2022

This allows to configured many different Kafka clusters per topic. The
basic idea is to provide the numebr of shards and the lower bound for
the range with the kafka config name and the topic name on that cluster

 metrics:
     shards: 65000
     mapping:
         0:
             name: "ingest-metrics-1"
             config: "metric_1"
         25000:
             name: "ingest-metrics-2"
             config: "metrics_2"
         45000:
             name: "ingest-metrics-3"
             config: "metrics_3"

This will also allow to configure one Kafka clusters and use different
topics names on it.

This allows to configured many different Kafka clusters per topic. The
basic idea is to provide the numebr of shards and the lower bound for
the range with the kafka config name and the topic name on that cluster

```
 metrics:
     shards: 65000
     mapping:
         0:
             name: "ingest-metrics-1"
             config: "metric_1"
         25000:
             name: "ingest-metrics-2"
             config: "metrics_2"
         45000:
             name: "ingest-metrics-3"
             config: "metrics_3"
```

This will also allow to configure one Kafka clusters and use different
topics names on it.

INGEST-1592
@olksdr olksdr self-assigned this Sep 6, 2022
@olksdr olksdr marked this pull request as ready for review September 9, 2022 06:19
@olksdr olksdr requested a review from a team September 9, 2022 06:19
pub struct Sharded {
/// The number of shards used for this topic.
shards: u64,
/// The Kafka configuration assigned to the specific shard range.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you describe that the u64 is the start index and the next u64 describes the range. Explicitly calling out what's inclusive (i'm assuming the start u64 is inclusive, the end u64 is excluded and part of the next range). Maybe even write an example out on the struct doc comment like you did in the PR description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flub, please, have a look into f967933 if this is something you had in mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, 💯

let org_id_hash = hasher.finish();
let shard = org_id_hash % shards;

// should be ok to unwrap since we MUST have at least one range defined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is reasonable but because it is so far apart from each other in the code I'm a bit uncomfortable about it. One way would be make a custom type for the producers, then it is enforced in that type and relying on that invariant would be a lot nicer and keeps the code interacting with the invariant close together. I guess this would be a newtype over the BTreeMap with a few small methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good point. I will look into it.
It might also simplify the code and removes some duplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I moved the same functionality under the Sharded producer in store actore d4a05c2.
I do not know if it's a good idea to create a new type over BTreeMap, which can be a bit more complicated to understand.
But I might have to pick your brain on this , and how to implement better.

Comment on lines 160 to 163
let mut hasher = FnvHasher::default();
std::hash::Hash::hash(&organization_id, &mut hasher);
let org_id_hash = hasher.finish();
let shard = org_id_hash % shards;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code exists in two places already, given the nature of it - i suspect both would need updating together all the time - it might be better to create a method somewhere. Maybe also on the newtype around the BTreeMap I already suggested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this into the impl for Sharded producer in d4a05c2

/// The maximum number of logical shards for this set of configs.
shards: u64,
/// The list of the sharded Kafka configs.
configs: BTreeMap<u64, KafkaParams<'a>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this mapping in three different places now, I wonder if it would be possible to have it only once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jjbayer I've tried to remove one of the levels of indirection to eliminate of of the repeating BTreeMap, see 0db194e
Please, have another look if this is somewhat better approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it makes sense this way.

Now Kafka config with topic name and parameters is produced from
TopicAssignment enum
@olksdr olksdr requested review from jjbayer and a team September 13, 2022 11:19
assert!(matches!(
kafka_config_profiles,
KafkaConfigName::Single { .. }
));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way we could keep this assertion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in 1c3d5e0.

topic_name: topic.as_str(),
kafka_config_name: None,
},
fn kafka_config_name<'a>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the doc comment and the function name, now that this function returns a full config rather than a name?

Copy link
Contributor Author

@olksdr olksdr Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, done in 1c3d5e0

/// The maximum number of logical shards for this set of configs.
shards: u64,
/// The list of the sharded Kafka configs.
configs: BTreeMap<u64, KafkaParams<'a>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it makes sense this way.

Self::Single {
topic_name,
producer,
} => (topic_name.as_str(), Arc::clone(producer)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can probably change the signature of get_producer to return (&str, &ThreadedProducer) to save on Arc::clones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in 1c3d5e0

@@ -133,7 +134,7 @@ def inner(
default_opts.setdefault(key, {}).update(options[key])

dir = config_dir("relay")
dir.join("config.yml").write(json.dumps(default_opts))
dir.join("config.yml").write(yaml.dump(default_opts))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have to change this now, but I believe that at some point the configuration will have to become json-serializable. then we will have to revisit the schema. but that's fine

@olksdr olksdr merged commit 6802bd1 into master Sep 14, 2022
@olksdr olksdr deleted the feat/kafka-sharding-config branch September 14, 2022 05:27
jan-auer added a commit that referenced this pull request Sep 14, 2022
* master:
  feat: Add support for Kafka clusters sharding (#1454)
  feat(replays): Fix typo in payload deserializer (#1467)
olksdr added a commit that referenced this pull request Sep 15, 2022
As a folllowup for #1454 we decided that we do not have to hash the org
id to get the sahrd number and simple modulo #shard will be sufficient.

As part of this small refactoring the `unwrap` was also removed and
Result is returned instead, to propagate the error to the caller if it
happens.
olksdr added a commit that referenced this pull request Sep 15, 2022
As a followup for #1454 we decided that we do not have to hash the org
id to get the shard number and simple modulo #shard will be sufficient.

As part of this small refactoring the `unwrap` was also removed and
Result is returned instead, to propagate the error to the caller if it
happens.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants