Skip to content

Commit c1c4289

Browse files
prateekmXinyu Liu
authored andcommitted
SAMZA-1221, SAMZA-1101: Internal cleanup for High-Level API implementation.
SAMZA-1221: Separated the OperatorSpec and MessageStream DAGs so that they're now duals of each other. Users interact with and construct the MessageStream DAG; we create and use the OperatorSpec DAG internally. Moved common OperatorSpec functionality (getId, getOpCode, getOpName etc.) to the OperatorSpec abstract base class. Added a new JoinOperatorSpec and PartialJoinOperatorImpls which are created from JoinOperatorSpec in OperatorGraphImpl. Added a new InputOperatorSpec and InputOperatorImpl (previously RootOperatorImpl). InputOperatorSpec is created when StreamGraph#getInputStream is called. SAMZA-1101: Added a new OutputOperatorSpec and OutputOperatorImpl for partitionBy and sendTo. These are Separate from SinkOperatorSpec for and SinkOperatorImpl for sink. We don't need to create a sinkFn for partitionBy and sendTo anymore. Updated most unit tests to use the new classes and avoid reflection. Author: Prateek Maheshwari <pmaheshw@linkedin.com> Reviewers: Jagadish V <vjagadish1989@apache.org> Closes apache#194 from prateekm/internal-cleanup
1 parent 29cf374 commit c1c4289

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1405
-2060
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@ docs/learn/documentation/*/rest/javadocs
2828
out/
2929
*.patch
3030
**.pyc
31+
samza-shell/src/main/visualizer/plan.json

samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java

Lines changed: 25 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,9 @@
3131
import org.apache.samza.SamzaException;
3232
import org.apache.samza.config.Config;
3333
import org.apache.samza.config.JobConfig;
34-
import org.apache.samza.operators.MessageStream;
35-
import org.apache.samza.operators.MessageStreamImpl;
3634
import org.apache.samza.operators.StreamGraphImpl;
35+
import org.apache.samza.operators.spec.JoinOperatorSpec;
3736
import org.apache.samza.operators.spec.OperatorSpec;
38-
import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
3937
import org.apache.samza.system.StreamSpec;
4038
import org.apache.samza.system.SystemStream;
4139
import org.slf4j.Logger;
@@ -77,7 +75,7 @@ public ExecutionPlan plan(StreamGraphImpl streamGraph) throws Exception {
7775
*/
7876
/* package private */ JobGraph createJobGraph(StreamGraphImpl streamGraph) {
7977
JobGraph jobGraph = new JobGraph(config);
80-
Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputStreams().keySet());
78+
Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputOperators().keySet());
8179
Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutputStreams().keySet());
8280
Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
8381
intStreams.retainAll(sinkStreams);
@@ -120,7 +118,7 @@ public ExecutionPlan plan(StreamGraphImpl streamGraph) throws Exception {
120118
/**
121119
* Fetch the partitions of source/sink streams and update the StreamEdges.
122120
* @param jobGraph {@link JobGraph}
123-
* @param streamManager the {@StreamManager} to interface with the streams.
121+
* @param streamManager the {@link StreamManager} to interface with the streams.
124122
*/
125123
/* package private */ static void updateExistingPartitions(JobGraph jobGraph, StreamManager streamManager) {
126124
Set<StreamEdge> existingStreams = new HashSet<>();
@@ -157,20 +155,16 @@ public ExecutionPlan plan(StreamGraphImpl streamGraph) throws Exception {
157155
Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
158156
// reverse mapping of the above
159157
Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs = HashMultimap.create();
160-
// Mapping from the output stream to the join spec. Since StreamGraph creates two partial join operators for a join and they
161-
// will have the same output stream, this mapping is used to choose one of them as the unique join spec representing this join
162-
// (who register first in the map wins).
163-
Map<MessageStream, OperatorSpec> outputStreamToJoinSpec = new HashMap<>();
164158
// A queue of joins with known input partitions
165159
Queue<OperatorSpec> joinQ = new LinkedList<>();
166160
// The visited set keeps track of the join specs that have been already inserted in the queue before
167161
Set<OperatorSpec> visited = new HashSet<>();
168162

169-
streamGraph.getInputStreams().entrySet().forEach(entry -> {
163+
streamGraph.getInputOperators().entrySet().forEach(entry -> {
170164
StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(entry.getKey());
171165
// Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
172166
findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,
173-
outputStreamToJoinSpec, joinQ, visited);
167+
joinQ, visited);
174168
});
175169

176170
// At this point, joinQ contains joinSpecs where at least one of the input stream edge partitions is known.
@@ -209,44 +203,33 @@ public ExecutionPlan plan(StreamGraphImpl streamGraph) throws Exception {
209203
}
210204

211205
/**
212-
* This function traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
213-
* @param inputMessageStream next input MessageStream to traverse {@link MessageStream}
206+
* This function traverses the {@link OperatorSpec} graph to find and update mappings for all Joins reachable
207+
* from this input {@link StreamEdge}.
208+
* @param operatorSpec the {@link OperatorSpec} to traverse
214209
* @param sourceStreamEdge source {@link StreamEdge}
215210
* @param joinSpecToStreamEdges mapping from join spec to its source {@link StreamEdge}s
216211
* @param streamEdgeToJoinSpecs mapping from source {@link StreamEdge} to the join specs that consumes it
217-
* @param outputStreamToJoinSpec mapping from the output stream to the join spec
218212
* @param joinQ queue that contains joinSpecs where at least one of the input stream edge partitions is known.
219213
*/
220-
private static void findReachableJoins(MessageStream inputMessageStream, StreamEdge sourceStreamEdge,
221-
Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges, Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs,
222-
Map<MessageStream, OperatorSpec> outputStreamToJoinSpec, Queue<OperatorSpec> joinQ, Set<OperatorSpec> visited) {
223-
Collection<OperatorSpec> specs = ((MessageStreamImpl) inputMessageStream).getRegisteredOperatorSpecs();
224-
for (OperatorSpec spec : specs) {
225-
if (spec instanceof PartialJoinOperatorSpec) {
226-
// every join will have two partial join operators
227-
// we will choose one of them in order to consolidate the inputs
228-
// the first one who registered with the outputStreamToJoinSpec will win
229-
MessageStream output = spec.getNextStream();
230-
OperatorSpec joinSpec = outputStreamToJoinSpec.get(output);
231-
if (joinSpec == null) {
232-
joinSpec = spec;
233-
outputStreamToJoinSpec.put(output, joinSpec);
234-
}
235-
236-
joinSpecToStreamEdges.put(joinSpec, sourceStreamEdge);
237-
streamEdgeToJoinSpecs.put(sourceStreamEdge, joinSpec);
238-
239-
if (!visited.contains(joinSpec) && sourceStreamEdge.getPartitionCount() > 0) {
240-
// put the joins with known input partitions into the queue
241-
joinQ.add(joinSpec);
242-
visited.add(joinSpec);
243-
}
214+
private static void findReachableJoins(OperatorSpec operatorSpec, StreamEdge sourceStreamEdge,
215+
Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges,
216+
Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs,
217+
Queue<OperatorSpec> joinQ, Set<OperatorSpec> visited) {
218+
if (operatorSpec instanceof JoinOperatorSpec) {
219+
joinSpecToStreamEdges.put(operatorSpec, sourceStreamEdge);
220+
streamEdgeToJoinSpecs.put(sourceStreamEdge, operatorSpec);
221+
222+
if (!visited.contains(operatorSpec) && sourceStreamEdge.getPartitionCount() > 0) {
223+
// put the joins with known input partitions into the queue and mark as visited
224+
joinQ.add(operatorSpec);
225+
visited.add(operatorSpec);
244226
}
227+
}
245228

246-
if (spec.getNextStream() != null) {
247-
findReachableJoins(spec.getNextStream(), sourceStreamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, outputStreamToJoinSpec, joinQ,
248-
visited);
249-
}
229+
Collection<OperatorSpec> registeredOperatorSpecs = operatorSpec.getRegisteredOperatorSpecs();
230+
for (OperatorSpec registeredOpSpec : registeredOperatorSpecs) {
231+
findReachableJoins(registeredOpSpec, sourceStreamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ,
232+
visited);
250233
}
251234
}
252235

samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,21 @@
1919

2020
package org.apache.samza.execution;
2121

22-
import com.google.common.base.Joiner;
23-
import com.google.common.collect.HashMultimap;
24-
import com.google.common.collect.Multimap;
2522
import java.io.ByteArrayOutputStream;
2623
import java.util.ArrayList;
2724
import java.util.Collection;
28-
import java.util.Collections;
2925
import java.util.HashMap;
3026
import java.util.HashSet;
3127
import java.util.List;
3228
import java.util.Map;
3329
import java.util.Set;
3430
import java.util.stream.Collectors;
3531
import org.apache.samza.config.ApplicationConfig;
36-
import org.apache.samza.operators.MessageStream;
37-
import org.apache.samza.operators.MessageStreamImpl;
32+
import org.apache.samza.operators.spec.JoinOperatorSpec;
3833
import org.apache.samza.operators.spec.OperatorSpec;
39-
import org.apache.samza.operators.util.OperatorJsonUtils;
34+
import org.apache.samza.operators.spec.OperatorSpec.OpCode;
35+
import org.apache.samza.operators.spec.OutputOperatorSpec;
36+
import org.apache.samza.operators.spec.OutputStreamImpl;
4037
import org.codehaus.jackson.annotate.JsonProperty;
4138
import org.codehaus.jackson.map.ObjectMapper;
4239

@@ -73,8 +70,6 @@ static final class OperatorGraphJson {
7370
List<StreamJson> outputStreams;
7471
@JsonProperty("operators")
7572
Map<Integer, Map<String, Object>> operators = new HashMap<>();
76-
@JsonProperty("canonicalOpIds")
77-
Map<Integer, String> canonicalOpIds = new HashMap<>();
7873
}
7974

8075
static final class StreamJson {
@@ -108,11 +103,6 @@ static final class JobGraphJson {
108103
String applicationId;
109104
}
110105

111-
// Mapping from the output stream to the ids.
112-
// Logically they belong to the same operator, but in code we generate one operator for each input.
113-
// This is to associate the operators that output to the same MessageStream.
114-
Multimap<MessageStream, Integer> outputStreamToOpIds = HashMultimap.create();
115-
116106
/**
117107
* Returns the JSON representation of a {@link JobGraph}
118108
* @param jobGraph {@link JobGraph}
@@ -157,28 +147,21 @@ private JobNodeJson buildJobNodeJson(JobNode jobNode) {
157147
}
158148

159149
/**
160-
* Traverse the {@StreamGraph} and build the operator graph JSON POJO.
150+
* Traverse the {@link OperatorSpec} graph and build the operator graph JSON POJO.
161151
* @param jobNode job node in the {@link JobGraph}
162152
* @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson}
163153
*/
164154
private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
165155
OperatorGraphJson opGraph = new OperatorGraphJson();
166156
opGraph.inputStreams = new ArrayList<>();
167-
jobNode.getStreamGraph().getInputStreams().forEach((streamSpec, stream) -> {
157+
jobNode.getStreamGraph().getInputOperators().forEach((streamSpec, operatorSpec) -> {
168158
StreamJson inputJson = new StreamJson();
169159
opGraph.inputStreams.add(inputJson);
170160
inputJson.streamId = streamSpec.getId();
171-
Collection<OperatorSpec> specs = ((MessageStreamImpl) stream).getRegisteredOperatorSpecs();
161+
Collection<OperatorSpec> specs = operatorSpec.getRegisteredOperatorSpecs();
172162
inputJson.nextOperatorIds = specs.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet());
173163

174-
updateOperatorGraphJson((MessageStreamImpl) stream, opGraph);
175-
176-
for (Map.Entry<MessageStream, Collection<Integer>> entry : outputStreamToOpIds.asMap().entrySet()) {
177-
List<Integer> sortedIds = new ArrayList<>(entry.getValue());
178-
Collections.sort(sortedIds);
179-
String canonicalId = Joiner.on(',').join(sortedIds);
180-
sortedIds.stream().forEach(id -> opGraph.canonicalOpIds.put(id, canonicalId));
181-
}
164+
updateOperatorGraphJson(operatorSpec, opGraph);
182165
});
183166

184167
opGraph.outputStreams = new ArrayList<>();
@@ -191,23 +174,43 @@ private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
191174
}
192175

193176
/**
194-
* Traverse the {@StreamGraph} recursively and update the operator graph JSON POJO.
195-
* @param messageStream input
177+
* Traverse the {@link OperatorSpec} graph recursively and update the operator graph JSON POJO.
178+
* @param operatorSpec input
196179
* @param opGraph operator graph to build
197180
*/
198-
private void updateOperatorGraphJson(MessageStreamImpl messageStream, OperatorGraphJson opGraph) {
199-
Collection<OperatorSpec> specs = messageStream.getRegisteredOperatorSpecs();
200-
specs.forEach(opSpec -> {
201-
opGraph.operators.put(opSpec.getOpId(), OperatorJsonUtils.operatorToMap(opSpec));
202-
203-
if (opSpec.getOpCode() == OperatorSpec.OpCode.JOIN || opSpec.getOpCode() == OperatorSpec.OpCode.MERGE) {
204-
outputStreamToOpIds.put(opSpec.getNextStream(), opSpec.getOpId());
205-
}
206-
207-
if (opSpec.getNextStream() != null) {
208-
updateOperatorGraphJson(opSpec.getNextStream(), opGraph);
209-
}
210-
});
181+
private void updateOperatorGraphJson(OperatorSpec operatorSpec, OperatorGraphJson opGraph) {
182+
// TODO xiliu: render input operators instead of input streams
183+
if (operatorSpec.getOpCode() != OpCode.INPUT) {
184+
opGraph.operators.put(operatorSpec.getOpId(), operatorToMap(operatorSpec));
185+
}
186+
Collection<OperatorSpec> specs = operatorSpec.getRegisteredOperatorSpecs();
187+
specs.forEach(opSpec -> updateOperatorGraphJson(opSpec, opGraph));
188+
}
189+
190+
/**
191+
* Format the operator properties into a map
192+
* @param spec a {@link OperatorSpec} instance
193+
* @return map of the operator properties
194+
*/
195+
private Map<String, Object> operatorToMap(OperatorSpec spec) {
196+
Map<String, Object> map = new HashMap<>();
197+
map.put("opCode", spec.getOpCode().name());
198+
map.put("opId", spec.getOpId());
199+
map.put("sourceLocation", spec.getSourceLocation());
200+
201+
Collection<OperatorSpec> nextOperators = spec.getRegisteredOperatorSpecs();
202+
map.put("nextOperatorIds", nextOperators.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet()));
203+
204+
if (spec instanceof OutputOperatorSpec) {
205+
OutputStreamImpl outputStream = ((OutputOperatorSpec) spec).getOutputStream();
206+
map.put("outputStreamId", outputStream.getStreamSpec().getId());
207+
}
208+
209+
if (spec instanceof JoinOperatorSpec) {
210+
map.put("ttlMs", ((JoinOperatorSpec) spec).getTtlMs());
211+
}
212+
213+
return map;
211214
}
212215

213216
/**

samza-core/src/main/java/org/apache/samza/execution/JobNode.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
import org.apache.samza.config.MapConfig;
3232
import org.apache.samza.config.TaskConfig;
3333
import org.apache.samza.operators.StreamGraphImpl;
34+
import org.apache.samza.operators.spec.JoinOperatorSpec;
3435
import org.apache.samza.operators.spec.OperatorSpec;
35-
import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
3636
import org.apache.samza.operators.spec.WindowOperatorSpec;
3737
import org.apache.samza.operators.util.MathUtils;
3838
import org.apache.samza.util.Util;
@@ -145,7 +145,7 @@ private long computeTriggerInterval() {
145145
// Filter out the join operators, and obtain a list of their ttl values
146146
List<Long> joinTtlIntervals = operatorSpecs.stream()
147147
.filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.JOIN)
148-
.map(spec -> ((PartialJoinOperatorSpec) spec).getTtlMs())
148+
.map(spec -> ((JoinOperatorSpec) spec).getTtlMs())
149149
.collect(Collectors.toList());
150150

151151
// Combine both the above lists

0 commit comments

Comments
 (0)