Skip to content

Commit e2adf8f

Browse files
cameronlee314prateekm
authored andcommitted
SAMZA-2012: Add API for wiring an external context through to application processing code
This PR also refactors TestSamzaSqlRemoteTable to be in samza-test instead of samza-sql, since it seems to actually be an integration test. It is useful to move that test in this PR so that tests that may need an external context can be consolidated. Author: Cameron Lee <calee@linkedin.com> Reviewers: Prateek Maheshwari <pmaheshwari@apache.org>, Shanthoosh Venkatraman <svenkatr@linkedin.com> Closes apache#829 from cameronlee314/external_context
1 parent 1a7e270 commit e2adf8f

File tree

40 files changed

+622
-379
lines changed

40 files changed

+622
-379
lines changed

samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.samza.context;
2020

2121
import java.io.Serializable;
22+
import org.apache.samza.annotation.InterfaceStability;
2223
import org.apache.samza.application.SamzaApplication;
2324
import org.apache.samza.application.descriptors.ApplicationDescriptor;
2425

@@ -35,14 +36,41 @@
3536
*
3637
* @param <T> concrete type of {@link ApplicationContainerContext} created by this factory
3738
*/
39+
@InterfaceStability.Evolving
3840
public interface ApplicationContainerContextFactory<T extends ApplicationContainerContext> extends Serializable {
39-
4041
/**
4142
* Creates an instance of the application-defined {@link ApplicationContainerContext}.
43+
* <p>
44+
* Applications should implement this to provide a context for container initialization.
4245
*
46+
* @param externalContext external context provided for the application; null if it was not provided
4347
* @param jobContext framework-provided job context
4448
* @param containerContext framework-provided container context
4549
* @return a new instance of the application-defined {@link ApplicationContainerContext}
4650
*/
47-
T create(JobContext jobContext, ContainerContext containerContext);
51+
default T create(ExternalContext externalContext, JobContext jobContext, ContainerContext containerContext) {
52+
return create(jobContext, containerContext);
53+
}
54+
55+
/**
56+
* New implementations should not implement this directly. Implement
57+
* {@link #create(ExternalContext, JobContext, ContainerContext)} instead.
58+
* <p>
59+
* This is the same as {@link #create(ExternalContext, JobContext, ContainerContext)}, except it does not provide
60+
* access to external context.
61+
* <p>
62+
* This is being left here for backwards compatibility.
63+
*
64+
* @param jobContext framework-provided job context
65+
* @param containerContext framework-provided container context
66+
* @return a new instance of the application-defined {@link ApplicationContainerContext}
67+
*
68+
* Deprecated: Applications should implement {@link #create(ExternalContext, JobContext, ContainerContext)} directly.
69+
* This is being left here for backwards compatibility.
70+
*/
71+
@Deprecated
72+
default T create(JobContext jobContext, ContainerContext containerContext) {
73+
// adding this here so that new apps do not need to implement this
74+
throw new UnsupportedOperationException("Please implement a version of create for the factory implementation.");
75+
}
4876
}

samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.samza.context;
2020

2121
import java.io.Serializable;
22+
import org.apache.samza.annotation.InterfaceStability;
2223
import org.apache.samza.application.SamzaApplication;
2324
import org.apache.samza.application.descriptors.ApplicationDescriptor;
2425

