Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make LeastActiveBalancingPolicy.ShufflingCompare comparisons stable #646

Merged
merged 1 commit into from
Jan 13, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
* The list of hosts is shuffled on each pass to account for the case
* where a number of hosts are at the minimum number of connections
* (ie. they are not busy).
*
*
*
*
* @author zznate
*/
public class LeastActiveBalancingPolicy implements LoadBalancingPolicy {

private static final long serialVersionUID = 329849818218657061L;
private static final Logger log = LoggerFactory.getLogger(LeastActiveBalancingPolicy.class);

@Override
public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> excludeHosts) {
List<HClientPool> vals = Lists.newArrayList(pools);
Expand All @@ -34,7 +34,7 @@ public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> exc
Iterator<HClientPool> iterator = vals.iterator();
HClientPool concurrentHClientPool = iterator.next();
if ( excludeHosts != null && excludeHosts.size() > 0 ) {
while (iterator.hasNext()) {
while (iterator.hasNext()) {
if ( !excludeHosts.contains(concurrentHClientPool.getCassandraHost()) ) {
break;
}
Expand All @@ -44,17 +44,34 @@ public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> exc
return concurrentHClientPool;
}

private final class ShufflingCompare implements Comparator<HClientPool> {

/**
* Make the results of this Comparator stable (and thus transitive) by caching the numActive value
* for each HClientPool as they are seen, then reusing the cached value instead of the current value
* (which may have changed) if the same pool is compared again.
*
* Without this change the new TimSort algorithm in Java 7 sometimes throws a:
* java.lang.IllegalArgumentException: Comparison method violates its general contract!
*/
static final class ShufflingCompare implements Comparator<HClientPool> {
private Map<HClientPool, Integer> cachedActive = new HashMap<HClientPool, Integer>();

public int compare(HClientPool o1, HClientPool o2) {
if ( log.isDebugEnabled() ) {
log.debug("comparing 1: {} and count {} with 2: {} and count {}",
new Object[]{o1.getCassandraHost(), o1.getNumActive(), o2.getCassandraHost(), o2.getNumActive()});
}
return o1.getNumActive() - o2.getNumActive();
return getNumActive(o1) - getNumActive(o2);
}
private int getNumActive(HClientPool p) {
Integer ret = cachedActive.get(p);
if (ret == null) {
ret = p.getNumActive();
cachedActive.put(p, ret);
}
return ret;
}
}

@Override
public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host, CassandraClientMonitor monitor) {
return new ConcurrentHClientPool(clientFactory, host, monitor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,31 @@

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;

import me.prettyprint.cassandra.connection.client.HClient;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.hector.api.exceptions.HectorException;

import org.junit.Test;
import org.mockito.Mockito;

public class LeastActiveBalancingPolicyTest extends BaseBalancingPolicyTest {

private LeastActiveBalancingPolicy leastActiveBalancingPolicy;

@Test
public void testGetPoolOk() {
leastActiveBalancingPolicy = new LeastActiveBalancingPolicy();
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
Mockito.when(poolWith5Active.getNumActive()).thenReturn(8);
assertEquals(poolWith7Active, leastActiveBalancingPolicy.getPool(pools, null));
assertEquals(poolWith7Active, leastActiveBalancingPolicy.getPool(pools, null));
Expand All @@ -30,34 +36,108 @@ public void testGetPoolOk() {
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, null));
}

@Test
public void testSkipExhausted() {
public void testSkipExhausted() {
leastActiveBalancingPolicy = new LeastActiveBalancingPolicy();
assertEquals(poolWith7Active, leastActiveBalancingPolicy.getPool(pools, new HashSet<CassandraHost>(Arrays.asList(new CassandraHost("127.0.0.1:9160")))));
assertEquals(poolWith5Active, leastActiveBalancingPolicy.getPool(pools, new HashSet<CassandraHost>(Arrays.asList(new CassandraHost("127.0.0.2:9161")))));
}

@Test
public void testShuffleOnAllEqual() {
ConcurrentHClientPool poolWith5Active2 = Mockito.mock(ConcurrentHClientPool.class);
ConcurrentHClientPool poolWith5Active2 = Mockito.mock(ConcurrentHClientPool.class);
Mockito.when(poolWith5Active2.getNumActive()).thenReturn(5);
Mockito.when(poolWith5Active2.getCassandraHost()).thenReturn(new CassandraHost("127.0.0.4:9163"));
ConcurrentHClientPool poolWith5Active3 = Mockito.mock(ConcurrentHClientPool.class);
ConcurrentHClientPool poolWith5Active3 = Mockito.mock(ConcurrentHClientPool.class);
Mockito.when(poolWith5Active3.getNumActive()).thenReturn(5);
Mockito.when(poolWith5Active3.getCassandraHost()).thenReturn(new CassandraHost("127.0.0.5:9164"));

pools.add(poolWith5Active2);
pools.add(poolWith5Active3);

leastActiveBalancingPolicy = new LeastActiveBalancingPolicy();
// should hit all three equal hosts over the course of 50 runs
Set<CassandraHost> foundHosts = new HashSet<CassandraHost>(3);
for (int i = 0; i < 50; i++) {
HClientPool foundPool = leastActiveBalancingPolicy.getPool(pools, null);
foundHosts.add(foundPool.getCassandraHost());
assert 5 == foundPool.getNumActive();
assert 5 == foundPool.getNumActive();
}
assertEquals(3, foundHosts.size());
}

@Test
public void testShufflingCompareStability() {
final int POOL_SIZE = 360;
List<HClientPool> pools = new ArrayList<HClientPool>(POOL_SIZE);
for (int i = 0; i < POOL_SIZE; i++) {
pools.add(new TestPool());
}
// Do it enough times and it will fail
for (int i = 0; i < 50; i++) {
Collections.shuffle(pools);
Collections.sort(pools, new LeastActiveBalancingPolicy.ShufflingCompare());
}

}
private static class TestPool implements HClientPool {
private Random rand = new Random();

public int getNumActive() {
return rand.nextInt(30);
}

public int getNumIdle() {
return 0;
}

public int getNumBlockedThreads() {
return 0;
}

public String getName() {
return null;
}

public boolean getIsActive() {
return false;
}

public long getExhaustedTime() {
return 0;
}

public HClient borrowClient() throws HectorException {
return null;
}

public CassandraHost getCassandraHost() {
return null;
}

public int getNumBeforeExhausted() {
return 0;
}

public boolean isExhausted() {
return false;
}

public int getMaxActive() {
return 0;
}

public String getStatusAsString() {
return null;
}

public void releaseClient(HClient client) throws HectorException {

}

public void shutdown() {

}
}
}