-
Notifications
You must be signed in to change notification settings - Fork 0
LROB-86: HadoopOutputFileIO #41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
dmvk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This definitely heads the right direction! I've added a few comments. Good job 👍
...ut-format/src/main/java/org/apache/beam/sdk/io/hadoop/outputformat/HadoopOutputFormatIO.java
Outdated
Show resolved
Hide resolved
| try { | ||
| JobID jobId = HadoopUtils.createJobId(); | ||
| TaskAttemptContext setupTaskContext = | ||
| HadoopUtils.createSetupTaskContext(getConfiguration().get(), jobId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reckon this method should work with the configuration object from the passed argument. Maybe we can make this method static?
| PCollectionView<SerializableConfiguration> configView = null; | ||
| if (input.getWindowingStrategy().equals(WindowingStrategy.globalDefault())) { | ||
| configView = createGlobalConfigCollectionView(input); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should throw an exception for non default windowing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it in the streaming iteration
| return PDone.in( | ||
| input | ||
| .apply( | ||
| TRANSFORM_NAME + "/TaskAssignee", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is not need to prefix transformation names. These will be always nested under user provided name.
| * @param <KeyT> | ||
| * @param <ValueT> | ||
| */ | ||
| private static class TaskIdContextHolder<KeyT, ValueT> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about TaskContext?
...doop-output-format/src/main/java/org/apache/beam/sdk/io/hadoop/outputformat/HadoopUtils.java
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| static int getReducersCountFromConfig(Configuration conf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shorter getReducersCount(Configuration conf) & getPartitioner(Configuration conf) method names are ok :)
|
|
||
| /** | ||
| * Coder for {@link CollectionAccumulator} class. | ||
| * @param <T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing doc (javadoc task would fail)
| } | ||
| } | ||
|
|
||
| } No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newline
| * Collects all items of defined type into one {@link Iterable} container. | ||
| * @param <T> Type of the elements to collect | ||
| */ | ||
| public class IterableCombinerFn<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I guess this should be package private though
074af32 to
044a5b6
Compare
81aa1ec to
d298e6b
Compare
dmvk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks awesome, good job 👍 There are few more issues to address, but we are getting closer ;)
...java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
Outdated
Show resolved
Hide resolved
...java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
Outdated
Show resolved
Hide resolved
...java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
Outdated
Show resolved
Hide resolved
...java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
Outdated
Show resolved
Hide resolved
...java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
Outdated
Show resolved
Hide resolved
...java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
Show resolved
Hide resolved
...java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
Outdated
Show resolved
Hide resolved
...java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
Outdated
Show resolved
Hide resolved
...java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
Outdated
Show resolved
Hide resolved
| Integer taskID = element.getKey(); | ||
|
|
||
| if (bundleTaskContext == null) { | ||
| bundleTaskContext = setupTask(taskID, conf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMPORTANT: this seems weird, I think that we can receive multiple windowed taskids in the same bundle, am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be probably more complicated..
I have found this article: https://beam.apache.org/documentation/execution-model/ which describes how are the data distributed across bundles. I will probably have to move task setup and commit from WriteFn to separate ParDos.
I will look on that more closely tommorow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still doubt if the WriteFn is written correctly.
There are following points which should be changed by my opinion:
- Setup of the tasks should be moved into taskIds creation
- Hash branch can do this for every partition after job setup
- NonHash branch can do it after taskId lock acquisition
- Commit of tasks should be done differently for Hash and non Hash branch:
- Hash branch should commit task after all writes are done - for example before commitJob
- NonHash branch should commit task after bundle processing (As it is now)
- My final dubiety is releated to dependent parallellism failures hadnling in relation to WriteFn. If I understand to the document correctly than we cannot expect that KV pairs of one taksId will be processed in one bundle (at least for Hashing branch).
I am not sure about my thoughts, so I will need some advice probably...
...java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
Outdated
Show resolved
Hide resolved
...java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
Outdated
Show resolved
Hide resolved
...java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
Outdated
Show resolved
Hide resolved
...java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
Outdated
Show resolved
Hide resolved
933f417 to
e457e04
Compare
0150688 to
20e3e24
Compare
4193ac9 to
721a422
Compare
721a422 to
b13e55d
Compare
b13e55d to
4adc254
Compare
No description provided.