Skip to content

Conversation

@b923
Copy link

@b923 b923 commented Sep 25, 2018

No description provided.

@b923 b923 requested a review from dmvk September 25, 2018 08:59
Copy link

@dmvk dmvk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definitely heads the right direction! I've added a few comments. Good job 👍

try {
JobID jobId = HadoopUtils.createJobId();
TaskAttemptContext setupTaskContext =
HadoopUtils.createSetupTaskContext(getConfiguration().get(), jobId);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reckon this method should work with the configuration object from the passed argument. Maybe we can make this method static?

PCollectionView<SerializableConfiguration> configView = null;
if (input.getWindowingStrategy().equals(WindowingStrategy.globalDefault())) {
configView = createGlobalConfigCollectionView(input);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should throw an exception for non default windowing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it in the streaming iteration

return PDone.in(
input
.apply(
TRANSFORM_NAME + "/TaskAssignee",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is not need to prefix transformation names. These will be always nested under user provided name.

* @param <KeyT>
* @param <ValueT>
*/
private static class TaskIdContextHolder<KeyT, ValueT> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about TaskContext?

}
}

static int getReducersCountFromConfig(Configuration conf) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shorter getReducersCount(Configuration conf) & getPartitioner(Configuration conf) method names are ok :)


/**
* Coder for {@link CollectionAccumulator} class.
* @param <T>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing doc (javadoc task would fail)

}
}

} No newline at end of file
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newline

* Collects all items of defined type into one {@link Iterable} container.
* @param <T> Type of the elements to collect
*/
public class IterableCombinerFn<T>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I guess this should be package private though

@dmvk dmvk force-pushed the BEAM-3912-HadoopOutputFormatIO branch from 074af32 to 044a5b6 Compare October 4, 2018 16:18
@b923 b923 force-pushed the b923/LROB-86-HadoopOutputFormatIO branch from 81aa1ec to d298e6b Compare October 5, 2018 14:49
Copy link

@dmvk dmvk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks awesome, good job 👍 There are few more issues to address, but we are getting closer ;)

Integer taskID = element.getKey();

if (bundleTaskContext == null) {
bundleTaskContext = setupTask(taskID, conf);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMPORTANT: this seems weird, I think that we can receive multiple windowed taskids in the same bundle, am I missing something?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be probably more complicated..
I have found this article: https://beam.apache.org/documentation/execution-model/ which describes how are the data distributed across bundles. I will probably have to move task setup and commit from WriteFn to separate ParDos.
I will look on that more closely tommorow.

Copy link
Author

@b923 b923 Oct 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still doubt if the WriteFn is written correctly.
There are following points which should be changed by my opinion:

  • Setup of the tasks should be moved into taskIds creation
    • Hash branch can do this for every partition after job setup
    • NonHash branch can do it after taskId lock acquisition
  • Commit of tasks should be done differently for Hash and non Hash branch:
    • Hash branch should commit task after all writes are done - for example before commitJob
    • NonHash branch should commit task after bundle processing (As it is now)
  • My final dubiety is releated to dependent parallellism failures hadnling in relation to WriteFn. If I understand to the document correctly than we cannot expect that KV pairs of one taksId will be processed in one bundle (at least for Hashing branch).

I am not sure about my thoughts, so I will need some advice probably...

@b923 b923 force-pushed the b923/LROB-86-HadoopOutputFormatIO branch from 933f417 to e457e04 Compare October 25, 2018 07:52
@b923 b923 changed the base branch from BEAM-3912-HadoopOutputFormatIO to master October 25, 2018 07:55
@b923 b923 force-pushed the b923/LROB-86-HadoopOutputFormatIO branch from 0150688 to 20e3e24 Compare October 31, 2018 08:28
@b923 b923 force-pushed the b923/LROB-86-HadoopOutputFormatIO branch 3 times, most recently from 4193ac9 to 721a422 Compare November 13, 2018 12:25
@b923 b923 force-pushed the b923/LROB-86-HadoopOutputFormatIO branch from 721a422 to b13e55d Compare November 19, 2018 15:44
@b923 b923 force-pushed the b923/LROB-86-HadoopOutputFormatIO branch from b13e55d to 4adc254 Compare November 29, 2018 11:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants