Skip to content

Commit

Permalink
session.go: custom table partitioners for statements and batches
Browse files Browse the repository at this point in the history
This change causes gocql to use a custom table partitioner, if
available, in order to calculate the token for the partition key.

This fixes token- and shard-aware routing for tables with custom
partitioners.
  • Loading branch information
piodul authored and mmatczuk committed Dec 2, 2020
1 parent 729cf53 commit 09df52a
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 11 deletions.
7 changes: 6 additions & 1 deletion policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,12 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
return t.fallback.Pick(qry)
}

token := meta.tokenRing.partitioner.Hash(routingKey)
partitioner := qry.GetCustomPartitioner()
if partitioner == nil {
partitioner = meta.tokenRing.partitioner
}

token := partitioner.Hash(routingKey)
ht := meta.replicas[qry.Keyspace()].replicasFor(token)

var replicas []*HostInfo
Expand Down
1 change: 1 addition & 0 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type ExecutableQuery interface {
Keyspace() string
IsIdempotent() bool
IsLWT() bool
GetCustomPartitioner() partitioner

withContext(context.Context) ExecutableQuery

Expand Down
48 changes: 38 additions & 10 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,16 @@ func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyI
return nil, nil
}

table := info.request.columns[0].Table
keyspace := info.request.columns[0].Keyspace

partitioner, err := scyllaGetTablePartitioner(s, keyspace, table)
if err != nil {
// don't cache this error
s.routingKeyInfoCache.Remove(stmt)
return nil, inflight.err
}

if len(info.request.pkeyColumns) > 0 {
// proto v4 dont need to calculate primary key columns
types := make([]TypeInfo, len(info.request.pkeyColumns))
Expand All @@ -574,17 +584,17 @@ func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyI
}

routingKeyInfo := &routingKeyInfo{
indexes: info.request.pkeyColumns,
types: types,
lwt: info.request.lwt,
indexes: info.request.pkeyColumns,
types: types,
lwt: info.request.lwt,
partitioner: partitioner,
}

inflight.value = routingKeyInfo
return routingKeyInfo, nil
}

// get the table metadata
table := info.request.columns[0].Table

var keyspaceMetadata *KeyspaceMetadata
keyspaceMetadata, inflight.err = s.KeyspaceMetadata(info.request.columns[0].Keyspace)
Expand All @@ -609,9 +619,10 @@ func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyI

size := len(partitionKey)
routingKeyInfo := &routingKeyInfo{
indexes: make([]int, size),
types: make([]TypeInfo, size),
lwt: info.request.lwt,
indexes: make([]int, size),
types: make([]TypeInfo, size),
lwt: info.request.lwt,
partitioner: partitioner,
}

for keyIndex, keyColumn := range partitionKey {
Expand Down Expand Up @@ -850,6 +861,9 @@ type Query struct {
// In effect if the query is of the form "INSERT/UPDATE/DELETE ... IF ..."
// For more details see https://docs.scylladb.com/using-scylla/lwt/
lwt bool

// If not nil, represents a custom partitioner for the table
partitioner partitioner
}

func (q *Query) defaultsFromSession() {
Expand Down Expand Up @@ -1069,6 +1083,7 @@ func (q *Query) GetRoutingKey() ([]byte, error) {
}
if routingKeyInfo != nil {
q.lwt = routingKeyInfo.lwt
q.partitioner = routingKeyInfo.partitioner
}
return createRoutingKey(routingKeyInfo, q.values)
}
Expand Down Expand Up @@ -1128,6 +1143,10 @@ func (q *Query) IsLWT() bool {
return q.lwt
}

func (q *Query) GetCustomPartitioner() partitioner {
return q.partitioner
}

// Idempotent marks the query as being idempotent or not depending on
// the value.
func (q *Query) Idempotent(value bool) *Query {
Expand Down Expand Up @@ -1596,6 +1615,9 @@ type Batch struct {
// It is sufficient that one batch entry is a conditional query for the
// whole batch to be considered for LWT optimization.
lwt bool

// If not nil, represents a custom partitioner for the table
partitioner partitioner
}

// NewBatch creates a new batch operation without defaults from the cluster
Expand Down Expand Up @@ -1690,6 +1712,10 @@ func (b *Batch) IsLWT() bool {
return b.lwt
}

func (b *Batch) GetCustomPartitioner() partitioner {
return b.partitioner
}

func (b *Batch) speculativeExecutionPolicy() SpeculativeExecutionPolicy {
return b.spec
}
Expand Down Expand Up @@ -1832,6 +1858,7 @@ func (b *Batch) GetRoutingKey() ([]byte, error) {
}
if routingKeyInfo != nil {
b.lwt = routingKeyInfo.lwt
b.partitioner = routingKeyInfo.partitioner
}

return createRoutingKey(routingKeyInfo, entry.Args)
Expand Down Expand Up @@ -1907,9 +1934,10 @@ type routingKeyInfoLRU struct {
}

type routingKeyInfo struct {
indexes []int
types []TypeInfo
lwt bool
indexes []int
types []TypeInfo
lwt bool
partitioner partitioner
}

func (r *routingKeyInfo) String() string {
Expand Down

0 comments on commit 09df52a

Please sign in to comment.