Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-328] Refactor IRDAG #190

Merged
merged 38 commits into from
Feb 1, 2019
Merged

[NEMO-328] Refactor IRDAG #190

merged 38 commits into from
Feb 1, 2019

Conversation

johnyangk
Copy link
Contributor

@johnyangk johnyangk commented Jan 23, 2019

JIRA: NEMO-328: Refactor IRDAG

Major changes:

  • Modifies the Pass interface to use IRDAG, rather than directly using DAG
  • Modifies reshaping passes (e.g., LargeShuffle, Skew) to use the IRDAG reshaping methods

Minor changes to note:

  • Merges multiple annotating LargeShufflePasses into a single LargeShuffleAnnotatingPass

Tests for the changes:

  • Existing tests pass

Other comments:

  • N/A

Closes #190

@johnyangk johnyangk requested a review from sanha January 23, 2019 01:05
sanha
sanha previously requested changes Jan 24, 2019
* 1) Task-level statistic collection is done via vertex with {@link MetricCollectTransform}
* 2) Stage-level statistic aggregation is done via vertex with {@link AggregateMetricTransform}
* inserted before shuffle edges.
* We insert a {@link MessageBarrierVertex for each shuffle edge}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must optimize multiple shuffle edges at once if a vertex receives many shuffle edges. Please check 327.

Copy link
Member

@wonook wonook left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've taken a brief look throughout the PR. I think it doesn't make sense using both IRDAG and DAG<IRVertex, IREdge> now, so I think it should all be changed to IRDAG. Also, for the reshaping pass and annotating pass, it looks like apply and optimize methods actually do the same thing but have different names. I don't see a reason for this as well, feels like it would confuse our users. Could you justify this or make appropriate changes throughout your PR? I'll take a second look once these changes have been made.
Thanks 😄

* @param <V> of the input pair.
* @param <O> of the output aggregated message.
*/
public class MessageAggregationVertex<K, V, O> extends OperatorVertex {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageAggregatorVertex?

@@ -48,11 +46,10 @@ public CompressionPass(final CompressionProperty.Value compression) {
}

@Override
public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
public IRDAG optimize(final IRDAG dag) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does changing the method name still override the super method?

new LargeShuffleDataPersistencePass(),
new LargeShuffleResourceSlotPass()
new LargeShuffleReshapingPass(),
new LargeShuffleAnnotatingPass()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should other composite passes refactored this way as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I filed a JIRA.
https://issues.apache.org/jira/browse/NEMO-329

* Get the current underlying DAG for direct access.
* @return underlying DAG.
*/
public DAG<IRVertex, IREdge> getCurrentDAGSnapshot() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this a 'snapshot'?

* "Unsafe" direct reshaping (semantic-preserving is not guaranteed).
* @param unsafeReshaping a function that directly reshapes the underlying DAG.
*/
public void unSafeDirectReshaping(final Function<DAG<IRVertex, IREdge>, DAG<IRVertex, IREdge>> unsafeReshaping) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like 'reshapeUnsafely' or 'applyUnsafeDirectReshaping' or something like that? Unsafe is a single word by the way. I think it would be better to keep the names of method that perform a specific action as verbs. Could you double check on this throughout your PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DAG<IRVertex, IREdge> seems to be still used here. I don't see a reason for using both forms.

@johnyangk
Copy link
Contributor Author

Thanks for the comments. I've updated the code.

@sanha
I've modified IRDAG#insert to receive multiple IREdges so the #189 continues to function as intended. I've run NetworkTraceAnalysisITCase#testDataSkew and verified the DAG topology.

@wonook
I've addressed your comments except for IRDAG#unSafeDirectReshaping, because the loop-related passes require direct access to the underlying DAG. It'd be good to handle this issue with a separate PR (JIRA). There also remains around 15 instances of DAG<IRVertex, IREdge> outside of IRDAG, mostly in loop-related passes and 2 in Spark-related classes for building new IRDAGs from RDD applications. Other than those instances, I've changed the type to IRDAG for consistency.

Copy link
Member

@wonook wonook left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Just a few naming suggestions. I'll merge after the change.

* @param <V> the vertex type
* @param <E> the edge type
*/
public interface DAGQueryInterface<V extends Vertex, E extends Edge<V>> extends Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe simply just a DAGInterface as its name?

private final AtomicInteger metricCollectionId;

private DAG<IRVertex, IREdge> lastSnapshotDAG; // the DAG that was saved most recently.
private DAG<IRVertex, IREdge> updatingDAG; // the DAG that is being updated.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dagSnapshot andmodifiedDAG?

@wonook
Copy link
Member

wonook commented Jan 31, 2019

@sanha Anything else you want to check?

@johnyangk
Copy link
Contributor Author

@wonook Thanks. I've renamed as per your suggestions.

Copy link
Contributor

@sanha sanha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnyangk Thanks for the work! Please check my comment.

final IREdge edgeToOriginalDstV =
new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), edge.getSrc(), v);
edge.copyExecutionPropertiesTo(edgeToOriginalDstV);
edgeToOriginalDstV.setPropertyPermanently(MetricCollectionProperty.of(metricCollectionId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to annotate this MetricCollectionProperty elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@sanha sanha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnyangk Thanks for your explanation! I'll merge this.

@sanha sanha merged commit a49b050 into apache:master Feb 1, 2019
@johnyangk johnyangk deleted the 328-refactor-irdag branch February 1, 2019 02:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants