Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

sql: implement index catalog #179

Merged
merged 1 commit into from
May 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sql/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ var ErrDatabaseNotFound = errors.NewKind("database not found: %s")
type Catalog struct {
Databases
FunctionRegistry
*IndexRegistry
}

// NewCatalog returns a new empty Catalog.
func NewCatalog() *Catalog {
return &Catalog{
Databases: Databases{},
FunctionRegistry: NewFunctionRegistry(),
IndexRegistry: NewIndexRegistry(),
}
}

Expand Down
277 changes: 267 additions & 10 deletions sql/index.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package sql

import "io"
import (
"context"
"io"
"reflect"
"strings"
"sync"

"gopkg.in/src-d/go-errors.v1"
)

// IndexKeyValueIter is an iterator of index key values, that is, a tuple of
// the values that will be index keys.
Expand Down Expand Up @@ -28,6 +36,10 @@ type Index interface {
Has(key interface{}) (bool, error)
// ID returns the identifier of the index.
ID() string
// Database returns the database name this index belongs to.
Database() string
// Table returns the table name this index belongs to.
Table() string
// Expression returns the indexed expression.
Expression() Expression
}
Expand Down Expand Up @@ -87,18 +99,263 @@ type Mergeable interface {
IsMergeable(IndexLookup) bool
}

