Skip to content

Commit ae6ca52

Browse files
authored
Merge pull request cdapio#99 from cdapio/feature/same-thread-event
Update the EventExecutor behavior
2 parents 4a07c2f + 0de0b6b commit ae6ca52

File tree

4 files changed

+587
-2
lines changed

4 files changed

+587
-2
lines changed

src/main/java/io/cdap/http/NettyHttpService.java

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package io.cdap.http;
1818

1919
import io.cdap.http.internal.BasicHandlerContext;
20+
import io.cdap.http.internal.ForwardingEventExecutorGroup;
21+
import io.cdap.http.internal.ForwardingOrderedEventExecutor;
2022
import io.cdap.http.internal.HttpDispatcher;
2123
import io.cdap.http.internal.HttpResourceHandler;
2224
import io.cdap.http.internal.NonStickyEventExecutorGroup;
@@ -37,9 +39,11 @@
3739
import io.netty.handler.codec.http.HttpServerKeepAliveHandler;
3840
import io.netty.handler.ssl.SslHandler;
3941
import io.netty.handler.stream.ChunkedWriteHandler;
42+
import io.netty.util.concurrent.EventExecutor;
4043
import io.netty.util.concurrent.EventExecutorGroup;
4144
import io.netty.util.concurrent.Future;
4245
import io.netty.util.concurrent.ImmediateEventExecutor;
46+
import io.netty.util.concurrent.OrderedEventExecutor;
4347
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
4448
import org.slf4j.Logger;
4549
import org.slf4j.LoggerFactory;
@@ -49,6 +53,7 @@
4953
import java.util.Arrays;
5054
import java.util.Collections;
5155
import java.util.HashMap;
56+
import java.util.Iterator;
5257
import java.util.List;
5358
import java.util.Map;
5459
import java.util.concurrent.RejectedExecutionHandler;
@@ -283,8 +288,9 @@ private EventExecutorGroup createEventExecutorGroup(int size) {
283288
return null;
284289
}
285290

291+
ThreadGroup threadGroup = new ThreadGroup(serviceName + "-executor-thread");
292+
286293
ThreadFactory threadFactory = new ThreadFactory() {
287-
private final ThreadGroup threadGroup = new ThreadGroup(serviceName + "-executor-thread");
288294
private final AtomicLong count = new AtomicLong(0);
289295

290296
@Override
@@ -301,7 +307,68 @@ public Thread newThread(Runnable r) {
301307
executor.setKeepAliveTime(execThreadKeepAliveSecs, TimeUnit.SECONDS);
302308
executor.allowCoreThreadTimeOut(true);
303309
}
304-
return new NonStickyEventExecutorGroup(executor);
310+
311+
// Returns a EventExecutorGroup that overrides the inEventLoop behavior of the EventExecutor returned by the group.
312+
// The inEventLoop will return true if it is calling from a thread in the same thread group, hence was created
313+
// from the same executor.
314+
// This is to make sure channel events coming from a thread owned by the EventExecutor
315+
// will be executed in the same thread instead of submitting a new task.
316+
return new ForwardingEventExecutorGroup(new NonStickyEventExecutorGroup(executor)) {
317+
318+
private final EventExecutorGroup group = this;
319+
320+
@Override
321+
public EventExecutor next() {
322+
return wrapEventExecutor(super.next());
323+
}
324+
325+
@Override
326+
public Iterator<EventExecutor> iterator() {
327+
Iterator<EventExecutor> iterator = super.iterator();
328+
329+
return new Iterator<EventExecutor>() {
330+
@Override
331+
public boolean hasNext() {
332+
return iterator.hasNext();
333+
}
334+
335+
@Override
336+
public EventExecutor next() {
337+
return wrapEventExecutor(iterator.next());
338+
}
339+
340+
@Override
341+
public void remove() {
342+
iterator.remove();
343+
}
344+
};
345+
}
346+
347+
private EventExecutor wrapEventExecutor(EventExecutor executor) {
348+
if (!(executor instanceof OrderedEventExecutor)) {
349+
// This should never happen since we use the NonStickyEventExecutorGroup above.
350+
throw new IllegalStateException("The executor is not an OrderedEventExecutor: " + executor.getClass());
351+
}
352+
353+
return new ForwardingOrderedEventExecutor((OrderedEventExecutor) executor) {
354+
355+
@Override
356+
public EventExecutorGroup parent() {
357+
return group;
358+
}
359+
360+
@Override
361+
public boolean inEventLoop() {
362+
return inEventLoop(Thread.currentThread());
363+
}
364+
365+
@Override
366+
public boolean inEventLoop(Thread thread) {
367+
return threadGroup == thread.getThreadGroup();
368+
}
369+
};
370+
}
371+
};
305372
}
306373

