-
Notifications
You must be signed in to change notification settings - Fork 65
Conversation
* 1) Task-level statistic collection is done via vertex with {@link MetricCollectTransform} | ||
* 2) Stage-level statistic aggregation is done via vertex with {@link AggregateMetricTransform} | ||
* inserted before shuffle edges. | ||
* We insert a {@link MessageBarrierVertex for each shuffle edge} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We must optimize multiple shuffle edges at once if a vertex receives many shuffle edges. Please check 327.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've taken a brief look throughout the PR. I think it doesn't make sense using both IRDAG
and DAG<IRVertex, IREdge>
now, so I think it should all be changed to IRDAG
. Also, for the reshaping pass and annotating pass, it looks like apply
and optimize
methods actually do the same thing but have different names. I don't see a reason for this as well, feels like it would confuse our users. Could you justify this or make appropriate changes throughout your PR? I'll take a second look once these changes have been made.
Thanks 😄
* @param <V> of the input pair. | ||
* @param <O> of the output aggregated message. | ||
*/ | ||
public class MessageAggregationVertex<K, V, O> extends OperatorVertex { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageAggregatorVertex?
@@ -48,11 +46,10 @@ public CompressionPass(final CompressionProperty.Value compression) { | |||
} | |||
|
|||
@Override | |||
public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) { | |||
public IRDAG optimize(final IRDAG dag) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does changing the method name still override the super method?
new LargeShuffleDataPersistencePass(), | ||
new LargeShuffleResourceSlotPass() | ||
new LargeShuffleReshapingPass(), | ||
new LargeShuffleAnnotatingPass() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should other composite passes refactored this way as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I filed a JIRA.
https://issues.apache.org/jira/browse/NEMO-329
* Get the current underlying DAG for direct access. | ||
* @return underlying DAG. | ||
*/ | ||
public DAG<IRVertex, IREdge> getCurrentDAGSnapshot() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this a 'snapshot'?
* "Unsafe" direct reshaping (semantic-preserving is not guaranteed). | ||
* @param unsafeReshaping a function that directly reshapes the underlying DAG. | ||
*/ | ||
public void unSafeDirectReshaping(final Function<DAG<IRVertex, IREdge>, DAG<IRVertex, IREdge>> unsafeReshaping) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like 'reshapeUnsafely' or 'applyUnsafeDirectReshaping' or something like that? Unsafe is a single word by the way. I think it would be better to keep the names of method that perform a specific action as verbs. Could you double check on this throughout your PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DAG<IRVertex, IREdge> seems to be still used here. I don't see a reason for using both forms.
Thanks for the comments. I've updated the code. @sanha @wonook |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Just a few naming suggestions. I'll merge after the change.
* @param <V> the vertex type | ||
* @param <E> the edge type | ||
*/ | ||
public interface DAGQueryInterface<V extends Vertex, E extends Edge<V>> extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe simply just a DAGInterface
as its name?
private final AtomicInteger metricCollectionId; | ||
|
||
private DAG<IRVertex, IREdge> lastSnapshotDAG; // the DAG that was saved most recently. | ||
private DAG<IRVertex, IREdge> updatingDAG; // the DAG that is being updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dagSnapshot
andmodifiedDAG
?
@sanha Anything else you want to check? |
@wonook Thanks. I've renamed as per your suggestions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnyangk Thanks for the work! Please check my comment.
final IREdge edgeToOriginalDstV = | ||
new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), edge.getSrc(), v); | ||
edge.copyExecutionPropertiesTo(edgeToOriginalDstV); | ||
edgeToOriginalDstV.setPropertyPermanently(MetricCollectionProperty.of(metricCollectionId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to annotate this MetricCollectionProperty
elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sanha This has been moved to IRDAG#insert
https://github.com/apache/incubator-nemo/pull/190/files#diff-969693370fcffe6720d48d175502c466R210
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnyangk Thanks for your explanation! I'll merge this.
JIRA: NEMO-328: Refactor IRDAG
Major changes:
Minor changes to note:
Tests for the changes:
Other comments:
Closes #190