Skip to content

Commit 4782c61

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 0440f75 + a9ff093 commit 4782c61

File tree

4 files changed

+83
-64
lines changed

4 files changed

+83
-64
lines changed

samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,12 @@ public AbstractApplicationRunner(Config config) {
5858
this.graphSpec = new StreamGraphSpec(config);
5959
}
6060

61-
public ExecutionPlan getExecutionPlan(StreamApplication app, StreamManager streamManager) throws Exception {
62-
return getExecutionPlan(app, null, streamManager);
61+
public ExecutionPlan getExecutionPlan(StreamApplication app) throws Exception {
62+
return getExecutionPlan(app, null);
6363
}
6464

6565
/* package private */
66-
ExecutionPlan getExecutionPlan(StreamApplication app, String runId, StreamManager streamManager) throws Exception {
66+
ExecutionPlan getExecutionPlan(StreamApplication app, String runId) throws Exception {
6767
// build stream graph
6868
app.init(graphSpec, config);
6969
OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
@@ -82,8 +82,14 @@ ExecutionPlan getExecutionPlan(StreamApplication app, String runId, StreamManage
8282
cfg.put(ApplicationConfig.APP_MODE, mode.name());
8383

8484
// create the physical execution plan
85-
ExecutionPlanner planner = new ExecutionPlanner(new MapConfig(cfg), streamManager);
86-
return planner.plan(specGraph);
85+
Config generatedConfig = new MapConfig(cfg);
86+
StreamManager streamManager = buildAndStartStreamManager(generatedConfig);
87+
try {
88+
ExecutionPlanner planner = new ExecutionPlanner(generatedConfig, streamManager);
89+
return planner.plan(specGraph);
90+
} finally {
91+
streamManager.stop();
92+
}
8793
}
8894

8995
/**
@@ -108,8 +114,8 @@ final void writePlanJsonFile(String planJson) {
108114
}
109115

110116
@VisibleForTesting
111-
StreamManager buildAndStartStreamManager() {
112-
StreamManager streamManager = new StreamManager(this.config);
117+
StreamManager buildAndStartStreamManager(Config config) {
118+
StreamManager streamManager = new StreamManager(config);
113119
streamManager.start();
114120
return streamManager;
115121
}

samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -158,32 +158,38 @@ public void runTask() {
158158

159159
@Override
160160
public void run(StreamApplication app) {
161-
StreamManager streamManager = null;
162-
try {
163-
streamManager = buildAndStartStreamManager();
164161

162+
try {
165163
// 1. initialize and plan
166-
ExecutionPlan plan = getExecutionPlan(app, streamManager);
164+
ExecutionPlan plan = getExecutionPlan(app);
167165

168166
String executionPlanJson = plan.getPlanAsJson();
169167
writePlanJsonFile(executionPlanJson);
170168
LOG.info("Execution Plan: \n" + executionPlanJson);
171-
172-
// 2. create the necessary streams
173-
// TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391
174169
String planId = String.valueOf(executionPlanJson.hashCode());
175-
createStreams(planId, plan.getIntermediateStreams(), streamManager);
176170

177-
// 3. create the StreamProcessors
178171
if (plan.getJobConfigs().isEmpty()) {
179172
throw new SamzaException("No jobs to run.");
180173
}
174+
181175
plan.getJobConfigs().forEach(jobConfig -> {
182-
LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
183-
LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener();
184-
StreamProcessor processor = createStreamProcessor(jobConfig, graphSpec, listener);
185-
listener.setProcessor(processor);
186-
processors.add(processor);
176+
StreamManager streamManager = null;
177+
try {
178+
// 2. create the necessary streams
179+
streamManager = buildAndStartStreamManager(jobConfig);
180+
createStreams(planId, plan.getIntermediateStreams(), streamManager);
181+
182+
// 3. create the StreamProcessors
183+
LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig);
184+
LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener();
185+
StreamProcessor processor = createStreamProcessor(jobConfig, graphSpec, listener);
186+
listener.setProcessor(processor);
187+
processors.add(processor);
188+
} finally {
189+
if (streamManager != null) {
190+
streamManager.stop();
191+
}
192+
}
187193
});
188194
numProcessorsToStart.set(processors.size());
189195

@@ -193,10 +199,6 @@ public void run(StreamApplication app) {
193199
appStatus = ApplicationStatus.unsuccessfulFinish(throwable);
194200
shutdownLatch.countDown();
195201
throw new SamzaException(String.format("Failed to start application: %s.", app), throwable);
196-
} finally {
197-
if (streamManager != null) {
198-
streamManager.stop();
199-
}
200202
}
201203
}
202204

@@ -256,11 +258,10 @@ public boolean waitForFinish(Duration timeout) {
256258
* stream creation.
257259
* @param planId a unique identifier representing the plan used for coordination purpose
258260
* @param intStreams list of intermediate {@link StreamSpec}s
259-
* @throws TimeoutException exception for latch timeout
260261
*/
261262
private void createStreams(String planId,
262263
List<StreamSpec> intStreams,
263-
StreamManager streamManager) throws TimeoutException {
264+
StreamManager streamManager) {
264265
if (intStreams.isEmpty()) {
265266
LOG.info("Set of intermediate streams is empty. Nothing to create.");
266267
return;

samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,36 +61,38 @@ public void runTask() {
6161
*/
6262
@Override
6363
public void run(StreamApplication app) {
64-
StreamManager streamManager = null;
6564
try {
66-
streamManager = buildAndStartStreamManager();
6765
// TODO: run.id needs to be set for standalone: SAMZA-1531
6866
// run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision
6967
String runId = String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8);
7068
LOG.info("The run id for this run is {}", runId);
7169

7270
// 1. initialize and plan
73-
ExecutionPlan plan = getExecutionPlan(app, runId, streamManager);
71+
ExecutionPlan plan = getExecutionPlan(app, runId);
7472
writePlanJsonFile(plan.getPlanAsJson());
7573

76-
// 2. create the necessary streams
77-
if (plan.getApplicationConfig().getAppMode() == ApplicationConfig.ApplicationMode.BATCH) {
78-
streamManager.clearStreamsFromPreviousRun(getConfigFromPrevRun());
79-
}
80-
streamManager.createStreams(plan.getIntermediateStreams());
81-
82-
// 3. submit jobs for remote execution
8374
plan.getJobConfigs().forEach(jobConfig -> {
84-
LOG.info("Starting job {} with config {}", jobConfig.getName(), jobConfig);
85-
JobRunner runner = new JobRunner(jobConfig);
86-
runner.run(true);
75+
StreamManager streamManager = null;
76+
try {
77+
// 2. create the necessary streams
78+
streamManager = buildAndStartStreamManager(jobConfig);
79+
if (plan.getApplicationConfig().getAppMode() == ApplicationConfig.ApplicationMode.BATCH) {
80+
streamManager.clearStreamsFromPreviousRun(getConfigFromPrevRun());
81+
}
82+
streamManager.createStreams(plan.getIntermediateStreams());
83+
84+
// 3. submit jobs for remote execution
85+
LOG.info("Starting job {} with config {}", jobConfig.getName(), jobConfig);
86+
JobRunner runner = new JobRunner(jobConfig);
87+
runner.run(true);
88+
} finally {
89+
if (streamManager != null) {
90+
streamManager.stop();
91+
}
92+
}
8793
});
8894
} catch (Throwable t) {
8995
throw new SamzaException("Failed to run application", t);
90-
} finally {
91-
if (streamManager != null) {
92-
streamManager.stop();
93-
}
9496
}
9597
}
9698

samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.samza.runtime;
2121

2222
import com.google.common.collect.ImmutableList;
23+
2324
import java.time.Duration;
2425
import java.util.Collections;
2526
import java.util.HashMap;
@@ -56,7 +57,13 @@
5657
import static org.junit.Assert.assertTrue;
5758
import static org.mockito.Matchers.anyObject;
5859
import static org.mockito.Matchers.anyString;
59-
import static org.mockito.Mockito.*;
60+
import static org.mockito.Mockito.any;
61+
import static org.mockito.Mockito.anyLong;
62+
import static org.mockito.Mockito.doAnswer;
63+
import static org.mockito.Mockito.mock;
64+
import static org.mockito.Mockito.spy;
65+
import static org.mockito.Mockito.verify;
66+
import static org.mockito.Mockito.when;
6067
import static org.powermock.api.mockito.PowerMockito.doReturn;
6168

6269

@@ -74,17 +81,18 @@ public class TestLocalApplicationRunner {
7481
@Test
7582
public void testStreamCreation()
7683
throws Exception {
77-
Map<String, String> config = new HashMap<>();
78-
LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config)));
84+
Config config = new MapConfig(new HashMap<>());
85+
LocalApplicationRunner runner = spy(new LocalApplicationRunner(config));
7986
StreamApplication app = mock(StreamApplication.class);
8087

8188
StreamManager streamManager = mock(StreamManager.class);
82-
doReturn(streamManager).when(runner).buildAndStartStreamManager();
89+
doReturn(streamManager).when(runner).buildAndStartStreamManager(any(Config.class));
8390

8491
ExecutionPlan plan = mock(ExecutionPlan.class);
8592
when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")));
8693
when(plan.getPlanAsJson()).thenReturn("");
87-
doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
94+
when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(config)));
95+
doReturn(plan).when(runner).getExecutionPlan(any());
8896

8997
CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
9098
JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
@@ -109,19 +117,19 @@ public void testStreamCreation()
109117
@Test
110118
public void testStreamCreationWithCoordination()
111119
throws Exception {
112-
Map<String, String> config = new HashMap<>();
113-
LocalApplicationRunner localRunner = new LocalApplicationRunner(new MapConfig(config));
114-
LocalApplicationRunner runner = spy(localRunner);
120+
Config config = new MapConfig(new HashMap<>());
121+
LocalApplicationRunner runner = spy(new LocalApplicationRunner(config));
115122

116123
StreamApplication app = mock(StreamApplication.class);
117124

118125
StreamManager streamManager = mock(StreamManager.class);
119-
doReturn(streamManager).when(runner).buildAndStartStreamManager();
126+
doReturn(streamManager).when(runner).buildAndStartStreamManager(any(Config.class));
120127

121128
ExecutionPlan plan = mock(ExecutionPlan.class);
122129
when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")));
123130
when(plan.getPlanAsJson()).thenReturn("");
124-
doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
131+
when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(config)));
132+
doReturn(plan).when(runner).getExecutionPlan(any());
125133

126134
CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
127135
CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
@@ -183,19 +191,20 @@ public void testRunStreamTask()
183191
@Test
184192
public void testRunComplete()
185193
throws Exception {
186-
final Map<String, String> config = new HashMap<>();
187-
config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
194+
HashMap<String, String> configMap = new HashMap<>();
195+
configMap.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
196+
Config config = new MapConfig(configMap);
188197
LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config)));
189198
StreamApplication app = mock(StreamApplication.class);
190199

