Skip to content

Commit c0784a9

Browse files
authored
Merge pull request src-d#179 from erizocosmico/feature/index-catalog
sql: implement index catalog
2 parents ad35890 + c2f9e5c commit c0784a9

File tree

3 files changed

+406
-10
lines changed

3 files changed

+406
-10
lines changed

sql/catalog.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ var ErrDatabaseNotFound = errors.NewKind("database not found: %s")
1111
type Catalog struct {
1212
Databases
1313
FunctionRegistry
14+
*IndexRegistry
1415
}
1516

1617
// NewCatalog returns a new empty Catalog.
1718
func NewCatalog() *Catalog {
1819
return &Catalog{
1920
Databases: Databases{},
2021
FunctionRegistry: NewFunctionRegistry(),
22+
IndexRegistry: NewIndexRegistry(),
2123
}
2224
}
2325

sql/index.go

Lines changed: 267 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
package sql
22

3-
import "io"
3+
import (
4+
"context"
5+
"io"
6+
"reflect"
7+
"strings"
8+
"sync"
9+
10+
"gopkg.in/src-d/go-errors.v1"
11+
)
412

513
// IndexKeyValueIter is an iterator of index key values, that is, a tuple of
614
// the values that will be index keys.
@@ -28,6 +36,10 @@ type Index interface {
2836
Has(key interface{}) (bool, error)
2937
// ID returns the identifier of the index.
3038
ID() string
39+
// Database returns the database name this index belongs to.
40+
Database() string
41+
// Table returns the table name this index belongs to.
42+
Table() string
3143
// Expression returns the indexed expression.
3244
Expression() Expression
3345
}
@@ -87,18 +99,263 @@ type Mergeable interface {
8799
IsMergeable(IndexLookup) bool
88100
}
89101

