Skip to content

Commit

Permalink
Add with default timestamp support
Browse files Browse the repository at this point in the history
Send client side timestamps by default for protocol v3+

Fixes apache#265
  • Loading branch information
Zariel committed Apr 15, 2015
1 parent 1ff8661 commit 6af1499
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 32 deletions.
2 changes: 2 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type ClusterConfig struct {
SerialConsistency SerialConsistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset)
Discovery DiscoveryConfig
SslOpts *SslOptions
DefaultTimestamp bool // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. (default: true, only enabled for protocol 3 and above)
}

// NewCluster generates a new config for the default cluster implementation.
Expand All @@ -92,6 +93,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
DiscoverHosts: false,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
DefaultTimestamp: true,
}
return cfg
}
Expand Down
3 changes: 2 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,8 @@ func (c *Conn) executeQuery(qry *Query) *Iter {

// frame checks that it is not 0
params.serialConsistency = qry.serialCons
params.defaultTimestamp = qry.defaultTimestamp

// TODO: Add DefaultTimestamp
if len(qry.pageState) > 0 {
params.pagingState = qry.pageState
}
Expand Down Expand Up @@ -616,6 +616,7 @@ func (c *Conn) executeBatch(batch *Batch) error {
statements: make([]batchStatment, n),
consistency: batch.Cons,
serialConsistency: batch.serialCons,
defaultTimestamp: batch.defaultTimestamp,
}

stmts := make(map[string]string)
Expand Down
15 changes: 7 additions & 8 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,12 +918,12 @@ type queryParams struct {
pagingState []byte
serialConsistency SerialConsistency
// v3+
timestamp *time.Time
defaultTimestamp bool
}

func (q queryParams) String() string {
return fmt.Sprintf("[query_params consistency=%v skip_meta=%v page_size=%d paging_state=%q serial_consistency=%v timestamp=%v values=%v]",
q.consistency, q.skipMeta, q.pageSize, q.pagingState, q.serialConsistency, q.timestamp, q.values)
return fmt.Sprintf("[query_params consistency=%v skip_meta=%v page_size=%d paging_state=%q serial_consistency=%v default_timestamp=%v values=%v]",
q.consistency, q.skipMeta, q.pageSize, q.pagingState, q.serialConsistency, q.defaultTimestamp, q.values)
}

func (f *framer) writeQueryParams(opts *queryParams) {
Expand Down Expand Up @@ -954,9 +954,10 @@ func (f *framer) writeQueryParams(opts *queryParams) {

// protoV3 specific things
if f.proto > protoVersion2 {
if opts.timestamp != nil {
if opts.defaultTimestamp {
flags |= flagDefaultTimestamp
}

if len(opts.values) > 0 && opts.values[0].name != "" {
flags |= flagWithNameValues
names = true
Expand Down Expand Up @@ -987,11 +988,9 @@ func (f *framer) writeQueryParams(opts *queryParams) {
f.writeConsistency(Consistency(opts.serialConsistency))
}

if f.proto > protoVersion2 && opts.timestamp != nil {
if f.proto > protoVersion2 && opts.defaultTimestamp {
// timestamp in microseconds
// TODO: should the timpestamp be set on the queryParams or should we set
// it here?
ts := opts.timestamp.UnixNano() / 1000
ts := time.Now().UnixNano() / 1000
f.writeLong(ts)
}
}
Expand Down
73 changes: 50 additions & 23 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ func (s *Session) Query(stmt string, values ...interface{}) *Query {
s.mu.RLock()
qry := &Query{stmt: stmt, values: values, cons: s.cons,
session: s, pageSize: s.pageSize, trace: s.trace,
prefetch: s.prefetch, rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency}
prefetch: s.prefetch, rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency,
defaultTimestamp: s.cfg.DefaultTimestamp,
}
s.mu.RUnlock()
return qry
}
Expand Down Expand Up @@ -359,20 +361,21 @@ func (s *Session) ExecuteBatch(batch *Batch) error {

// Query represents a CQL statement that can be executed.
type Query struct {
stmt string
values []interface{}
cons Consistency
pageSize int
routingKey []byte
pageState []byte
prefetch float64
trace Tracer
session *Session
rt RetryPolicy
binding func(q *QueryInfo) ([]interface{}, error)
attempts int
totalLatency int64
serialCons SerialConsistency
stmt string
values []interface{}
cons Consistency
pageSize int
routingKey []byte
pageState []byte
prefetch float64
trace Tracer
session *Session
rt RetryPolicy
binding func(q *QueryInfo) ([]interface{}, error)
attempts int
totalLatency int64
serialCons SerialConsistency
defaultTimestamp bool
}

//Attempts returns the number of times the query was executed.
Expand Down Expand Up @@ -418,6 +421,17 @@ func (q *Query) PageSize(n int) *Query {
return q
}

// DefaultTimestamp will enable the with default timestamp flag on the query.
// If enable, this will replace the server side assigned
// timestamp as default timestamp. Note that a timestamp in the query itself
// will still override this timestamp. This is entirely optional.
//
// Only available on protocol >= 3
func (q *Query) DefaultTimestamp(enable bool) *Query {
q.defaultTimestamp = enable
return q
}

// RoutingKey sets the routing key to use when a token aware connection
// pool is used to optimize the routing of this query.
func (q *Query) RoutingKey(routingKey []byte) *Query {
Expand Down Expand Up @@ -718,13 +732,14 @@ func (n *nextIter) fetch() *Iter {
}

type Batch struct {
Type BatchType
Entries []BatchEntry
Cons Consistency
rt RetryPolicy
attempts int
totalLatency int64
serialCons SerialConsistency
Type BatchType
Entries []BatchEntry
Cons Consistency
rt RetryPolicy
attempts int
totalLatency int64
serialCons SerialConsistency
defaultTimestamp bool
}

// NewBatch creates a new batch operation without defaults from the cluster
Expand All @@ -735,7 +750,8 @@ func NewBatch(typ BatchType) *Batch {
// NewBatch creates a new batch operation using defaults defined in the cluster
func (s *Session) NewBatch(typ BatchType) *Batch {
s.mu.RLock()
batch := &Batch{Type: typ, rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency, Cons: s.cons}
batch := &Batch{Type: typ, rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency,
Cons: s.cons, defaultTimestamp: s.cfg.DefaultTimestamp}
s.mu.RUnlock()
return batch
}
Expand Down Expand Up @@ -794,6 +810,17 @@ func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch {
return b
}

// DefaultTimestamp will enable the with default timestamp flag on the query.
// If enable, this will replace the server side assigned
// timestamp as default timestamp. Note that a timestamp in the query itself
// will still override this timestamp. This is entirely optional.
//
// Only available on protocol >= 3
func (b *Batch) DefaultTimestamp(enable bool) *Batch {
b.defaultTimestamp = enable
return b
}

type BatchType byte

const (
Expand Down

0 comments on commit 6af1499

Please sign in to comment.