Skip to content

Commit b68add0

Browse files
authored
[WIP] Refactoring SequentialExecutorService (#4969)
* Refactoring SequentialExecutorService Step 1: - create a `CallbackExecutor` and `AutoExecutor` as subclasses of SequentialExecutor. * Adding CallbackExecutor.submit() for encapsulation * Moving resume into `CallbackExecutor` * create an abstract class for execute(key, deque) This removest the last bit of code that directly used the TaskCompleteAction enum.
1 parent 7732506 commit b68add0

File tree

1 file changed

+112
-124
lines changed

1 file changed

+112
-124
lines changed

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java

Lines changed: 112 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
import java.util.logging.Level;
3131
import java.util.logging.Logger;
3232

33+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
34+
3335
interface CancellableRunnable extends Runnable {
34-
public void cancel(Throwable e);
36+
void cancel(Throwable e);
3537
}
3638

3739
/**
@@ -42,108 +44,40 @@ interface CancellableRunnable extends Runnable {
4244
final class SequentialExecutorService<T> {
4345
private static final Logger logger = Logger.getLogger(SequentialExecutorService.class.getName());
4446

45-
private final SequentialExecutor manageableSequentialExecutor;
46-
private final SequentialExecutor autoSequentialExecutor;
47+
private final CallbackExecutor callbackExecutor;
48+
private final AutoExecutor autoExecutor;
4749

4850
SequentialExecutorService(Executor executor) {
49-
this.manageableSequentialExecutor =
50-
SequentialExecutor.newManageableSequentialExecutor(executor);
51-
this.autoSequentialExecutor = SequentialExecutor.newAutoSequentialExecutor(executor);
51+
this.callbackExecutor = new CallbackExecutor(executor);
52+
this.autoExecutor = new AutoExecutor(executor);
5253
}
5354

5455
/**
5556
* Runs asynchronous {@code Callable} tasks sequentially. If one of the tasks fails, other tasks
5657
* with the same key that have not been executed will be cancelled.
5758
*/
5859
ApiFuture<T> submit(final String key, final Callable<ApiFuture> callable) {
59-
final SettableApiFuture<T> future = SettableApiFuture.<T>create();
60-
manageableSequentialExecutor.execute(
61-
key,
62-
new CancellableRunnable() {
63-
private boolean cancelled = false;
64-
65-
@Override
66-
public void run() {
67-
if (cancelled) {
68-
return;
69-
}
70-
try {
71-
ApiFuture<T> callResult = callable.call();
72-
ApiFutures.addCallback(
73-
callResult,
74-
new ApiFutureCallback<T>() {
75-
@Override
76-
public void onSuccess(T msg) {
77-
future.set(msg);
78-
manageableSequentialExecutor.resume(key);
79-
}
80-
81-
@Override
82-
public void onFailure(Throwable e) {
83-
future.setException(e);
84-
manageableSequentialExecutor.cancelQueuedTasks(
85-
key,
86-
new CancellationException(
87-
"Execution cancelled because executing previous runnable failed."));
88-
}
89-
});
90-
} catch (Exception e) {
91-
future.setException(e);
92-
}
93-
}
94-
95-
@Override
96-
public void cancel(Throwable e) {
97-
this.cancelled = true;
98-
future.setException(e);
99-
}
100-
});
101-
return future;
60+
return callbackExecutor.submit(key, callable);
10261
}
10362

10463
/** Runs synchronous {@code Runnable} tasks sequentially. */
105-
void submit(final String key, final Runnable runnable) {
106-
autoSequentialExecutor.execute(key, runnable);
64+
void submit( String key, Runnable runnable) {
65+
autoExecutor.execute(key, runnable);
10766
}
10867

10968
/**
110-
* Internal implemenation of SequentialExecutorService. Takes a serial stream of string keys and
69+
* Internal implementation of SequentialExecutorService. Takes a serial stream of string keys and
11170
* {@code Runnable} tasks, and runs the tasks with the same key sequentially. Tasks with the same
11271
* key will be run only when its predecessor has been completed while tasks with different keys
11372
* can be run in parallel.
11473
*/
115-
static class SequentialExecutor {
74+
static abstract class SequentialExecutor {
11675
// Maps keys to tasks.
117-
private final Map<String, Deque<Runnable>> tasksByKey;
118-
private final Executor executor;
119-
120-
enum TaskCompleteAction {
121-
EXECUTE_NEXT_TASK,
122-
WAIT_UNTIL_RESUME,
123-
}
124-
125-
private TaskCompleteAction taskCompleteAction;
76+
protected final Map<String, Deque<Runnable>> tasksByKey;
77+
protected final Executor executor;
12678

127-
/**
128-
* Creates a AutoSequentialExecutor which executes the next queued task automatically when the
129-
* previous task has completed.
130-
*/
131-
static SequentialExecutor newAutoSequentialExecutor(Executor executor) {
132-
return new SequentialExecutor(executor, TaskCompleteAction.EXECUTE_NEXT_TASK);
133-
}
134-
135-
/**
136-
* Creates a ManageableSequentialExecutor which allows users to decide when to execute the next
137-
* queued task. The first queued task is executed immediately, but the following tasks will be
138-
* executed only when {@link #resume(String)} is called explicitly.
139-
*/
140-
static SequentialExecutor newManageableSequentialExecutor(Executor executor) {
141-
return new SequentialExecutor(executor, TaskCompleteAction.WAIT_UNTIL_RESUME);
142-
}
143-
144-
private SequentialExecutor(Executor executor, TaskCompleteAction taskCompleteAction) {
79+
private SequentialExecutor(Executor executor) {
14580
this.executor = executor;
146-
this.taskCompleteAction = taskCompleteAction;
14781
this.tasksByKey = new HashMap<>();
14882
}
14983

@@ -162,25 +96,11 @@ void execute(final String key, Runnable task) {
16296
tasksByKey.put(key, newTasks);
16397
}
16498

165-
final Deque<Runnable> finalTasks = newTasks;
166-
executor.execute(
167-
new Runnable() {
168-
@Override
169-
public void run() {
170-
switch (taskCompleteAction) {
171-
case EXECUTE_NEXT_TASK:
172-
invokeCallbackAndExecuteNext(key, finalTasks);
173-
break;
174-
case WAIT_UNTIL_RESUME:
175-
invokeCallback(finalTasks);
176-
break;
177-
default:
178-
// Nothing to do.
179-
}
180-
}
181-
});
99+
execute(key, newTasks);
182100
}
183101

102+
protected abstract void execute(String key, Deque<Runnable> finalTasks);
103+
184104
/** Cancels every task in the queue assoicated with {@code key}. */
185105
void cancelQueuedTasks(final String key, Throwable e) {
186106
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
@@ -203,59 +123,127 @@ void cancelQueuedTasks(final String key, Throwable e) {
203123
}
204124
}
205125

206-
/** Executes the next queued task associated with {@code key}. */
207-
void resume(final String key) {
208-
if (taskCompleteAction.equals(TaskCompleteAction.EXECUTE_NEXT_TASK)) {
209-
// resume() is no-op since tasks are executed automatically.
210-
return;
126+
protected void invokeCallback(final Deque<Runnable> tasks) {
127+
// TODO(kimkyung-goog): Check if there is a race when task list becomes empty.
128+
Runnable task = tasks.poll();
129+
if (task != null) {
130+
task.run();
211131
}
212-
Deque<Runnable> tasks;
132+
}
133+
134+
protected void invokeCallbackAndExecuteNext(final String key, final Deque<Runnable> tasks) {
135+
invokeCallback(tasks);
213136
synchronized (tasksByKey) {
214-
tasks = tasksByKey.get(key);
215-
if (tasks == null) {
216-
return;
217-
}
218137
if (tasks.isEmpty()) {
138+
// Note that there can be a race if a task is added to `tasks` at this point. However,
139+
// tasks.add() is called only inside the block synchronized by `tasksByKey` object
140+
// in the execute() function. Therefore, we are safe to remove `tasks` here. This is not
141+
// optimal, but correct.
219142
tasksByKey.remove(key);
220143
return;
221144
}
222145
}
223-
final Deque<Runnable> finalTasks = tasks;
224-
// Run the next task.
225146
executor.execute(
226147
new Runnable() {
227148
@Override
228149
public void run() {
229-
invokeCallback(finalTasks);
150+
invokeCallbackAndExecuteNext(key, tasks);
230151
}
231152
});
232153
}
154+
}
233155

234-
private void invokeCallback(final Deque<Runnable> tasks) {
235-
// TODO(kimkyung-goog): Check if there is a race when task list becomes empty.
236-
Runnable task = tasks.poll();
237-
if (task != null) {
238-
task.run();
239-
}
156+
private static class AutoExecutor extends SequentialExecutor {
157+
AutoExecutor(Executor executor) {
158+
super(executor);
240159
}
241160

242-
private void invokeCallbackAndExecuteNext(final String key, final Deque<Runnable> tasks) {
243-
invokeCallback(tasks);
161+
protected void execute(final String key, final Deque<Runnable> finalTasks) {
162+
executor.execute(new Runnable() {
163+
@Override public void run() {
164+
invokeCallbackAndExecuteNext(key, finalTasks);
165+
}
166+
});
167+
}
168+
}
169+
170+
private static class CallbackExecutor extends SequentialExecutor {
171+
CallbackExecutor(Executor executor) {
172+
super(executor);
173+
}
174+
175+
<T> ApiFuture<T> submit(final String key, final Callable<ApiFuture> callable) {
176+
final SettableApiFuture<T> future = SettableApiFuture.create();
177+
execute(
178+
key,
179+
new CancellableRunnable() {
180+
private boolean cancelled = false;
181+
182+
@Override
183+
public void run() {
184+
if (cancelled) {
185+
return;
186+
}
187+
try {
188+
ApiFuture<T> callResult = callable.call();
189+
ApiFutures.addCallback(callResult, new ApiFutureCallback<T>() {
190+
@Override
191+
public void onSuccess(T msg) {
192+
future.set(msg);
193+
resume(key);
194+
}
195+
196+
@Override
197+
public void onFailure(Throwable e) {
198+
future.setException(e);
199+
cancelQueuedTasks(
200+
key,
201+
new CancellationException(
202+
"Execution cancelled because executing previous runnable failed."));
203+
}
204+
}, directExecutor());
205+
} catch (Exception e) {
206+
future.setException(e);
207+
}
208+
}
209+
210+
@Override
211+
public void cancel(Throwable e) {
212+
this.cancelled = true;
213+
future.setException(e);
214+
}
215+
});
216+
return future;
217+
}
218+
219+
protected void execute(final String key, final Deque<Runnable> finalTasks) {
220+
executor.execute(new Runnable() {
221+
@Override public void run() {
222+
invokeCallback(finalTasks);
223+
}
224+
});
225+
}
226+
227+
/** Executes the next queued task associated with {@code key}. */
228+
void resume(final String key) {
229+
Deque<Runnable> tasks;
244230
synchronized (tasksByKey) {
231+
tasks = tasksByKey.get(key);
232+
if (tasks == null) {
233+
return;
234+
}
245235
if (tasks.isEmpty()) {
246-
// Note that there can be a race if a task is added to `tasks` at this point. However,
247-
// tasks.add() is called only inside the block synchronized by `tasksByKey` object
248-
// in the execute() function. Therefore, we are safe to remove `tasks` here. This is not
249-
// optimal, but correct.
250236
tasksByKey.remove(key);
251237
return;
252238
}
253239
}
240+
final Deque<Runnable> finalTasks = tasks;
241+
// Run the next task.
254242
executor.execute(
255243
new Runnable() {
256244
@Override
257245
public void run() {
258-
invokeCallbackAndExecuteNext(key, tasks);
246+
invokeCallback(finalTasks);
259247
}
260248
});
261249
}

0 commit comments

Comments
 (0)