// IndexLoader is the piece that loads indexes from disk.
type IndexLoader interface {
// ID returns the unique name of the index loader.
// IndexDriver manages the coordination between the indexes and their
// representation in disk.
type IndexDriver interface {
// ID returns the unique name of the driver.
ID() string
// Create a new index for the given expression and id.
Create(path, db, id string, expr Expression) (Index, error)
// Load the index at the given path.
Load(path string) (Index, error)
// Save the given index at the given path.
Save(ctx context.Context, path string, index Index, iter IndexKeyValueIter) error
// Delete the index with the given path.
Delete(path string, index Index) error
}

// IndexSaver is the piece that stores indexes in disk.
type IndexSaver interface {
// ID returns the unique name of the index saver.
ID() string
// Save the given index at the given path.
Save(index Index, path string) error
type indexKey struct {
db, id string
}

// IndexRegistry keeps track of all indexes in the engine.
type IndexRegistry struct {
root string

mut sync.RWMutex
indexes map[indexKey]Index
statuses map[indexKey]IndexStatus

driversMut sync.RWMutex
drivers map[string]IndexDriver

rcmut sync.RWMutex
refCounts map[indexKey]int
deleteIndexQueue map[indexKey]chan<- struct{}
}

// NewIndexRegistry returns a new Index Registry.
func NewIndexRegistry() *IndexRegistry {
return &IndexRegistry{
indexes: make(map[indexKey]Index),
statuses: make(map[indexKey]IndexStatus),
drivers: make(map[string]IndexDriver),
refCounts: make(map[indexKey]int),
deleteIndexQueue: make(map[indexKey]chan<- struct{}),
}
}

// IndexDriver returns the IndexDriver with the given ID.
func (r *IndexRegistry) IndexDriver(id string) IndexDriver {
r.driversMut.RLock()
defer r.driversMut.RUnlock()
return r.drivers[id]
}

// RegisterIndexDriver registers a new index driver.
func (r *IndexRegistry) RegisterIndexDriver(driver IndexDriver) {
r.driversMut.Lock()
defer r.driversMut.Unlock()
r.drivers[driver.ID()] = driver
}

func (r *IndexRegistry) retainIndex(db, id string) {
r.rcmut.Lock()
defer r.rcmut.Unlock()
key := indexKey{db, id}
r.refCounts[key] = r.refCounts[key] + 1
}

// CanUseIndex returns whether the given index is ready to use or not.
func (r *IndexRegistry) CanUseIndex(idx Index) bool {
r.mut.RLock()
defer r.mut.RUnlock()
return bool(r.statuses[indexKey{idx.Database(), idx.ID()}])
}

// setStatus is not thread-safe, it should be guarded using mut.
func (r *IndexRegistry) setStatus(idx Index, status IndexStatus) {
r.statuses[indexKey{idx.Database(), idx.ID()}] = status
}

// ReleaseIndex releases an index after it's been used.
func (r *IndexRegistry) ReleaseIndex(idx Index) {
r.rcmut.Lock()
defer r.rcmut.Unlock()
key := indexKey{idx.Database(), idx.ID()}
r.refCounts[key] = r.refCounts[key] - 1

if r.refCounts[key] > 0 {
return
}

if ch, ok := r.deleteIndexQueue[key]; ok {
close(ch)
delete(r.deleteIndexQueue, key)
}
}

// Index returns the index with the given id. It may return nil if the index is
// not found.
func (r *IndexRegistry) Index(db, id string) Index {
r.mut.RLock()
defer r.mut.RUnlock()
return r.indexes[indexKey{db, strings.ToLower(id)}]
}

// IndexByExpression returns an index by the given expression. It will return
// nil it the index is not found.
func (r *IndexRegistry) IndexByExpression(db string, expr Expression) Index {
r.mut.RLock()
defer r.mut.RUnlock()

for _, idx := range r.indexes {
if reflect.DeepEqual(idx.Expression(), expr) && idx.Database() == db {
r.retainIndex(db, idx.ID())
return idx
}
}
return nil
}

var (
// ErrIndexIDAlreadyRegistered is the error returned when there is already
// an index with the same ID.
ErrIndexIDAlreadyRegistered = errors.NewKind("an index with id %q has already been registered")

// ErrIndexExpressionAlreadyRegistered is the error returned when there is
// already an index with the same expression.
ErrIndexExpressionAlreadyRegistered = errors.NewKind("there is already an index registered for the expression %q")

// ErrIndexNotFound is returned when the index could not be found.
ErrIndexNotFound = errors.NewKind("index %q was not found")

// ErrIndexDeleteInvalidStatus is returned when the index trying to delete
// does not have a ready state.
ErrIndexDeleteInvalidStatus = errors.NewKind("can't delete index %q because it's not ready for usage")
)

func (r *IndexRegistry) validateIndexToAdd(idx Index) error {
r.mut.RLock()
defer r.mut.RUnlock()

for _, i := range r.indexes {
if i.Database() != idx.Database() {
continue
}

if i.ID() == idx.ID() {
return ErrIndexIDAlreadyRegistered.New(idx.ID())
}

if reflect.DeepEqual(i.Expression(), idx.Expression()) {
return ErrIndexExpressionAlreadyRegistered.New(idx.Expression())
}
}

return nil
}

// AddIndex adds the given index to the registry. The added index will be
// marked as creating, so nobody can't register two indexes with the same
// expression or id while the other is still being created.
// When something is sent through the returned channel, it means the index has
// finished it's creation and will be marked as ready.
func (r *IndexRegistry) AddIndex(idx Index) (chan<- struct{}, error) {
if err := r.validateIndexToAdd(idx); err != nil {
return nil, err
}

r.mut.Lock()
r.setStatus(idx, IndexNotReady)
r.indexes[indexKey{idx.Database(), idx.ID()}] = idx
r.mut.Unlock()

var created = make(chan struct{})
go func() {
<-created
r.mut.Lock()
defer r.mut.Unlock()
r.setStatus(idx, IndexReady)
}()

return created, nil
}

// DeleteIndex deletes an index from the registry by its id. First, it marks
// the index for deletion but does not remove it, so queries that are using it
// may still do so. The returned channel will send a message when the index can
// be deleted from disk.
func (r *IndexRegistry) DeleteIndex(db, id string) (<-chan struct{}, error) {
r.mut.RLock()
var key indexKey
for k, idx := range r.indexes {
if strings.ToLower(id) == idx.ID() {
if !r.CanUseIndex(idx) {
r.mut.RUnlock()
return nil, ErrIndexDeleteInvalidStatus.New(id)
}
r.setStatus(idx, IndexNotReady)
key = k
break
}
}
r.mut.RUnlock()

if key.id == "" {
return nil, ErrIndexNotFound.New(id)
}

var done = make(chan struct{}, 1)

r.rcmut.Lock()
// If no query is using this index just delete it right away
if r.refCounts[key] == 0 {
r.mut.Lock()
defer r.mut.Unlock()
defer r.rcmut.Unlock()

delete(r.indexes, key)
close(done)
return done, nil
}

var onReadyToDelete = make(chan struct{})
r.deleteIndexQueue[key] = onReadyToDelete
r.rcmut.Unlock()

go func() {
<-onReadyToDelete
r.mut.Lock()
defer r.mut.Unlock()
delete(r.indexes, key)

done <- struct{}{}
}()

return done, nil
}

// IndexStatus represents the current status in which the index is.
type IndexStatus bool

const (
// IndexNotReady means the index is not ready to be used.
IndexNotReady IndexStatus = false
// IndexReady means the index can be used.
IndexReady IndexStatus = true
)

// IsUsable returns whether the index can be used or not based on the status.
func (s IndexStatus) IsUsable() bool {
return s == IndexReady
}

func (s IndexStatus) String() string {
switch s {
case IndexReady:
return "ready"
default:
return "not ready"
}
}
Loading