Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into cassandra-2.0.1
Browse files Browse the repository at this point in the history
Conflicts:
	pom.xml
	test/src/main/java/me/prettyprint/hector/testutils/EmbeddedSchemaLoader.java
  • Loading branch information
Shane Perry committed May 28, 2014
2 parents 36e6fb8 + da4187c commit fce4bd1
Show file tree
Hide file tree
Showing 19 changed files with 335 additions and 104 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Fix for issue 600: critical bug in CQL version selection (@malexejev)
Modify retryDownedHosts() to only delete hosts not in the ring if autoDiscoverHosts is true (@jancona)
Fix all hosts down check (@jancona)
don't use an iterator just to get item 0 of a List (@mebigfatguy)
Upgrade Cassandra dependencies to 1.2.11 (@pauloricardomg)
Add support to LOCAL_ONE consistency level (@pauloricardomg)

1.1-4
=====
Expand Down
2 changes: 1 addition & 1 deletion README
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ The currently active branch is 1.0. The master tracks Apache Cassandra active de
For the impatient:
http://hector-client.github.com/hector/build/html/index.html

All Hector artificats are deployed to Maven Central. If you use maven for your build system, you need only include the hector-core dependency and all related dependencies will be managed automatically.
All Hector artifacts are deployed to Maven Central. If you use maven for your build system, you need only include the hector-core dependency and all related dependencies will be managed automatically.
----------------------------------------------------------------------------------------------------

Hector is a high level Java client for Apache Cassandra.
Expand Down
6 changes: 3 additions & 3 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<configuration>
<instructions>
<Bundle-SymbolicName>${bundle.symbolicName}</Bundle-SymbolicName>
<Bundle-Version>${pom.version}</Bundle-Version>
<Bundle-Version>${project.version}</Bundle-Version>
<Embed-Dependency>
libthrift;scope=compile|runtime;type=!pom;inline=false,
cassandra;scope=compile|runtime;type=!pom;inline=false,
Expand All @@ -61,7 +61,7 @@

<Export-Package>
<!-- !${bundle.namespace}.internal.*, -->
me.prettyprint.*;version="${pom.version}"
me.prettyprint.*;version="${project.version}"
</Export-Package>

