Skip to content

Commit

Permalink
KEYCLOAK-5371 Fix SessionExpirationCrossDCTest, Added ExecutorsProvid…
Browse files Browse the repository at this point in the history
…er. Debug support for cache-servers in tests
  • Loading branch information
mposolda committed Oct 10, 2017
1 parent a5c1bf1 commit f5ff24c
Show file tree
Hide file tree
Showing 18 changed files with 787 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void lazyInit(KeycloakSession session) {
String myAddress = InfinispanUtil.getMyAddress(session);
String mySite = InfinispanUtil.getMySite(session);

notificationsManager = InfinispanNotificationsManager.create(workCache, myAddress, mySite, remoteStores);
notificationsManager = InfinispanNotificationsManager.create(session, workCache, myAddress, mySite, remoteStores);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

import org.infinispan.Cache;
Expand All @@ -48,6 +50,8 @@
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.ConcurrentMultivaluedHashMap;
import org.keycloak.executors.ExecutorsProvider;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;

/**
Expand All @@ -71,17 +75,20 @@ public class InfinispanNotificationsManager {

private final String mySite;

private final ExecutorService listenersExecutor;

protected InfinispanNotificationsManager(Cache<String, Serializable> workCache, RemoteCache workRemoteCache, String myAddress, String mySite) {

protected InfinispanNotificationsManager(Cache<String, Serializable> workCache, RemoteCache workRemoteCache, String myAddress, String mySite, ExecutorService listenersExecutor) {
this.workCache = workCache;
this.workRemoteCache = workRemoteCache;
this.myAddress = myAddress;
this.mySite = mySite;
this.listenersExecutor = listenersExecutor;
}


// Create and init manager including all listeners etc
public static InfinispanNotificationsManager create(Cache<String, Serializable> workCache, String myAddress, String mySite, Set<RemoteStore> remoteStores) {
public static InfinispanNotificationsManager create(KeycloakSession session, Cache<String, Serializable> workCache, String myAddress, String mySite, Set<RemoteStore> remoteStores) {
RemoteCache workRemoteCache = null;

if (!remoteStores.isEmpty()) {
Expand All @@ -93,7 +100,8 @@ public static InfinispanNotificationsManager create(Cache<String, Serializable>
}
}

InfinispanNotificationsManager manager = new InfinispanNotificationsManager(workCache, workRemoteCache, myAddress, mySite);
ExecutorService listenersExecutor = session.getProvider(ExecutorsProvider.class).getExecutor("work-cache-event-listener");
InfinispanNotificationsManager manager = new InfinispanNotificationsManager(workCache, workRemoteCache, myAddress, mySite, listenersExecutor);

// We need CacheEntryListener for communication within current DC
workCache.addListener(manager.new CacheEntryListener());
Expand Down Expand Up @@ -206,8 +214,20 @@ public void removed(ClientCacheEntryRemovedEvent event) {

private void hotrodEventReceived(String key) {
// TODO: Look at CacheEventConverter stuff to possibly include value in the event and avoid additional remoteCache request
Object value = workCache.get(key);
eventReceived(key, (Serializable) value);
try {
listenersExecutor.submit(() -> {

Object value = workCache.get(key);
eventReceived(key, (Serializable) value);

});
} catch (RejectedExecutionException ree) {
logger.warnf("Rejected submitting of the event for key: %s. Probably server going to shutdown", key);

if (logger.isDebugEnabled()) {
logger.debug(ree.getMessage(), ree);
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.keycloak.models.sessions.infinispan.remotestore;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
import org.infinispan.client.hotrod.event.ClientEvent;
import org.jboss.logging.Logger;
import org.keycloak.common.util.MultivaluedHashMap;

import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_CREATED;
import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_REMOVED;

/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class ClientListenerExecutorDecorator<K> {

private static final Logger logger = Logger.getLogger(ClientListenerExecutorDecorator.class);

private final Object lock = new Object();

private final ExecutorService decorated;

// Both "eventsInProgress" and "eventsQueue" maps are guarded by the "lock", so doesn't need to be concurrency safe

// Events currently submitted to the ExecutorService
private Map<K, MyClientEvent> eventsInProgress = new HashMap<>();

// Queue of the events waiting to process. We don't want events of same key to be processed concurrently
private MultivaluedHashMap<K, MyClientEventContext> eventsQueue = new MultivaluedHashMap<>();


public ClientListenerExecutorDecorator(ExecutorService decorated) {
this.decorated = decorated;
}


// Explicitly use 3 submit methods to ensure that different type of ClientEvent is not used

public void submit(ClientCacheEntryCreatedEvent<K> cacheEntryCreatedEvent, Runnable r) {
MyClientEvent event = convertIspnClientEvent(cacheEntryCreatedEvent);
submit(event, r);
}


public void submit(ClientCacheEntryModifiedEvent<K> cacheEntryModifiedEvent, Runnable r) {
MyClientEvent event = convertIspnClientEvent(cacheEntryModifiedEvent);
submit(event, r);
}


public void submit(ClientCacheEntryRemovedEvent<K> cacheEntryRemovedEvent, Runnable r) {
MyClientEvent event = convertIspnClientEvent(cacheEntryRemovedEvent);
submit(event, r);
}


// IMPL

private void submit(MyClientEvent event, Runnable r) {
K key = event.key;

synchronized (lock) {
if (!eventsInProgress.containsKey(key)) {
submitImpl(key, event, r);
} else {
putEventToTheQueue(key, event, r);
}
}
}


// Assume it's called from the synchronized block
private void submitImpl(K key, MyClientEvent event, Runnable r) {
logger.debugf("Submitting event to the executor: %s", event.toString());

eventsInProgress.put(key, event);

Runnable decoratedRunnable = () -> {
try {
r.run();
} finally {
synchronized (lock) {
logger.debugf("Finished processing event by the executor: %s", event.toString());
eventsInProgress.remove(key);

pollQueue(key);
}
}
};

decorated.submit(decoratedRunnable);
}


// Assume it's called from the synchronized block
private void pollQueue(K key) {
if (eventsQueue.containsKey(key)) {
List<MyClientEventContext> events = eventsQueue.get(key);

if (events.size() > 0) {
MyClientEventContext nextEvent = events.remove(0);

// Was last event in the queue for that key
if (events.size() == 0) {
eventsQueue.remove(key);
}

submitImpl(key, nextEvent.event, nextEvent.r);

} else {
// Shouldn't happen
throw new IllegalStateException("Illegal state. Size was 0 for key " + key);
}
}
}


// Assume it's called from the synchronized block
private void putEventToTheQueue(K key, MyClientEvent event, Runnable r) {
logger.debugf("Calling putEventToTheQueue: %s", event.toString());

if (!eventsQueue.containsKey(key)) {
eventsQueue.putSingle(key, new MyClientEventContext(event, r));
} else {

List<MyClientEventContext> existingEvents = eventsQueue.get(key);
MyClientEventContext myNewEvent = new MyClientEventContext(event, r);

// Try to optimize queue (EG. in case we have REMOVE event, we can ignore the previous CREATE or MODIFIED events)
switch (event.type) {
case CLIENT_CACHE_ENTRY_CREATED:
boolean add = true;
for (MyClientEventContext ctx : existingEvents) {
if (ctx.event.type == CLIENT_CACHE_ENTRY_REMOVED) {
// Ignore. TODO: Log me?
add = false;
break;
} else if (ctx.event.type == CLIENT_CACHE_ENTRY_CREATED) {
// Ignore. Already on the list
add = false;
break;
}
}

// Add to the beginning before the MODIFIED events
if (add) {
existingEvents.add(0, myNewEvent);
}
break;
case CLIENT_CACHE_ENTRY_MODIFIED:

boolean addd = true;
for (int i=0 ; i<existingEvents.size() ; i++) {
MyClientEventContext ctx = existingEvents.get(i);
if (ctx.event.type == CLIENT_CACHE_ENTRY_REMOVED) {
// Ignore.
addd = false;
break;
} else if (ctx.event.type == CLIENT_CACHE_ENTRY_CREATED) {
// Shift to the next element. CREATE event go first.
} else {
// Can ignore the previous MODIFY event if we have newer version
if (ctx.event.version < myNewEvent.event.version) {
existingEvents.remove(i);
} else {
addd = false;
}
}

if (addd) {
// Add to the end
existingEvents.add(myNewEvent);
}
}
break;

case CLIENT_CACHE_ENTRY_REMOVED:
// Can just ignore the other events in the queue in case of REMOVE
eventsQueue.putSingle(key, new MyClientEventContext(event, r));
break;
default:
throw new IllegalStateException("Unsupported event type: " + event.type);
}

}

logger.debugf("Event queued. Current events for the key '%s': %s", key.toString(), eventsQueue.getList(key));
}


public MyClientEvent convertIspnClientEvent(ClientEvent ispnClientEvent) {
if (ispnClientEvent instanceof ClientCacheEntryCreatedEvent) {
ClientCacheEntryCreatedEvent<K> ev = (ClientCacheEntryCreatedEvent<K>) ispnClientEvent;
return new MyClientEvent(ev.getKey(), ev.getVersion(), ev.getType());
} else if (ispnClientEvent instanceof ClientCacheEntryModifiedEvent) {
ClientCacheEntryModifiedEvent<K> ev = (ClientCacheEntryModifiedEvent<K>) ispnClientEvent;
return new MyClientEvent(ev.getKey(), ev.getVersion(), ev.getType());
} else if (ispnClientEvent instanceof ClientCacheEntryRemovedEvent) {
ClientCacheEntryRemovedEvent<K> ev = (ClientCacheEntryRemovedEvent<K>) ispnClientEvent;
return new MyClientEvent(ev.getKey(), -1l, ev.getType());
} else {
throw new IllegalStateException("Unsupported event type: " + ispnClientEvent.getType());
}
}


private class MyClientEventContext {
private final MyClientEvent event;
private final Runnable r;

private MyClientEventContext(MyClientEvent event, Runnable r) {
this.event = event;
this.r = r;
}

@Override
public String toString() {
return event.toString();
}
}


// Using separate class as ISPN ClientEvent type doesn't provide access to key and version :/
private class MyClientEvent {
private final K key;
private final long version;
private final ClientEvent.Type type;

private MyClientEvent(K key, long version, ClientEvent.Type type) {
this.key = key;
this.version = version;
this.type = type;
}


@Override
public String toString() {
return String.format("ClientEvent [ type=%s, key=%s, version=%d ]", type, key, version);
}
}

}


Loading

0 comments on commit f5ff24c

Please sign in to comment.