Skip to content

Commit

Permalink
KEYCLOAK-5371 Use managed executors on Wildfly
Browse files Browse the repository at this point in the history
  • Loading branch information
mposolda committed Oct 11, 2017
1 parent f5ff24c commit 26f1107
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public static InfinispanNotificationsManager create(KeycloakSession session, Cac
}
}

ExecutorService listenersExecutor = session.getProvider(ExecutorsProvider.class).getExecutor("work-cache-event-listener");
ExecutorService listenersExecutor = workRemoteCache==null ? null : session.getProvider(ExecutorsProvider.class).getExecutor("work-cache-event-listener");
InfinispanNotificationsManager manager = new InfinispanNotificationsManager(workCache, workRemoteCache, myAddress, mySite, listenersExecutor);

// We need CacheEntryListener for communication within current DC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.naming.InitialContext;
import javax.naming.NameNotFoundException;
import javax.naming.NamingException;

import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.models.KeycloakSession;
Expand All @@ -43,8 +47,15 @@ public class DefaultExecutorsProviderFactory implements ExecutorsProviderFactory
private int DEFAULT_MIN_THREADS = 4;
private int DEFAULT_MAX_THREADS = 16;

private static final String MANAGED_EXECUTORS_SERVICE_JNDI_PREFIX = "java:jboss/ee/concurrency/executor/";

// Default executor is bound on Wildfly under this name
private static final String DEFAULT_MANAGED_EXECUTORS_SERVICE_JNDI = MANAGED_EXECUTORS_SERVICE_JNDI_PREFIX + "default";

private Config.Scope config;

private Boolean managed = null;

private final Map<String, ExecutorService> executors = new ConcurrentHashMap<>();


Expand Down Expand Up @@ -76,8 +87,11 @@ public void postInit(KeycloakSessionFactory factory) {

@Override
public void close() {
for (ExecutorService executor : executors.values()) {
executor.shutdown();
if (managed != null && !managed) {
for (Map.Entry<String, ExecutorService> executor : executors.entrySet()) {
logger.debugf("Shutting down executor for task '%s'", executor.getKey());
executor.getValue().shutdown();
}
}
}

Expand All @@ -95,17 +109,7 @@ protected ExecutorService getExecutor(String taskType, KeycloakSession session)
if (existing == null) {
synchronized (this) {
if (!executors.containsKey(taskType)) {
Config.Scope currentScope = config.scope(taskType);
int min = DEFAULT_MIN_THREADS;
int max = DEFAULT_MAX_THREADS;

if (currentScope != null) {
min = currentScope.getInt("min", DEFAULT_MIN_THREADS);
max = currentScope.getInt("max", DEFAULT_MAX_THREADS);
}

logger.debugf("Creating pool for task '%s': min=%d, max=%d", taskType, min, max);
ExecutorService executor = createPool(taskType, session, min, max);
ExecutorService executor = retrievePool(taskType, session);
executors.put(taskType, executor);
}

Expand All @@ -117,8 +121,82 @@ protected ExecutorService getExecutor(String taskType, KeycloakSession session)
}


protected ExecutorService createPool(String taskType, KeycloakSession session, int min, int max) {
ThreadFactory threadFactory = new ThreadFactory() {
protected ExecutorService retrievePool(String taskType, KeycloakSession session) {
if (managed == null) {
detectManaged();
}

if (managed) {
return getPoolManaged(taskType, session);
} else {
return createPoolEmbedded(taskType, session);
}
}

protected void detectManaged() {
String jndiName = MANAGED_EXECUTORS_SERVICE_JNDI_PREFIX + "default";
try {
new InitialContext().lookup(jndiName);
logger.debugf("We are in managed environment. Executor '%s' was available.", jndiName);
managed = true;
} catch (NamingException nnfe) {
logger.debugf("We are not in managed environment. Executor '%s' was not available.", jndiName);
managed = false;
}
}


protected ExecutorService getPoolManaged(String taskType, KeycloakSession session) {
try {
InitialContext ctx = new InitialContext();

// First check if specific pool for the task
String jndiName = MANAGED_EXECUTORS_SERVICE_JNDI_PREFIX + taskType;
try {
ExecutorService executor = (ExecutorService) ctx.lookup(jndiName);
logger.debugf("Found executor for '%s' under JNDI name '%s'", taskType, jndiName);
return executor;
} catch (NameNotFoundException nnfe) {
logger.debugf("Not found executor for '%s' under specific JNDI name '%s'. Fallback to the default pool", taskType, jndiName);

ExecutorService executor = (ExecutorService) ctx.lookup(DEFAULT_MANAGED_EXECUTORS_SERVICE_JNDI);
logger.debugf("Found default executor for '%s' of JNDI name '%s'", taskType, DEFAULT_MANAGED_EXECUTORS_SERVICE_JNDI);
return executor;
}
} catch (NamingException ne) {
throw new IllegalStateException(ne);
}
}


protected ExecutorService createPoolEmbedded(String taskType, KeycloakSession session) {
Config.Scope currentScope = config.scope(taskType);
int min = DEFAULT_MIN_THREADS;
int max = DEFAULT_MAX_THREADS;

if (currentScope != null) {
min = currentScope.getInt("min", DEFAULT_MIN_THREADS);
max = currentScope.getInt("max", DEFAULT_MAX_THREADS);
}

logger.debugf("Creating pool for task '%s': min=%d, max=%d", taskType, min, max);

ThreadFactory threadFactory = createThreadFactory(taskType, session);

if (min == max) {
return Executors.newFixedThreadPool(min, threadFactory);
} else {
// Same like Executors.newCachedThreadPool. Besides that "min" and "max" are configurable
return new ThreadPoolExecutor(min, max,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
}


protected ThreadFactory createThreadFactory(String taskType, KeycloakSession session) {
return new ThreadFactory() {

private AtomicInteger i = new AtomicInteger(0);
private int group = new Random().nextInt(2048);
Expand All @@ -136,16 +214,6 @@ public Thread newThread(Runnable r) {
}

};

if (min == max) {
return Executors.newFixedThreadPool(min, threadFactory);
} else {
// Same like Executors.newCachedThreadPool. Besides that "min" and "max" are configurable
return new ThreadPoolExecutor(min, max,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
}

}

0 comments on commit 26f1107

Please sign in to comment.