Skip to content

Commit

Permalink
Merge pull request #657 from woolfel/master
Browse files Browse the repository at this point in the history
support for CQL3
  • Loading branch information
zznate committed Mar 6, 2014
2 parents b21b0e3 + 8a7f7b1 commit 5301af2
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package me.prettyprint.cassandra.model;

import org.apache.cassandra.thrift.ConsistencyLevel;

import me.prettyprint.cassandra.utils.Assert;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.Serializer;
Expand All @@ -19,8 +21,14 @@ public abstract class AbstractBasicQuery<K, N, T> implements Query<T> {
protected String columnFamilyName;
protected Serializer<K> keySerializer;
protected Serializer<N> columnNameSerializer;
// add: FailoverPolicy, ConsistencyLevelPolicy, Credentials?
// For now keep it simple and use the Thrift consistency level directly
protected ConsistencyLevel consistency;
// add: FailoverPolicy, Credentials?
protected String cqlVersion;
// default is set to false. Subclasses should check the cqlVersion and
// set the flag as needed. Cassandra.Client class has CQL3 specific
// method that differs from CQL1 and 2.
protected boolean cql3 = false;

protected AbstractBasicQuery(Keyspace k, Serializer<K> keySerializer,
Serializer<N> nameSerializer) {
Expand Down Expand Up @@ -61,7 +69,12 @@ public AbstractBasicQuery setCqlVersion(String cqlVersion) {
this.cqlVersion = cqlVersion;
return this;
}

public ConsistencyLevel getConsistencyLevel() {
return this.consistency;
}



public void setConsistencyLevel(ConsistencyLevel level) {
this.consistency = level;
}
}
62 changes: 58 additions & 4 deletions core/src/main/java/me/prettyprint/cassandra/model/CqlQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.api.query.QueryResult;

import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
Expand Down Expand Up @@ -82,9 +83,17 @@ public CqlQuery<K, N, V> setQuery(ByteBuffer qeury) {
return this;
}

/**
* Method will check to see if the version starts with "3" and set
* the boolean cql3 to true. This makes it easier for execute
* method and avoids doing lots of cqlVersion.startsWith() tests.
*/
@Override
public CqlQuery<K, N, V> setCqlVersion(String version){
this.cqlVersion=version;
if (this.cqlVersion.startsWith("3")) {
cql3 = true;
}
return this;
}

Expand All @@ -98,7 +107,26 @@ public CqlQuery<K, N, V> useCompression() {
return this;
}

protected ConsistencyLevel getConsistency() {
if (consistency != null) {
return consistency;
} else {
return ConsistencyLevel.ONE;
}
}

/**
* For future releases, we should consider refactoring how execute
* method converts the thrift CqlResult to Hector CqlRows. Starting
* with CQL3 the KEY is no longer returned by default and is no
* longer treated as a "special" thing. To get the KEY from the
* resultset, we have to look at the columns. Using the KEY
* in CqlRows is nice, and makes it easy to get results by the KEY.
* The downside is that users have to explicitly include the KEY in
* the statement. Changing how CqlRows work "might" break some
* existing use cases. For now, the implementation finds the column
* and expects the statement to have the KEY.
*/
@Override
public QueryResult<CqlRows<K, N, V>> execute() {

Expand All @@ -112,8 +140,14 @@ public CqlRows<K, N, V> execute(Client cassandra) throws HectorException {
if (cqlVersion != null) {
cassandra.set_cql_version(cqlVersion);
}
CqlResult result = cassandra.execute_cql_query(query,
useCompression ? Compression.GZIP : Compression.NONE);
CqlResult result = null;
if (cql3) {
result = cassandra.execute_cql3_query(query,
useCompression ? Compression.GZIP : Compression.NONE, getConsistency());
} else {
result = cassandra.execute_cql_query(query,
useCompression ? Compression.GZIP : Compression.NONE);
}
if ( log.isDebugEnabled() ) {
log.debug("Found CqlResult: {}", result);
}
Expand All @@ -125,10 +159,30 @@ public CqlRows<K, N, V> execute(Client cassandra) throws HectorException {
default:
if ( result.getRowsSize() > 0 ) {
LinkedHashMap<ByteBuffer, List<Column>> ret = new LinkedHashMap<ByteBuffer, List<Column>>(result.getRowsSize());

int keyColumnIndex = -1;
for (Iterator<CqlRow> rowsIter = result.getRowsIterator(); rowsIter.hasNext(); ) {
CqlRow row = rowsIter.next();
ret.put(ByteBuffer.wrap(row.getKey()), filterKeyColumn(row));
ByteBuffer kbb = ByteBuffer.wrap(row.getKey());
// if CQL3 is used row.getKey() always returns null, so we have
// to find the KEY in the columns.
if (cql3) {
List<Column> rowcolumns = row.getColumns();
// first time through we find the column and then use the
// column index to avoid needless work.
if (keyColumnIndex == -1) {
for (Column c: rowcolumns) {
keyColumnIndex++;
String name = StringSerializer.get().fromBytes(c.getName());
if (name.toUpperCase().equals("KEY")) {
kbb = ByteBuffer.wrap(c.getValue());
break;
}
}
} else {
kbb = ByteBuffer.wrap(row.getColumns().get(keyColumnIndex).getValue());
}
}
ret.put(kbb, filterKeyColumn(row));
}
Map<K, List<Column>> thriftRet = keySerializer.fromBytesMap(ret);
rows = new CqlRows<K, N, V>((LinkedHashMap<K, List<Column>>)thriftRet, columnNameSerializer, valueSerializer);
Expand Down

0 comments on commit 5301af2

Please sign in to comment.