Skip to content

Commit f3d0dc7

Browse files
authored
SAMZA-2118: Improve the shutdown sequence of AsyncRunLoop. (apache#935)
* Wake up AsyncRunLoop on shutdown if its waiting for dispatched messages to come-back. * Code clean up. * Address review comments.
1 parent 3b8dc03 commit f3d0dc7

File tree

3 files changed

+130
-8
lines changed

3 files changed

+130
-8
lines changed

samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ public double getWorkFactor() {
193193

194194
public void shutdown() {
195195
shutdownNow = true;
196+
resume();
196197
}
197198

198199
/**
@@ -220,15 +221,17 @@ private IncomingMessageEnvelope chooseEnvelope() {
220221
* Insert the envelope into the task pending queues and run all the tasks
221222
*/
222223
private void runTasks(IncomingMessageEnvelope envelope) {
223-
if (envelope != null) {
224-
PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
225-
for (AsyncTaskWorker worker : sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) {
226-
worker.state.insertEnvelope(pendingEnvelope);
224+
if (!shutdownNow) {
225+
if (envelope != null) {
226+
PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
227+
for (AsyncTaskWorker worker : sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition())) {
228+
worker.state.insertEnvelope(pendingEnvelope);
229+
}
227230
}
228-
}
229231

230-
for (AsyncTaskWorker worker: taskWorkers) {
231-
worker.run();
232+
for (AsyncTaskWorker worker: taskWorkers) {
233+
worker.run();
234+
}
232235
}
233236
}
234237

@@ -280,7 +283,10 @@ private void blockIfBusyOrNoNewWork(IncomingMessageEnvelope envelope) {
280283
}
281284

282285
/**
283-
* Resume the runloop thread. It is triggered once a task becomes ready again or has failure.
286+
* Resume the runloop thread. This API is triggered in the following scenarios:
287+
* A. A task becomes ready to process a message.
288+
* B. A task has failed when processing a message.
289+
* C. User thread shuts down the run loop.
284290
*/
285291
private void resume() {
286292
log.trace("Resume loop thread");
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.test.processor;
21+
22+
import java.util.concurrent.CountDownLatch;
23+
import org.apache.samza.application.TaskApplication;
24+
import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
25+
import org.apache.samza.serializers.NoOpSerde;
26+
import org.apache.samza.system.IncomingMessageEnvelope;
27+
import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
28+
import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
29+
import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
30+
import org.apache.samza.task.AsyncStreamTask;
31+
import org.apache.samza.task.AsyncStreamTaskFactory;
32+
import org.apache.samza.task.ClosableTask;
33+
import org.apache.samza.task.MessageCollector;
34+
import org.apache.samza.task.TaskCallback;
35+
import org.apache.samza.task.TaskCoordinator;
36+
import org.apache.samza.test.table.TestTableData;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
public class TestTaskApplication implements TaskApplication {
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(TestTaskApplication.class);
43+
44+
private final String systemName;
45+
private final String inputTopic;
46+
private final String outputTopic;
47+
private final CountDownLatch shutdownLatch;
48+
private final CountDownLatch processedMessageLatch;
49+
50+
public TestTaskApplication(String systemName, String inputTopic, String outputTopic,
51+
CountDownLatch processedMessageLatch, CountDownLatch shutdownLatch) {
52+
this.systemName = systemName;
53+
this.inputTopic = inputTopic;
54+
this.outputTopic = outputTopic;
55+
this.processedMessageLatch = processedMessageLatch;
56+
this.shutdownLatch = shutdownLatch;
57+
}
58+
59+
private class TestTaskImpl implements AsyncStreamTask, ClosableTask {
60+
61+
@Override
62+
public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback) {
63+
processedMessageLatch.countDown();
64+
// Implementation does not invokes callback.complete to block the AsyncRunLoop.process() after it exhausts the
65+
// `task.max.concurrency` defined per task.
66+
}
67+
68+
@Override
69+
public void close() {
70+
LOG.info("Task instance is shutting down.");
71+
shutdownLatch.countDown();
72+
}
73+
}
74+
75+
@Override
76+
public void describe(TaskApplicationDescriptor appDescriptor) {
77+
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(systemName);
78+
KafkaInputDescriptor<TestTableData.Profile> inputDescriptor = ksd.getInputDescriptor(inputTopic, new NoOpSerde<>());
79+
KafkaOutputDescriptor<TestTableData.EnrichedPageView> outputDescriptor = ksd.getOutputDescriptor(outputTopic, new NoOpSerde<>());
80+
appDescriptor.withInputStream(inputDescriptor)
81+
.withOutputStream(outputDescriptor)
82+
.withTaskFactory((AsyncStreamTaskFactory) () -> new TestTaskImpl());
83+
}
84+
}

samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.kafka.clients.producer.KafkaProducer;
4646
import org.apache.kafka.clients.producer.ProducerRecord;
4747
import org.apache.samza.Partition;
48+
import org.apache.samza.application.TaskApplication;
4849
import org.apache.samza.config.ApplicationConfig;
4950
import org.apache.samza.config.Config;
5051
import org.apache.samza.config.ClusterManagerConfig;
@@ -940,6 +941,37 @@ public void testStatefulSamzaApplicationShouldRedistributeInputPartitionsToCorre
940941
Assert.assertEquals(32, jobModel.maxChangeLogStreamPartitions);
941942
}
942943

944+
@Test
945+
public void testApplicationShutdownShouldBeIndependentOfPerMessageProcessingTime() throws Exception {
946+
publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
947+
948+
// Create a TaskApplication with only one task per container.
949+
// The task does not invokes taskCallback.complete for any of the dispatched message.
950+
CountDownLatch shutdownLatch = new CountDownLatch(1);
951+
CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
952+
953+
TaskApplication taskApplication = new TestTaskApplication(TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, shutdownLatch);
954+
MapConfig taskApplicationConfig = new MapConfig(ImmutableList.of(applicationConfig1,
955+
ImmutableMap.of(TaskConfig.MAX_CONCURRENCY(), "1", JobConfig.SSP_GROUPER_FACTORY(), "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory")));
956+
ApplicationRunner appRunner = ApplicationRunners.getApplicationRunner(taskApplication, taskApplicationConfig);
957+
958+
// Run the application.
959+
executeRun(appRunner, applicationConfig1);
960+
961+
// Wait for the task to receive at least one dispatched message.
962+
processedMessagesLatch1.await();
963+
964+
// Kill the application when none of the dispatched messages is acknowledged as completed by the task.
965+
appRunner.kill();
966+
appRunner.waitForFinish();
967+
968+
// Expect the shutdown latch to be triggered.
969+
shutdownLatch.await();
970+
971+
// Assert that the shutdown was successful.
972+
Assert.assertEquals(ApplicationStatus.SuccessfulFinish, appRunner.status());
973+
}
974+
943975
/**
944976
* Computes the task to partition assignment of the {@param JobModel}.
945977
* @param jobModel the jobModel to compute task to partition assignment for.

0 commit comments

Comments
 (0)