Skip to content

Commit

Permalink
added limit on how long client pool can be exhausted
Browse files Browse the repository at this point in the history
  • Loading branch information
shaunkalley authored and Shaun Kalley committed Sep 6, 2013
1 parent eaf1e78 commit 024a32d
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ public boolean removeCassandraHost(CassandraHost cassandraHost) {
boolean removed = getHosts().contains(cassandraHost);
String message;
if ( removed ) {
HClientPool pool = hostPools.remove(cassandraHost);
message = "Removed from hostPools";
HClientPool pool = hostPools.remove(cassandraHost);
message = "Removed from hostPools";
if ( pool == null ) {
log.info("removeCassandraHost looking for host {} in suspendedHostPools", cassandraHost);
pool = suspendedHostPools.remove(cassandraHost);
Expand All @@ -145,12 +145,12 @@ public boolean removeCassandraHost(CassandraHost cassandraHost) {
log.info("removeCassandraHost attempt miss for CassandraHost {} May have been beaten by another thread?", cassandraHost);
}
} else if ( cassandraHostRetryService != null && cassandraHostRetryService.contains(cassandraHost)) {
log.info("Host {} not in active pools, but found in retry service.", cassandraHost);
removed = cassandraHostRetryService.remove(cassandraHost);
message = "Removed from Downed hosts";
log.info("Host {} not in active pools, but found in retry service.", cassandraHost);
removed = cassandraHostRetryService.remove(cassandraHost);
message = "Removed from Downed hosts";
} else {
message = "Host not found";
log.info("Remove requested on a host that was not found in active or disabled pools: {}", cassandraHost);
message = "Host not found";
log.info("Remove requested on a host that was not found in active or disabled pools: {}", cassandraHost);
}
log.info("Remove status for CassandraHost pool {} was {}", cassandraHost, removed);
listenerHandler.fireOnRemoveHost(cassandraHost, removed, message);
Expand Down Expand Up @@ -211,7 +211,7 @@ public Set<CassandraHost> getHosts() {
public List<String> getStatusPerPool() {
List<String> stats = new ArrayList<String>();
for (HClientPool clientPool : hostPools.values()) {
stats.add(clientPool.getStatusAsString());
stats.add(clientPool.getStatusAsString());
}
return stats;
}
Expand Down Expand Up @@ -272,12 +272,9 @@ public void operateWithFailover(Operation<?> op) throws HectorException {
// TODO timecheck on how long we've been waiting on timeouts here
// suggestion per user moores on hector-users
} else if (he instanceof HPoolExhaustedException) {
if (pool.getExhaustedTime() >= pool.getCassandraHost().getMaxExhaustedTimeBeforeSuspending()) {
if (suspendCassandraHost(pool.getCassandraHost())) {
log.warn("Client pool for {} was exhausted for {} ms and was suspended", pool.getCassandraHost(), pool.getExhaustedTime());
} else {
log.warn("Client pool for {} was exhausted for {} ms but could not be suspended", pool.getCassandraHost(), pool.getExhaustedTime());
}
if (pool.getExhaustedTime() >= pool.getCassandraHost().getMaxExhaustedTimeBeforeMarkingAsDown()) {
markHostAsDown(pool.getCassandraHost());
log.warn("Client pool for {} was exhausted for {} ms and was marked as down", pool.getCassandraHost(), pool.getExhaustedTime());
}
if (hostPools.isEmpty()) {
throw he;
Expand Down Expand Up @@ -338,22 +335,22 @@ public void setTimer(HOpTimer timer) {
* @param listener - a {@link me.prettyprint.cassandra.connection.ConnectionManagerListener} listener
*/
public void addListener(String listenerName, ConnectionManagerListener listener){
listenerHandler.put(listenerName, listener);
listenerHandler.put(listenerName, listener);
}

/**
* removes a listener from the connectionManager
* @param listenerName - the name of the listener to remove
*/
public void removeListener(String listenerName){
listenerHandler.remove(listenerName);
listenerHandler.remove(listenerName);
}

/**
* removes all listeners from the connectionManager
*/
public void removeAllListeners(){
listenerHandler.clear();
listenerHandler.clear();
}

/**
Expand All @@ -370,22 +367,22 @@ private void doTimeoutCheck(CassandraHost cassandraHost) {
}

/**
* Sleeps for the specified time as determined by sleepBetweenHostsMilli.
* In many cases failing over to other hosts is done b/c the cluster is too busy, so the sleep b/w
* hosts may help reduce load on the cluster.
*/
private void sleepBetweenHostSkips(FailoverPolicy failoverPolicy) {
if (failoverPolicy.sleepBetweenHostsMilli > 0) {
if ( log.isDebugEnabled() ) {
log.debug("Will sleep for {} millisec", failoverPolicy.sleepBetweenHostsMilli);
}
try {
Thread.sleep(failoverPolicy.sleepBetweenHostsMilli);
} catch (InterruptedException e) {
log.warn("Sleep between hosts interrupted", e);
}
* Sleeps for the specified time as determined by sleepBetweenHostsMilli.
* In many cases failing over to other hosts is done b/c the cluster is too busy, so the sleep b/w
* hosts may help reduce load on the cluster.
*/
private void sleepBetweenHostSkips(FailoverPolicy failoverPolicy) {
if (failoverPolicy.sleepBetweenHostsMilli > 0) {
if ( log.isDebugEnabled() ) {
log.debug("Will sleep for {} millisec", failoverPolicy.sleepBetweenHostsMilli);
}
try {
Thread.sleep(failoverPolicy.sleepBetweenHostsMilli);
} catch (InterruptedException e) {
log.warn("Sleep between hosts interrupted", e);
}
}
}

private HClientPool getClientFromLBPolicy(Set<CassandraHost> excludeHosts) {
if ( hostPools.isEmpty() ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private void configure(Reference resourceRef) throws Exception {
// optional
RefAddr maxActiveRefAddr = resourceRef.get("maxActive");
RefAddr maxWaitTimeWhenExhausted = resourceRef.get("maxWaitTimeWhenExhausted");
RefAddr maxExhaustedTimeBeforeSuspending = resourceRef.get("maxExhaustedTimeBeforeSuspending");
RefAddr maxExhaustedTimeBeforeMarkingAsDown = resourceRef.get("maxExhaustedTimeBeforeMarkingAsDown");
RefAddr autoDiscoverHosts = resourceRef.get("autoDiscoverHosts");
RefAddr runAutoDiscoverAtStartup = resourceRef.get("runAutoDiscoveryAtStartup");
RefAddr retryDownedHostDelayInSeconds = resourceRef.get("retryDownedHostDelayInSeconds");
Expand All @@ -120,8 +120,8 @@ private void configure(Reference resourceRef) throws Exception {
cassandraHostConfigurator.setMaxActive(Integer.parseInt((String)maxActiveRefAddr.getContent()));
if ( maxWaitTimeWhenExhausted != null )
cassandraHostConfigurator.setMaxWaitTimeWhenExhausted(Integer.parseInt((String)maxWaitTimeWhenExhausted.getContent()));
if (maxExhaustedTimeBeforeSuspending != null) {
cassandraHostConfigurator.setMaxExhaustedTimeBeforeSuspending(Integer.parseInt((String) maxExhaustedTimeBeforeSuspending.getContent()));
if (maxExhaustedTimeBeforeMarkingAsDown != null) {
cassandraHostConfigurator.setMaxExhaustedTimeBeforeMarkingAsDown(Integer.parseInt((String) maxExhaustedTimeBeforeMarkingAsDown.getContent()));
}

if ( log.isDebugEnabled() )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class CassandraHost {
* The default max exhausted time before suspending. Default value is set to
* maximum so that it won't suspend.
*/
public static final long DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_SUSPENDING = Long.MAX_VALUE;
public static final long DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_MARKING_AS_DOWN = Long.MAX_VALUE;

public static final boolean DEFAULT_LIFO = true;
/**
Expand All @@ -66,7 +66,7 @@ public final class CassandraHost {
private boolean lifo = DEFAULT_LIFO;

private long maxWaitTimeWhenExhausted = DEFAULT_MAX_WAITTIME_WHEN_EXHAUSTED;
private long maxExhaustedTimeBeforeSuspending = DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_SUSPENDING;
private long maxExhaustedTimeBeforeMarkingAsDown = DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_MARKING_AS_DOWN;
private int cassandraThriftSocketTimeout;
private boolean useThriftFramedTransport = DEFAULT_USE_FRAMED_THRIFT_TRANSPORT;
private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
Expand Down Expand Up @@ -175,12 +175,12 @@ public void setMaxWaitTimeWhenExhausted(long maxWaitTimeWhenExhausted) {
this.maxWaitTimeWhenExhausted = maxWaitTimeWhenExhausted;
}

public long getMaxExhaustedTimeBeforeSuspending() {
return maxExhaustedTimeBeforeSuspending;
public long getMaxExhaustedTimeBeforeMarkingAsDown() {
return maxExhaustedTimeBeforeMarkingAsDown;
}

public void setMaxExhaustedTimeBeforeSuspending(long maxExhaustedTimeBeforeSuspending) {
this.maxExhaustedTimeBeforeSuspending = maxExhaustedTimeBeforeSuspending;
public void setMaxExhaustedTimeBeforeMarkingAsDown(long maxExhaustedTimeBeforeMarkingAsDown) {
this.maxExhaustedTimeBeforeMarkingAsDown = maxExhaustedTimeBeforeMarkingAsDown;
}

public int getCassandraThriftSocketTimeout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public final class CassandraHostConfigurator implements Serializable {
private int maxActive = CassandraHost.DEFAULT_MAX_ACTIVE;
private boolean lifo = CassandraHost.DEFAULT_LIFO;
private long maxWaitTimeWhenExhausted = CassandraHost.DEFAULT_MAX_WAITTIME_WHEN_EXHAUSTED;
private long maxExhaustedTimeBeforeSuspending = CassandraHost.DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_SUSPENDING;
private long maxExhaustedTimeBeforeMarkingAsDown = CassandraHost.DEFAULT_MAX_EXHAUSTED_TIME_BEFORE_MARKING_AS_DOWN;
private int cassandraThriftSocketTimeout;
private boolean useThriftFramedTransport = CassandraHost.DEFAULT_USE_FRAMED_THRIFT_TRANSPORT;
private int maxFrameSize = CassandraHost.DEFAULT_MAX_FRAME_SIZE;
Expand Down Expand Up @@ -83,7 +83,7 @@ public void applyConfig(CassandraHost cassandraHost) {
cassandraHost.setMaxActive(maxActive);
cassandraHost.setLifo(lifo);
cassandraHost.setMaxWaitTimeWhenExhausted(maxWaitTimeWhenExhausted);
cassandraHost.setMaxExhaustedTimeBeforeSuspending(maxExhaustedTimeBeforeSuspending);
cassandraHost.setMaxExhaustedTimeBeforeMarkingAsDown(maxExhaustedTimeBeforeMarkingAsDown);
cassandraHost.setUseThriftFramedTransport(useThriftFramedTransport);
cassandraHost.setMaxFrameSize(maxFrameSize);
cassandraHost.setUseSocketKeepalive(useSocketKeepalive);
Expand Down Expand Up @@ -114,8 +114,8 @@ public void setMaxWaitTimeWhenExhausted(long maxWaitTimeWhenExhausted) {
this.maxWaitTimeWhenExhausted = maxWaitTimeWhenExhausted;
}

public void setMaxExhaustedTimeBeforeSuspending(long maxExhaustedTimeBeforeSuspending) {
this.maxExhaustedTimeBeforeSuspending = maxExhaustedTimeBeforeSuspending;
public void setMaxExhaustedTimeBeforeMarkingAsDown(long maxExhaustedTimeBeforeMarkingAsDown) {
this.maxExhaustedTimeBeforeMarkingAsDown = maxExhaustedTimeBeforeMarkingAsDown;
}

/**
Expand Down Expand Up @@ -178,8 +178,8 @@ public String toString() {
s.append(cassandraThriftSocketTimeout);
s.append("&maxWaitTimeWhenExhausted=");
s.append(maxWaitTimeWhenExhausted);
s.append("&maxExhaustedTimeBeforeSuspending=");
s.append(maxExhaustedTimeBeforeSuspending);
s.append("&maxExhaustedTimeBeforeMarkingAsDown=");
s.append(maxExhaustedTimeBeforeMarkingAsDown);
s.append("&maxActive=");
s.append(maxActive);
s.append("&hosts=");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package me.prettyprint.cassandra.connection;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -21,6 +18,9 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

public class HConnectionManagerTest extends BaseEmbededServerSetupTest {

@Test
Expand Down Expand Up @@ -83,19 +83,20 @@ public void clientPoolShouldBeSuspendedWhenExhaustedForTooLong() throws Interrup
configurator.setClientFactoryClass(TestClientFactory.class.getName());
configurator.setMaxActive(maxActive);
configurator.setMaxWaitTimeWhenExhausted(50);
configurator.setMaxExhaustedTimeBeforeSuspending(0);
configurator.setMaxExhaustedTimeBeforeMarkingAsDown(0);
configurator.setRetryDownedHosts(false);

final HConnectionManager connectionManager = new HConnectionManager("TestCluster", configurator);
CassandraHost host = connectionManager.getHosts().iterator().next();
ConnectionManagerListener listener = mock(ConnectionManagerListener.class);
final MutableBoolean wasSuspended = new MutableBoolean();
final MutableBoolean wasRemoved = new MutableBoolean();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
wasSuspended.setValue(true);
wasRemoved.setValue(true);
return null;
}
}).when(listener).onSuspendHost(host, true);
}).when(listener).onHostDown(host);
connectionManager.addListener("TestListener", listener);

ExecutorService exec = Executors.newCachedThreadPool();
Expand All @@ -116,7 +117,7 @@ public void run() {

latch.await();

assertTrue(wasSuspended.booleanValue());
assertTrue(wasRemoved.booleanValue());

exec.shutdownNow();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ public void testConfigValuesPropogated() {
cassandraHostConfigurator.setMaxActive(20);
cassandraHostConfigurator.setCassandraThriftSocketTimeout(3000);
cassandraHostConfigurator.setMaxWaitTimeWhenExhausted(4000);
cassandraHostConfigurator.setMaxExhaustedTimeBeforeSuspending(5000);
cassandraHostConfigurator.setMaxExhaustedTimeBeforeMarkingAsDown(5000);
CassandraHost[] cassandraHosts = cassandraHostConfigurator.buildCassandraHosts();
// no need to test all, just a smattering
assertEquals(20, cassandraHosts[1].getMaxActive());
assertEquals(20, cassandraHosts[0].getMaxActive());
assertEquals(4000, cassandraHosts[1].getMaxWaitTimeWhenExhausted());
assertEquals(5000, cassandraHosts[0].getMaxExhaustedTimeBeforeSuspending());
assertEquals(5000, cassandraHosts[0].getMaxExhaustedTimeBeforeMarkingAsDown());
assertEquals(3000, cassandraHosts[2].getCassandraThriftSocketTimeout());
assertEquals(3000, cassandraHosts[0].getCassandraThriftSocketTimeout());
}
Expand Down

0 comments on commit 024a32d

Please sign in to comment.