Skip to content

tapgarden: fix races and deadlocks in caretaker #693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Dec 8, 2023
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ COMMIT := $(shell git describe --tags --dirty)

GOBUILD := GOEXPERIMENT=loopvar GO111MODULE=on go build -v
GOINSTALL := GOEXPERIMENT=loopvar GO111MODULE=on go install -v
GOTEST := GOEXPERIMENT=loopvar GO111MODULE=on go test
GOTEST := GOEXPERIMENT=loopvar GO111MODULE=on go test
GOMOD := GO111MODULE=on go mod

GOLIST := go list -deps $(PKG)/... | grep '$(PKG)'
Expand Down Expand Up @@ -168,7 +168,7 @@ unit-cover: $(GOACC_BIN)

unit-race:
@$(call print, "Running unit race tests.")
env CGO_ENABLED=1 GORACE="history_size=7 halt_on_errors=1" $(GOLIST) | $(XARGS) env $(GOTEST) -race -test.timeout=20m
env CGO_ENABLED=1 GORACE="history_size=7 halt_on_errors=1" $(UNIT_RACE)

itest: build-itest itest-only

Expand Down
17 changes: 17 additions & 0 deletions fn/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,20 @@ func First[T any](xs []*T, pred func(*T) bool) (*T, error) {

return nil, fmt.Errorf("no item found")
}

// Last returns the last item in the slice that matches the predicate, or an
// error if none matches.
func Last[T any](xs []*T, pred func(*T) bool) (*T, error) {
var matches []*T
for i := range xs {
if pred(xs[i]) {
matches = append(matches, xs[i])
}
}

if len(matches) == 0 {
return nil, fmt.Errorf("no item found")
}

return matches[len(matches)-1], nil
}
4 changes: 3 additions & 1 deletion make/testing_flags.mk
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ LOG_TAGS =
TEST_FLAGS =
ITEST_FLAGS = -logoutput
COVER_PKG = $$(go list -deps -tags="$(DEV_TAGS)" ./... | grep '$(PKG)' | grep -v lnrpc)
RACE_PKG = go list -deps -tags="$(DEV_TAGS)" ./... | grep '$(PKG)'
COVER_HTML = go tool cover -html=coverage.txt -o coverage.html
POSTGRES_START_DELAY = 5

Expand All @@ -19,6 +20,7 @@ ifneq ($(pkg),)
UNITPKG := $(PKG)/$(pkg)
UNIT_TARGETED = yes
COVER_PKG = $(PKG)/$(pkg)
RACE_PKG = $(PKG)/$(pkg)
endif

# If a specific unit test case is being target, construct test.run filter.
Expand Down Expand Up @@ -78,7 +80,7 @@ TEST_FLAGS += -test.timeout=$(timeout)
else ifneq ($(optional),)
TEST_FLAGS += -test.timeout=240m
else
TEST_FLAGS += -test.timeout=60m
TEST_FLAGS += -test.timeout=20m
endif

