Skip to content

Commit e0b5a32

Browse files
dnishimurarmatharu-zz
authored andcommitted
SAMZA-2298: Fix CoordinatorStreamStore creation for LocalApplicationRunner (apache#1136)
1 parent 767a5d4 commit e0b5a32

File tree

2 files changed

+140
-16
lines changed

2 files changed

+140
-16
lines changed

samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,15 @@
5757
import org.apache.samza.metrics.MetricsRegistryMap;
5858
import org.apache.samza.metrics.MetricsReporter;
5959
import org.apache.samza.processor.StreamProcessor;
60+
import org.apache.samza.system.SystemAdmin;
61+
import org.apache.samza.system.SystemAdmins;
62+
import org.apache.samza.system.SystemStream;
6063
import org.apache.samza.task.TaskFactory;
6164
import org.apache.samza.task.TaskFactoryUtil;
65+
import org.apache.samza.util.CoordinatorStreamUtil;
6266
import org.apache.samza.util.ReflectionUtil;
6367
import org.apache.samza.util.Util;
68+
import org.apache.samza.zk.ZkJobCoordinatorFactory;
6469
import org.apache.samza.zk.ZkMetadataStoreFactory;
6570
import org.slf4j.Logger;
6671
import org.slf4j.LoggerFactory;
@@ -96,7 +101,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
96101
* @param config configuration for the application
97102
*/
98103
public LocalApplicationRunner(SamzaApplication app, Config config) {
99-
this(app, config, getMetadataStoreFactory(new JobConfig(config)));
104+
this(app, config, getDefaultCoordinatorStreamStoreFactory(new JobConfig(config)));
100105
}
101106

102107
/**
@@ -121,15 +126,22 @@ public LocalApplicationRunner(SamzaApplication app, Config config, MetadataStore
121126
this.appDesc = appDesc;
122127
this.isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
123128
this.coordinationUtils = coordinationUtils;
124-
this.metadataStoreFactory = Optional.ofNullable(getMetadataStoreFactory(new JobConfig(appDesc.getConfig())));
129+
this.metadataStoreFactory = Optional.ofNullable(getDefaultCoordinatorStreamStoreFactory(new JobConfig(appDesc.getConfig())));
125130
}
126131

127-
static MetadataStoreFactory getMetadataStoreFactory(JobConfig jobConfig) {
128-
if (jobConfig.getCoordinatorSystemNameOrNull() != null) {
132+
static MetadataStoreFactory getDefaultCoordinatorStreamStoreFactory(JobConfig jobConfig) {
133+
String coordinatorSystemName = jobConfig.getCoordinatorSystemNameOrNull();
134+
JobCoordinatorConfig jobCoordinatorConfig = new JobCoordinatorConfig(jobConfig);
135+
String jobCoordinatorFactoryClassName = jobCoordinatorConfig.getJobCoordinatorFactoryClassName();
136+
137+
// TODO: Remove restriction to only ZkJobCoordinator after next phase of metadata store abstraction.
138+
if (StringUtils.isNotBlank(coordinatorSystemName) && ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) {
129139
return new CoordinatorStreamMetadataStoreFactory();
130140
}
131-
LOG.warn("{} or {} not configured. No coordinator stream metadata store will be created.",
132-
JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
141+
142+
LOG.warn("{} or {} not configured, or {} is not {}. No default coordinator stream metadata store will be created.",
143+
JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM,
144+
JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ZkJobCoordinatorFactory.class.getName());
133145
return null;
134146
}
135147

@@ -272,15 +284,49 @@ CountDownLatch getShutdownLatch() {
272284
}
273285

274286
@VisibleForTesting
275-
MetadataStore createCoordinatorStreamStore(Config jobConfig) {
287+
MetadataStore createCoordinatorStreamStore(Config config) {
276288
if (metadataStoreFactory.isPresent()) {
277-
MetadataStore coordinatorStreamStore =
278-
metadataStoreFactory.get().getMetadataStore("NoOp", jobConfig, new MetricsRegistryMap());
279-
return coordinatorStreamStore;
289+
// TODO: Add missing metadata store abstraction for creating the underlying store to address SAMZA-2182
290+
if (metadataStoreFactory.get() instanceof CoordinatorStreamMetadataStoreFactory) {
291+
if (createUnderlyingCoordinatorStream(config)) {
292+
MetadataStore coordinatorStreamStore =
293+
metadataStoreFactory.get().getMetadataStore("NoOp", config, new MetricsRegistryMap());
294+
LOG.info("Created coordinator stream store of type: {}", coordinatorStreamStore.getClass().getSimpleName());
295+
return coordinatorStreamStore;
296+
}
297+
} else {
298+
MetadataStore otherMetadataStore =
299+
metadataStoreFactory.get().getMetadataStore("NoOp", config, new MetricsRegistryMap());
300+
LOG.info("Created alternative coordinator stream store of type: {}", otherMetadataStore.getClass().getSimpleName());
301+
return otherMetadataStore;
302+
}
280303
}
304+
305+
LOG.warn("No coordinator stream store created.");
281306
return null;
282307
}
283308

309+
@VisibleForTesting
310+
boolean createUnderlyingCoordinatorStream(Config config) {
311+
// TODO: This work around method is necessary due to SAMZA-2182 - Metadata store: disconnect between creation and usage of the underlying storage
312+
// and will be addressed in the next phase of metadata store abstraction
313+
if (new JobConfig(config).getCoordinatorSystemNameOrNull() == null) {
314+
LOG.warn("{} or {} not configured. Coordinator stream not created.",
315+
JobConfig.JOB_COORDINATOR_SYSTEM, JobConfig.JOB_DEFAULT_SYSTEM);
316+
return false;
317+
}
318+
SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
319+
SystemAdmins systemAdmins = new SystemAdmins(config);
320+
systemAdmins.start();
321+
try {
322+
SystemAdmin coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
323+
CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin);
324+
} finally {
325+
systemAdmins.stop();
326+
}
327+
return true;
328+
}
329+
284330
@VisibleForTesting
285331
StreamProcessor createStreamProcessor(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
286332
StreamProcessor.StreamProcessorLifecycleListenerFactory listenerFactory,

samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,25 @@
3535
import org.apache.samza.config.Config;
3636
import org.apache.samza.config.ConfigException;
3737
import org.apache.samza.config.JobConfig;
38+
import org.apache.samza.config.JobCoordinatorConfig;
3839
import org.apache.samza.config.MapConfig;
3940
import org.apache.samza.context.ExternalContext;
4041
import org.apache.samza.coordinator.ClusterMembership;
4142
import org.apache.samza.coordinator.CoordinationConstants;
4243
import org.apache.samza.coordinator.CoordinationUtils;
4344
import org.apache.samza.coordinator.DistributedLock;
45+
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory;
4446
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
4547
import org.apache.samza.execution.LocalJobPlanner;
4648
import org.apache.samza.job.ApplicationStatus;
49+
import org.apache.samza.metadatastore.InMemoryMetadataStore;
50+
import org.apache.samza.metadatastore.InMemoryMetadataStoreFactory;
51+
import org.apache.samza.metadatastore.MetadataStore;
4752
import org.apache.samza.metadatastore.MetadataStoreFactory;
53+
import org.apache.samza.metrics.MetricsRegistry;
4854
import org.apache.samza.processor.StreamProcessor;
55+
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
56+
import org.apache.samza.system.SystemAdmins;
4957
import org.apache.samza.task.IdentityStreamTask;
5058
import org.apache.samza.zk.ZkMetadataStore;
5159
import org.apache.samza.zk.ZkMetadataStoreFactory;
@@ -476,24 +484,94 @@ private void prepareTestForRunId() throws Exception {
476484
doReturn(coordinatorStreamStore).when(runner).createCoordinatorStreamStore(any(Config.class));
477485
}
478486

487+
/**
488+
* Default metadata store factory should be null if no job coordinator system defined and the default
489+
* ZkJobCoordinator is used.
490+
*/
479491
@Test
480-
public void testGetMetadataStoreFactoryWithoutJobCoordinatorSystem() {
492+
public void testGetCoordinatorStreamStoreFactoryWithoutJobCoordinatorSystem() {
481493
MetadataStoreFactory metadataStoreFactory =
482-
LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new MapConfig()));
494+
LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new JobConfig(new MapConfig()));
483495
assertNull(metadataStoreFactory);
484496
}
485497

