Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple upstream source ingestion support on Pinot #13780

Open
lnbest0707-uber opened this issue Aug 8, 2024 · 8 comments
Open

Multiple upstream source ingestion support on Pinot #13780

lnbest0707-uber opened this issue Aug 8, 2024 · 8 comments

Comments

@lnbest0707-uber
Copy link
Contributor

Pinot nowadays only supports realtime table ingested from one single source stream, e.g. one Kafka topic from a Kafka cluster. And inside the table manager, the internal segment partition concept is hard coupled with the stream's partition. For example, if Kafka topic has 8 partitions, then Pinot table segments are also partitioned by 8, and each segment is consuming from the Kafka topic partition with the exact same partition id.
This is a workable and simple design which could fit most of straightforward use cases. But it also imposes the flexibilities on ingestions.
In reality, users may produce data of same subject to different Kafka topics and ingest to a single Pinot table (with same Schema) to do centralized analysis. There was one Pinot open issue asking for the feature #5647. Other OLAP technologies, e.g. Clickhouse and Druid, are developing or have developed similar features like https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka and apache/druid#14424.
Based on the current Pinot architecture, it is possible to add the feature with following features and constraints:

  • Ingests from multiple stream topics and formats a same Pinot table.
  • Different stream topics could be with different number of partitions, and even different data format (json, avro, protobuf, etc) meaning Pinot table should be able to use different decoder to decode data from different tables accordingly.
  • Same transformation and indexing strategy is applied to the decoded data from different topics. This limitation is due to the TableConfig structure we are defining, could be resolved if some major TableConfig refactor done. Even with this limitation, transformation could be easily done by using existing dynamic transformation features like SchemaConformingTransformer introduced in Add SchemaConformingTransformerV2 to enhance text search abilities #12788.
  • Starts from LLC.
  • Table schema evolution, stream partition number expansion and auto catch-up, instance assignment strategies need to have same support without regressions.
  • In short term, we do not consider adding or removing topics from the stream topics list.

The implementation strategy should consider decoupling the partition concept between stream and Pinot. Theoretically, stream and OLAP db are two independent infra and storages. They should have their own partition strategies instead of having hard dependencies on the other. Pinot segment partition is only directly used for segment management. The data consumption of each segment partition should not be hardly coupled with stream's partition. The abstraction layer could be built in between to manage the mapping.
With this feature, it could also enhances ingestion performance and solves the issue like #13319 to have multiple segment partitions consuming from same topic partition.

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 8, 2024

Duplicate of #8806 ?
Duplicate of #5647 (as you mention in description)

@kishoreg
Copy link
Member

kishoreg commented Aug 8, 2024

+100 on this.. it's coming up multiple times.

Do you have any initial thoughts?

@lnbest0707-uber
Copy link
Contributor Author

+100 on this.. it's coming up multiple times.

Do you have any initial thoughts?

I've already had a working prototyping, would contribute soon.

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Aug 8, 2024

At a high level, we could treat each topic-partition pair as a single consuming partition in pinot.

We will need some more abstractions to be able to support different encoding or other changes amongst the topics.

I will assume that the table has only one schema. So, we need some set of intersecting columns in each topic that map into the table's columns.

@lnbest0707-uber
Copy link
Contributor Author

Add a design doc

@sajjad-moradi
Copy link
Contributor

sajjad-moradi commented Aug 13, 2024

A few years ago, I had a working POC for multi-topic consumption. I also wrote a design doc for it, but we ended up not using it in linkedin, so I did not publish it. The idea is similar to what's proposed here, but IMO it's a bit simpler. Please take a look, and let me know what you think:
https://docs.google.com/document/d/1gz3oQLdIfL_Iniu0XvIjKw5We_mo4LlroEDysoBhpHw/edit#heading=h.kvi1qo69zoqe

@hpvd
Copy link

hpvd commented Sep 18, 2024

+1 on this!

as described:

The implementation strategy should consider decoupling the partition concept between stream and Pinot. Theoretically, stream and OLAP db are two independent infra and storages. They should have their own partition strategies instead of having hard dependencies on the other.

especially this decoupling is also important since there is not only stream ingestion solely from Kafka but also from other systems like Pulsar, Kinesis...

@rajat-sr1704
Copy link

A few years ago, I had a working POC for multi-topic consumption. I also wrote a design doc for it, but we ended up not using it in linkedin, so I did not publish it. The idea is similar to what's proposed here, but IMO it's a bit simpler. Please take a look, and let me know what you think: https://docs.google.com/document/d/1gz3oQLdIfL_Iniu0XvIjKw5We_mo4LlroEDysoBhpHw/edit#heading=h.kvi1qo69zoqe

@sajjad-moradi which version of pinot are you using? because I'm using docker to pull the latest version from apachepinot/pinot but I am getting this error every time I run the AddTable command.
2024/10/16 07:04:33.529 INFO [AddTableCommand] [main] {"code":400,"error":"Invalid TableConfigs: transportSchedule. Only 1 stream is supported in REALTIME table"}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants