Skip to content

Refactor quickstart data source#8567

Merged
richardstartin merged 20 commits intoapache:masterfrom
dongxiaoman:xd-stream-source
Apr 26, 2022
Merged

Refactor quickstart data source#8567
richardstartin merged 20 commits intoapache:masterfrom
dongxiaoman:xd-stream-source

Conversation

@dongxiaoman
Copy link
Contributor

@dongxiaoman dongxiaoman commented Apr 19, 2022

Description

Refactor the quickstart classes so some of the code/logic could be shared and extended easily in future.
This is mostly because the same logic of created a thread, pull from source, writes into Steaming sinks, and shutdown are written in many places.

  1. Created one PinotRealtimeSource class as the driver with threaded running loop. It pulls from sources and writes intoStreamDataProducer like Kafka;
  2. Abstracted the AvroFilePinotSourceGenerator as one source that pulls from Avro files and generate row bytes.
  3. Refactored AirlineDataStream to use the driver (PinotRealtimeSource) as an example
  4. Refactored PullRequestMergedEventsStream to use the driver too.
  5. Refactored MeetupRsvpStream, MeetupRsvpJsonStream to use the driver too.

Tests Done

  1. Given that the quickstarts are in many places in tests already, did not add extra integration test
  2. Added a few unit tests to verify basic class

Extra Note

Based on discussion in #8180 (comment) , the MeetupRSVP data needs some tweaking. I will work on a follow up PR to make it run.

@codecov-commenter
Copy link

codecov-commenter commented Apr 20, 2022

Codecov Report

Merging #8567 (0f1f3ba) into master (7e060fd) will increase coverage by 0.29%.
The diff coverage is 44.00%.

@@             Coverage Diff              @@
##             master    #8567      +/-   ##
============================================
+ Coverage     63.68%   63.98%   +0.29%     
- Complexity     4224     4315      +91     
============================================
  Files          1677     1646      -31     
  Lines         88078    86711    -1367     
  Branches      13354    13233     -121     
============================================
- Hits          56094    55479     -615     
+ Misses        27851    27205     -646     
+ Partials       4133     4027     -106     
Flag Coverage Δ
integration1 ?
integration2 ?
unittests1 66.93% <44.00%> (-0.13%) ⬇️
unittests2 14.15% <0.00%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...rg/apache/pinot/spi/stream/StreamDataProducer.java 44.00% <44.00%> (ø)
...va/org/apache/pinot/core/routing/RoutingTable.java 0.00% <0.00%> (-100.00%) ⬇️
...va/org/apache/pinot/common/config/NettyConfig.java 0.00% <0.00%> (-100.00%) ⬇️
...a/org/apache/pinot/common/metrics/MinionMeter.java 0.00% <0.00%> (-100.00%) ⬇️
...g/apache/pinot/common/metrics/ControllerMeter.java 0.00% <0.00%> (-100.00%) ⬇️
.../apache/pinot/common/metrics/BrokerQueryPhase.java 0.00% <0.00%> (-100.00%) ⬇️
.../apache/pinot/common/metrics/MinionQueryPhase.java 0.00% <0.00%> (-100.00%) ⬇️
...apache/pinot/common/helix/ExtraInstanceConfig.java 0.00% <0.00%> (-100.00%) ⬇️
...ache/pinot/server/access/AccessControlFactory.java 0.00% <0.00%> (-100.00%) ⬇️
...he/pinot/common/messages/SegmentReloadMessage.java 0.00% <0.00%> (-100.00%) ⬇️
... and 549 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7e060fd...0f1f3ba. Read the comment docs.

@dongxiaoman dongxiaoman marked this pull request as ready for review April 21, 2022 17:36
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Reviewer: this line is the only difference from its parent class. That the partitioning key is using RsvpId() instead of EventId(). So in the refactoring I just make the Generator configurable with partition key, and removed this file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Reviewer: Note that this line is the difference with MeetupRsvpJsonStream. getEventId() is the different key

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewer: the RsvpStream and RsvpJsonStream is merged to use this class instead, and using the config of _keyColumn to apply different key

@dongxiaoman dongxiaoman changed the title [draft] Refactor quickstart data source Refactor quickstart data source Apr 22, 2022
Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall the approach looks good. I have some suggestions, please kindly take a look.
although this is a refactor PR but there are some new features that were added in, for example the rate limiter interface, it's good to also called out exactly what new things were added and how they were tested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this goes into SPI. we need to be careful explaining what's the intended usage. could you give a bit more in the javadoc?

At the first glance, I thought it is representing a row in a partitioned table with the partition key.

Also this seems to be an interface used by pinot-tools, it might work better to put it in the pinot-tools module as a helper utils (along with produce batch)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I moved the classed to be inside StreamDataProducer so it will not be used in other places. If in future people see the need for such class they can do refactor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we already consolidated all the generator / producer usage all in this class. let's put the batch operation API here as a helper utils.

static produceBatchRowWithKey(_producer, _topicName, rows) {
  // .. the default impl 
}

Copy link
Contributor Author

@dongxiaoman dongxiaoman Apr 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default void produceKeyedBatch() is introduced so we can have a batch interface too. I hope to introduce this method so we can easily batch produce rows to improve throughput. (E.g., if the _producer is behind a microservice endpoint) In that case the implementation in the StreamProducer can override the produceKeyedBatch to provide a more efficient call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. this actually is very helpful. #8537 is likely due to a batch produce instability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SourceGenerator sounds like you are generating a subclass of PinotRealtimeSource. PinotSourceDataGenerator may be a better name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Changed

@richardstartin richardstartin merged commit 4d36f3d into apache:master Apr 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants