Skip to content

Commit

Permalink
Merge pull request #646 from jancona/master
Browse files Browse the repository at this point in the history
Make LeastActiveBalancingPolicy.ShufflingCompare comparisons stable
  • Loading branch information
zznate committed Jan 13, 2014
2 parents 2724dcb + f0304a7 commit d48e723
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
* The list of hosts is shuffled on each pass to account for the case
* where a number of hosts are at the minimum number of connections
* (ie. they are not busy).
*
*
*
*
* @author zznate
*/
public class LeastActiveBalancingPolicy implements LoadBalancingPolicy {

private static final long serialVersionUID = 329849818218657061L;
private static final Logger log = LoggerFactory.getLogger(LeastActiveBalancingPolicy.class);

@Override
public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> excludeHosts) {
List<HClientPool> vals = Lists.newArrayList(pools);
Expand All @@ -34,7 +34,7 @@ public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> exc
Iterator<HClientPool> iterator = vals.iterator();
HClientPool concurrentHClientPool = iterator.next();
if ( excludeHosts != null && excludeHosts.size() > 0 ) {
while (iterator.hasNext()) {
while (iterator.hasNext()) {
if ( !excludeHosts.contains(concurrentHClientPool.getCassandraHost()) ) {
break;
}
Expand All @@ -44,17 +44,34 @@ public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> exc
return concurrentHClientPool;
}

private final class ShufflingCompare implements Comparator<HClientPool> {

/**
* Make the results of this Comparator stable (and thus transitive) by caching the numActive value
* for each HClientPool as they are seen, then reusing the cached value instead of the current value
* (which may have changed) if the same pool is compared again.
*
* Without this change the new TimSort algorithm in Java 7 sometimes throws a:
* java.lang.IllegalArgumentException: Comparison method violates its general contract!
*/
static final class ShufflingCompare implements Comparator<HClientPool> {
private Map<HClientPool, Integer> cachedActive = new HashMap<HClientPool, Integer>();

public int compare(HClientPool o1, HClientPool o2) {
if ( log.isDebugEnabled() ) {
log.debug("comparing 1: {} and count {} with 2: {} and count {}",
new Object[]{o1.getCassandraHost(), o1.getNumActive(), o2.getCassandraHost(), o2.getNumActive()});
}
return o1.getNumActive() - o2.getNumActive();
return getNumActive(o1) - getNumActive(o2);
}
private int getNumActive(HClientPool p) {
Integer ret = cachedActive.get(p);
if (ret == null) {
ret = p.getNumActive();
cachedActive.put(p, ret);
}
return ret;
}
}

@Override
public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host, CassandraClientMonitor monitor) {
return new ConcurrentHClientPool(clientFactory, host, monitor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,31 @@

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;

import me.prettyprint.cassandra.connection.client.HClient;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.hector.api.exceptions.HectorException;

import org.junit.Test;
import org.mockito.Mockito;

public class LeastActiveBalancingPolicyTest extends BaseBalancingPolicyTest {

private LeastActiveBalancingPolicy leastActiveBalancingPolicy;

@Test
public void testGetPoolOk() {
leastActiveBalancingPolicy = new LeastActiveBalancingPolicy();
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
Mockito.when(poolWith5Active.getNumActive()).thenReturn(8);
assertEquals(poolWith7Active, leastActiveBalancingPolicy.getPool(pools, null));
assertEquals(poolWith7Active, leastActiveBalancingPolicy.getPool(pools, null));
Expand All @@ -30,34 +36,108 @@ public void testGetPoolOk() {
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
}

@Test
public void testSkipExhausted() {
public void testSkipExhausted() {
leastActiveBalancingPolicy = new LeastActiveBalancingPolicy();
assertEquals(poolWith7Active, leastActiveBalancingPolicy.getPool(pools, new HashSet<CassandraHost>(Arrays.asList(new CassandraHost("127.0.0.1:9160")))));
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, new HashSet<CassandraHost>(Arrays.asList(new CassandraHost("127.0.0.2:9161")))));
}

@Test
public void testShuffleOnAllEqual() {
ConcurrentHClientPool poolWith5Active2 = Mockito.mock(ConcurrentHClientPool.class);
ConcurrentHClientPool poolWith5Active2 = Mockito.mock(ConcurrentHClientPool.class);
Mockito.when(poolWith5Active2.getNumActive()).thenReturn(5);
Mockito.when(poolWith5Active2.getCassandraHost()).thenReturn(new CassandraHost("127.0.0.4:9163"));
ConcurrentHClientPool poolWith5Active3 = Mockito.mock(ConcurrentHClientPool.class);
ConcurrentHClientPool poolWith5Active3 = Mockito.mock(ConcurrentHClientPool.class);
Mockito.when(poolWith5Active3.getNumActive()).thenReturn(5);
Mockito.when(poolWith5Active3.getCassandraHost()).thenReturn(new CassandraHost("127.0.0.5:9164"));

pools.add(poolWith5Active2);
pools.add(poolWith5Active3);

leastActiveBalancingPolicy = new LeastActiveBalancingPolicy();
// should hit all three equal hosts over the course of 50 runs
Set<CassandraHost> foundHosts = new HashSet<CassandraHost>(3);
for (int i = 0; i < 50; i++) {
HClientPool foundPool = leastActiveBalancingPolicy.getPool(pools, null);
foundHosts.add(foundPool.getCassandraHost());
assert 5 == foundPool.getNumActive();
assert 5 == foundPool.getNumActive();
}
assertEquals(3, foundHosts.size());
}

@Test
public void testShufflingCompareStability() {
final int POOL_SIZE = 360;
List<HClientPool> pools = new ArrayList<HClientPool>(POOL_SIZE);
for (int i = 0; i < POOL_SIZE; i++) {
pools.add(new TestPool());
}
// Do it enough times and it will fail
for (int i = 0; i < 50; i++) {
Collections.shuffle(pools);
Collections.sort(pools, new LeastActiveBalancingPolicy.ShufflingCompare());
}

}
private static class TestPool implements HClientPool {
private Random rand = new Random();

public int getNumActive() {
return rand.nextInt(30);
}

public int getNumIdle() {
return 0;
}

public int getNumBlockedThreads() {
return 0;
}

public String getName() {
return null;
}

public boolean getIsActive() {
return false;
}

public long getExhaustedTime() {
return 0;
}

public HClient borrowClient() throws HectorException {
return null;
}

public CassandraHost getCassandraHost() {
return null;
}

public int getNumBeforeExhausted() {
return 0;
}

public boolean isExhausted() {
return false;
}

public int getMaxActive() {
return 0;
}

public String getStatusAsString() {
return null;
}

public void releaseClient(HClient client) throws HectorException {

}

public void shutdown() {

}
}
}

0 comments on commit d48e723

Please sign in to comment.