Skip to content

Commit

Permalink
SAMZA-1175 - Removing CoordinationService from JobCoordinatorFactory …
Browse files Browse the repository at this point in the history
…interface

Removing CoordinationService from JobCoordinatorFactory interface

Author: navina <navina@apache.org>

Reviewers: Xinyu Liu <xinyuliu.us@apache.org>,Boris Shkolnik <boryas@apache.org>

Closes apache#102 from navina/SAMZA-1175
  • Loading branch information
navina authored and nramesh committed Mar 29, 2017
1 parent a989c08 commit d7fc811
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,5 @@ public interface JobCoordinatorFactory {
* pause the container and add/remove tasks
* @return An instance of IJobCoordinator
*/
JobCoordinator getJobCoordinator(int processorId, Config config,
SamzaContainerController containerController, CoordinationUtils coordinationUtils);
JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.CoordinationServiceFactory;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.metrics.MetricsReporter;
Expand Down Expand Up @@ -119,24 +117,17 @@ private StreamProcessor(int processorId, Config config, Map<String, MetricsRepor
updatedConfigMap.put(PROCESSOR_ID, String.valueOf(this.processorId));
Config updatedConfig = new MapConfig(updatedConfigMap);


SamzaContainerController containerController = new SamzaContainerController(
taskFactory,
new TaskConfigJava(updatedConfig).getShutdownMs(),
String.valueOf(processorId),
customMetricsReporters);

CoordinationUtils jobCooridanationService = Util.
<CoordinationServiceFactory>getObj(
new JobCoordinatorConfig(updatedConfig)
.getJobCoordinationServiceFactoryClassName())
.getCoordinationService("groupId", String.valueOf(processorId), updatedConfig);

this.jobCoordinator = Util.
<JobCoordinatorFactory>getObj(
new JobCoordinatorConfig(updatedConfig)
.getJobCoordinatorFactoryClassName())
.getJobCoordinator(processorId, updatedConfig, containerController, jobCooridanationService);
.getJobCoordinator(processorId, updatedConfig, containerController);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
package org.apache.samza.standalone;

import org.apache.samza.config.Config;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.processor.SamzaContainerController;

public class StandaloneJobCoordinatorFactory implements JobCoordinatorFactory {
@Override
public JobCoordinator getJobCoordinator(int processorId, Config config,
SamzaContainerController containerController, CoordinationUtils coordinationUtils) {
public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
return new StandaloneJobCoordinator(processorId, config, containerController);
}
}
25 changes: 16 additions & 9 deletions samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@
*/
package org.apache.samza.zk;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.coordinator.CoordinationServiceFactory;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobModelManager;
Expand All @@ -40,6 +36,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* JobCoordinator for stand alone processor managed via Zookeeper.
*/
Expand All @@ -61,16 +64,20 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private String newJobModelVersion; // version published in ZK (by the leader)
private JobModel jobModel;

public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils,
SamzaContainerController containerController, CoordinationUtils coordinationUtils) {
public ZkJobCoordinator(int processorId, String groupId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils,
SamzaContainerController containerController) {
this.zkUtils = zkUtils;
this.keyBuilder = zkUtils.getKeyBuilder();
this.debounceTimer = debounceTimer;
this.processorId = processorId;
this.containerController = containerController;
this.zkController = new ZkControllerImpl(String.valueOf(processorId), zkUtils, debounceTimer, this);
this.config = config;
this.coordinationUtils = coordinationUtils;
this.coordinationUtils = Util.
<CoordinationServiceFactory>getObj(
new JobCoordinatorConfig(config)
.getJobCoordinationServiceFactoryClassName())
.getCoordinationService(groupId, String.valueOf(processorId), config);

streamMetadataCache = getStreamMetadataCache();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
import org.apache.samza.processor.SamzaContainerController;
Expand All @@ -37,7 +36,7 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
* @return An instance of IJobCoordinator
*/
@Override
public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController, CoordinationUtils coordinationUtils) {
public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
JobConfig jobConfig = new JobConfig(config);
String groupName = String.format("%s-%s", jobConfig.getName().get(), jobConfig.getJobId().get());
ZkConfig zkConfig = new ZkConfig(config);
Expand All @@ -47,6 +46,7 @@ public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaCon

return new ZkJobCoordinator(
processorId,
"groupId", // TODO: Usage of groupId to be resolved in SAMZA-1173
config,
debounceTimer,
new ZkUtils(
Expand All @@ -55,6 +55,6 @@ public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaCon
zkClient,
zkConfig.getZkConnectionTimeoutMs()
),
containerController, coordinationUtils);
containerController);
}
}

0 comments on commit d7fc811

Please sign in to comment.