|
1 | 1 | package sql
|
2 | 2 |
|
3 |
| -import "io" |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "io" |
| 6 | + "reflect" |
| 7 | + "strings" |
| 8 | + "sync" |
| 9 | + |
| 10 | + "gopkg.in/src-d/go-errors.v1" |
| 11 | +) |
4 | 12 |
|
5 | 13 | // IndexKeyValueIter is an iterator of index key values, that is, a tuple of
|
6 | 14 | // the values that will be index keys.
|
@@ -28,6 +36,10 @@ type Index interface {
|
28 | 36 | Has(key interface{}) (bool, error)
|
29 | 37 | // ID returns the identifier of the index.
|
30 | 38 | ID() string
|
| 39 | + // Database returns the database name this index belongs to. |
| 40 | + Database() string |
| 41 | + // Table returns the table name this index belongs to. |
| 42 | + Table() string |
31 | 43 | // Expression returns the indexed expression.
|
32 | 44 | Expression() Expression
|
33 | 45 | }
|
@@ -87,18 +99,263 @@ type Mergeable interface {
|
87 | 99 | IsMergeable(IndexLookup) bool
|
88 | 100 | }
|
89 | 101 |
|
90 |
| -// IndexLoader is the piece that loads indexes from disk. |
91 |
| -type IndexLoader interface { |
92 |
| - // ID returns the unique name of the index loader. |
| 102 | +// IndexDriver manages the coordination between the indexes and their |
| 103 | +// representation in disk. |
| 104 | +type IndexDriver interface { |
| 105 | + // ID returns the unique name of the driver. |
93 | 106 | ID() string
|
| 107 | + // Create a new index for the given expression and id. |
| 108 | + Create(path, db, id string, expr Expression) (Index, error) |
94 | 109 | // Load the index at the given path.
|
95 | 110 | Load(path string) (Index, error)
|
| 111 | + // Save the given index at the given path. |
| 112 | + Save(ctx context.Context, path string, index Index, iter IndexKeyValueIter) error |
| 113 | + // Delete the index with the given path. |
| 114 | + Delete(path string, index Index) error |
96 | 115 | }
|
97 | 116 |
|
98 |
| -// IndexSaver is the piece that stores indexes in disk. |
99 |
| -type IndexSaver interface { |
100 |
| - // ID returns the unique name of the index saver. |
101 |
| - ID() string |
102 |
| - // Save the given index at the given path. |
103 |
| - Save(index Index, path string) error |
| 117 | +type indexKey struct { |
| 118 | + db, id string |
| 119 | +} |
| 120 | + |
| 121 | +// IndexRegistry keeps track of all indexes in the engine. |
| 122 | +type IndexRegistry struct { |
| 123 | + root string |
| 124 | + |
| 125 | + mut sync.RWMutex |
| 126 | + indexes map[indexKey]Index |
| 127 | + statuses map[indexKey]IndexStatus |
| 128 | + |
| 129 | + driversMut sync.RWMutex |
| 130 | + drivers map[string]IndexDriver |
| 131 | + |
| 132 | + rcmut sync.RWMutex |
| 133 | + refCounts map[indexKey]int |
| 134 | + deleteIndexQueue map[indexKey]chan<- struct{} |
| 135 | +} |
| 136 | + |
| 137 | +// NewIndexRegistry returns a new Index Registry. |
| 138 | +func NewIndexRegistry() *IndexRegistry { |
| 139 | + return &IndexRegistry{ |
| 140 | + indexes: make(map[indexKey]Index), |
| 141 | + statuses: make(map[indexKey]IndexStatus), |
| 142 | + drivers: make(map[string]IndexDriver), |
| 143 | + refCounts: make(map[indexKey]int), |
| 144 | + deleteIndexQueue: make(map[indexKey]chan<- struct{}), |
| 145 | + } |
| 146 | +} |
| 147 | + |
| 148 | +// IndexDriver returns the IndexDriver with the given ID. |
| 149 | +func (r *IndexRegistry) IndexDriver(id string) IndexDriver { |
| 150 | + r.driversMut.RLock() |
| 151 | + defer r.driversMut.RUnlock() |
| 152 | + return r.drivers[id] |
| 153 | +} |
| 154 | + |
| 155 | +// RegisterIndexDriver registers a new index driver. |
| 156 | +func (r *IndexRegistry) RegisterIndexDriver(driver IndexDriver) { |
| 157 | + r.driversMut.Lock() |
| 158 | + defer r.driversMut.Unlock() |
| 159 | + r.drivers[driver.ID()] = driver |
| 160 | +} |
| 161 | + |
| 162 | +func (r *IndexRegistry) retainIndex(db, id string) { |
| 163 | + r.rcmut.Lock() |
| 164 | + defer r.rcmut.Unlock() |
| 165 | + key := indexKey{db, id} |
| 166 | + r.refCounts[key] = r.refCounts[key] + 1 |
| 167 | +} |
| 168 | + |
| 169 | +// CanUseIndex returns whether the given index is ready to use or not. |
| 170 | +func (r *IndexRegistry) CanUseIndex(idx Index) bool { |
| 171 | + r.mut.RLock() |
| 172 | + defer r.mut.RUnlock() |
| 173 | + return bool(r.statuses[indexKey{idx.Database(), idx.ID()}]) |
| 174 | +} |
| 175 | + |
| 176 | +// setStatus is not thread-safe, it should be guarded using mut. |
| 177 | +func (r *IndexRegistry) setStatus(idx Index, status IndexStatus) { |
| 178 | + r.statuses[indexKey{idx.Database(), idx.ID()}] = status |
| 179 | +} |
| 180 | + |
| 181 | +// ReleaseIndex releases an index after it's been used. |
| 182 | +func (r *IndexRegistry) ReleaseIndex(idx Index) { |
| 183 | + r.rcmut.Lock() |
| 184 | + defer r.rcmut.Unlock() |
| 185 | + key := indexKey{idx.Database(), idx.ID()} |
| 186 | + r.refCounts[key] = r.refCounts[key] - 1 |
| 187 | + |
| 188 | + if r.refCounts[key] > 0 { |
| 189 | + return |
| 190 | + } |
| 191 | + |
| 192 | + if ch, ok := r.deleteIndexQueue[key]; ok { |
| 193 | + close(ch) |
| 194 | + delete(r.deleteIndexQueue, key) |
| 195 | + } |
| 196 | +} |
| 197 | + |
| 198 | +// Index returns the index with the given id. It may return nil if the index is |
| 199 | +// not found. |
| 200 | +func (r *IndexRegistry) Index(db, id string) Index { |
| 201 | + r.mut.RLock() |
| 202 | + defer r.mut.RUnlock() |
| 203 | + return r.indexes[indexKey{db, strings.ToLower(id)}] |
| 204 | +} |
| 205 | + |
| 206 | +// IndexByExpression returns an index by the given expression. It will return |
| 207 | +// nil it the index is not found. |
| 208 | +func (r *IndexRegistry) IndexByExpression(db string, expr Expression) Index { |
| 209 | + r.mut.RLock() |
| 210 | + defer r.mut.RUnlock() |
| 211 | + |
| 212 | + for _, idx := range r.indexes { |
| 213 | + if reflect.DeepEqual(idx.Expression(), expr) && idx.Database() == db { |
| 214 | + r.retainIndex(db, idx.ID()) |
| 215 | + return idx |
| 216 | + } |
| 217 | + } |
| 218 | + return nil |
| 219 | +} |
| 220 | + |
| 221 | +var ( |
| 222 | + // ErrIndexIDAlreadyRegistered is the error returned when there is already |
| 223 | + // an index with the same ID. |
| 224 | + ErrIndexIDAlreadyRegistered = errors.NewKind("an index with id %q has already been registered") |
| 225 | + |
| 226 | + // ErrIndexExpressionAlreadyRegistered is the error returned when there is |
| 227 | + // already an index with the same expression. |
| 228 | + ErrIndexExpressionAlreadyRegistered = errors.NewKind("there is already an index registered for the expression %q") |
| 229 | + |
| 230 | + // ErrIndexNotFound is returned when the index could not be found. |
| 231 | + ErrIndexNotFound = errors.NewKind("index %q was not found") |
| 232 | + |
| 233 | + // ErrIndexDeleteInvalidStatus is returned when the index trying to delete |
| 234 | + // does not have a ready state. |
| 235 | + ErrIndexDeleteInvalidStatus = errors.NewKind("can't delete index %q because it's not ready for usage") |
| 236 | +) |
| 237 | + |
| 238 | +func (r *IndexRegistry) validateIndexToAdd(idx Index) error { |
| 239 | + r.mut.RLock() |
| 240 | + defer r.mut.RUnlock() |
| 241 | + |
| 242 | + for _, i := range r.indexes { |
| 243 | + if i.Database() != idx.Database() { |
| 244 | + continue |
| 245 | + } |
| 246 | + |
| 247 | + if i.ID() == idx.ID() { |
| 248 | + return ErrIndexIDAlreadyRegistered.New(idx.ID()) |
| 249 | + } |
| 250 | + |
| 251 | + if reflect.DeepEqual(i.Expression(), idx.Expression()) { |
| 252 | + return ErrIndexExpressionAlreadyRegistered.New(idx.Expression()) |
| 253 | + } |
| 254 | + } |
| 255 | + |
| 256 | + return nil |
| 257 | +} |
| 258 | + |
| 259 | +// AddIndex adds the given index to the registry. The added index will be |
| 260 | +// marked as creating, so nobody can't register two indexes with the same |
| 261 | +// expression or id while the other is still being created. |
| 262 | +// When something is sent through the returned channel, it means the index has |
| 263 | +// finished it's creation and will be marked as ready. |
| 264 | +func (r *IndexRegistry) AddIndex(idx Index) (chan<- struct{}, error) { |
| 265 | + if err := r.validateIndexToAdd(idx); err != nil { |
| 266 | + return nil, err |
| 267 | + } |
| 268 | + |
| 269 | + r.mut.Lock() |
| 270 | + r.setStatus(idx, IndexNotReady) |
| 271 | + r.indexes[indexKey{idx.Database(), idx.ID()}] = idx |
| 272 | + r.mut.Unlock() |
| 273 | + |
| 274 | + var created = make(chan struct{}) |
| 275 | + go func() { |
| 276 | + <-created |
| 277 | + r.mut.Lock() |
| 278 | + defer r.mut.Unlock() |
| 279 | + r.setStatus(idx, IndexReady) |
| 280 | + }() |
| 281 | + |
| 282 | + return created, nil |
| 283 | +} |
| 284 | + |
| 285 | +// DeleteIndex deletes an index from the registry by its id. First, it marks |
| 286 | +// the index for deletion but does not remove it, so queries that are using it |
| 287 | +// may still do so. The returned channel will send a message when the index can |
| 288 | +// be deleted from disk. |
| 289 | +func (r *IndexRegistry) DeleteIndex(db, id string) (<-chan struct{}, error) { |
| 290 | + r.mut.RLock() |
| 291 | + var key indexKey |
| 292 | + for k, idx := range r.indexes { |
| 293 | + if strings.ToLower(id) == idx.ID() { |
| 294 | + if !r.CanUseIndex(idx) { |
| 295 | + r.mut.RUnlock() |
| 296 | + return nil, ErrIndexDeleteInvalidStatus.New(id) |
| 297 | + } |
| 298 | + r.setStatus(idx, IndexNotReady) |
| 299 | + key = k |
| 300 | + break |
| 301 | + } |
| 302 | + } |
| 303 | + r.mut.RUnlock() |
| 304 | + |
| 305 | + if key.id == "" { |
| 306 | + return nil, ErrIndexNotFound.New(id) |
| 307 | + } |
| 308 | + |
| 309 | + var done = make(chan struct{}, 1) |
| 310 | + |
| 311 | + r.rcmut.Lock() |
| 312 | + // If no query is using this index just delete it right away |
| 313 | + if r.refCounts[key] == 0 { |
| 314 | + r.mut.Lock() |
| 315 | + defer r.mut.Unlock() |
| 316 | + defer r.rcmut.Unlock() |
| 317 | + |
| 318 | + delete(r.indexes, key) |
| 319 | + close(done) |
| 320 | + return done, nil |
| 321 | + } |
| 322 | + |
| 323 | + var onReadyToDelete = make(chan struct{}) |
| 324 | + r.deleteIndexQueue[key] = onReadyToDelete |
| 325 | + r.rcmut.Unlock() |
| 326 | + |
| 327 | + go func() { |
| 328 | + <-onReadyToDelete |
| 329 | + r.mut.Lock() |
| 330 | + defer r.mut.Unlock() |
| 331 | + delete(r.indexes, key) |
| 332 | + |
| 333 | + done <- struct{}{} |
| 334 | + }() |
| 335 | + |
| 336 | + return done, nil |
| 337 | +} |
| 338 | + |
| 339 | +// IndexStatus represents the current status in which the index is. |
| 340 | +type IndexStatus bool |
| 341 | + |
| 342 | +const ( |
| 343 | + // IndexNotReady means the index is not ready to be used. |
| 344 | + IndexNotReady IndexStatus = false |
| 345 | + // IndexReady means the index can be used. |
| 346 | + IndexReady IndexStatus = true |
| 347 | +) |
| 348 | + |
| 349 | +// IsUsable returns whether the index can be used or not based on the status. |
| 350 | +func (s IndexStatus) IsUsable() bool { |
| 351 | + return s == IndexReady |
| 352 | +} |
| 353 | + |
| 354 | +func (s IndexStatus) String() string { |
| 355 | + switch s { |
| 356 | + case IndexReady: |
| 357 | + return "ready" |
| 358 | + default: |
| 359 | + return "not ready" |
| 360 | + } |
104 | 361 | }
|
0 commit comments