@@ -35,17 +36,49 @@
3536
*
3637
* @param <T> concrete type of {@link ApplicationTaskContext} created by this factory
3738
*/
39+
@InterfaceStability.Evolving
3840
public interface ApplicationTaskContextFactory<T extends ApplicationTaskContext> extends Serializable {
39-
4041
/**
4142
* Creates an instance of the application-defined {@link ApplicationTaskContext}.
43+
* <p>
44+
* Applications should implement this to provide a context for task initialization.
45+
*
46+
* @param externalContext external context provided for the application; null if it was not provided
47+
* @param jobContext framework-provided job context
48+
* @param containerContext framework-provided container context
49+
* @param taskContext framework-provided task context
50+
* @param applicationContainerContext application-defined container context; null if it was not provided
51+
* @return a new instance of the application-defined {@link ApplicationTaskContext}
52+
*/
53+
default T create(ExternalContext externalContext, JobContext jobContext, ContainerContext containerContext,
54+
TaskContext taskContext, ApplicationContainerContext applicationContainerContext) {
55+
return create(jobContext, containerContext, taskContext, applicationContainerContext);
56+
}
57+
58+
/**
59+
* New implementations should not implement this directly. Implement
60+
* {@link #create(ExternalContext, JobContext, ContainerContext, TaskContext, ApplicationContainerContext)} instead.
61+
* <p>
62+
* This is the same as
63+
* {@link #create(ExternalContext, JobContext, ContainerContext, TaskContext, ApplicationContainerContext)}, except it
64+
* does not provide access to external context.
65+
* <p>
66+
* This is being left here for backwards compatibility.
4267
*
4368
* @param jobContext framework-provided job context
4469
* @param containerContext framework-provided container context
4570
* @param taskContext framework-provided task context
46-
* @param applicationContainerContext application-defined container context
71+
* @param applicationContainerContext application-defined container context; null if it was not provided
4772
* @return a new instance of the application-defined {@link ApplicationTaskContext}
73+
*
74+
* Deprecated: Applications should implement
75+
* {@link #create(ExternalContext, JobContext, ContainerContext, TaskContext, ApplicationContainerContext)} directly.
76+
* This is being left here for backwards compatibility.
4877
*/
49-
T create(JobContext jobContext, ContainerContext containerContext, TaskContext taskContext,
50-
ApplicationContainerContext applicationContainerContext);
78+
@Deprecated
79+
default T create(JobContext jobContext, ContainerContext containerContext, TaskContext taskContext,
80+
ApplicationContainerContext applicationContainerContext) {
81+
// adding this here so that new apps do not need to implement this
82+
throw new UnsupportedOperationException("Please implement a version of create for the factory implementation.");
83+
}
5184
}

samza-api/src/main/java/org/apache/samza/context/Context.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.samza.context;
2020

2121
import org.apache.samza.application.descriptors.ApplicationDescriptor;
22+
import org.apache.samza.runtime.ApplicationRunner;
23+
2224

2325
/**
2426
* A holder for all framework and application defined contexts at runtime.
@@ -76,4 +78,13 @@ public interface Context {
7678
* @throws IllegalStateException if no {@link ApplicationTaskContextFactory} was provided for the application
7779
*/
7880
ApplicationTaskContext getApplicationTaskContext();
81+
82+
/**
83+
* Gets the {@link ExternalContext} that was created outside of the application.
84+
* <p>
85+
* Use {@link ApplicationRunner#run(ExternalContext)} to provide this context.
86+
*
87+
* @return the external context provided for the application
88+
*/
89+
ExternalContext getExternalContext();
7990
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.context;
20+
21+
/**
22+
* An {@link ExternalContext} can be used to pass components created and managed outside of Samza into a Samza
23+
* application. This will be made accessible through the {@link Context}.
24+
* <p>
25+
* This is passed to {@link org.apache.samza.runtime.ApplicationRunner#run(ExternalContext)} and propagated down to the
26+
* {@link Context} object provided to tasks.
27+
* <p>
28+
* {@link ExternalContext} can be used to inject objects that need to be created by other frameworks, such as Spring.
29+
* <p>
30+
* This is currently just a marker interface for the object passed into Samza.
31+
*/
32+
public interface ExternalContext {
33+
}

samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.time.Duration;
2222
import org.apache.samza.annotation.InterfaceStability;
23+
import org.apache.samza.context.ExternalContext;
2324
import org.apache.samza.job.ApplicationStatus;
2425

2526