307374
/**
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright © 2021 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.http.internal;
18+
19+
import io.netty.util.concurrent.EventExecutor;
20+
import io.netty.util.concurrent.EventExecutorGroup;
21+
import io.netty.util.concurrent.Future;
22+
import io.netty.util.concurrent.ScheduledFuture;
23+
24+
import java.util.Collection;
25+
import java.util.Iterator;
26+
import java.util.List;
27+
import java.util.concurrent.Callable;
28+
import java.util.concurrent.ExecutionException;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.TimeoutException;
31+
32+
/**
33+
* A {@link EventExecutorGroup} that forwards all methods to another {@link EventExecutorGroup}.
34+
*/
35+
public class ForwardingEventExecutorGroup implements EventExecutorGroup {
36+
37+
private final EventExecutorGroup delegate;
38+
39+
public ForwardingEventExecutorGroup(EventExecutorGroup delegate) {
40+
this.delegate = delegate;
41+
}
42+
43+
@Override
44+
public boolean isShuttingDown() {
45+
return delegate.isShuttingDown();
46+
}
47+
48+
@Override
49+
public Future<?> shutdownGracefully() {
50+
return delegate.shutdownGracefully();
51+
}
52+
53+
@Override
54+
public Future<?> shutdownGracefully(long l, long l1, TimeUnit timeUnit) {
55+
return delegate.shutdownGracefully(l, l1, timeUnit);
56+
}
57+
58+
@Override
59+
public Future<?> terminationFuture() {
60+
return delegate.terminationFuture();
61+
}
62+
63+
@Override
64+
@Deprecated
65+
public void shutdown() {
66+
delegate.shutdown();
67+
}
68+
69+
@Override
70+
@Deprecated
71+
public List<Runnable> shutdownNow() {
72+
return delegate.shutdownNow();
73+
}
74+
75+
@Override
76+
public EventExecutor next() {
77+
return delegate.next();
78+
}
79+
80+
@Override
81+
public Iterator<EventExecutor> iterator() {
82+
return delegate.iterator();
83+
}
84+
85+
@Override
86+
public Future<?> submit(Runnable runnable) {
87+
return delegate.submit(runnable);
88+
}
89+
90+
@Override
91+
public <T> Future<T> submit(Runnable runnable, T t) {
92+
return delegate.submit(runnable, t);
93+
}
94+
95+
@Override
96+
public <T> Future<T> submit(Callable<T> callable) {
97+
return delegate.submit(callable);
98+
}
99+
100+
@Override
101+
public ScheduledFuture<?> schedule(Runnable runnable, long l, TimeUnit timeUnit) {
102+
return delegate.schedule(runnable, l, timeUnit);
103+
}
104+
105+
@Override
106+
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long l, TimeUnit timeUnit) {
107+
return delegate.schedule(callable, l, timeUnit);
108+
}
109+
110+
@Override
111+
public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long l, long l1, TimeUnit timeUnit) {
112+
return delegate.scheduleAtFixedRate(runnable, l, l1, timeUnit);
113+
}
114+
115+
@Override
116+
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long l, long l1, TimeUnit timeUnit) {
117+
return delegate.scheduleWithFixedDelay(runnable, l, l1, timeUnit);
118+
}
119+
120+
@Override
121+
public boolean isShutdown() {
122+
return delegate.isShutdown();
123+
}
124+
125+
@Override
126+
public boolean isTerminated() {
127+
return delegate.isTerminated();
128+
}
129+
130+
@Override
131+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
132+
return delegate.awaitTermination(timeout, unit);
133+
}
134+
135+
@Override
136+
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
137+
throws InterruptedException {
138+
return delegate.invokeAll(tasks);
139+
}
140+
141+
@Override
142+
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
143+
long timeout, TimeUnit unit) throws InterruptedException {
144+
return delegate.invokeAll(tasks, timeout, unit);
145+
}
146+
147+
@Override
148+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
149+
return delegate.invokeAny(tasks);
150+
}
151+
152+
@Override
153+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
154+
throws InterruptedException, ExecutionException, TimeoutException {
155+
return delegate.invokeAny(tasks, timeout, unit);
156+
}
157+
158+
@Override
159+
public void execute(Runnable command) {
160+
delegate.execute(command);
161+
}
162+
}

0 commit comments

Comments
 (0)