Skip to content

Commit

Permalink
Merge pull request #666 from horschi/master
Browse files Browse the repository at this point in the history
Make setConsistencyLevel method available for all queries/mutator and not just for CQL queries
  • Loading branch information
zznate committed Sep 25, 2014
2 parents b9d09b0 + 3d44bb7 commit 7bcd151
Show file tree
Hide file tree
Showing 32 changed files with 146 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ public AbstractBasicQuery setCqlVersion(String cqlVersion) {
return this;
}

@Override
public HConsistencyLevel getConsistencyLevel() {
return this.consistency;
}

@Override
public void setConsistencyLevel(HConsistencyLevel level) {
this.consistency = level;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.List;

import me.prettyprint.cassandra.utils.Assert;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.beans.ColumnSlice;
Expand Down Expand Up @@ -55,6 +56,16 @@ public SubColumnQuery<K, SN, N, V> setColumnFamily(String cf) {
return this;
}

@Override
public HConsistencyLevel getConsistencyLevel() {
return this.subSliceQuery.getConsistencyLevel();
}

@Override
public void setConsistencyLevel(HConsistencyLevel level) {
this.subSliceQuery.setConsistencyLevel(level);
}

@Override
public QueryResult<HColumn<N, V>> execute() {
Assert.isTrue(subSliceQuery.getColumnNames().size() == 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import me.prettyprint.cassandra.service.KeyspaceService;
import me.prettyprint.cassandra.service.KeyspaceServiceImpl;
import me.prettyprint.cassandra.service.Operation;
import me.prettyprint.cassandra.service.OperationType;
import me.prettyprint.cassandra.utils.Assert;
import me.prettyprint.hector.api.ConsistencyLevelPolicy;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.exceptions.HectorException;
import org.apache.cassandra.thrift.Cassandra;
Expand Down Expand Up @@ -80,12 +82,36 @@ public String toString() {
public long createClock() {
return connectionManager.createClock();
}

public <T> ExecutionResult<T> doExecute(KeyspaceOperationCallback<T> koc)
throws HectorException {
throws HectorException {
return doExecute(koc, null);
}

public <T> ExecutionResult<T> doExecute(KeyspaceOperationCallback<T> koc,
final HConsistencyLevel level) throws HectorException {
KeyspaceService ks = null;
try {
ks = new KeyspaceServiceImpl(keyspace, consistencyLevelPolicy,
final ConsistencyLevelPolicy policy;
if(level != null) {
policy = new ConsistencyLevelPolicy() {

@Override
public HConsistencyLevel get(OperationType op, String cfName) {
return level;
}

@Override
public HConsistencyLevel get(OperationType op) {
return level;
}
};
}
else {
policy = consistencyLevelPolicy;
}

ks = new KeyspaceServiceImpl(keyspace, policy,
connectionManager, failoverPolicy, credentials);
return koc.doInKeyspaceAndMeasure(ks);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import me.prettyprint.cassandra.service.FailoverPolicy;
import me.prettyprint.cassandra.service.KeyspaceService;
import me.prettyprint.cassandra.service.Operation;
import me.prettyprint.cassandra.service.OperationType;
import me.prettyprint.cassandra.service.VirtualKeyspaceOperation;
import me.prettyprint.cassandra.service.VirtualKeyspaceServiceImpl;
import me.prettyprint.hector.api.ConsistencyLevelPolicy;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.exceptions.HectorException;

Expand Down Expand Up @@ -41,14 +43,39 @@ public ExecutingVirtualKeyspace(String keyspace, E keyPrefix,
this.keyPrefixSerializer = keyPrefixSerializer;
prefixBytes = keyPrefixSerializer.toByteBuffer(keyPrefix);
}

@Override
public <T> ExecutionResult<T> doExecute(KeyspaceOperationCallback<T> koc)
throws HectorException {
return doExecute(koc, null);
}

@Override
public <T> ExecutionResult<T> doExecute(KeyspaceOperationCallback<T> koc,
final HConsistencyLevel level) throws HectorException {
KeyspaceService ks = null;
try {
final ConsistencyLevelPolicy policy;
if(level != null) {
policy = new ConsistencyLevelPolicy() {

@Override
public HConsistencyLevel get(OperationType op, String cfName) {
return level;
}

@Override
public HConsistencyLevel get(OperationType op) {
return level;
}
};
}
else {
policy = consistencyLevelPolicy;
}

ks = new VirtualKeyspaceServiceImpl(keyspace, keyPrefix,
keyPrefixSerializer, consistencyLevelPolicy, connectionManager,
keyPrefixSerializer, policy, connectionManager,
failoverPolicy, credentials);
return koc.doInKeyspaceAndMeasure(ks);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public OrderedRows<K, N, V> doInKeyspace(KeyspaceService ks)
(LinkedHashMap<K, List<Column>>) thriftRet,
columnNameSerializer, valueSerializer);
}
}), this);
}, consistency), this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import me.prettyprint.cassandra.service.KeyspaceService;
import me.prettyprint.cassandra.utils.Assert;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.exceptions.HectorException;
Expand All @@ -20,7 +21,8 @@ public class MultigetCountQuery<K,N> implements Query<Map<K, Integer>> {
protected final Serializer<K> keySerializer;
protected String columnFamily;
protected List<K> keys;

protected HConsistencyLevel consistency;

/** The slice predicate for which the count it performed*/
protected final HSlicePredicate<N> slicePredicate;

Expand Down Expand Up @@ -54,6 +56,16 @@ public MultigetCountQuery<K,N> setRange(N start, N finish, int count) {
return this;
}

@Override
public HConsistencyLevel getConsistencyLevel() {
return this.consistency;
}

@Override
public void setConsistencyLevel(HConsistencyLevel level) {
this.consistency = level;
}

@Override
public QueryResult<Map<K, Integer>> execute() {
Assert.notNull(keys, "keys list is null");
Expand All @@ -67,7 +79,7 @@ public Map<K,Integer> doInKeyspace(KeyspaceService ks) throws HectorException {
ks.multigetCount(keySerializer.toBytesList(keys), columnParent, slicePredicate.toThrift()));
return counts;
}
}), this);
}, consistency), this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ public Map<K,Integer> doInKeyspace(KeyspaceService ks) throws HectorException {
ks.multigetCount(keySerializer.toBytesList(keys), columnParent, slicePredicate.toThrift()));
return counts;
}
}), this);
}, consistency), this);
}
}
17 changes: 15 additions & 2 deletions core/src/main/java/me/prettyprint/cassandra/model/MutatorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import me.prettyprint.cassandra.model.thrift.ThriftFactory;
import me.prettyprint.cassandra.serializers.TypeInferringSerializer;
import me.prettyprint.cassandra.service.*;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.beans.HColumn;
Expand Down Expand Up @@ -45,6 +46,8 @@ public final class MutatorImpl<K> implements Mutator<K> {
private BatchMutation<K> pendingMutations;

private BatchSizeHint sizeHint;

private HConsistencyLevel consistency;

public MutatorImpl(Keyspace keyspace, Serializer<K> keySerializer, BatchSizeHint sizeHint) {
this.keyspace = (ExecutingKeyspace) keyspace;
Expand All @@ -64,6 +67,16 @@ public MutatorImpl(Keyspace keyspace, BatchSizeHint sizeHint) {
this(keyspace, TypeInferringSerializer.<K> get(), sizeHint);
}

@Override
public HConsistencyLevel getConsistencyLevel() {
return this.consistency;
}

@Override
public void setConsistencyLevel(HConsistencyLevel level) {
this.consistency = level;
}

// Simple and immediate insertion of a column
@Override
public <N,V> MutationResult insert(final K key, final String cf, final HColumn<N,V> c) {
Expand Down Expand Up @@ -110,7 +123,7 @@ public Void doInKeyspace(KeyspaceService ks) throws HectorException {
supercolumnName, columnName, sNameSerializer, nameSerializer));
return null;
}
}));
}, consistency));
}

