forked from scylladb/gocql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscylla_cdc.go
93 lines (74 loc) · 2.42 KB
/
scylla_cdc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package gocql
import (
"encoding/binary"
"math"
"strings"
)
// cdc partitioner
const (
scyllaCDCPartitionerName = "CDCPartitioner"
scyllaCDCPartitionerFullName = "com.scylladb.dht.CDCPartitioner"
scyllaCDCPartitionKeyLength = 16
scyllaCDCVersionMask = 0x0F
scyllaCDCMinSupportedVersion = 1
scyllaCDCMaxSupportedVersion = 1
scyllaCDCMinToken = int64Token(math.MinInt64)
scyllaCDCLogTableNameSuffix = "_scylla_cdc_log"
scyllaCDCExtensionName = "cdc"
)
type scyllaCDCPartitioner struct{}
var _ Partitioner = scyllaCDCPartitioner{}
func (p scyllaCDCPartitioner) Name() string {
return scyllaCDCPartitionerName
}
func (p scyllaCDCPartitioner) Hash(partitionKey []byte) Token {
if len(partitionKey) < 8 {
// The key is too short to extract any sensible token,
// so return the min token instead
if gocqlDebug {
Logger.Printf("scylla: cdc partition key too short: %d < 8", len(partitionKey))
}
return scyllaCDCMinToken
}
upperQword := binary.BigEndian.Uint64(partitionKey[0:])
if gocqlDebug {
// In debug mode, do some more checks
if len(partitionKey) != scyllaCDCPartitionKeyLength {
// The token has unrecognized format, but the first quadword
// should be the token value that we want
Logger.Printf("scylla: wrong size of cdc partition key: %d", len(partitionKey))
}
lowerQword := binary.BigEndian.Uint64(partitionKey[8:])
version := lowerQword & scyllaCDCVersionMask
if version < scyllaCDCMinSupportedVersion || version > scyllaCDCMaxSupportedVersion {
// We don't support this version yet,
// the token may be wrong
Logger.Printf(
"scylla: unsupported version: %d is not in range [%d, %d]",
version,
scyllaCDCMinSupportedVersion,
scyllaCDCMaxSupportedVersion,
)
}
}
return int64Token(upperQword)
}
func (p scyllaCDCPartitioner) ParseString(str string) Token {
return parseInt64Token(str)
}
func scyllaIsCdcTable(session *Session, keyspaceName, tableName string) (bool, error) {
if !strings.HasSuffix(tableName, scyllaCDCLogTableNameSuffix) {
// Not a CDC table, use the default partitioner
return false, nil
}
// Get the table metadata to see if it has the cdc partitioner set
keyspaceMeta, err := session.KeyspaceMetadata(keyspaceName)
if err != nil {
return false, err
}
tableMeta, ok := keyspaceMeta.Tables[tableName]
if !ok {
return false, ErrNoMetadata
}
return tableMeta.Options.Partitioner == scyllaCDCPartitionerFullName, nil
}