-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Adding KafkaToGCS #57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks for your pull request. It looks like this may be your first contribution to a Google open source project (if not, look below for help). Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). 📝 Please visit https://cla.developers.google.com/ to sign. Once you've signed (or fixed any issues), please reply here with What to do if you already signed the CLAIndividual signers
Corporate signers
ℹ️ Googlers: Go here for more info. |
|
@YadnikiPawar - Can you sign the CLA? |
|
@googlebot I signed it! |
|
CLAs look good, thanks! ℹ️ Googlers: Go here for more info. |
586a115 to
7375a66
Compare
sabhyankar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@YadnikiPawar - Thanks for opening this PR. Can you rebase and take a look at the minor nits for now? I want to spend a little more time looking at the main pipeline code.
v2/common/src/main/java/com/google/cloud/teleport/v2/coders/FailsafeElementCoder.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/coders/package-info.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/values/FailsafeElement.java
Outdated
Show resolved
Hide resolved
v2/kafka-to-gcs/src/main/resources/kafka-to-gcs-command-spec.json
Outdated
Show resolved
Hide resolved
v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGCS.java
Outdated
Show resolved
Hide resolved
v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGCS.java
Outdated
Show resolved
Hide resolved
v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGCS.java
Outdated
Show resolved
Hide resolved
3ef5f58 to
401ab72
Compare
sabhyankar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to review this a little more as I disagree with a few of the things.
v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGCS.java
Outdated
Show resolved
Hide resolved
v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGCS.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/WriteToGCS.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/WriteToGCS.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/WriteToGCS.java
Outdated
Show resolved
Hide resolved
v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGCS.java
Outdated
Show resolved
Hide resolved
v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGCS.java
Outdated
Show resolved
Hide resolved
|
@sabhyankar Changes are done. The PR is up for review |
|
@sabhyankar Removed the unrelated files |
sabhyankar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add unit tests for each of the WriteToXXX transforms. Also format the classes using the IntelliJ plugin to make it easier.
v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/FileFormatFactory.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/ConstructGenericRecordsFn.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/FileFormatFactory.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/FileFormatFactory.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/FileFormatFactory.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/FileFormatFactory.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/FileFormatFactory.java
Outdated
Show resolved
Hide resolved
sabhyankar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor corrects and nits - Please address and rebase
v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/WriteToGCSAvro.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/WriteToGCSText.java
Outdated
Show resolved
Hide resolved
v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/WriteToGCSUtility.java
Outdated
Show resolved
Hide resolved
v2/common/src/test/java/com/google/cloud/teleport/v2/transforms/WriteToGCSTextTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we passing null? Should we even use withNumShards if are going to pass null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sabhyankar Though of making NumShards a required parameter because when I was trying to set its default value to 0 it threw [java.lang.IllegalArgumentException: When applying WriteFiles to an unbounded PCollection, must specify number of output shards explicitly] this Exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sabhyankar
I made the following changes
- Changed withNumShards() to setNumShards().
- Setting its default value to be 0 using @Default.Integer(0) in the options Interface
- Letting withNumShards() method from IO's (i.e. AvroIO, TextIO, FileIO) to handle this exception
v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/transforms/FileFormatFactory.java
Outdated
Show resolved
Hide resolved
v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/transforms/FileFormatFactory.java
Outdated
Show resolved
Hide resolved
17959ab to
bfb196b
Compare
|
All (the pull request submitter and all commit authors) CLAs are signed, but one or more commits were authored or co-authored by someone other than the pull request submitter. We need to confirm that all authors are ok with their commits being contributed to this project. Please have them confirm that by leaving a comment that contains only Note to project maintainer: There may be cases where the author cannot leave a comment, or the comment is not properly detected as consent. In those cases, you can manually confirm consent of the commit author(s), and set the ℹ️ Googlers: Go here for more info. |
|
CLAs look good, thanks! ℹ️ Googlers: Go here for more info. |
|
@sabhyankar The PR has been rebased. |
sabhyankar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
PiperOrigin-RevId: 281563732
* Dml integration (#53) * Added extensive UT Added extensive UT * Cassandra pr bug fixes (#57) * Cassandra Consolidate Unit Test case and Regression testing fixes (#58) * Added Mapping fixes * Added Spoltles fixes * Added Consolidated fixes * Added TODO * Addess Data and Time * Cassandra pr bug fixes (#64) * Handle TypeHandler Parsing issue fixes (#65) Co-authored-by: pawankashyapollion <v-pawan.kumar@ollion.com> * Added Safe handle (#68) * Handle LocalTime For Time Data Type In Cassandra (#69) * Cassandra pr bug fixes (#70) * Handle Timestamp Fixes (#72) * Added Code Combined in a single way * Address The Unwanted Hop * Cassandra pr bug fixes (#75) * Added PR Review Comments * Remove NamesCol Dependecy as spannerTableName is same as In Given Mapping * Added spannerTableId for fetching Mapping * Removed SpannerToID and also Updated Session file with proper structure * Timestamp in milisecond * removed assertNotNull from UT wherever possible * Added Fixes * Added Note Instead of Question * -- review fixes (#78) * Added Bytes to hex to blob conversion * Handling Bytes as Binary encoded As of now * Passing Null Value to Primary Key as well for cassandra * Added UT fixes * Added UT refectoring * Reverse merge confict fixes --------- Co-authored-by: pawankashyapollion <v-pawan.kumar@ollion.com> Co-authored-by: Akash Thawait <aakash@ollion.com>
@sabhyankar Pull request for Kafka To GCS Template.