@Override
Expand All @@ -125,7 +138,7 @@ public Void doInKeyspace(KeyspaceService ks) throws HectorException {
ThriftFactory.createSuperColumnPath(cf, supercolumnName, sNameSerializer));
return null;
}
}));
}, consistency));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import me.prettyprint.cassandra.model.QueryResultImpl;
import me.prettyprint.cassandra.service.KeyspaceService;
import me.prettyprint.cassandra.utils.Assert;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.exceptions.HectorException;
Expand All @@ -27,7 +28,8 @@
protected final Serializer<K> keySerializer;
/** The slice predicate for which the count it performed*/
protected final HSlicePredicate<N> slicePredicate;

protected HConsistencyLevel consistency;

public AbstractThriftCountQuery(Keyspace k, Serializer<K> keySerializer,
Serializer<N> nameSerializer) {
Assert.notNull(k, "keyspaceOperator can't be null");
Expand All @@ -48,6 +50,16 @@ public Query<Integer> setColumnFamily(String cf) {
return this;
}

@Override
public HConsistencyLevel getConsistencyLevel() {
return this.consistency;
}

@Override
public void setConsistencyLevel(HConsistencyLevel level) {
this.consistency = level;
}

protected QueryResult<Integer> countColumns() {
Assert.notNull(key, "key is null");
Assert.notNull(columnFamily, "columnFamily is null");
Expand All @@ -59,7 +71,7 @@ public Integer doInKeyspace(KeyspaceService ks) throws HectorException {
return ks.getCount(keySerializer.toByteBuffer(key), columnParent,
slicePredicate.toThrift());
}
}), this);
}, consistency), this);
}

