Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect: Add mechanisms for routing records by topic name #11623

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

mun1r0b0t
Copy link

@mun1r0b0t mun1r0b0t commented Nov 22, 2024

Add a new routing mechanism for records that uses Kafka topic name for routing and update configuration for how to route records.

The changes move the routing logic to a separate class with different implementations that can be selected via the configuration. It preserves backwards compatibility of current behavior, while adding a way to route records to tables based on the Kafka topic and also allows custom implementations of how records are routed.

Closes #11163

@jbonofre
Copy link
Member

I wonder if it makes sense to do the routing in the Iceberg kafka connect sink.
Is it not the purpose of integration framework (Camel, Nifi, ...) to do the routing and implement EIPs ?

@bryanck what do you think ?

@mun1r0b0t just a note: your PR includes a lot of not necessary change (especially in the md file). Please avoid unnecessary reformating changes as it's harder to review.

@mun1r0b0t
Copy link
Author

Sorry about that. It was my over zealous auto-formatter. Reverted any changes that are not relevant to the PR.

@bryanck
Copy link
Contributor

bryanck commented Nov 22, 2024

I feel like we can take a much simpler approach for this. For more complex routing needs, an SMT makes more sense to me.

@mun1r0b0t
Copy link
Author

I am certainly open to simplifying this. However, as user and developer of other connectors, I strongly feel that the connector should support consuming from multiple topics and writing to multiple destination tables without the use of other services.

Kafka Connect API supports consuming from multiple topics (using a list of topic names or a regex), but this is just not possible to do with the current Iceberg connector. So, it is essentially preventing me from using Kafka Connect to its full potential and forcing me to create multiple connectors where one connector would work. Other Kafka connectors that I've worked with all support topic to destination mapping out of the box and do not require any additional services. This is the first connector I've come across that does not work with multiple topics.

The complexity is largely due to the existing setup of using a field from the data rather than some configuration for routing. With the current routing approach, we have to add the destination table name as part of the data and write data with entirely different schemas and use cases all to one topic. I did not want to break existing behavior and hence had to work it in the PR, but in my humble opinion, the existing approach is contradictory to how Kafka should be used.

I'd much rather keep only the topic based routing and do away with the field based routing, if it comes down to supporting only one of these options. It aligns much better with how Kafka and Kafka Connect is used and how other connectors behave.

How do you feel about making a breaking change like this with the configuration - keep the iceberg.table.table_name.topics and iceberg.table.table_name.topic-regex, remove the current routing set up and rely on other services for more complex routing patterns. I would also prefer to keep the option to plug in a custom routing solution for those who want to do their own routing without additional services, but I can drop that as well.

@jbonofre
Copy link
Member

I think that "basic routing" like consuming from multiple topics and "map" topic -> table could make sense. We should avoid complex routing in the kafka-connect component as we will overlap with SMT.
I think that we can keep the iceberg.table.table_name.topics and iceberg.table.table_name.topic_regex properties and have a pluggable layer for actual routing.

Add 2 new routing mechanisms for records that use Kafka topic name for
routing and update configuration for how to route records.

The changes move the routing logic to a separate class with different
implementations that can be selected via the configuration. It preserves
backwards compatibility of current behavior, while adding 2 new ways to
route records to tables based on the Kafka topic and also allows
custom implementations of how records are routed.

The two new routing mechanisms allow the connector to consume from
multiple topics, eliminating the need to create separate connectors
for each topic to consume from.
@mun1r0b0t mun1r0b0t force-pushed the feature/topic-based-routing-for-connect branch from 2a994a2 to 137439d Compare December 3, 2024 01:49
@mun1r0b0t
Copy link
Author

I made the changes as we discussed on Slack. Instead of using new keys for the topic based configuration, I am overloading the existing route-regex key to match against the topic.

@bryanck
Copy link
Contributor

bryanck commented Dec 3, 2024

I don't feel we need a new RecordRouter abstraction to implement this feature, which introduces complexity. SinkWriter.extractRouteValue() could be enhanced to extract the source topic name, if configured, in addition to using a field value.

@mun1r0b0t
Copy link
Author

mun1r0b0t commented Dec 3, 2024

With the abstraction, it doesn't need to check the configuration for each record. I want to use the connector with ~30 tables and not having to parse all the configuration for each record will help with the performance. It also allows saving routing specific state within the router instead of having it all in the SinkWriter, which I think is easier to manage rather than doing it all in one method.

I also like that it allows users to plugin their own routing solution easily. I can think of many other ways someone might determine the destination table, and it is good functionality to have in the connector. I intend to use the plugin feature myself as well.

So, overall, I think the abstraction is worth the performance and usability benefits, and it is not that much more code. Could you explain your concerns around complexity? Perhaps I can address them in other ways.

@bryanck
Copy link
Contributor

bryanck commented Dec 3, 2024

I was thinking something like this

@mun1r0b0t
Copy link
Author

But that's very limiting in terms of what it can do. The table name has to be the topic name, which is extremely restrictive, and it uses magic string to determine topic vs field. It also does not allow reading from multiple topics into the same table or one topic into multiple tables.

For my use case, I cannot use the topic name as the table name since the topic names have . in them, which does not work with AWS Glue. So I absolutely need a way to specify the source topic per table. I'm sure there are plenty of other such cases where people would want to explicitly specify the mapping from topic to table and simply cannot use the topic name as the table name.

@bryanck
Copy link
Contributor

bryanck commented Dec 3, 2024

The topic name can be mapped to a table via static routing, isn't that what your TopicRecordRouter is doing?

@mun1r0b0t
Copy link
Author

Yeah, but the code in your link does not do that. To do that in your version, it'll have to loop through all the table configurations for each record. The abstraction around the routing optimizes this by doing it once per topic. It also makes static routing faster using the same optimization.

I still don't understand the issue with the RecordRouter abstraction. To me, it is a good thing both performance and usability wise.

@bryanck
Copy link
Contributor

bryanck commented Dec 3, 2024

I feel your solution is reasonable, though I'm trying to reconcile this with the need for a more flexible, pluggable way to route records. For example, one case we had was to support dynamic routing based on the topic name but with flexible table names (as you pointed out, that's limited), so for that we used an SMT to populate the route field. I'm interested in other opinions on this.

@mun1r0b0t
Copy link
Author

IMO making it a plugin gives the user the flexibility to choose. User can add their own routing plugin to the runtime or use SMT or some other means. I think the connector shouldn't preclude how the user routes records. That is best left to the user.

But this is just one user's opinion. Definitely open to hearing from others.

@igorvoltaic
Copy link

I think that "basic routing" like consuming from multiple topics and "map" topic -> table could make sense.

That's something I've suggested here #10422

Copy link

github-actions bot commented Jan 9, 2025

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jan 9, 2025
@mun1r0b0t
Copy link
Author

@bryanck Any further thoughts on this?

@github-actions github-actions bot removed the stale label Jan 10, 2025
Copy link

github-actions bot commented Feb 9, 2025

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Feb 9, 2025
@mun1r0b0t
Copy link
Author

Still waiting on response from maintainer

@github-actions github-actions bot removed the stale label Feb 10, 2025
@bb-dikshantsharma17012025

hi @jbonofre @bryanck , any update on this PR? as for our usecase we want to route all tables from a source mysql into s3, (we are using debezium as source connector, so need to map each topic as a table in iceberg).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Kafka Connect: route to table using topic name
5 participants