90-
// IndexLoader is the piece that loads indexes from disk.
91-
type IndexLoader interface {
92-
// ID returns the unique name of the index loader.
102+
// IndexDriver manages the coordination between the indexes and their
103+
// representation in disk.
104+
type IndexDriver interface {
105+
// ID returns the unique name of the driver.
93106
ID() string
107+
// Create a new index for the given expression and id.
108+
Create(path, db, id string, expr Expression) (Index, error)
94109
// Load the index at the given path.
95110
Load(path string) (Index, error)
111+
// Save the given index at the given path.
112+
Save(ctx context.Context, path string, index Index, iter IndexKeyValueIter) error
113+
// Delete the index with the given path.
114+
Delete(path string, index Index) error
96115
}
97116

98-
// IndexSaver is the piece that stores indexes in disk.
99-
type IndexSaver interface {
100-
// ID returns the unique name of the index saver.
101-
ID() string
102-
// Save the given index at the given path.
103-
Save(index Index, path string) error
117+
type indexKey struct {
118+
db, id string
119+
}
120+
121+
// IndexRegistry keeps track of all indexes in the engine.
122+
type IndexRegistry struct {
123+
root string
124+
125+
mut sync.RWMutex
126+
indexes map[indexKey]Index
127+
statuses map[indexKey]IndexStatus
128+
129+
driversMut sync.RWMutex
130+
drivers map[string]IndexDriver
131+
132+
rcmut sync.RWMutex
133+
refCounts map[indexKey]int
134+
deleteIndexQueue map[indexKey]chan<- struct{}
135+
}
136+
137+
// NewIndexRegistry returns a new Index Registry.
138+
func NewIndexRegistry() *IndexRegistry {
139+
return &IndexRegistry{
140+
indexes: make(map[indexKey]Index),
141+
statuses: make(map[indexKey]IndexStatus),
142+
drivers: make(map[string]IndexDriver),
143+
refCounts: make(map[indexKey]int),
144+
deleteIndexQueue: make(map[indexKey]chan<- struct{}),
145+
}
146+
}
147+
148+
// IndexDriver returns the IndexDriver with the given ID.
149+
func (r *IndexRegistry) IndexDriver(id string) IndexDriver {
150+
r.driversMut.RLock()
151+
defer r.driversMut.RUnlock()
152+
return r.drivers[id]
153+
}
154+
155+
// RegisterIndexDriver registers a new index driver.
156+
func (r *IndexRegistry) RegisterIndexDriver(driver IndexDriver) {
157+
r.driversMut.Lock()
158+
defer r.driversMut.Unlock()
159+
r.drivers[driver.ID()] = driver
160+
}
161+
162+
func (r *IndexRegistry) retainIndex(db, id string) {
163+
r.rcmut.Lock()
164+
defer r.rcmut.Unlock()
165+
key := indexKey{db, id}
166+
r.refCounts[key] = r.refCounts[key] + 1
167+
}
168+
169+
// CanUseIndex returns whether the given index is ready to use or not.
170+
func (r *IndexRegistry) CanUseIndex(idx Index) bool {
171+
r.mut.RLock()
172+
defer r.mut.RUnlock()
173+
return bool(r.statuses[indexKey{idx.Database(), idx.ID()}])
174+
}
175+
176+
// setStatus is not thread-safe, it should be guarded using mut.
177+
func (r *IndexRegistry) setStatus(idx Index, status IndexStatus) {
178+
r.statuses[indexKey{idx.Database(), idx.ID()}] = status
179+
}
180+
181+
// ReleaseIndex releases an index after it's been used.
182+
func (r *IndexRegistry) ReleaseIndex(idx Index) {
183+
r.rcmut.Lock()
184+
defer r.rcmut.Unlock()
185+
key := indexKey{idx.Database(), idx.ID()}
186+
r.refCounts[key] = r.refCounts[key] - 1
187+
188+
if r.refCounts[key] > 0 {
189+
return
190+
}
191+
192+
if ch, ok := r.deleteIndexQueue[key]; ok {
193+
close(ch)
194+
delete(r.deleteIndexQueue, key)
195+
}
196+
}
197+
198+
// Index returns the index with the given id. It may return nil if the index is
199+
// not found.
200+
func (r *IndexRegistry) Index(db, id string) Index {
201+
r.mut.RLock()
202+
defer r.mut.RUnlock()
203+
return r.indexes[indexKey{db, strings.ToLower(id)}]
204+
}
205+
206+
// IndexByExpression returns an index by the given expression. It will return
207+
// nil it the index is not found.
208+
func (r *IndexRegistry) IndexByExpression(db string, expr Expression) Index {
209+
r.mut.RLock()
210+
defer r.mut.RUnlock()
211+
212+
for _, idx := range r.indexes {
213+
if reflect.DeepEqual(idx.Expression(), expr) && idx.Database() == db {
214+
r.retainIndex(db, idx.ID())
215+
return idx
216+
}
217+
}
218+
return nil
219+
}
220+
221+
var (
222+
// ErrIndexIDAlreadyRegistered is the error returned when there is already
223+
// an index with the same ID.
224+
ErrIndexIDAlreadyRegistered = errors.NewKind("an index with id %q has already been registered")
225+
226+
// ErrIndexExpressionAlreadyRegistered is the error returned when there is
227+
// already an index with the same expression.
228+
ErrIndexExpressionAlreadyRegistered = errors.NewKind("there is already an index registered for the expression %q")
229+
230+
// ErrIndexNotFound is returned when the index could not be found.
231+
ErrIndexNotFound = errors.NewKind("index %q was not found")
232+
233+
// ErrIndexDeleteInvalidStatus is returned when the index trying to delete
234+
// does not have a ready state.
235+
ErrIndexDeleteInvalidStatus = errors.NewKind("can't delete index %q because it's not ready for usage")
236+
)
237+
238+
func (r *IndexRegistry) validateIndexToAdd(idx Index) error {
239+
r.mut.RLock()
240+
defer r.mut.RUnlock()
241+
242+
for _, i := range r.indexes {
243+
if i.Database() != idx.Database() {
244+
continue
245+
}
246+
247+
if i.ID() == idx.ID() {
248+
return ErrIndexIDAlreadyRegistered.New(idx.ID())
249+
}
250+
251+
if reflect.DeepEqual(i.Expression(), idx.Expression()) {
252+
return ErrIndexExpressionAlreadyRegistered.New(idx.Expression())
253+
}
254+
}
255+
256+
return nil
257+
}
258+
259+
// AddIndex adds the given index to the registry. The added index will be
260+
// marked as creating, so nobody can't register two indexes with the same
261+
// expression or id while the other is still being created.
262+
// When something is sent through the returned channel, it means the index has
263+
// finished it's creation and will be marked as ready.
264+
func (r *IndexRegistry) AddIndex(idx Index) (chan<- struct{}, error) {
265+
if err := r.validateIndexToAdd(idx); err != nil {
266+
return nil, err
267+
}
268+
269+
r.mut.Lock()
270+
r.setStatus(idx, IndexNotReady)
271+
r.indexes[indexKey{idx.Database(), idx.ID()}] = idx
272+
r.mut.Unlock()
273+
274+
var created = make(chan struct{})
275+
go func() {
276+
<-created
277+
r.mut.Lock()
278+
defer r.mut.Unlock()
279+
r.setStatus(idx, IndexReady)
280+
}()
281+
282+
return created, nil
283+
}
284+
285+
// DeleteIndex deletes an index from the registry by its id. First, it marks
286+
// the index for deletion but does not remove it, so queries that are using it
287+
// may still do so. The returned channel will send a message when the index can
288+
// be deleted from disk.
289+
func (r *IndexRegistry) DeleteIndex(db, id string) (<-chan struct{}, error) {
290+
r.mut.RLock()
291+
var key indexKey
292+
for k, idx := range r.indexes {
293+
if strings.ToLower(id) == idx.ID() {
294+
if !r.CanUseIndex(idx) {
295+
r.mut.RUnlock()
296+
return nil, ErrIndexDeleteInvalidStatus.New(id)
297+
}
298+
r.setStatus(idx, IndexNotReady)
299+
key = k
300+
break
301+
}
302+
}
303+
r.mut.RUnlock()
304+
305+
if key.id == "" {
306+
return nil, ErrIndexNotFound.New(id)
307+
}
308+
309+
var done = make(chan struct{}, 1)
310+
311+
r.rcmut.Lock()
312+
// If no query is using this index just delete it right away
313+
if r.refCounts[key] == 0 {
314+
r.mut.Lock()
315+
defer r.mut.Unlock()
316+
defer r.rcmut.Unlock()
317+
318+
delete(r.indexes, key)
319+
close(done)
320+
return done, nil
321+
}
322+
323+
var onReadyToDelete = make(chan struct{})
324+
r.deleteIndexQueue[key] = onReadyToDelete
325+
r.rcmut.Unlock()
326+
327+
go func() {
328+
<-onReadyToDelete
329+
r.mut.Lock()
330+
defer r.mut.Unlock()
331+
delete(r.indexes, key)
332+
333+
done <- struct{}{}
334+
}()
335+
336+
return done, nil
337+
}
338+
339+
// IndexStatus represents the current status in which the index is.
340+
type IndexStatus bool
341+
342+
const (
343+
// IndexNotReady means the index is not ready to be used.
344+
IndexNotReady IndexStatus = false
345+
// IndexReady means the index can be used.
346+
IndexReady IndexStatus = true
347+
)
348+
349+
// IsUsable returns whether the index can be used or not based on the status.
350+
func (s IndexStatus) IsUsable() bool {
351+
return s == IndexReady
352+
}
353+
354+
func (s IndexStatus) String() string {
355+
switch s {
356+
case IndexReady:
357+
return "ready"
358+
default:
359+
return "not ready"
360+
}
104361
}

0 commit comments

Comments
 (0)