Skip to content

Commit

Permalink
KEYCLOAK-5618 Fix SessionsPreloadCrossDCTest. Update HOW-TO-RUN docs.…
Browse files Browse the repository at this point in the history
… Ensure it's executed in travis.
  • Loading branch information
mposolda committed Nov 9, 2017
1 parent 128ff12 commit a98f085
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 97 deletions.
4 changes: 3 additions & 1 deletion misc/CrossDataCenter.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ Infinispan Server setup
<transaction mode="NON_DURABLE_XA" locking="PESSIMISTIC"/>
<locking acquire-timeout="0" />
<backups>
<backup site="site2" failure-policy="FAIL" strategy="SYNC" enabled="true"/>
<backup site="site2" failure-policy="FAIL" strategy="SYNC" enabled="true">
<take-offline min-wait="60000" after-failures="3" />
</backup>
</backups>
</replicated-cache-configuration>

Expand Down
2 changes: 1 addition & 1 deletion misc/Testsuite.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ But additionally you can enable Kerberos authentication in LDAP provider with th
* KeyTab: $KEYCLOAK_SOURCES/testsuite/integration-arquillian/tests/base/src/test/resources/kerberos/http.keytab (Replace $KEYCLOAK_SOURCES with correct absolute path of your sources)

Once you do this, you should also ensure that your Kerberos client configuration file is properly configured with KEYCLOAK.ORG domain.
See [../testsuite/integration-arquillian/src/test/resources/kerberos/test-krb5.conf](../testsuite/integration-arquillian/src/test/resources/kerberos/test-krb5.conf) for inspiration. The location of Kerberos configuration file
See [../testsuite/integration-arquillian/tests/base/src/test/resources/kerberos/test-krb5.conf](../testsuite/integration-arquillian/tests/base/src/test/resources/kerberos/test-krb5.conf) for inspiration. The location of Kerberos configuration file
is platform dependent (In linux it's file `/etc/krb5.conf` )

Then you need to configure your browser to allow SPNEGO/Kerberos login from `localhost` .
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ExecutionResult;
import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;

import java.util.concurrent.Callable;
Expand Down Expand Up @@ -140,7 +141,7 @@ private LockEntry createLockEntry() {
private boolean tryLock(String cacheKey, int taskTimeoutInSeconds) {
LockEntry myLock = createLockEntry();

LockEntry existingLock = (LockEntry) crossDCAwareCacheFactory.getCache().putIfAbsent(cacheKey, myLock, taskTimeoutInSeconds, TimeUnit.SECONDS);
LockEntry existingLock = InfinispanClusterProviderFactory.putIfAbsentWithRetries(crossDCAwareCacheFactory, cacheKey, myLock, taskTimeoutInSeconds);
if (existingLock != null) {
if (logger.isTraceEnabled()) {
logger.tracef("Task %s in progress already by node %s. Ignoring task.", cacheKey, existingLock.getNode());
Expand All @@ -156,22 +157,15 @@ private boolean tryLock(String cacheKey, int taskTimeoutInSeconds) {


private void removeFromCache(String cacheKey) {
// 3 attempts to send the message (it may fail if some node fails in the meantime)
int retry = 3;
while (true) {
try {
crossDCAwareCacheFactory.getCache().remove(cacheKey);
if (logger.isTraceEnabled()) {
logger.tracef("Task %s removed from the cache", cacheKey);
}
return;
} catch (RuntimeException e) {
retry--;
if (retry == 0) {
throw e;
}
// More attempts to send the message (it may fail if some node fails in the meantime)
Retry.executeWithBackoff((int iteration) -> {

crossDCAwareCacheFactory.getCache().remove(cacheKey);
if (logger.isTraceEnabled()) {
logger.tracef("Task %s removed from the cache", cacheKey);
}
}

}, 10, 10);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.keycloak.cluster.infinispan;

import org.infinispan.Cache;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
Expand All @@ -29,6 +30,7 @@
import org.keycloak.Config;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ClusterProviderFactory;
import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.KeycloakSession;
Expand All @@ -42,6 +44,8 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -111,7 +115,7 @@ protected int initClusterStartupTime(KeycloakSession session) {
// clusterStartTime not yet initialized. Let's try to put our startupTime
int serverStartTime = (int) (session.getKeycloakSessionFactory().getServerStartupTimestamp() / 1000);

existingClusterStartTime = (Integer) crossDCAwareCacheFactory.getCache().putIfAbsent(InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY, serverStartTime);
existingClusterStartTime = putIfAbsentWithRetries(crossDCAwareCacheFactory, InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY, serverStartTime, -1);
if (existingClusterStartTime == null) {
logger.debugf("Initialized cluster startup time to %s", Time.toDate(serverStartTime).toString());
return serverStartTime;
Expand All @@ -123,6 +127,35 @@ protected int initClusterStartupTime(KeycloakSession session) {
}


// Will retry few times for the case when backup site not available in cross-dc environment.
// The site might be taken offline automatically if "take-offline" properly configured
static <V extends Serializable> V putIfAbsentWithRetries(CrossDCAwareCacheFactory crossDCAwareCacheFactory, String key, V value, int taskTimeoutInSeconds) {
AtomicReference<V> resultRef = new AtomicReference<>();

Retry.executeWithBackoff((int iteration) -> {

try {
V result;
if (taskTimeoutInSeconds > 0) {
result = (V) crossDCAwareCacheFactory.getCache().putIfAbsent(key, value);
} else {
result = (V) crossDCAwareCacheFactory.getCache().putIfAbsent(key, value, taskTimeoutInSeconds, TimeUnit.SECONDS);
}
resultRef.set(result);

} catch (HotRodClientException re) {
logger.warnf(re, "Failed to write key '%s' and value '%s' in iteration '%d' . Retrying", key, value, iteration);

// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
throw re;
}

}, 10, 10);

return resultRef.get();
}



@Override
public void init(Config.Scope config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private void loadSessionsFromRemoteCaches(KeycloakSession session) {


private void loadSessionsFromRemoteCache(final KeycloakSessionFactory sessionFactory, String cacheName, final int sessionsPerSegment, final int maxErrors) {
log.debugf("Check pre-loading userSessions from remote cache '%s'", cacheName);
log.debugf("Check pre-loading sessions from remote cache '%s'", cacheName);

KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {

Expand All @@ -293,7 +293,7 @@ public void run(KeycloakSession session) {

});

log.debugf("Pre-loading userSessions from remote cache '%s' finished", cacheName);
log.debugf("Pre-loading sessions from remote cache '%s' finished", cacheName);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.keycloak.models.sessions.infinispan.initializer;

import org.infinispan.Cache;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.context.Flag;
import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.Retry;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.session.UserSessionPersisterProvider;

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
Expand Down Expand Up @@ -101,10 +104,24 @@ public boolean isFinished(BaseCacheInitializer initializer) {
public void afterAllSessionsLoaded(BaseCacheInitializer initializer) {
Cache<String, Serializable> workCache = initializer.getWorkCache();

// Cross-DC aware flag
workCache
.getAdvancedCache().withFlags(Flag.SKIP_REMOTE_LOOKUP)
.put(PERSISTENT_SESSIONS_LOADED, true);
// Will retry few times for the case when backup site not available in cross-dc environment.
// The site might be taken offline automatically if "take-offline" properly configured
Retry.executeWithBackoff((int iteration) -> {

try {
// Cross-DC aware flag
workCache
.getAdvancedCache().withFlags(Flag.SKIP_REMOTE_LOOKUP)
.put(PERSISTENT_SESSIONS_LOADED, true);

} catch (HotRodClientException re) {
log.warnf(re, "Failed to write flag PERSISTENT_SESSIONS_LOADED in iteration '%d' . Retrying", iteration);

// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
throw re;
}

}, 10, 10);

// Just local-DC aware flag
workCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public boolean isFinished(BaseCacheInitializer initializer) {
.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE)
.get(OfflinePersistentUserSessionLoader.PERSISTENT_SESSIONS_LOADED_IN_CURRENT_DC);

if (cacheName.equals(InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME) && sessionsLoaded != null && sessionsLoaded) {
if ((cacheName.equals(InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME) || (cacheName.equals(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME)))
&& sessionsLoaded != null && sessionsLoaded) {
log.debugf("Sessions already loaded in current DC. Skip sessions loading from remote cache '%s'", cacheName);
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.keycloak.cluster.infinispan;

import java.util.concurrent.atomic.AtomicInteger;

import org.infinispan.Cache;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.context.Flag;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;

/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class ConcurrencyJDGOfflineBackupsTest {

protected static final Logger logger = Logger.getLogger(ConcurrencyJDGOfflineBackupsTest.class);

public static void main(String[] args) throws Exception {

Cache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createManager(1).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);

try {
// Create initial item
UserSessionEntity session = new UserSessionEntity();
session.setId("123");
session.setRealmId("foo");
session.setBrokerSessionId("!23123123");
session.setBrokerUserId(null);
session.setUser("foo");
session.setLoginUsername("foo");
session.setIpAddress("123.44.143.178");
session.setStarted(Time.currentTime());
session.setLastSessionRefresh(Time.currentTime());

// AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity();
// clientSession.setAuthMethod("saml");
// clientSession.setAction("something");
// clientSession.setTimestamp(1234);
// clientSession.setProtocolMappers(new HashSet<>(Arrays.asList("mapper1", "mapper2")));
// clientSession.setRoles(new HashSet<>(Arrays.asList("role1", "role2")));
// session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId());

SessionEntityWrapper<UserSessionEntity> wrappedSession = new SessionEntityWrapper<>(session);

// Some dummy testing of remoteStore behaviour
logger.info("Before put");


AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger errorsCount = new AtomicInteger(0);
for (int i=0 ; i<100 ; i++) {
try {
cache1
.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL) // will still invoke remoteStore . Just doesn't propagate to cluster
.put("123", wrappedSession);
successCount.incrementAndGet();
Thread.sleep(1000);
logger.infof("Success in the iteration: %d", i);
} catch (HotRodClientException hrce) {
logger.errorf("Failed to put the item in the iteration: %d ", i);
errorsCount.incrementAndGet();
}
}

logger.infof("SuccessCount: %d, ErrorsCount: %d", successCount.get(), errorsCount.get());

// logger.info("After put");
//
// cache1.replace("123", wrappedSession);
//
// logger.info("After replace");
//
// cache1.get("123");
//
// logger.info("After cache1.get");

// cache2.get("123");
//
// logger.info("After cache2.get");

} finally {
// Finish JVM
cache1.getCacheManager().stop();
}

}

private static EmbeddedCacheManager createManager(int threadId) {
return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
}

}
Loading

0 comments on commit a98f085

Please sign in to comment.