Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 54 additions & 10 deletions backend/ethereum/channel/conclude.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func (a *Adjudicator) ensureConcluded(ctx context.Context, req channel.Adjudicat
subErr := make(chan error, 1)
waitCtx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
subErr <- sub.Read(ctx, events)
cancel()
}()

// In final Register calls, as the non-initiator, we optimistically wait for
Expand All @@ -71,17 +71,12 @@ func (a *Adjudicator) ensureConcluded(ctx context.Context, req channel.Adjudicat
}

// No conclude event found in the past, send transaction.
if req.Tx.IsFinal {
err = errors.WithMessage(a.callConcludeFinal(ctx, req), "calling concludeFinal")
} else {
err = errors.WithMessage(a.callConclude(ctx, req, subStates), "calling conclude")
}
if IsErrTxFailed(err) {
a.log.Warn("Calling conclude(Final) failed, waiting for event anyways...")
} else if err != nil {
return err
err = a.conclude(ctx, req, subStates)
if err != nil {
return errors.WithMessage(err, "concluding")
}

// Wait for concluded event.
for {
select {
case _e := <-events:
Expand All @@ -100,6 +95,25 @@ func (a *Adjudicator) ensureConcluded(ctx context.Context, req channel.Adjudicat
}
}

func (a *Adjudicator) conclude(ctx context.Context, req channel.AdjudicatorReq, subStates channel.StateMap) error {
// If the on-chain state resulted from forced execution, we do not have a fully-signed state and cannot call concludeFinal.
forceExecuted, err := a.isForceExecuted(ctx, req.Params.ID())
if err != nil {
return errors.WithMessage(err, "checking force execution")
}
if req.Tx.IsFinal && !forceExecuted {
err = errors.WithMessage(a.callConcludeFinal(ctx, req), "calling concludeFinal")
} else {
err = errors.WithMessage(a.callConclude(ctx, req, subStates), "calling conclude")
}
if IsErrTxFailed(err) {
a.log.WithError(err).Warn("Calling conclude(Final) failed, waiting for event anyways...")
} else if err != nil {
return err
}
return nil
}

// isConcluded returns whether a channel is already concluded.
func (a *Adjudicator) isConcluded(ctx context.Context, sub *subscription.EventSub) (bool, error) {
events := make(chan *subscription.Event, 10)
Expand All @@ -119,6 +133,36 @@ func (a *Adjudicator) isConcluded(ctx context.Context, sub *subscription.EventSu
return false, errors.WithMessage(<-subErr, "reading past events")
}

// isForceExecuted returns whether a channel is in the forced execution phase.
func (a *Adjudicator) isForceExecuted(_ctx context.Context, c channel.ID) (bool, error) {
ctx, cancel := context.WithCancel(_ctx)
defer cancel()
sub, err := subscription.NewEventSub(ctx, a.ContractBackend, a.bound, updateEventType(c), startBlockOffset)
if err != nil {
return false, errors.WithMessage(err, "subscribing")
}
defer sub.Close()
events := make(chan *subscription.Event, 10)
subErr := make(chan error, 1)
// Write the events into events.
go func() {
defer close(events)
subErr <- sub.ReadPast(ctx, events)
}()
// Read all events and check for force execution.
var lastEvent *subscription.Event
for _e := range events {
lastEvent = _e
}
if lastEvent != nil {
e := lastEvent.Data.(*adjudicator.AdjudicatorChannelUpdate)
if e.Phase == phaseForceExec {
return true, nil
}
}
return false, errors.WithMessage(<-subErr, "reading past events")
}

func updateEventType(channelID [32]byte) subscription.EventFactory {
return func() *subscription.Event {
return &subscription.Event{
Expand Down