191200
// buildAndStartStreamManager already includes start, so not going to verify it gets called
192201
StreamManager streamManager = mock(StreamManager.class);
193-
when(runner.buildAndStartStreamManager()).thenReturn(streamManager);
202+
when(runner.buildAndStartStreamManager(any(Config.class))).thenReturn(streamManager);
194203
ExecutionPlan plan = mock(ExecutionPlan.class);
195204
when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
196205
when(plan.getPlanAsJson()).thenReturn("");
197206
when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config))));
198-
doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
207+
doReturn(plan).when(runner).getExecutionPlan(any());
199208

200209
StreamProcessor sp = mock(StreamProcessor.class);
201210
ArgumentCaptor<StreamProcessorLifecycleListener> captor =
@@ -221,19 +230,20 @@ public void testRunComplete()
221230
@Test
222231
public void testRunFailure()
223232
throws Exception {
224-
final Map<String, String> config = new HashMap<>();
225-
config.put(ApplicationConfig.PROCESSOR_ID, "0");
226-
LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config)));
233+
final Map<String, String> configMap = new HashMap<>();
234+
configMap.put(ApplicationConfig.PROCESSOR_ID, "0");
235+
MapConfig config = new MapConfig(configMap);
236+
LocalApplicationRunner runner = spy(new LocalApplicationRunner(config));
227237
StreamApplication app = mock(StreamApplication.class);
228238

229239
// buildAndStartStreamManager already includes start, so not going to verify it gets called
230240
StreamManager streamManager = mock(StreamManager.class);
231-
when(runner.buildAndStartStreamManager()).thenReturn(streamManager);
241+
when(runner.buildAndStartStreamManager(any(Config.class))).thenReturn(streamManager);
232242
ExecutionPlan plan = mock(ExecutionPlan.class);
233243
when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
234244
when(plan.getPlanAsJson()).thenReturn("");
235-
when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config))));
236-
doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
245+
when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(config)));
246+
doReturn(plan).when(runner).getExecutionPlan(any());
237247

238248
StreamProcessor sp = mock(StreamProcessor.class);
239249
ArgumentCaptor<StreamProcessorLifecycleListener> captor =

0 commit comments

Comments
 (0)