diff --git a/cluster.go b/cluster.go index 0dc61d268..a76298789 100644 --- a/cluster.go +++ b/cluster.go @@ -76,6 +76,7 @@ type ClusterConfig struct { SerialConsistency SerialConsistency // Sets the consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL (default: unset) Discovery DiscoveryConfig SslOpts *SslOptions + DefaultTimestamp bool // Sends a client side timestamp for all requests which overrides the timestamp at which it arrives at the server. (default: true, only enabled for protocol 3 and above) } // NewCluster generates a new config for the default cluster implementation. @@ -92,6 +93,7 @@ func NewCluster(hosts ...string) *ClusterConfig { DiscoverHosts: false, MaxPreparedStmts: defaultMaxPreparedStmts, MaxRoutingKeyInfo: 1000, + DefaultTimestamp: true, } return cfg } diff --git a/conn.go b/conn.go index 88da9cb02..8cad1b3df 100644 --- a/conn.go +++ b/conn.go @@ -443,8 +443,8 @@ func (c *Conn) executeQuery(qry *Query) *Iter { // frame checks that it is not 0 params.serialConsistency = qry.serialCons + params.defaultTimestamp = qry.defaultTimestamp - // TODO: Add DefaultTimestamp if len(qry.pageState) > 0 { params.pagingState = qry.pageState } @@ -616,6 +616,7 @@ func (c *Conn) executeBatch(batch *Batch) error { statements: make([]batchStatment, n), consistency: batch.Cons, serialConsistency: batch.serialCons, + defaultTimestamp: batch.defaultTimestamp, } stmts := make(map[string]string) diff --git a/frame.go b/frame.go index c6e520698..72ac3d2f2 100644 --- a/frame.go +++ b/frame.go @@ -918,12 +918,12 @@ type queryParams struct { pagingState []byte serialConsistency SerialConsistency // v3+ - timestamp *time.Time + defaultTimestamp bool } func (q queryParams) String() string { - return fmt.Sprintf("[query_params consistency=%v skip_meta=%v page_size=%d paging_state=%q serial_consistency=%v timestamp=%v values=%v]", - q.consistency, q.skipMeta, q.pageSize, q.pagingState, q.serialConsistency, q.timestamp, q.values) + return fmt.Sprintf("[query_params consistency=%v skip_meta=%v page_size=%d paging_state=%q serial_consistency=%v default_timestamp=%v values=%v]", + q.consistency, q.skipMeta, q.pageSize, q.pagingState, q.serialConsistency, q.defaultTimestamp, q.values) } func (f *framer) writeQueryParams(opts *queryParams) { @@ -954,9 +954,10 @@ func (f *framer) writeQueryParams(opts *queryParams) { // protoV3 specific things if f.proto > protoVersion2 { - if opts.timestamp != nil { + if opts.defaultTimestamp { flags |= flagDefaultTimestamp } + if len(opts.values) > 0 && opts.values[0].name != "" { flags |= flagWithNameValues names = true @@ -987,11 +988,9 @@ func (f *framer) writeQueryParams(opts *queryParams) { f.writeConsistency(Consistency(opts.serialConsistency)) } - if f.proto > protoVersion2 && opts.timestamp != nil { + if f.proto > protoVersion2 && opts.defaultTimestamp { // timestamp in microseconds - // TODO: should the timpestamp be set on the queryParams or should we set - // it here? - ts := opts.timestamp.UnixNano() / 1000 + ts := time.Now().UnixNano() / 1000 f.writeLong(ts) } } diff --git a/session.go b/session.go index 136854d5a..6d37401b0 100644 --- a/session.go +++ b/session.go @@ -96,7 +96,9 @@ func (s *Session) Query(stmt string, values ...interface{}) *Query { s.mu.RLock() qry := &Query{stmt: stmt, values: values, cons: s.cons, session: s, pageSize: s.pageSize, trace: s.trace, - prefetch: s.prefetch, rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency} + prefetch: s.prefetch, rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency, + defaultTimestamp: s.cfg.DefaultTimestamp, + } s.mu.RUnlock() return qry } @@ -359,20 +361,21 @@ func (s *Session) ExecuteBatch(batch *Batch) error { // Query represents a CQL statement that can be executed. type Query struct { - stmt string - values []interface{} - cons Consistency - pageSize int - routingKey []byte - pageState []byte - prefetch float64 - trace Tracer - session *Session - rt RetryPolicy - binding func(q *QueryInfo) ([]interface{}, error) - attempts int - totalLatency int64 - serialCons SerialConsistency + stmt string + values []interface{} + cons Consistency + pageSize int + routingKey []byte + pageState []byte + prefetch float64 + trace Tracer + session *Session + rt RetryPolicy + binding func(q *QueryInfo) ([]interface{}, error) + attempts int + totalLatency int64 + serialCons SerialConsistency + defaultTimestamp bool } //Attempts returns the number of times the query was executed. @@ -418,6 +421,17 @@ func (q *Query) PageSize(n int) *Query { return q } +// DefaultTimestamp will enable the with default timestamp flag on the query. +// If enable, this will replace the server side assigned +// timestamp as default timestamp. Note that a timestamp in the query itself +// will still override this timestamp. This is entirely optional. +// +// Only available on protocol >= 3 +func (q *Query) DefaultTimestamp(enable bool) *Query { + q.defaultTimestamp = enable + return q +} + // RoutingKey sets the routing key to use when a token aware connection // pool is used to optimize the routing of this query. func (q *Query) RoutingKey(routingKey []byte) *Query { @@ -718,13 +732,14 @@ func (n *nextIter) fetch() *Iter { } type Batch struct { - Type BatchType - Entries []BatchEntry - Cons Consistency - rt RetryPolicy - attempts int - totalLatency int64 - serialCons SerialConsistency + Type BatchType + Entries []BatchEntry + Cons Consistency + rt RetryPolicy + attempts int + totalLatency int64 + serialCons SerialConsistency + defaultTimestamp bool } // NewBatch creates a new batch operation without defaults from the cluster @@ -735,7 +750,8 @@ func NewBatch(typ BatchType) *Batch { // NewBatch creates a new batch operation using defaults defined in the cluster func (s *Session) NewBatch(typ BatchType) *Batch { s.mu.RLock() - batch := &Batch{Type: typ, rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency, Cons: s.cons} + batch := &Batch{Type: typ, rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency, + Cons: s.cons, defaultTimestamp: s.cfg.DefaultTimestamp} s.mu.RUnlock() return batch } @@ -794,6 +810,17 @@ func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch { return b } +// DefaultTimestamp will enable the with default timestamp flag on the query. +// If enable, this will replace the server side assigned +// timestamp as default timestamp. Note that a timestamp in the query itself +// will still override this timestamp. This is entirely optional. +// +// Only available on protocol >= 3 +func (b *Batch) DefaultTimestamp(enable bool) *Batch { + b.defaultTimestamp = enable + return b +} + type BatchType byte const (