<Import-Package>
Expand Down Expand Up @@ -163,7 +163,7 @@
<dependency>
<groupId>org.hectorclient</groupId>
<artifactId>hector-test</artifactId>
<version>${pom.version}</version>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void add(double i) {
if (intervalupdates.intValue() >= UPDATES_PER_INTERVAL)
return;
if (!latencies.offer(i)) {
latencies.remove();
latencies.poll();
latencies.offer(i);
}
intervalupdates.getAndIncrement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
* The list of hosts is shuffled on each pass to account for the case
* where a number of hosts are at the minimum number of connections
* (ie. they are not busy).
*
*
*
*
* @author zznate
*/
public class LeastActiveBalancingPolicy implements LoadBalancingPolicy {

private static final long serialVersionUID = 329849818218657061L;
private static final Logger log = LoggerFactory.getLogger(LeastActiveBalancingPolicy.class);

@Override
public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> excludeHosts) {
List<HClientPool> vals = Lists.newArrayList(pools);
Expand All @@ -34,7 +34,7 @@ public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> exc
Iterator<HClientPool> iterator = vals.iterator();
HClientPool concurrentHClientPool = iterator.next();
if ( excludeHosts != null && excludeHosts.size() > 0 ) {
while (iterator.hasNext()) {
while (iterator.hasNext()) {
if ( !excludeHosts.contains(concurrentHClientPool.getCassandraHost()) ) {
break;
}
Expand All @@ -44,17 +44,34 @@ public HClientPool getPool(Collection<HClientPool> pools, Set<CassandraHost> exc
return concurrentHClientPool;
}

private final class ShufflingCompare implements Comparator<HClientPool> {

/**
* Make the results of this Comparator stable (and thus transitive) by caching the numActive value
* for each HClientPool as they are seen, then reusing the cached value instead of the current value
* (which may have changed) if the same pool is compared again.
*
* Without this change the new TimSort algorithm in Java 7 sometimes throws a:
* java.lang.IllegalArgumentException: Comparison method violates its general contract!
*/
static final class ShufflingCompare implements Comparator<HClientPool> {
private Map<HClientPool, Integer> cachedActive = new HashMap<HClientPool, Integer>();

public int compare(HClientPool o1, HClientPool o2) {
if ( log.isDebugEnabled() ) {
log.debug("comparing 1: {} and count {} with 2: {} and count {}",
new Object[]{o1.getCassandraHost(), o1.getNumActive(), o2.getCassandraHost(), o2.getNumActive()});
}
return o1.getNumActive() - o2.getNumActive();
return getNumActive(o1) - getNumActive(o2);
}
private int getNumActive(HClientPool p) {
Integer ret = cachedActive.get(p);
if (ret == null) {
ret = p.getNumActive();
cachedActive.put(p, ret);
}
return ret;
}
}

@Override
public HClientPool createConnection(HClientFactory clientFactory, CassandraHost host, CassandraClientMonitor monitor) {
return new ConcurrentHClientPool(clientFactory, host, monitor);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package me.prettyprint.cassandra.locking;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import me.prettyprint.cassandra.service.AbstractCluster;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
Expand All @@ -17,6 +18,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.primitives.Ints;

/**
*
* @author patricioe (Patricio Echague - patricioe@gmail.com)
Expand Down Expand Up @@ -106,6 +109,7 @@ private ColumnFamilyDefinition createColumnFamilyDefinition() {
lockManagerConfigurator.getLockManagerCF(), ComparatorType.UTF8TYPE);
cfDef.setKeyValidationClass(ComparatorType.UTF8TYPE.getClassName());
cfDef.setRowCacheSize(lockManagerConfigurator.isRowsCacheEnabled() ? 10000 : 0);
cfDef.setGcGraceSeconds(Ints.saturatedCast(TimeUnit.MILLISECONDS.toSeconds(lockManagerConfigurator.getLocksTTLInMillis())));
return cfDef;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package me.prettyprint.cassandra.model;

import me.prettyprint.cassandra.utils.Assert;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.query.Query;
Expand All @@ -19,8 +20,14 @@ public abstract class AbstractBasicQuery<K, N, T> implements Query<T> {
protected String columnFamilyName;
protected Serializer<K> keySerializer;
protected Serializer<N> columnNameSerializer;
// add: FailoverPolicy, ConsistencyLevelPolicy, Credentials?
// For now keep it simple and use the Thrift consistency level directly
protected HConsistencyLevel consistency;
// add: FailoverPolicy, Credentials?
protected String cqlVersion;
// default is set to false. Subclasses should check the cqlVersion and
// set the flag as needed. Cassandra.Client class has CQL3 specific
// method that differs from CQL1 and 2.
protected boolean cql3 = false;

protected AbstractBasicQuery(Keyspace k, Serializer<K> keySerializer,
Serializer<N> nameSerializer) {
Expand Down Expand Up @@ -61,7 +68,12 @@ public AbstractBasicQuery setCqlVersion(String cqlVersion) {
this.cqlVersion = cqlVersion;
return this;
}

public HConsistencyLevel getConsistencyLevel() {
return this.consistency;
}



public void setConsistencyLevel(HConsistencyLevel level) {
this.consistency = level;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.HashMap;
import java.util.Map;

import me.prettyprint.cassandra.service.OperationType;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.ConsistencyLevelPolicy;
Expand Down Expand Up @@ -40,10 +39,12 @@ public HConsistencyLevel get(OperationType op, String cfName) {

public void setReadCfConsistencyLevels(Map<String, HConsistencyLevel> columnFamilyConsistencyLevels) {
this.readCfConsistencyLevels = columnFamilyConsistencyLevels;
log.info("Read ColumnFamily ConsistencyLevels set to: {}", columnFamilyConsistencyLevels);
}

public void setWriteCfConsistencyLevels(Map<String, HConsistencyLevel> columnFamilyConsistencyLevels) {
this.writeCfConsistencyLevels = columnFamilyConsistencyLevels;
log.info("Write ColumnFamily ConsistencyLevels set to: {}", columnFamilyConsistencyLevels);
}

public void setConsistencyLevelForCfOperation(HConsistencyLevel consistencyLevel,
Expand All @@ -60,10 +61,12 @@ public void setConsistencyLevelForCfOperation(HConsistencyLevel consistencyLevel

public void setDefaultReadConsistencyLevel(HConsistencyLevel defaultReadConsistencyLevel) {
this.defaultReadConsistencyLevel = defaultReadConsistencyLevel;
log.info("Default read ConsistencyLevel set to: {}", defaultReadConsistencyLevel.toString() + ".");
}

public void setDefaultWriteConsistencyLevel(HConsistencyLevel defaultWriteConsistencyLevel) {
this.defaultWriteConsistencyLevel = defaultWriteConsistencyLevel;
log.info("Default write ConsistencyLevel set to: {}", defaultWriteConsistencyLevel.toString() + ".");
}


Expand Down
64 changes: 60 additions & 4 deletions core/src/main/java/me/prettyprint/cassandra/model/CqlQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
import java.util.List;
import java.util.Map;

import me.prettyprint.cassandra.model.thrift.ThriftConverter;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.Operation;
import me.prettyprint.cassandra.service.OperationType;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.api.query.QueryResult;

import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
Expand Down Expand Up @@ -82,9 +85,17 @@ public CqlQuery<K, N, V> setQuery(ByteBuffer qeury) {
return this;
}

/**
* Method will check to see if the version starts with "3" and set
* the boolean cql3 to true. This makes it easier for execute
* method and avoids doing lots of cqlVersion.startsWith() tests.
*/
@Override
public CqlQuery<K, N, V> setCqlVersion(String version){
this.cqlVersion=version;
if (this.cqlVersion.startsWith("3")) {
cql3 = true;
}
return this;
}

Expand All @@ -98,7 +109,26 @@ public CqlQuery<K, N, V> useCompression() {
return this;
}

protected HConsistencyLevel getConsistency() {
if (consistency != null) {
return consistency;
} else {
return HConsistencyLevel.ONE;
}
}

/**
* For future releases, we should consider refactoring how execute
* method converts the thrift CqlResult to Hector CqlRows. Starting
* with CQL3 the KEY is no longer returned by default and is no
* longer treated as a "special" thing. To get the KEY from the
* resultset, we have to look at the columns. Using the KEY
* in CqlRows is nice, and makes it easy to get results by the KEY.
* The downside is that users have to explicitly include the KEY in
* the statement. Changing how CqlRows work "might" break some
* existing use cases. For now, the implementation finds the column
* and expects the statement to have the KEY.
*/
@Override
public QueryResult<CqlRows<K, N, V>> execute() {

Expand All @@ -112,8 +142,14 @@ public CqlRows<K, N, V> execute(Client cassandra) throws HectorException {
if (cqlVersion != null) {
cassandra.set_cql_version(cqlVersion);
}
CqlResult result = cassandra.execute_cql_query(query,
useCompression ? Compression.GZIP : Compression.NONE);
CqlResult result = null;
if (cql3) {
result = cassandra.execute_cql3_query(query,
useCompression ? Compression.GZIP : Compression.NONE, ThriftConverter.consistencyLevel(getConsistency()));
} else {
result = cassandra.execute_cql_query(query,
useCompression ? Compression.GZIP : Compression.NONE);
}
if ( log.isDebugEnabled() ) {
log.debug("Found CqlResult: {}", result);
}
Expand All @@ -125,10 +161,30 @@ public CqlRows<K, N, V> execute(Client cassandra) throws HectorException {
default:
if ( result.getRowsSize() > 0 ) {
LinkedHashMap<ByteBuffer, List<Column>> ret = new LinkedHashMap<ByteBuffer, List<Column>>(result.getRowsSize());

int keyColumnIndex = -1;
for (Iterator<CqlRow> rowsIter = result.getRowsIterator(); rowsIter.hasNext(); ) {
CqlRow row = rowsIter.next();
ret.put(ByteBuffer.wrap(row.getKey()), filterKeyColumn(row));
ByteBuffer kbb = ByteBuffer.wrap(row.getKey());
// if CQL3 is used row.getKey() always returns null, so we have
// to find the KEY in the columns.
if (cql3) {
List<Column> rowcolumns = row.getColumns();
// first time through we find the column and then use the
// column index to avoid needless work.
if (keyColumnIndex == -1) {
for (Column c: rowcolumns) {
keyColumnIndex++;
String name = StringSerializer.get().fromBytes(c.getName());
if (name.toUpperCase().equals("KEY")) {
kbb = ByteBuffer.wrap(c.getValue());
break;
}
}
} else {
kbb = ByteBuffer.wrap(row.getColumns().get(keyColumnIndex).getValue());
}
}
ret.put(kbb, filterKeyColumn(row));
}
Map<K, List<Column>> thriftRet = keySerializer.fromBytesMap(ret);
rows = new CqlRows<K, N, V>((LinkedHashMap<K, List<Column>>)thriftRet, columnNameSerializer, valueSerializer);
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/me/prettyprint/cassandra/model/MutatorImpl.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package me.prettyprint.cassandra.model;

import java.nio.ByteBuffer;
import java.util.Arrays;

import me.prettyprint.cassandra.model.thrift.ThriftConverter;
Expand All @@ -18,6 +19,7 @@
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Deletion;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;



Expand Down Expand Up @@ -228,6 +230,26 @@ public <N> Mutator<K> addDeletion(K key, String cf, N columnName, Serializer<N>
return this;
}

/**
* {@inheritDoc}
*/
@Override
public <N> Mutator<K> addDeletion(K key, String cf, N columnNameStart, N columnNameFinish, Serializer<N> nameSerializer) {
return addDeletion(key, cf, columnNameStart, columnNameFinish, nameSerializer, keyspace.createClock());
}

/**
* {@inheritDoc}
*/
@Override
public <N> Mutator<K> addDeletion(K key, String cf, N columnNameStart, N columnNameFinish, Serializer<N> nameSerializer, long clock) {
SlicePredicate sp = new SlicePredicate();
sp.setSlice_range(new SliceRange(nameSerializer.toByteBuffer(columnNameStart), nameSerializer.toByteBuffer(columnNameFinish), false, Integer.MAX_VALUE));
Deletion d = new Deletion().setTimestamp(clock).setPredicate(sp);
getPendingMutations().addDeletion(key, Arrays.asList(cf), d);
return this;
}

/**
* Batch executes all mutations scheduled to this Mutator instance by addInsertion, addDeletion etc.
* May throw a HectorException which is a RuntimeException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ public static ConsistencyLevel consistencyLevel(HConsistencyLevel c) {
return ConsistencyLevel.EACH_QUORUM;
case LOCAL_QUORUM:
return ConsistencyLevel.LOCAL_QUORUM;
case LOCAL_ONE:
return ConsistencyLevel.LOCAL_ONE;
default:
throw new RuntimeException("Unregornized consistency level " + c);
}
}

/**
* Converts a list of ColumnOrSuperColumn to Column
* @param columns
Expand All @@ -50,5 +52,5 @@ public static List<Column> getColumnList(List<ColumnOrSuperColumn> columns) {
}
return list;
}

}
Loading

0 comments on commit fce4bd1

Please sign in to comment.