forked from couchbase/gocb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtoken.go
154 lines (126 loc) · 3.84 KB
/
token.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package gocb
import (
"encoding/json"
"fmt"
"strconv"
gocbcore "github.com/couchbase/gocbcore/v8"
)
// MutationToken holds the mutation state information from an operation.
type MutationToken struct {
token gocbcore.MutationToken
bucketName string
}
type bucketToken struct {
SeqNo uint64 `json:"seqno"`
VbUUID string `json:"vbuuid"`
}
// BucketName returns the name of the bucket that this token belongs to.
func (mt MutationToken) BucketName() string {
return mt.bucketName
}
// PartitionUUID returns the UUID of the vbucket that this token belongs to.
func (mt MutationToken) PartitionUUID() uint64 {
return uint64(mt.token.VbUUID)
}
// PartitionID returns the ID of the vbucket that this token belongs to.
func (mt MutationToken) PartitionID() uint64 {
return uint64(mt.token.VbID)
}
// SequenceNumber returns the sequence number of the vbucket that this token belongs to.
func (mt MutationToken) SequenceNumber() uint64 {
return uint64(mt.token.SeqNo)
}
func (mt bucketToken) MarshalJSON() ([]byte, error) {
info := []interface{}{mt.SeqNo, mt.VbUUID}
return json.Marshal(info)
}
func (mt *bucketToken) UnmarshalJSON(data []byte) error {
info := []interface{}{&mt.SeqNo, &mt.VbUUID}
return json.Unmarshal(data, &info)
}
type bucketTokens map[string]*bucketToken
type mutationStateData map[string]*bucketTokens
type searchMutationState map[string]map[string]int
// MutationState holds and aggregates MutationToken's across multiple operations.
type MutationState struct {
tokens []MutationToken
}
// NewMutationState creates a new MutationState for tracking mutation state.
func NewMutationState(tokens ...MutationToken) *MutationState {
mt := &MutationState{}
mt.Add(tokens...)
return mt
}
// Add includes an operation's mutation information in this mutation state.
func (mt *MutationState) Add(tokens ...MutationToken) {
for _, token := range tokens {
if token.bucketName != "" {
mt.tokens = append(mt.tokens, token)
}
}
}
// MarshalJSON marshal's this mutation state to JSON.
func (mt *MutationState) MarshalJSON() ([]byte, error) {
var data mutationStateData
for _, token := range mt.tokens {
if data == nil {
data = make(mutationStateData)
}
bucketName := token.bucketName
if (data)[bucketName] == nil {
tokens := make(bucketTokens)
(data)[bucketName] = &tokens
}
vbID := fmt.Sprintf("%d", token.token.VbID)
stateToken := (*(data)[bucketName])[vbID]
if stateToken == nil {
stateToken = &bucketToken{}
(*(data)[bucketName])[vbID] = stateToken
}
stateToken.SeqNo = uint64(token.token.SeqNo)
stateToken.VbUUID = fmt.Sprintf("%d", token.token.VbUUID)
}
return json.Marshal(data)
}
// UnmarshalJSON unmarshal's a mutation state from JSON.
func (mt *MutationState) UnmarshalJSON(data []byte) error {
var stateData mutationStateData
err := json.Unmarshal(data, &stateData)
if err != nil {
return err
}
for bucketName, bTokens := range stateData {
for vbIDStr, stateToken := range *bTokens {
vbID, err := strconv.Atoi(vbIDStr)
if err != nil {
return err
}
vbUUID, err := strconv.Atoi(stateToken.VbUUID)
if err != nil {
return err
}
token := MutationToken{
bucketName: bucketName,
token: gocbcore.MutationToken{
VbID: uint16(vbID),
VbUUID: gocbcore.VbUUID(vbUUID),
SeqNo: gocbcore.SeqNo(stateToken.SeqNo),
},
}
mt.tokens = append(mt.tokens, token)
}
}
return nil
}
// toSearchMutationState is specific to search, search doesn't accept tokens in the same format as other services.
func (mt *MutationState) toSearchMutationState() searchMutationState {
data := make(searchMutationState)
for _, token := range mt.tokens {
_, ok := data[token.bucketName]
if !ok {
data[token.bucketName] = make(map[string]int)
}
data[token.bucketName][fmt.Sprintf("%d/%d", token.token.VbID, token.token.VbUUID)] = int(token.token.SeqNo)
}
return data
}