Skip to content

Commit dcf626a

Browse files
committed
First step moving to virutal threads as part of reducing thread contention
1 parent 07873a8 commit dcf626a

File tree

4 files changed

+142
-56
lines changed

4 files changed

+142
-56
lines changed

operator/src/main/java/oracle/kubernetes/operator/BaseMain.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import oracle.kubernetes.operator.work.Fiber.CompletionCallback;
4646
import oracle.kubernetes.operator.work.Packet;
4747
import oracle.kubernetes.operator.work.Step;
48-
import oracle.kubernetes.operator.work.ThreadFactorySingleton;
4948
import oracle.kubernetes.utils.SystemClock;
5049

5150
/** An abstract base main class for the operator and the webhook. */
@@ -60,7 +59,7 @@ public abstract class BaseMain {
6059
static final Container container = new Container();
6160
static final ThreadFactory threadFactory = new WrappedThreadFactory();
6261
static ScheduledExecutorService wrappedExecutorService =
63-
Engine.wrappedExecutorService("operator", container); // non-final to allow change in unit tests
62+
Engine.wrappedExecutorService(container); // non-final to allow change in unit tests
6463
static final AtomicReference<OffsetDateTime> lastFullRecheck =
6564
new AtomicReference<>(SystemClock.now());
6665
static final Semaphore shutdownSignal = new Semaphore(0);
@@ -276,7 +275,7 @@ static Packet createPacketWithLoggingContext(String ns) {
276275
}
277276

278277
private static class WrappedThreadFactory implements ThreadFactory {
279-
private final ThreadFactory delegate = ThreadFactorySingleton.getInstance();
278+
private final ThreadFactory delegate = Thread.ofVirtual().factory();
280279

281280
@Override
282281
public Thread newThread(@Nonnull Runnable r) {

operator/src/main/java/oracle/kubernetes/operator/helpers/ClientPool.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,8 @@
44
package oracle.kubernetes.operator.helpers;
55

66
import java.io.IOException;
7-
import java.util.concurrent.ExecutorService;
8-
import java.util.concurrent.SynchronousQueue;
7+
import java.util.concurrent.Executors;
98
import java.util.concurrent.ThreadFactory;
10-
import java.util.concurrent.ThreadPoolExecutor;
11-
import java.util.concurrent.TimeUnit;
129
import java.util.concurrent.atomic.AtomicBoolean;
1310
import java.util.concurrent.atomic.AtomicReference;
1411

@@ -116,22 +113,9 @@ public ApiClient get() {
116113
}
117114

118115
if (threadFactory != null) {
119-
ExecutorService exec =
120-
new ThreadPoolExecutor(
121-
0,
122-
Integer.MAX_VALUE,
123-
60,
124-
TimeUnit.SECONDS,
125-
new SynchronousQueue<>(),
126-
threadFactory) {
127-
@Override
128-
public void execute(Runnable command) {
129-
super.execute(wrapRunnable(command));
130-
}
131-
};
132116
OkHttpClient httpClient =
133117
client.getHttpClient().newBuilder().addInterceptor(new HeaderModifierInterceptor())
134-
.dispatcher(new Dispatcher(exec)).build();
118+
.dispatcher(new Dispatcher(Executors.newThreadPerTaskExecutor(threadFactory))).build();
135119
client.setHttpClient(httpClient);
136120
}
137121

Lines changed: 137 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,29 @@
1-
// Copyright (c) 2018, 2021, Oracle and/or its affiliates.
1+
// Copyright (c) 2018, 2023, Oracle and/or its affiliates.
22
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
33

44
package oracle.kubernetes.operator.work;
55

6+
import java.util.Collection;
7+
import java.util.List;
8+
import java.util.concurrent.Callable;
9+
import java.util.concurrent.ExecutionException;
610
import java.util.concurrent.Executor;
11+
import java.util.concurrent.ExecutorService;
12+
import java.util.concurrent.Executors;
13+
import java.util.concurrent.Future;
714
import java.util.concurrent.ScheduledExecutorService;
15+
import java.util.concurrent.ScheduledFuture;
816
import java.util.concurrent.ScheduledThreadPoolExecutor;
9-
import java.util.concurrent.ThreadFactory;
10-
import java.util.concurrent.atomic.AtomicInteger;
17+
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.TimeoutException;
1119
import java.util.concurrent.atomic.AtomicReference;
1220
import javax.annotation.Nonnull;
1321

1422
/**
1523
* Collection of {@link Fiber}s. Owns an {@link Executor} to run them.
1624
*/
1725
public class Engine {
18-
private static final int DEFAULT_THREAD_COUNT = 10;
26+
private static final int DEFAULT_THREAD_COUNT = 2;
1927
private final AtomicReference<ScheduledExecutorService> threadPool = new AtomicReference<>();
2028

2129
/**
@@ -29,30 +37,146 @@ public Engine(ScheduledExecutorService threadPool) {
2937

3038
/**
3139
* Creates engine with the specified id and default container and executor.
32-
*
33-
* @param id Engine id
3440
*/
35-
public Engine(String id) {
36-
this(wrappedExecutorService(id, ContainerResolver.getDefault().getContainer()));
41+
public Engine() {
42+
this(wrappedExecutorService(ContainerResolver.getDefault().getContainer()));
3743
}
3844

3945
/**
4046
* wrapped executor service.
41-
* @param id id
4247
* @param container container
4348
* @return executor service
4449
*/
45-
public static ScheduledExecutorService wrappedExecutorService(String id, Container container) {
46-
ScheduledThreadPoolExecutor threadPool =
47-
new ScheduledThreadPoolExecutor(DEFAULT_THREAD_COUNT, new DaemonThreadFactory(id));
50+
public static ScheduledExecutorService wrappedExecutorService(Container container) {
51+
ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(DEFAULT_THREAD_COUNT);
4852
threadPool.setRemoveOnCancelPolicy(true);
49-
return wrap(container, threadPool);
53+
return wrap(container, new VirtualScheduledExectuorService(threadPool));
5054
}
5155

5256
private static ScheduledExecutorService wrap(Container container, ScheduledExecutorService ex) {
5357
return container != null ? ContainerResolver.getDefault().wrapExecutor(container, ex) : ex;
5458
}
5559

60+
private static class VirtualScheduledExectuorService implements ScheduledExecutorService {
61+
private final ScheduledExecutorService service;
62+
private final ExecutorService virtualService = Executors.newVirtualThreadPerTaskExecutor();
63+
64+
public VirtualScheduledExectuorService(ScheduledExecutorService service) {
65+
this.service = service;
66+
}
67+
68+
private Runnable wrap(Runnable command) {
69+
return () -> virtualService.execute(command);
70+
}
71+
72+
@Nonnull
73+
@Override
74+
public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) {
75+
return service.schedule(wrap(command), delay, unit);
76+
}
77+
78+
@Nonnull
79+
@Override
80+
public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) {
81+
throw new UnsupportedOperationException();
82+
}
83+
84+
@Nonnull
85+
@Override
86+
public ScheduledFuture<?> scheduleAtFixedRate(
87+
@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) {
88+
return service.scheduleAtFixedRate(wrap(command), initialDelay, period, unit);
89+
}
90+
91+
@Nonnull
92+
@Override
93+
public ScheduledFuture<?> scheduleWithFixedDelay(
94+
@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) {
95+
return service.scheduleWithFixedDelay(wrap(command), initialDelay, delay, unit);
96+
}
97+
98+
@Override
99+
public void shutdown() {
100+
service.shutdown();
101+
}
102+
103+
@Nonnull
104+
@Override
105+
public List<Runnable> shutdownNow() {
106+
return service.shutdownNow();
107+
}
108+
109+
@Override
110+
public boolean isShutdown() {
111+
return service.isShutdown();
112+
}
113+
114+
@Override
115+
public boolean isTerminated() {
116+
return service.isTerminated();
117+
}
118+
119+
@Override
120+
public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
121+
return service.awaitTermination(timeout, unit);
122+
}
123+
124+
@Nonnull
125+
@Override
126+
public <T> Future<T> submit(@Nonnull Callable<T> task) {
127+
throw new UnsupportedOperationException();
128+
}
129+
130+
@Nonnull
131+
@Override
132+
public <T> Future<T> submit(@Nonnull Runnable task, T result) {
133+
return service.submit(wrap(task), result);
134+
}
135+
136+
@Nonnull
137+
@Override
138+
public Future<?> submit(@Nonnull Runnable task) {
139+
return service.submit(wrap(task));
140+
}
141+
142+
@Nonnull
143+
@Override
144+
public <T> List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> tasks) throws InterruptedException {
145+
throw new UnsupportedOperationException();
146+
}
147+
148+
@Nonnull
149+
@Override
150+
public <T> List<Future<T>> invokeAll(
151+
@Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit)
152+
throws InterruptedException {
153+
throw new UnsupportedOperationException();
154+
}
155+
156+
@Nonnull
157+
@Override
158+
public <T> T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks)
159+
throws InterruptedException, ExecutionException {
160+
throw new UnsupportedOperationException();
161+
}
162+
163+
@Override
164+
public <T> T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit)
165+
throws InterruptedException, ExecutionException, TimeoutException {
166+
throw new UnsupportedOperationException();
167+
}
168+
169+
@Override
170+
public void close() {
171+
service.close();
172+
}
173+
174+
@Override
175+
public void execute(@Nonnull Runnable command) {
176+
virtualService.execute(command);
177+
}
178+
}
179+
56180
/**
57181
* Returns the executor.
58182
*
@@ -81,25 +205,4 @@ public Fiber createFiber() {
81205
Fiber createChildFiber(Fiber parent) {
82206
return new Fiber(this, parent);
83207
}
84-
85-
private static class DaemonThreadFactory implements ThreadFactory {
86-
final AtomicInteger threadNumber = new AtomicInteger(1);
87-
final String namePrefix;
88-
89-
DaemonThreadFactory(String id) {
90-
namePrefix = "engine-" + id + "-thread-";
91-
}
92-
93-
public Thread newThread(@Nonnull Runnable r) {
94-
Thread t = new Thread(r);
95-
t.setName(namePrefix + threadNumber.getAndIncrement());
96-
if (!t.isDaemon()) {
97-
t.setDaemon(true);
98-
}
99-
if (t.getPriority() != Thread.NORM_PRIORITY) {
100-
t.setPriority(Thread.NORM_PRIORITY);
101-
}
102-
return t;
103-
}
104-
}
105208
}

operator/src/test/java/oracle/kubernetes/operator/DomainPresenceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,7 @@ public abstract static class DomainProcessorStub implements DomainProcessor {
619619
@NotNull
620620
private Map<String, FiberGate> createMakeRightFiberGateMap() {
621621
Map<String, FiberGate> map = new ConcurrentHashMap<>();
622-
map.put(NS, new TestFiberGate(new Engine("Test")));
622+
map.put(NS, new TestFiberGate(new Engine()));
623623
return map;
624624
}
625625

0 commit comments

Comments
 (0)