Skip to content

Commit

Permalink
support for CQL3
Browse files Browse the repository at this point in the history
Support for CQL3 is working now. Users have to make sure their CQL
queries use the new format, which means double quotes around all text
column names.
  • Loading branch information
woolfel committed Mar 6, 2014
1 parent 220626e commit 8a7f7b1
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package me.prettyprint.cassandra.model;

import org.apache.cassandra.thrift.ConsistencyLevel;

import me.prettyprint.cassandra.utils.Assert;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.Serializer;
Expand All @@ -19,8 +21,14 @@ public abstract class AbstractBasicQuery<K, N, T> implements Query<T> {
protected String columnFamilyName;
protected Serializer<K> keySerializer;
protected Serializer<N> columnNameSerializer;
// add: FailoverPolicy, ConsistencyLevelPolicy, Credentials?
// For now keep it simple and use the Thrift consistency level directly
protected ConsistencyLevel consistency;
// add: FailoverPolicy, Credentials?
protected String cqlVersion;
// default is set to false. Subclasses should check the cqlVersion and
// set the flag as needed. Cassandra.Client class has CQL3 specific
// method that differs from CQL1 and 2.
protected boolean cql3 = false;

protected AbstractBasicQuery(Keyspace k, Serializer<K> keySerializer,
Serializer<N> nameSerializer) {
Expand Down Expand Up @@ -61,7 +69,12 @@ public AbstractBasicQuery setCqlVersion(String cqlVersion) {
this.cqlVersion = cqlVersion;
return this;
}

public ConsistencyLevel getConsistencyLevel() {
return this.consistency;
}



public void setConsistencyLevel(ConsistencyLevel level) {
this.consistency = level;
}
}
62 changes: 58 additions & 4 deletions core/src/main/java/me/prettyprint/cassandra/model/CqlQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.api.query.QueryResult;

import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
Expand Down Expand Up @@ -82,9 +83,17 @@ public CqlQuery<K, N, V> setQuery(ByteBuffer qeury) {
return this;
}

/**
* Method will check to see if the version starts with "3" and set
* the boolean cql3 to true. This makes it easier for execute
* method and avoids doing lots of cqlVersion.startsWith() tests.
*/
@Override
public CqlQuery<K, N, V> setCqlVersion(String version){
this.cqlVersion=version;
if (this.cqlVersion.startsWith("3")) {
cql3 = true;
}
return this;
}

Expand All @@ -98,7 +107,26 @@ public CqlQuery<K, N, V> useCompression() {
return this;
}

protected ConsistencyLevel getConsistency() {
if (consistency != null) {
return consistency;
} else {
return ConsistencyLevel.ONE;
}
}

/**
* For future releases, we should consider refactoring how execute
* method converts the thrift CqlResult to Hector CqlRows. Starting
* with CQL3 the KEY is no longer returned by default and is no
* longer treated as a "special" thing. To get the KEY from the
* resultset, we have to look at the columns. Using the KEY
* in CqlRows is nice, and makes it easy to get results by the KEY.
* The downside is that users have to explicitly include the KEY in
* the statement. Changing how CqlRows work "might" break some
* existing use cases. For now, the implementation finds the column
* and expects the statement to have the KEY.
*/
@Override
public QueryResult<CqlRows<K, N, V>> execute() {

Expand All @@ -112,8 +140,14 @@ public CqlRows<K, N, V> execute(Client cassandra) throws HectorException {
if (cqlVersion != null) {
cassandra.set_cql_version(cqlVersion);
}
CqlResult result = cassandra.execute_cql_query(query,
useCompression ? Compression.GZIP : Compression.NONE);
CqlResult result = null;
if (cql3) {
result = cassandra.execute_cql3_query(query,
useCompression ? Compression.GZIP : Compression.NONE, getConsistency());
} else {
result = cassandra.execute_cql_query(query,
useCompression ? Compression.GZIP : Compression.NONE);
}
if ( log.isDebugEnabled() ) {
log.debug("Found CqlResult: {}", result);
}
Expand All @@ -125,10 +159,30 @@ public CqlRows<K, N, V> execute(Client cassandra) throws HectorException {
default:
if ( result.getRowsSize() > 0 ) {
LinkedHashMap<ByteBuffer, List<Column>> ret = new LinkedHashMap<ByteBuffer, List<Column>>(result.getRowsSize());

int keyColumnIndex = -1;
for (Iterator<CqlRow> rowsIter = result.getRowsIterator(); rowsIter.hasNext(); ) {
CqlRow row = rowsIter.next();
ret.put(ByteBuffer.wrap(row.getKey()), filterKeyColumn(row));
ByteBuffer kbb = ByteBuffer.wrap(row.getKey());
// if CQL3 is used row.getKey() always returns null, so we have
// to find the KEY in the columns.
if (cql3) {
List<Column> rowcolumns = row.getColumns();
// first time through we find the column and then use the
// column index to avoid needless work.
if (keyColumnIndex == -1) {
for (Column c: rowcolumns) {
keyColumnIndex++;
String name = StringSerializer.get().fromBytes(c.getName());
if (name.toUpperCase().equals("KEY")) {
kbb = ByteBuffer.wrap(c.getValue());
break;
}
}
} else {
kbb = ByteBuffer.wrap(row.getColumns().get(keyColumnIndex).getValue());
}
}
ret.put(kbb, filterKeyColumn(row));
}
Map<K, List<Column>> thriftRet = keySerializer.fromBytesMap(ret);
rows = new CqlRows<K, N, V>((LinkedHashMap<K, List<Column>>)thriftRet, columnNameSerializer, valueSerializer);
Expand Down

0 comments on commit 8a7f7b1

Please sign in to comment.