Skip to content

Commit eca0020

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 5156239 + 40f7430 commit eca0020

27 files changed

+1523
-246
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ project(":samza-core_$scalaVersion") {
184184
compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
185185
compile "org.scala-lang:scala-library:$scalaLibVersion"
186186
compile "org.slf4j:slf4j-api:$slf4jVersion"
187+
compile "net.jodah:failsafe:$failsafeVersion"
187188
testCompile project(":samza-api").sourceSets.test.output
188189
testCompile "junit:junit:$junitVersion"
189190
testCompile "org.mockito:mockito-core:$mockitoVersion"

gradle/dependency-versions.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,5 @@
4545
yarnVersion = "2.6.1"
4646
zkClientVersion = "0.8"
4747
zookeeperVersion = "3.4.6"
48+
failsafeVersion = "1.1.0"
4849
}

samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java

Lines changed: 101 additions & 77 deletions
Large diffs are not rendered by default.

samza-core/src/main/java/org/apache/samza/execution/JobGraph.java

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@
5454

5555
private final Map<String, JobNode> nodes = new HashMap<>();
5656
private final Map<String, StreamEdge> edges = new HashMap<>();
57-
private final Set<StreamEdge> sources = new HashSet<>();
58-
private final Set<StreamEdge> sinks = new HashSet<>();
57+
private final Set<StreamEdge> inputStreams = new HashSet<>();
58+
private final Set<StreamEdge> outputStreams = new HashSet<>();
5959
private final Set<StreamEdge> intermediateStreams = new HashSet<>();
6060
private final Set<TableSpec> tables = new HashSet<>();
6161
private final Config config;
@@ -115,26 +115,26 @@ public OperatorSpecGraph getSpecGraph() {
115115

116116
/**
117117
* Add a source stream to a {@link JobNode}
118-
* @param input source stream
119-
* @param node the job node that consumes from the source
118+
* @param streamSpec input stream
119+
* @param node the job node that consumes from the streamSpec
120120
*/
121-
void addSource(StreamSpec input, JobNode node) {
122-
StreamEdge edge = getOrCreateStreamEdge(input);
121+
void addInputStream(StreamSpec streamSpec, JobNode node) {
122+
StreamEdge edge = getOrCreateStreamEdge(streamSpec);
123123
edge.addTargetNode(node);
124124
node.addInEdge(edge);
125-
sources.add(edge);
125+
inputStreams.add(edge);
126126
}
127127

128128
/**
129-
* Add a sink stream to a {@link JobNode}
130-
* @param output sink stream
131-
* @param node the job node that outputs to the sink
129+
* Add an output stream to a {@link JobNode}
130+
* @param streamSpec output stream
131+
* @param node the job node that outputs to the output stream
132132
*/
133-
void addSink(StreamSpec output, JobNode node) {
134-
StreamEdge edge = getOrCreateStreamEdge(output);
133+
void addOutputStream(StreamSpec streamSpec, JobNode node) {
134+
StreamEdge edge = getOrCreateStreamEdge(streamSpec);
135135
edge.addSourceNode(node);
136136
node.addOutEdge(edge);
137-
sinks.add(edge);
137+
outputStreams.add(edge);
138138
}
139139

140140
/**
@@ -204,19 +204,19 @@ List<JobNode> getJobNodes() {
204204
}
205205

206206
/**
207-
* Returns the source streams in the graph
207+
* Returns the input streams in the graph
208208
* @return unmodifiable set of {@link StreamEdge}
209209
*/
210-
Set<StreamEdge> getSources() {
211-
return Collections.unmodifiableSet(sources);
210+
Set<StreamEdge> getInputStreams() {
211+
return Collections.unmodifiableSet(inputStreams);
212212
}
213213

214214
/**
215-
* Return the sink streams in the graph
215+
* Return the output streams in the graph
216216
* @return unmodifiable set of {@link StreamEdge}
217217
*/
218-
Set<StreamEdge> getSinks() {
219-
return Collections.unmodifiableSet(sinks);
218+
Set<StreamEdge> getOutputStreams() {
219+
return Collections.unmodifiableSet(outputStreams);
220220
}
221221

222222
/**
@@ -236,22 +236,22 @@ Set<StreamEdge> getIntermediateStreamEdges() {
236236
}
237237

238238
/**
239-
* Validate the graph has the correct topology, meaning the sources are coming from external streams,
240-
* sinks are going to external streams, and the nodes are connected with intermediate streams.
241-
* Also validate all the nodes are reachable from the sources.
239+
* Validate the graph has the correct topology, meaning the input streams are coming from external streams,
240+
* output streams are going to external streams, and the nodes are connected with intermediate streams.
241+
* Also validate all the nodes are reachable from the input streams.
242242
*/
243243
void validate() {
244-
validateSources();
245-
validateSinks();
244+
validateInputStreams();
245+
validateOutputStreams();
246246
validateInternalStreams();
247247
validateReachability();
248248
}
249249

250250
/**
251-
* Validate the sources should have indegree being 0 and outdegree greater than 0
251+
* Validate the input streams should have indegree being 0 and outdegree greater than 0
252252
*/
253-
private void validateSources() {
254-
sources.forEach(edge -> {
253+
private void validateInputStreams() {
254+
inputStreams.forEach(edge -> {
255255
if (!edge.getSourceNodes().isEmpty()) {
256256
throw new IllegalArgumentException(
257257
String.format("Source stream %s should not have producers.", edge.getName()));
@@ -264,10 +264,10 @@ private void validateSources() {
264264
}
265265

266266
/**
267-
* Validate the sinks should have outdegree being 0 and indegree greater than 0
267+
* Validate the output streams should have outdegree being 0 and indegree greater than 0
268268
*/
269-
private void validateSinks() {
270-
sinks.forEach(edge -> {
269+
private void validateOutputStreams() {
270+
outputStreams.forEach(edge -> {
271271
if (!edge.getTargetNodes().isEmpty()) {
272272
throw new IllegalArgumentException(
273273
String.format("Sink stream %s should not have consumers", edge.getName()));
@@ -284,8 +284,8 @@ private void validateSinks() {
284284
*/
285285
private void validateInternalStreams() {
286286
Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
287-
internalEdges.removeAll(sources);
288-
internalEdges.removeAll(sinks);
287+
internalEdges.removeAll(inputStreams);
288+
internalEdges.removeAll(outputStreams);
289289

290290
internalEdges.forEach(edge -> {
291291
if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) {
@@ -296,10 +296,10 @@ private void validateInternalStreams() {
296296
}
297297

298298
/**
299-
* Validate all nodes are reachable by sources.
299+
* Validate all nodes are reachable by input streams.
300300
*/
301301
private void validateReachability() {
302-
// validate all nodes are reachable from the sources
302+
// validate all nodes are reachable from the input streams
303303
final Set<JobNode> reachable = findReachable();
304304
if (reachable.size() != nodes.size()) {
305305
Set<JobNode> unreachable = new HashSet<>(nodes.values());
@@ -317,8 +317,8 @@ Set<JobNode> findReachable() {
317317
Queue<JobNode> queue = new ArrayDeque<>();
318318
Set<JobNode> visited = new HashSet<>();
319319

320-
sources.forEach(source -> {
321-
List<JobNode> next = source.getTargetNodes();
320+
inputStreams.forEach(input -> {
321+
List<JobNode> next = input.getTargetNodes();
322322
queue.addAll(next);
323323
visited.addAll(next);
324324
});
@@ -353,11 +353,11 @@ List<JobNode> topologicalSort() {
353353
pnodes.forEach(node -> {
354354
String nid = node.getId();
355355
//only count the degrees of intermediate streams
356-
long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count();
356+
long degree = node.getInEdges().stream().filter(e -> !inputStreams.contains(e)).count();
357357
indegree.put(nid, degree);
358358

359359
if (degree == 0L) {
360-
// start from the nodes that has no intermediate input streams, so it only consumes from sources
360+
// start from the nodes that has no intermediate input streams, so it only consumes from input streams
361361
q.add(node);
362362
visited.add(node);
363363
}
@@ -410,9 +410,9 @@ List<JobNode> topologicalSort() {
410410
q.add(minNode);
411411
visited.add(minNode);
412412
} else {
413-
// all the remaining nodes should be reachable from sources
414-
// start from sources again to find the next node that hasn't been visited
415-
JobNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream())
413+
// all the remaining nodes should be reachable from input streams
414+
// start from input streams again to find the next node that hasn't been visited
415+
JobNode nextNode = inputStreams.stream().flatMap(input -> input.getTargetNodes().stream())
416416
.filter(node -> !visited.contains(node))
417417
.findAny().get();
418418
q.add(nextNode);

samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ static final class JobGraphJson {
134134
jobGraphJson.sinkStreams = new HashMap<>();
135135
jobGraphJson.intermediateStreams = new HashMap<>();
136136
jobGraphJson.tables = new HashMap<>();
137-
jobGraph.getSources().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sourceStreams));
138-
jobGraph.getSinks().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sinkStreams));
137+
jobGraph.getInputStreams().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sourceStreams));
138+
jobGraph.getOutputStreams().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sinkStreams));
139139
jobGraph.getIntermediateStreamEdges().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.intermediateStreams));
140140
jobGraph.getTables().forEach(t -> buildTableJson(t, jobGraphJson.tables));
141141

samza-core/src/main/java/org/apache/samza/table/TableManager.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@
3737

3838
/**
3939
* A {@link TableManager} manages tables within a Samza task. For each table, it maintains
40-
* the {@link TableSpec} and the {@link TableProvider}. It is used at execution for
41-
* {@link org.apache.samza.container.TaskInstance} to retrieve table instances for
42-
* read/write operations.
40+
* the {@link TableSpec}, the {@link TableProvider} and the {@link Table} instance.
41+
* It is used at execution for {@link org.apache.samza.container.TaskInstance} to retrieve
42+
* table instances for read/write operations.
4343
*
4444
* A {@link TableManager} is constructed from job configuration, the {@link TableSpec}
45-
* and {@link TableProvider} are constructed by processing the job configuration.
45+
* and {@link TableProvider} are constructed by processing the job configuration
46+
* during initialization. The {@link Table} is constructed when {@link #getTable(String)}
47+
* is called and cached.
4648
*
4749
* After a {@link TableManager} is constructed, local tables are associated with
4850
* local store instances created during {@link org.apache.samza.container.SamzaContainer}
@@ -51,19 +53,19 @@
5153
* Method {@link TableManager#getTable(String)} will throw {@link IllegalStateException},
5254
* if it's called before initialization.
5355
*
54-
* For store backed tables, the list of stores must be injected into the constructor.
5556
*/
5657
public class TableManager {
5758

5859
static public class TableCtx {
5960
private TableSpec tableSpec;
6061
private TableProvider tableProvider;
62+
private Table table;
6163
}
6264

6365
private final Logger logger = LoggerFactory.getLogger(TableManager.class.getName());
6466

6567
// tableId -> TableCtx
66-
private final Map<String, TableCtx> tables = new HashMap<>();
68+
private final Map<String, TableCtx> tableContexts = new HashMap<>();
6769

6870
private boolean initialized;
6971

@@ -100,7 +102,7 @@ public TableManager(Config config, Map<String, Serde<Object>> serdes) {
100102
*/
101103
public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
102104
Preconditions.checkNotNull(containerContext, "null container context.");
103-
tables.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext));
105+
tableContexts.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext));
104106
initialized = true;
105107
}
106108

@@ -109,22 +111,22 @@ public void init(SamzaContainerContext containerContext, TaskContext taskContext
109111
* @param tableSpec the table spec
110112
*/
111113
private void addTable(TableSpec tableSpec) {
112-
if (tables.containsKey(tableSpec.getId())) {
114+
if (tableContexts.containsKey(tableSpec.getId())) {
113115
throw new SamzaException("Table " + tableSpec.getId() + " already exists");
114116
}
115117
TableCtx ctx = new TableCtx();
116118
TableProviderFactory tableProviderFactory =
117119
Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
118120
ctx.tableProvider = tableProviderFactory.getTableProvider(tableSpec);
119121
ctx.tableSpec = tableSpec;
120-
tables.put(tableSpec.getId(), ctx);
122+
tableContexts.put(tableSpec.getId(), ctx);
121123
}
122124

123125
/**
124126
* Shutdown the table manager, internally it shuts down all tables
125127
*/
126128
public void close() {
127-
tables.values().forEach(ctx -> ctx.tableProvider.close());
129+
tableContexts.values().forEach(ctx -> ctx.tableProvider.close());
128130
}
129131

130132
/**
@@ -133,10 +135,14 @@ public void close() {
133135
* @return table instance
134136
*/
135137
public Table getTable(String tableId) {
136-
if (!initialized) {
137-
throw new IllegalStateException("TableManager has not been initialized.");
138+
Preconditions.checkState(initialized, "TableManager has not been initialized.");
139+
140+
TableCtx ctx = tableContexts.get(tableId);
141+
Preconditions.checkNotNull(ctx, "Unknown tableId " + tableId);
142+
143+
if (ctx.table == null) {
144+
ctx.table = ctx.tableProvider.getTable();
138145
}
139-
Preconditions.checkArgument(tables.containsKey(tableId), "Unknown tableId=" + tableId);
140-
return tables.get(tableId).tableProvider.getTable();
146+
return ctx.table;
141147
}
142148
}

samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@
4343
* @param <V> the type of the value in this table
4444
*/
4545
public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implements ReadWriteTable<K, V> {
46-
private final TableWriteFunction<K, V> writeFn;
4746

4847
private DefaultTableWriteMetrics writeMetrics;
4948

5049
@VisibleForTesting
50+
final TableWriteFunction<K, V> writeFn;
5151
final TableRateLimiter writeRateLimiter;
5252

5353
public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,

samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,10 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
8080
protected final ExecutorService callbackExecutor;
8181
protected final ExecutorService tableExecutor;
8282

83-
private final TableReadFunction<K, V> readFn;
8483
private DefaultTableReadMetrics readMetrics;
8584

8685
@VisibleForTesting
86+
final TableReadFunction<K, V> readFn;
8787
final TableRateLimiter<K, V> readRateLimiter;
8888

8989
/**

0 commit comments

Comments
 (0)