Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (sa *ShardAccessor) Blockstore() (ReadBlockstore, error) {
}

// Close terminates this shard accessor, releasing any resources associated
// with it, and decrementing internal refcounts.
// with it
func (sa *ShardAccessor) Close() error {
if err := sa.data.Close(); err != nil {
log.Warnf("failed to close mount when closing shard accessor: %s", err)
Expand All @@ -88,6 +88,5 @@ func (sa *ShardAccessor) Close() error {
}
sa.lk.Unlock()

tsk := &task{op: OpShardRelease, shard: sa.shard}
return sa.shard.d.queueTask(tsk, sa.shard.d.externalCh)
return nil
}
241 changes: 72 additions & 169 deletions dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ var (
type DAGStore struct {
lk sync.RWMutex
mounts *mount.Registry
shards map[shard.Key]*Shard
config Config
indices index.FullIndexRepo
store ds.Datastore
Expand All @@ -94,8 +93,6 @@ type DAGStore struct {
// back to the application. Serviced by a dispatcher goroutine.
// See note in dispatchResultsCh for background.
dispatchFailuresCh chan *dispatch
// gcCh is where requests for GC are sent.
gcCh chan chan *GCResult

// Channels not owned by us.
//
Expand All @@ -106,8 +103,8 @@ type DAGStore struct {

// Throttling.
//
throttleReaadyFetch throttle.Throttler
throttleIndex throttle.Throttler
throttleReadyFetch throttle.Throttler
throttleIndex throttle.Throttler

// Lifecycle.
//
Expand Down Expand Up @@ -223,86 +220,44 @@ func NewDAGStore(cfg Config) (*DAGStore, error) {

ctx, cancel := context.WithCancel(context.Background())
dagst := &DAGStore{
mounts: cfg.MountRegistry,
config: cfg,
indices: cfg.IndexRepo,
TopLevelIndex: cfg.TopLevelIndex,
shards: make(map[shard.Key]*Shard),
store: cfg.Datastore,
externalCh: make(chan *task, 128), // len=128, concurrent external tasks that can be queued up before exercising backpressure.
internalCh: make(chan *task, 1), // len=1, because eventloop will only ever stage another internal event.
completionCh: make(chan *task, 64), // len=64, hitting this limit will just make async tasks wait.
dispatchResultsCh: make(chan *dispatch, 128), // len=128, same as externalCh.
gcCh: make(chan chan *GCResult, 8),
traceCh: cfg.TraceCh,
failureCh: cfg.FailureCh,
throttleIndex: throttle.Noop(),
throttleReaadyFetch: throttle.Noop(),
ctx: ctx,
cancelFn: cancel,
mounts: cfg.MountRegistry,
config: cfg,
indices: cfg.IndexRepo,
TopLevelIndex: cfg.TopLevelIndex,
store: cfg.Datastore,
externalCh: make(chan *task, 128), // len=128, concurrent external tasks that can be queued up before exercising backpressure.
internalCh: make(chan *task, 1), // len=1, because eventloop will only ever stage another internal event.
completionCh: make(chan *task, 64), // len=64, hitting this limit will just make async tasks wait.
dispatchResultsCh: make(chan *dispatch, 128), // len=128, same as externalCh.
traceCh: cfg.TraceCh,
failureCh: cfg.FailureCh,
throttleIndex: throttle.Noop(),
throttleReadyFetch: throttle.Noop(),
ctx: ctx,
cancelFn: cancel,
}

if max := cfg.MaxConcurrentIndex; max > 0 {
dagst.throttleIndex = throttle.Fixed(max)
}

if max := cfg.MaxConcurrentReadyFetches; max > 0 {
dagst.throttleReaadyFetch = throttle.Fixed(max)
dagst.throttleReadyFetch = throttle.Fixed(max)
}

return dagst, nil
}

// Start starts a DAG store.
func (d *DAGStore) Start(ctx context.Context) error {
if err := d.restoreState(); err != nil {
// TODO add a lenient mode.
return fmt.Errorf("failed to restore dagstore state: %w", err)
if d.config.RecoverOnStart == RecoverNow {
// TODO: query shards in error state and queue them for recovery
// TODO: Seems like this will not scale, maybe we should allow
// manual recovery only
}

if err := d.clearOrphaned(); err != nil {
log.Warnf("failed to clear orphaned files on startup: %s", err)
}

// Reset in-progress states.
//
// Queue shards whose registration needs to be restarted. Release those
// ops after we spawn the control goroutine. Otherwise, having more shards
// in this state than the externalCh buffer size would exceed the channel
// buffer, and we'd block forever.
var toRegister, toRecover []*Shard
for _, s := range d.shards {
switch s.state {
case ShardStateErrored:
switch d.config.RecoverOnStart {
case DoNotRecover:
log.Infow("start: skipping recovery of shard in errored state", "shard", s.key, "error", s.err)
case RecoverOnAcquire:
log.Infow("start: failed shard will recover on next acquire", "shard", s.key, "error", s.err)
s.recoverOnNextAcquire = true
case RecoverNow:
log.Infow("start: recovering failed shard immediately", "shard", s.key, "error", s.err)
toRecover = append(toRecover, s)
}

case ShardStateServing:
// reset to available, as we have no active acquirers at start.
s.state = ShardStateAvailable
case ShardStateAvailable:
// Noop: An available shard whose index has disappeared across restarts
// will fail on the first acquisition.
case ShardStateInitializing:
// handle shards that were initializing when we shut down.
// if we already have the index for the shard, there's nothing else to do.
if istat, err := d.indices.StatFullIndex(s.key); err == nil && istat.Exists {
s.state = ShardStateAvailable
} else {
// reset back to new, and queue the OpShardRegister.
s.state = ShardStateNew
toRegister = append(toRegister, s)
}
}
}
// Find shards in the New state and queue them for initialization
var toRegister []*Shard // TODO: query for shards in New state

// spawn the control goroutine.
d.wg.Add(1)
Expand All @@ -325,10 +280,10 @@ func (d *DAGStore) Start(ctx context.Context) error {
_ = d.queueTask(&task{op: OpShardRegister, shard: s, waiter: &waiter{ctx: ctx}}, d.externalCh)
}

// queue shard recovery for shards in the errored state before we return.
for _, s := range toRecover {
_ = d.queueTask(&task{op: OpShardRecover, shard: s, waiter: &waiter{ctx: ctx}}, d.externalCh)
}
//// queue shard recovery for shards in the errored state before we return.
//for _, s := range toRecover {
// _ = d.queueTask(&task{op: OpShardRecover, shard: s, waiter: &waiter{ctx: ctx}}, d.externalCh)
//}

return nil
}
Expand Down Expand Up @@ -373,31 +328,22 @@ type RegisterOpts struct {
// Otherwise, it queues the shard for registration. The caller should monitor
// supplied channel for a result.
func (d *DAGStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan ShardResult, opts RegisterOpts) error {
d.lk.Lock()
if _, ok := d.shards[key]; ok {
d.lk.Unlock()
return fmt.Errorf("%s: %w", key.String(), ErrShardExists)
}

// wrap the original mount in an upgrader.
upgraded, err := mount.Upgrade(mnt, d.throttleReaadyFetch, d.config.TransientsDir, key.String(), opts.ExistingTransient)
upgraded, err := mount.Upgrade(mnt, d.throttleReadyFetch, d.config.TransientsDir, key.String(), opts.ExistingTransient)
if err != nil {
d.lk.Unlock()
return err
}

w := &waiter{outCh: out, ctx: ctx}

// add the shard to the shard catalogue, and drop the lock.
// queue the shard for registration
s := &Shard{
d: d,
key: key,
state: ShardStateNew,
mount: upgraded,
lazy: opts.LazyInitialization,
}
d.shards[key] = s
d.lk.Unlock()

tsk := &task{op: OpShardRegister, shard: s, waiter: w}
return d.queueTask(tsk, d.externalCh)
Expand All @@ -407,14 +353,7 @@ type DestroyOpts struct {
}

func (d *DAGStore) DestroyShard(ctx context.Context, key shard.Key, out chan ShardResult, _ DestroyOpts) error {
d.lk.Lock()
s, ok := d.shards[key]
if !ok {
d.lk.Unlock()
return ErrShardUnknown // TODO: encode shard key
}
d.lk.Unlock()

s := &Shard{d: d, key: key}
tsk := &task{op: OpShardDestroy, shard: s, waiter: &waiter{ctx: ctx, outCh: out}}
return d.queueTask(tsk, d.externalCh)
}
Expand All @@ -433,14 +372,7 @@ type AcquireOpts struct {
// Otherwise, it queues the shard for acquisition. The caller should monitor
// supplied channel for a result.
func (d *DAGStore) AcquireShard(ctx context.Context, key shard.Key, out chan ShardResult, _ AcquireOpts) error {
d.lk.Lock()
s, ok := d.shards[key]
if !ok {
d.lk.Unlock()
return fmt.Errorf("%s: %w", key.String(), ErrShardUnknown)
}
d.lk.Unlock()

s := &Shard{d: d, key: key}
tsk := &task{op: OpShardAcquire, shard: s, waiter: &waiter{ctx: ctx, outCh: out}}
return d.queueTask(tsk, d.externalCh)
}
Expand All @@ -462,14 +394,7 @@ type RecoverOpts struct {
// TODO add an operation identifier to ShardResult -- starts to look like
// a Trace event?
func (d *DAGStore) RecoverShard(ctx context.Context, key shard.Key, out chan ShardResult, _ RecoverOpts) error {
d.lk.Lock()
s, ok := d.shards[key]
if !ok {
d.lk.Unlock()
return fmt.Errorf("%s: %w", key.String(), ErrShardUnknown)
}
d.lk.Unlock()

s := &Shard{d: d, key: key}
tsk := &task{op: OpShardRecover, shard: s, waiter: &waiter{ctx: ctx, outCh: out}}
return d.queueTask(tsk, d.externalCh)
}
Expand All @@ -483,61 +408,49 @@ type Trace struct {
type ShardInfo struct {
ShardState
Error error
refs uint32
}

// GetShardInfo returns the current state of shard with key k.
//
// If the shard is not known, ErrShardUnknown is returned.
func (d *DAGStore) GetShardInfo(k shard.Key) (ShardInfo, error) {
d.lk.RLock()
defer d.lk.RUnlock()
s, ok := d.shards[k]
if !ok {
return ShardInfo{}, ErrShardUnknown
val, err := d.store.Get(d.ctx, ds.NewKey(k.String()))
if err != nil {
return ShardInfo{}, fmt.Errorf("getting info for shard %s: %w", k, err)
}

s.lk.RLock()
info := ShardInfo{ShardState: s.state, Error: s.err, refs: s.refs}
s.lk.RUnlock()
s := Shard{d: d}
if err := s.UnmarshalJSON(val); err != nil {
return ShardInfo{}, fmt.Errorf("unmarshalling shard %s: %w", k, err)
}
info := ShardInfo{ShardState: s.state, Error: s.err}
return info, nil
}

type AllShardsInfo map[shard.Key]ShardInfo

// AllShardsInfo returns the current state of all registered shards, as well as
// any errors.
func (d *DAGStore) AllShardsInfo() AllShardsInfo {
d.lk.RLock()
defer d.lk.RUnlock()

ret := make(AllShardsInfo, len(d.shards))
for k, s := range d.shards {
s.lk.RLock()
info := ShardInfo{ShardState: s.state, Error: s.err, refs: s.refs}
s.lk.RUnlock()
ret[k] = info
}
return ret
}

// GC performs DAG store garbage collection by reclaiming transient files of
// shards that are currently available but inactive, or errored.
//
// GC runs with exclusivity from the event loop.
func (d *DAGStore) GC(ctx context.Context) (*GCResult, error) {
ch := make(chan *GCResult)
select {
case d.gcCh <- ch:
case <-ctx.Done():
return nil, ctx.Err()
func (d *DAGStore) AllShardsInfo() (AllShardsInfo, error) {
ret := make(AllShardsInfo)
results, err := d.store.Query(d.ctx, query.Query{})
if err != nil {
return nil, fmt.Errorf("failed to get dagstore state from store: %w", err)
}
for {
res, ok := results.NextSync()
if !ok {
return nil, nil
}
s := Shard{d: d}
if err := s.UnmarshalJSON(res.Value); err != nil {
log.Warnf("failed to get state of shard %s: %s; skipping", shard.KeyFromString(res.Key), err)
continue
}

select {
case res := <-ch:
return res, nil
case <-ctx.Done():
return nil, ctx.Err()
ret[s.key] = ShardInfo{
ShardState: s.state,
Error: s.err,
}
}
}

Expand All @@ -557,28 +470,6 @@ func (d *DAGStore) queueTask(tsk *task, ch chan<- *task) error {
}
}

func (d *DAGStore) restoreState() error {
results, err := d.store.Query(d.ctx, query.Query{})
if err != nil {
return fmt.Errorf("failed to recover dagstore state from store: %w", err)
}
for {
res, ok := results.NextSync()
if !ok {
return nil
}
s := &Shard{d: d}
if err := s.UnmarshalJSON(res.Value); err != nil {
log.Warnf("failed to recover state of shard %s: %s; skipping", shard.KeyFromString(res.Key), err)
continue
}

log.Debugw("restored shard state on dagstore startup", "shard", s.key, "shard state", s.state, "shard error", s.err,
"shard lazy", s.lazy)
d.shards[s.key] = s
}
}

// ensureDir checks whether the specified path is a directory, and if not it
// attempts to create it.
func ensureDir(path string) error {
Expand All @@ -601,3 +492,15 @@ func (d *DAGStore) failShard(s *Shard, ch chan *task, format string, args ...int
err := fmt.Errorf(format, args...)
return d.queueTask(&task{op: OpShardFail, shard: s, err: err}, ch)
}

func (d *DAGStore) shardFromPersistentState(s *Shard) error {
val, err := d.store.Get(d.ctx, ds.NewKey(s.key.String()))
if err != nil {
return fmt.Errorf("getting shard %s from datastore: %w", s.key, err)
}
if err := s.UnmarshalJSON(val); err != nil {
return fmt.Errorf("unmarshalling shard %s: %w", s.key, err)
}
s.d = d
return nil
}
Loading