@@ -32,12 +33,19 @@
3233
*/
3334
@InterfaceStability.Evolving
3435
public interface ApplicationRunner {
36+
/**
37+
* This is like {@link #run(ExternalContext)}, except it provides a null {@link ExternalContext}.
38+
*/
39+
default void run() {
40+
run(null);
41+
}
3542

3643
/**
3744
* Deploy and run the Samza jobs to execute {@link org.apache.samza.application.SamzaApplication}.
3845
* It is non-blocking so it doesn't wait for the application running.
46+
* @param externalContext nullable {@link ExternalContext} to pass through to the application
3947
*/
40-
void run();
48+
void run(ExternalContext externalContext);
4149

4250
/**
4351
* Kill the Samza jobs represented by {@link org.apache.samza.application.SamzaApplication}

samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.samza.application.StreamApplication;
2626
import org.apache.samza.config.Config;
2727
import org.apache.samza.config.MapConfig;
28+
import org.apache.samza.context.ExternalContext;
2829
import org.apache.samza.job.ApplicationStatus;
2930
import org.junit.Test;
3031

@@ -60,7 +61,7 @@ public MockApplicationRunner(SamzaApplication userApp, Config config) {
6061
}
6162

6263
@Override
63-
public void run() {
64+
public void run(ExternalContext externalContext) {
6465

6566
}
6667

samza-core/src/main/java/org/apache/samza/context/ContextImpl.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,26 @@ public class ContextImpl implements Context {
3030
private final TaskContext taskContext;
3131
private final Optional<ApplicationContainerContext> applicationContainerContextOptional;
3232
private final Optional<ApplicationTaskContext> applicationTaskContextOptional;
33+
private final Optional<ExternalContext> externalContextOptional;
3334

3435
/**
3536
* @param jobContext non-null job context
3637
* @param containerContext non-null framework container context
3738
* @param taskContext non-null framework task context
3839
* @param applicationContainerContextOptional optional application-defined container context
3940
* @param applicationTaskContextOptional optional application-defined task context
41+
* @param externalContextOptional optional external context
4042
*/
41-
public ContextImpl(JobContext jobContext,
42-
ContainerContext containerContext,
43-
TaskContext taskContext,
43+
public ContextImpl(JobContext jobContext, ContainerContext containerContext, TaskContext taskContext,
4444
Optional<ApplicationContainerContext> applicationContainerContextOptional,
45-
Optional<ApplicationTaskContext> applicationTaskContextOptional) {
45+
Optional<ApplicationTaskContext> applicationTaskContextOptional,
46+
Optional<ExternalContext> externalContextOptional) {
4647
this.jobContext = Preconditions.checkNotNull(jobContext, "Job context can not be null");
4748
this.containerContext = Preconditions.checkNotNull(containerContext, "Container context can not be null");
4849
this.taskContext = Preconditions.checkNotNull(taskContext, "Task context can not be null");
4950
this.applicationContainerContextOptional = applicationContainerContextOptional;
5051
this.applicationTaskContextOptional = applicationTaskContextOptional;
52+
this.externalContextOptional = externalContextOptional;
5153
}
5254

5355
@Override
@@ -67,20 +69,24 @@ public TaskContext getTaskContext() {
6769

6870
@Override
6971
public ApplicationContainerContext getApplicationContainerContext() {
70-
if (!this.applicationContainerContextOptional.isPresent()) {
71-
throw new IllegalStateException("No application-defined container context exists");
72-
}
72+
Preconditions.checkState(this.applicationContainerContextOptional.isPresent(),
73+
"No application-defined container context exists");
7374
return this.applicationContainerContextOptional.get();
7475
}
7576

7677
@Override
7778
public ApplicationTaskContext getApplicationTaskContext() {
78-
if (!this.applicationTaskContextOptional.isPresent()) {
79-
throw new IllegalStateException("No application-defined task context exists");
80-
}
79+
Preconditions.checkState(this.applicationTaskContextOptional.isPresent(),
80+
"No application-defined task context exists");
8181
return this.applicationTaskContextOptional.get();
8282
}
8383

84+
@Override
85+
public ExternalContext getExternalContext() {
86+
Preconditions.checkState(this.externalContextOptional.isPresent(), "No external context exists");
87+
return this.externalContextOptional.get();
88+
}
89+
8490
@Override
8591
public boolean equals(Object o) {
8692
if (this == o) {
@@ -93,12 +99,13 @@ public boolean equals(Object o) {
9399
return Objects.equals(jobContext, context.jobContext) && Objects.equals(containerContext, context.containerContext)
94100
&& Objects.equals(taskContext, context.taskContext) && Objects.equals(applicationContainerContextOptional,
95101
context.applicationContainerContextOptional) && Objects.equals(applicationTaskContextOptional,
96-
context.applicationTaskContextOptional);
102+
context.applicationTaskContextOptional) && Objects.equals(externalContextOptional,
103+
context.externalContextOptional);
97104
}
98105

99106
@Override
100107
public int hashCode() {
101108
return Objects.hash(jobContext, containerContext, taskContext, applicationContainerContextOptional,
102-
applicationTaskContextOptional);
109+
applicationTaskContextOptional, externalContextOptional);
103110
}
104111
}

samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.samza.context.ApplicationContainerContextFactory;
4141
import org.apache.samza.context.ApplicationTaskContext;
4242
import org.apache.samza.context.ApplicationTaskContextFactory;
43+
import org.apache.samza.context.ExternalContext;
4344
import org.apache.samza.context.JobContextImpl;
4445
import org.apache.samza.coordinator.JobCoordinator;
4546
import org.apache.samza.coordinator.JobCoordinatorFactory;
@@ -115,6 +116,7 @@ public class StreamProcessor {
115116
* context object.
116117
*/
117118
private final Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional;
119+
private final Optional<ExternalContext> externalContextOptional;
118120
private final Map<String, MetricsReporter> customMetricsReporter;
119121
private final Config config;
120122
private final long taskShutdownMs;
@@ -169,7 +171,7 @@ public State getState() {
169171
* @param taskFactory task factory to instantiate the Task
170172
* @param processorListener listener to the StreamProcessor life cycle
171173
*
172-
* Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional,
174+
* Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional, Optional,
173175
* StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
174176
*/
175177
@Deprecated
@@ -179,7 +181,7 @@ public StreamProcessor(Config config, Map<String, MetricsReporter> customMetrics
179181
}
180182

181183
/**
182-
* Same as {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional,
184+
* Same as {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional, Optional,
183185
* StreamProcessorLifecycleListenerFactory, JobCoordinator)}, with the following differences:
184186
* <ol>
185187
* <li>Passes null for application-defined context factories</li>
@@ -193,14 +195,14 @@ public StreamProcessor(Config config, Map<String, MetricsReporter> customMetrics
193195
* @param processorListener listener to the StreamProcessor life cycle
194196
* @param jobCoordinator the instance of {@link JobCoordinator}
195197
*
196-
* Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional,
198+
* Deprecated: Use {@link #StreamProcessor(Config, Map, TaskFactory, Optional, Optional, Optional,
197199
* StreamProcessorLifecycleListenerFactory, JobCoordinator)} instead.
198200
*/
199201
@Deprecated
200202
public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
201203
ProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) {
202-
this(config, customMetricsReporters, taskFactory, Optional.empty(), Optional.empty(), sp -> processorListener,
203-
jobCoordinator);
204+
this(config, customMetricsReporters, taskFactory, Optional.empty(), Optional.empty(), Optional.empty(),
205+
sp -> processorListener, jobCoordinator);
204206
}
205207

206208
/**
@@ -211,19 +213,22 @@ public StreamProcessor(Config config, Map<String, MetricsReporter> customMetrics
211213
* @param taskFactory task factory to instantiate the Task
212214
* @param applicationDefinedContainerContextFactoryOptional optional factory for application-defined container context
213215
* @param applicationDefinedTaskContextFactoryOptional optional factory for application-defined task context
216+
* @param externalContextOptional optional {@link ExternalContext} to pass through to the application
214217
* @param listenerFactory factory for creating a listener to the StreamProcessor life cycle
215218
* @param jobCoordinator the instance of {@link JobCoordinator}
216219
*/
217220
public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, TaskFactory taskFactory,
218221
Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> applicationDefinedContainerContextFactoryOptional,
219222
Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> applicationDefinedTaskContextFactoryOptional,
220-
StreamProcessorLifecycleListenerFactory listenerFactory, JobCoordinator jobCoordinator) {
223+
Optional<ExternalContext> externalContextOptional, StreamProcessorLifecycleListenerFactory listenerFactory,
224+
JobCoordinator jobCoordinator) {
221225
Preconditions.checkNotNull(listenerFactory, "StreamProcessorListenerFactory cannot be null.");
222226
this.config = config;
223227
this.customMetricsReporter = customMetricsReporters;
224228
this.taskFactory = taskFactory;
225229
this.applicationDefinedContainerContextFactoryOptional = applicationDefinedContainerContextFactoryOptional;
226230
this.applicationDefinedTaskContextFactoryOptional = applicationDefinedTaskContextFactoryOptional;
231+
this.externalContextOptional = externalContextOptional;
227232
this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
228233
this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : createJobCoordinator();
229234
this.jobCoordinatorListener = createJobCoordinatorListener();
@@ -318,7 +323,7 @@ SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
318323
this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
319324
Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
320325
Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
321-
null);
326+
Option.apply(this.externalContextOptional.orElse(null)), null);
322327
}
323328

324329
private JobCoordinator createJobCoordinator() {

samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.samza.util.CommandLine;
2727
import org.apache.samza.util.Util;
2828

29-
3029
/**
3130
* This class contains the main() method used by run-app.sh.
3231
* It creates the {@link ApplicationRunner} based on the config, and then run the application.
@@ -59,7 +58,7 @@ public static void main(String[] args) throws Exception {
5958

6059
switch (op) {
6160
case RUN:
62-
appRunner.run();
61+
appRunner.run(null);
6362
break;
6463
case KILL:
6564
appRunner.kill();

0 commit comments

Comments
 (0)