GOLIST := go list -tags="$(DEV_TAGS)" -deps $(PKG)/... | grep '$(PKG)'| grep -v '/vendor/'
Expand Down
184 changes: 122 additions & 62 deletions tapgarden/caretaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,62 +166,82 @@ func (b *BatchCaretaker) Start() error {
func (b *BatchCaretaker) Stop() error {
var stopErr error
b.stopOnce.Do(func() {
log.Infof("BatchCaretaker(%x): Stopping", b.batchKey[:])

close(b.Quit)
b.Wg.Wait()
})

return stopErr
}

// Cancel signals for a batch caretaker to stop advancing a batch if possible.
// A batch can only be cancelled if it has not reached BatchStateBroadcast yet.
func (b *BatchCaretaker) Cancel() CancelResp {
// Cancel signals for a batch caretaker to stop advancing a batch. A batch can
// only be cancelled if it has not reached BatchStateBroadcast yet. If
// cancellation succeeds, we forward the batch state after cancellation. If the
// batch could not be cancelled, the planter will handle caretaker shutdown and
// batch state.
func (b *BatchCaretaker) Cancel() error {
ctx, cancel := b.WithCtxQuit()
defer cancel()

batchKey := b.cfg.Batch.BatchKey.PubKey.SerializeCompressed()
batchKey := b.batchKey[:]
batchState := b.cfg.Batch.State()
var cancelResp CancelResp

// This function can only be called before the caretaker state stepping
// function, so the batch state read is the next state that has not yet
// been executed. Seedlings are converted to asset sprouts in the Frozen
// state, and broadcast in the Broadast state.
log.Debugf("BatchCaretaker(%x): Trying to cancel", batchKey)
switch batchState {
// In the pending state, the batch seedlings have not sprouted yet.
case BatchStatePending, BatchStateFrozen:
finalBatchState := BatchStateSeedlingCancelled
err := b.cfg.Log.UpdateBatchState(
ctx, b.cfg.Batch.BatchKey.PubKey,
finalBatchState,
BatchStateSeedlingCancelled,
)
if err != nil {
err = fmt.Errorf("BatchCaretaker(%x), batch state(%v), "+
"cancel failed: %w", batchKey, batchState, err)
}

b.cfg.BroadcastErrChan <- fmt.Errorf("caretaker canceled")

return CancelResp{&finalBatchState, err}
cancelResp = CancelResp{true, err}

case BatchStateCommitted:
finalBatchState := BatchStateSproutCancelled
err := b.cfg.Log.UpdateBatchState(
ctx, b.cfg.Batch.BatchKey.PubKey,
finalBatchState,
BatchStateSproutCancelled,
)
if err != nil {
err = fmt.Errorf("BatchCaretaker(%x), batch state(%v), "+
"cancel failed: %w", batchKey, batchState, err)
}

b.cfg.BroadcastErrChan <- fmt.Errorf("caretaker canceled")

return CancelResp{&finalBatchState, err}
cancelResp = CancelResp{true, err}

default:
err := fmt.Errorf("BatchCaretaker(%x), batch not cancellable",
b.cfg.Batch.BatchKey.PubKey.SerializeCompressed())
return CancelResp{nil, err}
cancelResp = CancelResp{false, err}
}

b.cfg.CancelRespChan <- cancelResp

// If the batch was cancellable, the final write of the cancelled batch
// may still have failed. That error will be handled by the planter. At
// this point, the caretaker should shut down gracefully if cancellation
// was attempted.
if cancelResp.cancelAttempted {
log.Infof("BatchCaretaker(%x), attempted batch cancellation, "+
"shutting down", b.batchKey[:])

return nil
}

// If the cancellation failed, that error will be handled by the
// planter.
return fmt.Errorf("BatchCaretaker(%x) cancellation failed",
b.batchKey[:])
}

// advanceStateUntil attempts to advance the internal state machine until the
Expand All @@ -241,22 +261,20 @@ func (b *BatchCaretaker) advanceStateUntil(currentState,
return 0, fmt.Errorf("BatchCaretaker(%x), shutting "+
"down", b.batchKey[:])

// If the batch was cancellable, the finalState of the cancel
// response will be non-nil. If the cancellation failed, that
// error will be handled by the planter. At this point, the
// caretaker should always shut down gracefully.
case <-b.cfg.CancelReqChan:
cancelResp := b.Cancel()
b.cfg.CancelRespChan <- cancelResp

// TODO(jhb): Use concrete error types for caretaker
// shutdown cases
// If the batch was cancellable, the finalState of the
// cancel response will be non-nil. If the cancellation
// failed, that error will be handled by the planter.
// At this point, the caretaker should always shut down
// gracefully.
if cancelResp.finalState != nil {
cancelErr := b.Cancel()
if cancelErr == nil {
return 0, fmt.Errorf("BatchCaretaker(%x), "+
"attempted batch cancellation, "+
"shutting down", b.batchKey[:])
}

log.Info(cancelErr)

default:
}

Expand Down Expand Up @@ -313,7 +331,7 @@ func (b *BatchCaretaker) assetCultivator() {
currentBatchState, BatchStateBroadcast,
)
if err != nil {
log.Errorf("unable to advance state machine: %v", err)
log.Errorf("Unable to advance state machine: %v", err)
b.cfg.BroadcastErrChan <- err
return
}
Expand Down Expand Up @@ -360,7 +378,12 @@ func (b *BatchCaretaker) assetCultivator() {
return

case <-b.cfg.CancelReqChan:
b.cfg.CancelRespChan <- b.Cancel()
cancelErr := b.Cancel()
if cancelErr == nil {
return
}

log.Error(cancelErr)

case <-b.Quit:
return
Expand Down Expand Up @@ -740,7 +763,7 @@ func (b *BatchCaretaker) stateStep(currentState BatchState) (BatchState, error)
b.cfg.Batch.GenesisPacket.ChainFees = chainFees

log.Infof("BatchCaretaker(%x): GenesisPacket absolute fee: "+
"%d sats", chainFees)
"%d sats", b.batchKey[:], chainFees)
log.Infof("BatchCaretaker(%x): GenesisPacket finalized",
b.batchKey[:])
log.Tracef("GenesisPacket: %v", spew.Sdump(signedPkt))
Expand Down Expand Up @@ -848,48 +871,85 @@ func (b *BatchCaretaker) stateStep(currentState BatchState) (BatchState, error)
defer confCancel()
defer b.Wg.Done()

var confEvent *chainntnfs.TxConfirmation
select {
case confEvent = <-confNtfn.Confirmed:
log.Debugf("Got chain confirmation: %v",
confEvent.Tx.TxHash())

case err := <-errChan:
b.cfg.ErrChan <- fmt.Errorf("error getting "+
"confirmation: %w", err)
return

case <-confCtx.Done():
log.Debugf("Skipping TX confirmation, context " +
"done")

case <-b.cfg.CancelReqChan:
b.cfg.CancelRespChan <- b.Cancel()
var (
confEvent *chainntnfs.TxConfirmation
confRecv bool
)

case <-b.Quit:
log.Debugf("Skipping TX confirmation, exiting")
return
for !confRecv {
select {
case confEvent = <-confNtfn.Confirmed:
confRecv = true

case err := <-errChan:
confErr := fmt.Errorf("error getting "+
"confirmation: %w", err)
log.Info(confErr)
b.cfg.ErrChan <- confErr

return

case <-confCtx.Done():
log.Debugf("Skipping TX confirmation, " +
"context done")
confRecv = true

case <-b.cfg.CancelReqChan:
cancelErr := b.Cancel()
if cancelErr == nil {
return
}

// Cancellation failed, continue to wait
// for transaction confirmation.
log.Info(cancelErr)

case <-b.Quit:
log.Debugf("Skipping TX confirmation, " +
"exiting")
return
}
}

if confEvent == nil {
b.cfg.ErrChan <- fmt.Errorf("got empty " +
confErr := fmt.Errorf("got empty " +
"confirmation event in batch")
log.Info(confErr)
b.cfg.ErrChan <- confErr

return
}

select {
case b.confEvent <- confEvent:

case <-confCtx.Done():
log.Debugf("Skipping TX confirmation, context " +
"done")

case <-b.cfg.CancelReqChan:
b.cfg.CancelRespChan <- b.Cancel()
if confEvent.Tx != nil {
log.Debugf("Got chain confirmation: %v",
confEvent.Tx.TxHash())
}

case <-b.Quit:
log.Debugf("Skipping TX confirmation, exiting")
return
for {
select {
case b.confEvent <- confEvent:
return

case <-confCtx.Done():
log.Debugf("Skipping TX confirmation, " +
"context done")
return

case <-b.cfg.CancelReqChan:
cancelErr := b.Cancel()
if cancelErr == nil {
return
}

// Cancellation failed, continue to try
// and send the confirmation event.
log.Info(cancelErr)

case <-b.Quit:
log.Debugf("Skipping TX confirmation, " +
"exiting")
return
}
}
}()

Expand Down
Loading