Skip to content

Commit

Permalink
Add with serial consistency support
Browse files Browse the repository at this point in the history
Add an option to Query to set the serial consistency for the
conditional update phase of a query.
  • Loading branch information
Zariel committed Apr 9, 2015
1 parent bc6c54e commit 4eb0e05
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 2 deletions.
1 change: 1 addition & 0 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ func TestCAS(t *testing.T) {

session := createSession(t)
defer session.Close()
session.cfg.SerialConsistency = LocalSerial

if err := createTable(session, `CREATE TABLE cas_table (
title varchar,
Expand Down
1 change: 1 addition & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type ClusterConfig struct {
MaxPreparedStmts int // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
MaxRoutingKeyInfo int // Sets the maximum cache size for query info about statements for each session (default: 1000)
PageSize int // Default page size to use for created sessions (default: 0)
SerialConsistency Consistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset)
Discovery DiscoveryConfig
SslOpts *SslOptions
}
Expand Down
5 changes: 4 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,10 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
consistency: qry.cons,
}

// TODO: Add DefaultTimestamp, SerialConsistency
// frame checks that it is not 0
params.serialConsistency = qry.serialCons

// TODO: Add DefaultTimestamp
if len(qry.pageState) > 0 {
params.pagingState = qry.pageState
}
Expand Down
16 changes: 15 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *Session) Query(stmt string, values ...interface{}) *Query {
s.mu.RLock()
qry := &Query{stmt: stmt, values: values, cons: s.cons,
session: s, pageSize: s.pageSize, trace: s.trace,
prefetch: s.prefetch, rt: s.cfg.RetryPolicy}
prefetch: s.prefetch, rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency}
s.mu.RUnlock()
return qry
}
Expand Down Expand Up @@ -372,6 +372,7 @@ type Query struct {
binding func(q *QueryInfo) ([]interface{}, error)
attempts int
totalLatency int64
serialCons Consistency
}

//Attempts returns the number of times the query was executed.
Expand Down Expand Up @@ -517,6 +518,19 @@ func (q *Query) Bind(v ...interface{}) *Query {
return q
}

// SerialConsistency sets the consistencyc level for the
// serial phase of conditional updates. That consitency can only be
// either SERIAL or LOCAL_SERIAL and if not present, it defaults to
// SERIAL. This option will be ignored for anything else that a
// conditional update/insert.
func (q *Query) SerialConsistency(cons Consistency) *Query {
if !(cons == Serial || cons == LocalSerial) {
panic("only acceptable consistency for serial_consistency is SERIAL or LOCAL_SERIAL")
}
q.serialCons = cons
return q
}

// Exec executes the query without returning any rows.
func (q *Query) Exec() error {
iter := q.Iter()
Expand Down

0 comments on commit 4eb0e05

Please sign in to comment.