Conversation
|
@walterddr please review |
Codecov Report
@@ Coverage Diff @@
## master #8233 +/- ##
============================================
- Coverage 70.83% 69.51% -1.33%
Complexity 4245 4245
============================================
Files 1631 1631
Lines 85462 85490 +28
Branches 12877 12878 +1
============================================
- Hits 60539 59427 -1112
- Misses 20746 21923 +1177
+ Partials 4177 4140 -37
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
This seems not compilable for jdk8:
Error: Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile (default-testCompile) on project flink-pinot-sink: Compilation failure
Error: /home/runner/work/pinot/pinot/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/http/PinotControllerClientTest.java:[124,53] error: cannot infer type arguments for HashMap<K,V>
Error: reason: cannot use '<>' with anonymous inner classes
Error: where K,V are type-variables:
Error: K extends Object declared in class HashMap
Error: V extends Object declared in class HashMap
Error: -> [Help 1]
There was a problem hiding this comment.
I thought we dropped jdk8 support? not yet?
There was a problem hiding this comment.
btw, this usage HashMap<> is all over the codebase
There was a problem hiding this comment.
nit: new line, same for other files
There was a problem hiding this comment.
I feel this is a very helpful Client.
Just drop my random thought here:
Shall we rename this to PinotAdminClient and move it to the pinot-java-client module then import it back?
There was a problem hiding this comment.
TBH we already have something very similar but is only used in test: ControllerTestUtils. we can probably consider a prime version of it
There was a problem hiding this comment.
yup, I'm fine with consolidating/extracting the client into a more reusable lib. But I think it's better to do this in a later PR, to reduce the scope of this PR, which is already quite large.
There was a problem hiding this comment.
Sounds good, can you create a new issue for consolidating the reusable utils with potential candidates that can be merged.
There was a problem hiding this comment.
Do we really need this abstract? I feel this class is more like static util methods if making _httpClient static.
Also, we have implementations at AbstractBaseAdminCommand in pinot-tools for the same purpose, you can also consider moving this logic to pinot-common :p
There was a problem hiding this comment.
yeah, it's just some helper methods that can be reused by subclasses. For example, if we want to add other clients to broker/server etc
|
this looks super great. thanks @yupeng9 for the contribution. I will take a look this weekend thoroughly |
yupeng9
left a comment
There was a problem hiding this comment.
addressed comments
There was a problem hiding this comment.
btw, this usage HashMap<> is all over the codebase
walterddr
left a comment
There was a problem hiding this comment.
thanks @yupeng9 for the contribution. I briefly looked over most of the implementions and found some issues. please kindly take a look.
Since this is largely based on my previous POC: https://github.com/walterddr/flink-pinot-sink my review might've been biased. it would be great if @npawar can also take another look at the segment write/upload logic.
thanks
There was a problem hiding this comment.
| public class PinotMapRecordConverter implements RecordConverter<Map> { | |
| public class PinotMapRecordConverter implements RecordConverter<Map<String, Object>> { |
to avoid unchecked casts.
There was a problem hiding this comment.
indexOfSubtask is not used in uploaded. I think it only matters in segment writer.
There was a problem hiding this comment.
this javadoc is outdated.
There was a problem hiding this comment.
potential race condition when snapshot and invoke both calls flush?
There was a problem hiding this comment.
Is it possible to have a race conditon? Only invoke can call it, since snapshotState is not used in batch processing.
There was a problem hiding this comment.
in this case we can just throw an exception? rather than invoking flush in snapshotState? if it is not use anyway.
There was a problem hiding this comment.
nit: Refactor common code between FlinkSegmentWriter and FileBaseSegmentWriter but can be done in separate PR
There was a problem hiding this comment.
I might've been wrong but I dont see the entire package of org.apache.pinot.connector.flink.http used in anywhere outside of this package.
is this module relevant to the flink connector PR at all? or this is simply used for integration/e2e test as an easy util?
i've deleted this entire module and tests runs just fine, so if not relevant I suggest we delete this module
There was a problem hiding this comment.
It's used in the quickstart to show how to use REST API to fetch the schema and table config to config the connector. Otherwise developer needs to manually craft those configs.
Also, you can take a look at PinotConnectionUtils for the useful config management.
There was a problem hiding this comment.
yes, but it is not related to the flink connector. for the sake of this PR's scope. i would suggest we drop it and add in separate PR.
mainly for the following reasons.
- it doesn't relate to flink connector, at first I thought we are utilizing http to directly ingest into pinot via some sort of REST API. but if it is not the case it shouldn't be part of flink-pinot connector but rather in pinot-tools.
- @xiangfu0 also think we can refactor this part out later. doesn't make sense to get this in an refactor it out if it is not part of the critical path
- there's already one util in test call ControllerUtils that can be used for demonstration purpose.
Not saying it is not useful, but it would be much easier to address it separately.
There was a problem hiding this comment.
Not really. I think this is an integral part to show how the connector is used as in QuickStart, without it I don't think the connector is complete. It doesn't make sense to expect developers to manually create the Schema as well as TableConfig, not to mention several configurations decoration needed in the PinotConnectionUtils .
Xiang's point is that in later PR we can refactor this part of code with a reusable client lib. However, from the connector's user perspective, it doesn't change the way of using the connector.
Makes sense?
There was a problem hiding this comment.
hmm. I must have missed something. here is what I did:
- deleted the entire folder under
src/main/java/org/apache/pinot/connector/flink/http. - run
mvn test -pl pinot-connectors/pinot-flink-connector
everything runs pass. this proves that the http folder has nothing to do with this PR. yes?
did you forget to add a Quickstart? I don't see a quickstart in this PR or any changes outside of the pinot-flink-connector module. (and if so, let's add it in a separate PR i suggest)
There was a problem hiding this comment.
hmm , I see. This file is filtered by one of the .gitignore rules. Renamed it and pushed again.
There was a problem hiding this comment.
| <artifactId>flink-pinot-sink</artifactId> | |
| <artifactId>pinot-flink-connector</artifactId> |
There was a problem hiding this comment.
add <scope>test</scope> to these 2 dependencies.
yupeng9
left a comment
There was a problem hiding this comment.
@walterddr Thanks for the review. Addressed the comments PTAL.
There was a problem hiding this comment.
It's used in the quickstart to show how to use REST API to fetch the schema and table config to config the connector. Otherwise developer needs to manually craft those configs.
Also, you can take a look at PinotConnectionUtils for the useful config management.
There was a problem hiding this comment.
Is it possible to have a race conditon? Only invoke can call it, since snapshotState is not used in batch processing.
There was a problem hiding this comment.
why was creating a Flink specific impl of SegmentWriter needed? Why couldn't we just use the FileBasedSegmentWriter? most of the code looks the same, with some additional metadata like seqId and rowCount etc. Aren't those concepts generic enough that we could just add them to FileBasedSegmentWriter and enhance it?
There was a problem hiding this comment.
We could with a later refactoring. I think one of the reasons is that we don't have an abstract class of SegmentWriter that allows customization like in this Flink case. Also, initially, this connector was not planned to add to Pinot repo, but Flink repo, which means this connector has to depend on the published pinot library only.
Similar to the consolidation with the client utils, I think we can later consolidate the SegmentWriter, after they are in the same repo.
There was a problem hiding this comment.
the Sink should have been in Flink, but the Writer should have always been in Pinot, and it should have been from the impls offered. If by consolidate later, you mean soon enough, I'm fine with starting with this. But this can easily get deprioritized and fall off the plate once it is non-blocking :) And the code really is identical, except a few tweaks here and there which the default impl can benefit from anyway..
There was a problem hiding this comment.
That's fair. Let me address this in a follow-up PR, and it shouldn't be hard. But I try not to modify other modules in this PR, so it'll be a pure module addition.
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Outdated
Show resolved
Hide resolved
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Outdated
Show resolved
Hide resolved
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Outdated
Show resolved
Hide resolved
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Outdated
Show resolved
Hide resolved
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
this is the javadoc from FileBasedSegmentWriter. call out here why FlinkSegmentWriter needed to be added, why it is different from FileBased, and any todos to unify the 2. imo, FlinkSegmentWriter should just extend the FileBasedSegmentWriter for the flink metrics related special casing. Everything else should get folded into FileBased impl.
There was a problem hiding this comment.
This class seems exactly identical to SegmentUploaderDefault. Can we just use that one?
There was a problem hiding this comment.
(optional) this name is not the very intuitive..
How about PinotGenericRowConverter as interface, and MapGenericRowConverter and FlinkRowGenericRowConverter as impls?
There was a problem hiding this comment.
can this just be in pinot-tools with the rest of the Quickstarts?
There was a problem hiding this comment.
not sure. pinot-tools doesn't have dependency on connectors. actually we do not build connectors for the core
There was a problem hiding this comment.
can the integration test move to pinot-integration-tests? 1. it should ideally be there with all others 2. you won't have to write this util class just for flink connector, as it's already there for use in ITs
There was a problem hiding this comment.
pinot-integration-tests does not have dependency on connectors.
There was a problem hiding this comment.
it is fine to add the test dependency for connectors to pinot-integration-tests.. we add dependency of required plugin/connector there anyway for the test. fine if this is addressed in next PR
There was a problem hiding this comment.
hmm, the integration test in the spark connector is also in the spark module. I'm fine with the suggestion, though it'll make connectors build non-optional in Pinot project build. What do you think, @xiangfu0
...ors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/TestUtils.java
Outdated
Show resolved
Hide resolved
pinot-connectors/pinot-flink-connector/src/test/resources/fixtures/pinotTableSchema.json
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
not sure. pinot-tools doesn't have dependency on connectors. actually we do not build connectors for the core
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Outdated
Show resolved
Hide resolved
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Outdated
Show resolved
Hide resolved
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
pinot-integration-tests does not have dependency on connectors.
...ors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/TestUtils.java
Outdated
Show resolved
Hide resolved
| "transformFunction": null | ||
| } | ||
| ], | ||
| "dateTimeFieldSpecs": [ |
There was a problem hiding this comment.
how is this not failing with 2 dateTimeFieldSpecs !
There was a problem hiding this comment.
oh, it's just a dummy file to test the RPC calls
Description
Add a Flink connector to write data to Pinot directly. This is useful for backfilling or bootstrapping tables,
including the upsert tables. You can read more about the motivation and design in this design proposal.
Upgrade Notes