Skip to content

added ExecutorWrapper in WorkerFactoryOptions #988

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ public class SyncActivityWorker implements SuspendableWorker {

private final ActivityWorker worker;
private final POJOActivityTaskHandler taskHandler;
private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4);
private final ScheduledExecutorService heartbeatExecutor;

public SyncActivityWorker(
IWorkflowService service, String domain, String taskList, SingleWorkerOptions options) {
heartbeatExecutor = options.getExecutorWrapper().wrap(Executors.newScheduledThreadPool(4));
taskHandler =
new POJOActivityTaskHandler(service, domain, options.getDataConverter(), heartbeatExecutor);
worker = new ActivityWorker(service, domain, taskList, options, taskHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.uber.cadence.internal.worker.SuspendableWorker;
import com.uber.cadence.internal.worker.WorkflowWorker;
import com.uber.cadence.serviceclient.IWorkflowService;
import com.uber.cadence.worker.ExecutorWrapper;
import com.uber.cadence.worker.WorkflowImplementationOptions;
import com.uber.cadence.workflow.Functions.Func;
import com.uber.cadence.workflow.WorkflowInterceptor;
Expand All @@ -51,8 +52,8 @@ public class SyncWorkflowWorker
private final POJOWorkflowImplementationFactory factory;
private final DataConverter dataConverter;
private final POJOActivityTaskHandler laTaskHandler;
private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(4);
private final ScheduledExecutorService ldaHeartbeatExecutor = Executors.newScheduledThreadPool(4);
private final ScheduledExecutorService heartbeatExecutor;
private final ScheduledExecutorService ldaHeartbeatExecutor;
private SuspendableWorker ldaWorker;
private POJOActivityTaskHandler ldaTaskHandler;
private final IWorkflowService service;
Expand All @@ -73,6 +74,11 @@ public SyncWorkflowWorker(
this.dataConverter = workflowOptions.getDataConverter();
this.service = service;

// heartbeat executors
ExecutorWrapper executorWrapper = localActivityOptions.getExecutorWrapper();
heartbeatExecutor = executorWrapper.wrap(Executors.newScheduledThreadPool(4));
ldaHeartbeatExecutor = executorWrapper.wrap(Executors.newScheduledThreadPool(4));

factory =
new POJOWorkflowImplementationFactory(
workflowOptions.getDataConverter(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public void start() {
getOrCreateActivityPollTask(),
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
options.getPollerOptions(),
options.getMetricsScope());
options.getMetricsScope(),
options.getExecutorWrapper());
poller.start();
setPoller(poller);
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public void start() {
laPollTask,
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
options.getPollerOptions(),
options.getMetricsScope());
options.getMetricsScope(),
options.getExecutorWrapper());
poller.start();
setPoller(poller);
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ public interface TaskHandler<TT> {

this.options = options;
taskExecutor =
new ThreadPoolExecutor(
0,
options.getTaskExecutorThreadPoolSize(),
1,
TimeUnit.SECONDS,
new SynchronousQueue<>());
options
.getExecutorWrapper()
.wrap(
new ThreadPoolExecutor(
0,
options.getTaskExecutorThreadPoolSize(),
1,
TimeUnit.SECONDS,
new SynchronousQueue<>()));
taskExecutor.setThreadFactory(
new ExecutorThreadFactory(
options.getPollerOptions().getPollThreadNamePrefix().replaceFirst("Poller", "Executor"),
Expand Down
21 changes: 14 additions & 7 deletions src/main/java/com/uber/cadence/internal/worker/Poller.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.uber.cadence.internal.metrics.MetricsType;
import com.uber.cadence.internal.worker.autoscaler.AutoScaler;
import com.uber.cadence.internal.worker.autoscaler.AutoScalerFactory;
import com.uber.cadence.worker.ExecutorWrapper;
import com.uber.m3.tally.Scope;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
Expand Down Expand Up @@ -72,24 +73,29 @@ interface ThrowingRunnable {

private final AutoScaler pollerAutoScaler;

private final ExecutorWrapper executorWrapper;

public Poller(
String identity,
PollTask<T> pollTask,
ShutdownableTaskExecutor<T> taskExecutor,
PollerOptions pollerOptions,
Scope metricsScope) {
Scope metricsScope,
ExecutorWrapper executorWrapper) {
Objects.requireNonNull(identity, "identity cannot be null");
Objects.requireNonNull(pollTask, "poll service should not be null");
Objects.requireNonNull(taskExecutor, "taskExecutor should not be null");
Objects.requireNonNull(pollerOptions, "pollerOptions should not be null");
Objects.requireNonNull(metricsScope, "metricsScope should not be null");
Objects.requireNonNull(metricsScope, "executorWrapper should not be null");

this.identity = identity;
this.pollTask = pollTask;
this.taskExecutor = taskExecutor;
this.pollerOptions = pollerOptions;
this.metricsScope = metricsScope;
this.pollerAutoScaler = AutoScalerFactory.getInstance().createAutoScaler(pollerOptions);
this.executorWrapper = executorWrapper;
}

@Override
Expand All @@ -109,12 +115,13 @@ public void start() {
// As task enqueues next task the buffering is needed to queue task until the previous one
// releases a thread.
pollExecutor =
new ThreadPoolExecutor(
pollerOptions.getPollThreadCount(),
pollerOptions.getPollThreadCount(),
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(pollerOptions.getPollThreadCount()));
executorWrapper.wrap(
new ThreadPoolExecutor(
pollerOptions.getPollThreadCount(),
pollerOptions.getPollThreadCount(),
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(pollerOptions.getPollThreadCount())));
pollExecutor.setThreadFactory(
new ExecutorThreadFactory(
pollerOptions.getPollThreadNamePrefix(), pollerOptions.getUncaughtExceptionHandler()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.uber.cadence.converter.DataConverter;
import com.uber.cadence.converter.JsonDataConverter;
import com.uber.cadence.internal.metrics.NoopScope;
import com.uber.cadence.worker.ExecutorWrapper;
import com.uber.m3.tally.Scope;
import io.opentracing.Tracer;
import java.time.Duration;
Expand All @@ -46,6 +47,7 @@ public static final class Builder {
private boolean enableLoggingInReplay;
private List<ContextPropagator> contextPropagators;
private Tracer tracer;
private ExecutorWrapper executorWrapper;

private Builder() {}

Expand All @@ -59,6 +61,7 @@ public Builder(SingleWorkerOptions options) {
this.enableLoggingInReplay = options.getEnableLoggingInReplay();
this.contextPropagators = options.getContextPropagators();
this.tracer = options.getTracer();
this.executorWrapper = options.getExecutorWrapper();
}

public Builder setIdentity(String identity) {
Expand Down Expand Up @@ -107,6 +110,11 @@ public Builder setTracer(Tracer tracer) {
return this;
}

public Builder setExecutorWrapper(ExecutorWrapper executorWrapper) {
this.executorWrapper = executorWrapper;
return this;
}

public SingleWorkerOptions build() {
if (pollerOptions == null) {
pollerOptions =
Expand Down Expand Up @@ -134,7 +142,8 @@ public SingleWorkerOptions build() {
metricsScope,
enableLoggingInReplay,
contextPropagators,
tracer);
tracer,
executorWrapper);
}
}

Expand All @@ -147,6 +156,7 @@ public SingleWorkerOptions build() {
private final boolean enableLoggingInReplay;
private List<ContextPropagator> contextPropagators;
private final Tracer tracer;
private final ExecutorWrapper executorWrapper;

private SingleWorkerOptions(
String identity,
Expand All @@ -157,7 +167,8 @@ private SingleWorkerOptions(
Scope metricsScope,
boolean enableLoggingInReplay,
List<ContextPropagator> contextPropagators,
Tracer tracer) {
Tracer tracer,
ExecutorWrapper executorWrapper) {
this.identity = identity;
this.dataConverter = dataConverter;
this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize;
Expand All @@ -167,6 +178,7 @@ private SingleWorkerOptions(
this.enableLoggingInReplay = enableLoggingInReplay;
this.contextPropagators = contextPropagators;
this.tracer = tracer;
this.executorWrapper = executorWrapper;
}

public String getIdentity() {
Expand Down Expand Up @@ -204,4 +216,8 @@ public List<ContextPropagator> getContextPropagators() {
public Tracer getTracer() {
return tracer;
}

public ExecutorWrapper getExecutorWrapper() {
return executorWrapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public void start() {
options.getIdentity()),
pollTaskExecutor,
options.getPollerOptions(),
options.getMetricsScope());
options.getMetricsScope(),
options.getExecutorWrapper());
poller.start();
setPoller(poller);
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/com/uber/cadence/worker/ExecutorWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Modifications Copyright (c) 2017-2021 Uber Technologies Inc.
* Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.uber.cadence.worker;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

public interface ExecutorWrapper {
ExecutorService wrap(ExecutorService delegate);

ThreadPoolExecutor wrap(ThreadPoolExecutor delegate);

ScheduledExecutorService wrap(ScheduledExecutorService delegate);

static ExecutorWrapper newDefaultInstance() {
return new ExecutorWrapper() {
@Override
public ExecutorService wrap(ExecutorService delegate) {
return delegate;
}

@Override
public ThreadPoolExecutor wrap(ThreadPoolExecutor delegate) {
return delegate;
}

@Override
public ScheduledExecutorService wrap(ScheduledExecutorService delegate) {
return delegate;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public ShadowingWorker(
.setTaskListActivitiesPerSecond(options.getTaskListActivitiesPerSecond())
.setPollerOptions(options.getActivityPollerOptions())
.setMetricsScope(metricsScope)
.setExecutorWrapper(testOptions.getWorkerFactoryOptions().getExecutorWrapper())
.build();
activityWorker =
new SyncActivityWorker(
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/uber/cadence/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public final class Worker implements Suspendable {
.setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
.setContextPropagators(contextPropagators)
.setTracer(options.getTracer())
.setExecutorWrapper(factoryOptions.getExecutorWrapper())
.build();
activityWorker =
new SyncActivityWorker(
Expand All @@ -117,6 +118,7 @@ public final class Worker implements Suspendable {
.setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
.setContextPropagators(contextPropagators)
.setTracer(options.getTracer())
.setExecutorWrapper(factoryOptions.getExecutorWrapper())
.build();
SingleWorkerOptions localActivityOptions =
SingleWorkerOptions.newBuilder()
Expand All @@ -128,6 +130,7 @@ public final class Worker implements Suspendable {
.setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
.setContextPropagators(contextPropagators)
.setTracer(options.getTracer())
.setExecutorWrapper(factoryOptions.getExecutorWrapper())
.build();
workflowWorker =
new SyncWorkflowWorker(
Expand Down
18 changes: 11 additions & 7 deletions src/main/java/com/uber/cadence/worker/WorkerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,15 @@ public WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factory
MoreObjects.firstNonNull(factoryOptions, WorkerFactoryOptions.defaultInstance());

workflowThreadPool =
new ThreadPoolExecutor(
0,
this.factoryOptions.getMaxWorkflowThreadCount(),
1,
TimeUnit.SECONDS,
new SynchronousQueue<>());
factoryOptions
.getExecutorWrapper()
.wrap(
new ThreadPoolExecutor(
0,
this.factoryOptions.getMaxWorkflowThreadCount(),
1,
TimeUnit.SECONDS,
new SynchronousQueue<>()));
workflowThreadPool.setThreadFactory(
r -> new Thread(r, "workflow-thread-" + workflowThreadCounter.incrementAndGet()));

Expand Down Expand Up @@ -136,7 +139,8 @@ public WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factory
.setPollThreadNamePrefix(POLL_THREAD_NAME)
.setPollThreadCount(this.factoryOptions.getStickyPollerCount())
.build(),
stickyScope);
stickyScope,
factoryOptions.getExecutorWrapper());
}

/**
Expand Down
Loading