Skip to content

Commit 5587ebf

Browse files
committed
SAMZA-1796: PassthroughJobCoordinator doesn't create changelog streams
Currently only the ClusterBasedJobCoordinator and ZkJobCoordinator are creating changelog streams. The Passthrough one should also do it. Author: xinyuiscool <xiliu@linkedin.com> Reviewers: Bharath K <bharathkk@gmail.com> Closes apache#595 from xinyuiscool/SAMZA-1796
1 parent 1d2f054 commit 5587ebf

File tree

1 file changed

+3
-0
lines changed

1 file changed

+3
-0
lines changed

samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.samza.job.model.JobModel;
3030
import org.apache.samza.coordinator.JobCoordinatorListener;
3131
import org.apache.samza.runtime.ProcessorIdGenerator;
32+
import org.apache.samza.storage.ChangelogStreamManager;
3233
import org.apache.samza.system.StreamMetadataCache;
3334
import org.apache.samza.system.SystemAdmins;
3435
import org.apache.samza.util.*;
@@ -81,6 +82,8 @@ public void start() {
8182
if (checkpointManager != null) {
8283
checkpointManager.createResources();
8384
}
85+
86+
ChangelogStreamManager.createChangelogStreams(config, jobModel.maxChangeLogStreamPartitions);
8487
} catch (Exception e) {
8588
LOGGER.error("Exception while trying to getJobModel.", e);
8689
if (coordinatorListener != null) {

0 commit comments

Comments
 (0)