498+
/**
499+
* Default metadata store factory should not be null if job coordinator system defined and the default
500+
* ZkJobCoordinator is used.
501+
*/
486502
@Test
487-
public void testGetMetadataStoreFactoryWithJobCoordinatorSystem() {
503+
public void testGetCoordinatorStreamStoreFactoryWithJobCoordinatorSystem() {
488504
MetadataStoreFactory metadataStoreFactory =
489-
LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, "test-system"))));
505+
LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_COORDINATOR_SYSTEM, "test-system"))));
490506
assertNotNull(metadataStoreFactory);
491507
}
492508

509+
/**
510+
* Default metadata store factory should not be null if default system defined and the default
511+
* ZkJobCoordinator is used.
512+
*/
493513
@Test
494-
public void testGetMetadataStoreFactoryWithDefaultSystem() {
514+
public void testGetCoordinatorStreamStoreFactoryWithDefaultSystem() {
495515
MetadataStoreFactory metadataStoreFactory =
496-
LocalApplicationRunner.getMetadataStoreFactory(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "test-system"))));
516+
LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM, "test-system"))));
497517
assertNotNull(metadataStoreFactory);
498518
}
519+
520+
/**
521+
* Default metadata store factory be null if job coordinator system or default system defined and a non ZkJobCoordinator
522+
* job coordinator is used.
523+
*/
524+
@Test
525+
public void testGetCoordinatorStreamStoreFactoryWithNonZkJobCoordinator() {
526+
MapConfig mapConfig = new MapConfig(
527+
ImmutableMap.of(
528+
JobConfig.JOB_DEFAULT_SYSTEM, "test-system",
529+
JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()));
530+
MetadataStoreFactory metadataStoreFactory =
531+
LocalApplicationRunner.getDefaultCoordinatorStreamStoreFactory(new JobConfig(mapConfig));
532+
assertNull(metadataStoreFactory);
533+
}
534+
535+
/**
536+
* Underlying coordinator stream should be created if using CoordinatorStreamMetadataStoreFactory
537+
* @throws Exception
538+
*/
539+
@Test
540+
public void testCreateCoordinatorStreamWithCoordinatorFactory() throws Exception {
541+
CoordinatorStreamStore coordinatorStreamStore = mock(CoordinatorStreamStore.class);
542+
CoordinatorStreamMetadataStoreFactory coordinatorStreamMetadataStoreFactory = mock(CoordinatorStreamMetadataStoreFactory.class);
543+
doReturn(coordinatorStreamStore).when(coordinatorStreamMetadataStoreFactory).getMetadataStore(anyString(), any(Config.class), any(
544+
MetricsRegistry.class));
545+
SystemAdmins systemAdmins = mock(SystemAdmins.class);
546+
PowerMockito.whenNew(SystemAdmins.class).withAnyArguments().thenReturn(systemAdmins);
547+
LocalApplicationRunner localApplicationRunner =
548+
spy(new LocalApplicationRunner(mockApp, config, coordinatorStreamMetadataStoreFactory));
549+
550+
// create store only if successful in creating the underlying coordinator stream
551+
doReturn(true).when(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
552+
assertEquals(coordinatorStreamStore, localApplicationRunner.createCoordinatorStreamStore(config));
553+
verify(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
554+
555+
// do not create store if creating the underlying coordinator stream fails
556+
doReturn(false).when(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
557+
assertNull(localApplicationRunner.createCoordinatorStreamStore(config));
558+
}
559+
560+
/**
561+
* Underlying coordinator stream should not be created if not using CoordinatorStreamMetadataStoreFactory
562+
* @throws Exception
563+
*/
564+
@Test
565+
public void testCreateCoordinatorStreamWithoutCoordinatorFactory() throws Exception {
566+
SystemAdmins systemAdmins = mock(SystemAdmins.class);
567+
PowerMockito.whenNew(SystemAdmins.class).withAnyArguments().thenReturn(systemAdmins);
568+
LocalApplicationRunner localApplicationRunner =
569+
spy(new LocalApplicationRunner(mockApp, config, new InMemoryMetadataStoreFactory()));
570+
doReturn(false).when(localApplicationRunner).createUnderlyingCoordinatorStream(eq(config));
571+
MetadataStore coordinatorStreamStore = localApplicationRunner.createCoordinatorStreamStore(config);
572+
assertTrue(coordinatorStreamStore instanceof InMemoryMetadataStore);
573+
574+
// creating underlying coordinator stream should not be called for other coordinator stream metadata store types.
575+
verify(localApplicationRunner, never()).createUnderlyingCoordinatorStream(eq(config));
576+
}
499577
}

0 commit comments

Comments
 (0)