public Query<Integer> setColumnNames(N... columnNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ public HColumn<N, V> doInKeyspace(KeyspaceService ks) throws HectorException {
return null;
}
}
}), this);
}, consistency), this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public HCounterColumn<N> doInKeyspace(KeyspaceService ks) throws HectorException
return null;
}
}
}), this);
}, consistency), this);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public CounterRows<K, N> doInKeyspace(KeyspaceService ks) throws HectorException
ks.multigetCounterSlice(keysList, columnParent, getPredicate()));
return new CounterRowsImpl<K, N>(thriftRet, columnNameSerializer);
}
}), this);
}, consistency), this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Rows<K, N,V> doInKeyspace(KeyspaceService ks) throws HectorException {
ks.multigetSlice(keysList, columnParent, getPredicate()));
return new RowsImpl<K, N, V>(thriftRet, columnNameSerializer, valueSerializer);
}
}), this);
}, consistency), this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public Rows<K, N, V> doInKeyspace(KeyspaceService ks) throws HectorException {
keySerializer.toBytesList(keysList), columnParent, getPredicate()));
return new RowsImpl<K, N, V>(thriftRet, columnNameSerializer, valueSerializer);
}
}), this);
}, consistency), this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public CounterSuperRows<K, SN, N> doInKeyspace(KeyspaceService ks) throws Hector
return new CounterSuperRowsImpl<K, SN, N>(thriftRet, keySerializer, columnNameSerializer,
nameSerializer);
}
}), this);
}, consistency), this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public SuperRows<K, SN, N, V> doInKeyspace(KeyspaceService ks) throws HectorExce
return new SuperRowsImpl<K, SN, N, V>(thriftRet, keySerializer, columnNameSerializer,
nameSerializer, valueSerializer);
}
}), this);
}, consistency), this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public OrderedCounterRows<K, N> doInKeyspace(KeyspaceService ks) throws HectorEx
ks.getRangeCounterSlices(columnParent, getPredicate(), keyRange.toThrift()));
return new OrderedCounterRowsImpl<K,N>((LinkedHashMap<K, List<CounterColumn>>) thriftRet, columnNameSerializer);
}
}), this);
}, consistency), this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public OrderedRows<K, N,V > doInKeyspace(KeyspaceService ks) throws HectorExcept
ks.getRangeSlices(columnParent, getPredicate(), keyRange.toThrift()));
return new OrderedRowsImpl<K,N,V>((LinkedHashMap<K, List<Column>>) thriftRet, columnNameSerializer, valueSerializer);
}
}), this);
}, consistency), this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public OrderedCounterRows<K,N> doInKeyspace(KeyspaceService ks) throws HectorExc
ks.getRangeCounterSlices(columnParent, getPredicate(), keyRange.toThrift()));
return new OrderedCounterRowsImpl<K,N>((LinkedHashMap<K, List<CounterColumn>>) thriftRet, columnNameSerializer);
}
}), this);
}, consistency), this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public OrderedRows<K,N,V> doInKeyspace(KeyspaceService ks) throws HectorExceptio
ks.getRangeSlices(columnParent, getPredicate(), keyRange.toThrift()));
return new OrderedRowsImpl<K,N,V>((LinkedHashMap<K, List<Column>>) thriftRet, columnNameSerializer, valueSerializer);
}
}), this);
}, consistency), this);
}

@Override
Expand Down
Loading

0 comments on commit 7bcd151

Please sign in to comment.