forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstore.go
251 lines (221 loc) · 8.31 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
package kv
import (
"context"
"errors"
"io"
)
var (
// ErrKeyNotFound is the error returned when the key requested is not found.
ErrKeyNotFound = errors.New("key not found")
// ErrBucketNotFound is the error returned when the bucket cannot be found.
ErrBucketNotFound = errors.New("bucket not found")
// ErrTxNotWritable is the error returned when an mutable operation is called during
// a non-writable transaction.
ErrTxNotWritable = errors.New("transaction is not writable")
// ErrSeekMissingPrefix is returned when seek bytes is missing the prefix defined via
// WithCursorPrefix
ErrSeekMissingPrefix = errors.New("seek missing prefix bytes")
)
// IsNotFound returns a boolean indicating whether the error is known to report that a key or was not found.
func IsNotFound(err error) bool {
return err == ErrKeyNotFound
}
// SchemaStore is a superset of Store along with store schema change
// functionality like bucket creation and deletion.
//
// This type is made available via the `kv/migration` package.
// It should be consumed via this package to create and delete buckets using a migration.
// Checkout the internal tool `cmd/internal/kvmigrate` for building a new migration Go file into
// the correct location (in kv/migration/all.go).
// Configuring your bucket here will ensure it is created properly on initialization of InfluxDB.
type SchemaStore interface {
Store
// CreateBucket creates a bucket on the underlying store if it does not exist
CreateBucket(ctx context.Context, bucket []byte) error
// DeleteBucket deletes a bucket on the underlying store if it exists
DeleteBucket(ctx context.Context, bucket []byte) error
}
// Store is an interface for a generic key value store. It is modeled after
// the boltdb database struct.
type Store interface {
// View opens up a transaction that will not write to any data. Implementing interfaces
// should take care to ensure that all view transactions do not mutate any data.
View(context.Context, func(Tx) error) error
// Update opens up a transaction that will mutate data.
Update(context.Context, func(Tx) error) error
// Backup copies all K:Vs to a writer, file format determined by implementation.
Backup(ctx context.Context, w io.Writer) error
}
// Tx is a transaction in the store.
type Tx interface {
// Bucket possibly creates and returns bucket, b.
Bucket(b []byte) (Bucket, error)
// Context returns the context associated with this Tx.
Context() context.Context
// WithContext associates a context with this Tx.
WithContext(ctx context.Context)
}
type CursorPredicateFunc func(key, value []byte) bool
type CursorHints struct {
KeyPrefix *string
KeyStart *string
PredicateFn CursorPredicateFunc
}
// CursorHint configures CursorHints
type CursorHint func(*CursorHints)
// WithCursorHintPrefix is a hint to the store
// that the caller is only interested keys with the
// specified prefix.
func WithCursorHintPrefix(prefix string) CursorHint {
return func(o *CursorHints) {
o.KeyPrefix = &prefix
}
}
// WithCursorHintKeyStart is a hint to the store
// that the caller is interested in reading keys from
// start.
func WithCursorHintKeyStart(start string) CursorHint {
return func(o *CursorHints) {
o.KeyStart = &start
}
}
// WithCursorHintPredicate is a hint to the store
// to return only key / values which return true for the
// f.
//
// The primary concern of the predicate is to improve performance.
// Therefore, it should perform tests on the data at minimal cost.
// If the predicate has no meaningful impact on reducing memory or
// CPU usage, there is no benefit to using it.
func WithCursorHintPredicate(f CursorPredicateFunc) CursorHint {
return func(o *CursorHints) {
o.PredicateFn = f
}
}
// Bucket is the abstraction used to perform get/put/delete/get-many operations
// in a key value store.
type Bucket interface {
// TODO context?
// Get returns a key within this bucket. Errors if key does not exist.
Get(key []byte) ([]byte, error)
// GetBatch returns a corresponding set of values for the provided
// set of keys. If a value cannot be found for any provided key its
// value will be nil at the same index for the provided key.
GetBatch(keys ...[]byte) ([][]byte, error)
// Cursor returns a cursor at the beginning of this bucket optionally
// using the provided hints to improve performance.
Cursor(hints ...CursorHint) (Cursor, error)
// Put should error if the transaction it was called in is not writable.
Put(key, value []byte) error
// Delete should error if the transaction it was called in is not writable.
Delete(key []byte) error
// ForwardCursor returns a forward cursor from the seek position provided.
// Other options can be supplied to provide direction and hints.
ForwardCursor(seek []byte, opts ...CursorOption) (ForwardCursor, error)
}
// Cursor is an abstraction for iterating/ranging through data. A concrete implementation
// of a cursor can be found in cursor.go.
type Cursor interface {
// Seek moves the cursor forward until reaching prefix in the key name.
Seek(prefix []byte) (k []byte, v []byte)
// First moves the cursor to the first key in the bucket.
First() (k []byte, v []byte)
// Last moves the cursor to the last key in the bucket.
Last() (k []byte, v []byte)
// Next moves the cursor to the next key in the bucket.
Next() (k []byte, v []byte)
// Prev moves the cursor to the prev key in the bucket.
Prev() (k []byte, v []byte)
}
// ForwardCursor is an abstraction for interacting/ranging through data in one direction.
type ForwardCursor interface {
// Next moves the cursor to the next key in the bucket.
Next() (k, v []byte)
// Err returns non-nil if an error occurred during cursor iteration.
// This should always be checked after Next returns a nil key/value.
Err() error
// Close is reponsible for freeing any resources created by the cursor.
Close() error
}
// CursorDirection is an integer used to define the direction
// a request cursor operates in.
type CursorDirection int
const (
// CursorAscending directs a cursor to range in ascending order
CursorAscending CursorDirection = iota
// CursorAscending directs a cursor to range in descending order
CursorDescending
)
// CursorConfig is a type used to configure a new forward cursor.
// It includes a direction and a set of hints
type CursorConfig struct {
Direction CursorDirection
Hints CursorHints
Prefix []byte
SkipFirst bool
}
// NewCursorConfig constructs and configures a CursorConfig used to configure
// a forward cursor.
func NewCursorConfig(opts ...CursorOption) CursorConfig {
conf := CursorConfig{}
for _, opt := range opts {
opt(&conf)
}
return conf
}
// CursorOption is a functional option for configuring a forward cursor
type CursorOption func(*CursorConfig)
// WithCursorDirection sets the cursor direction on a provided cursor config
func WithCursorDirection(direction CursorDirection) CursorOption {
return func(c *CursorConfig) {
c.Direction = direction
}
}
// WithCursorHints configs the provided hints on the cursor config
func WithCursorHints(hints ...CursorHint) CursorOption {
return func(c *CursorConfig) {
for _, hint := range hints {
hint(&c.Hints)
}
}
}
// WithCursorPrefix configures the forward cursor to retrieve keys
// with a particular prefix. This implies the cursor will start and end
// at a specific location based on the prefix [prefix, prefix + 1).
//
// The value of the seek bytes must be prefixed with the provided
// prefix, otherwise an error will be returned.
func WithCursorPrefix(prefix []byte) CursorOption {
return func(c *CursorConfig) {
c.Prefix = prefix
}
}
// WithCursorSkipFirstItem skips returning the first item found within
// the seek.
func WithCursorSkipFirstItem() CursorOption {
return func(c *CursorConfig) {
c.SkipFirst = true
}
}
// VisitFunc is called for each k, v byte slice pair from the underlying source bucket
// which are found in the index bucket for a provided foreign key.
type VisitFunc func(k, v []byte) error
// WalkCursor consumers the forward cursor call visit for each k/v pair found
func WalkCursor(ctx context.Context, cursor ForwardCursor, visit VisitFunc) (err error) {
defer func() {
if cerr := cursor.Close(); cerr != nil && err == nil {
err = cerr
}
}()
for k, v := cursor.Next(); k != nil; k, v = cursor.Next() {
if err := visit(k, v); err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
return cursor.Err()
}