@@ -54,8 +54,8 @@ func (a *Adjudicator) ensureConcluded(ctx context.Context, req channel.Adjudicat
5454 subErr := make (chan error , 1 )
5555 waitCtx , cancel := context .WithCancel (ctx )
5656 go func () {
57+ defer cancel ()
5758 subErr <- sub .Read (ctx , events )
58- cancel ()
5959 }()
6060
6161 // In final Register calls, as the non-initiator, we optimistically wait for
@@ -71,17 +71,12 @@ func (a *Adjudicator) ensureConcluded(ctx context.Context, req channel.Adjudicat
7171 }
7272
7373 // No conclude event found in the past, send transaction.
74- if req .Tx .IsFinal {
75- err = errors .WithMessage (a .callConcludeFinal (ctx , req ), "calling concludeFinal" )
76- } else {
77- err = errors .WithMessage (a .callConclude (ctx , req , subStates ), "calling conclude" )
78- }
79- if IsErrTxFailed (err ) {
80- a .log .Warn ("Calling conclude(Final) failed, waiting for event anyways..." )
81- } else if err != nil {
82- return err
74+ err = a .conclude (ctx , req , subStates )
75+ if err != nil {
76+ return errors .WithMessage (err , "concluding" )
8377 }
8478
79+ // Wait for concluded event.
8580 for {
8681 select {
8782 case _e := <- events :
@@ -100,6 +95,25 @@ func (a *Adjudicator) ensureConcluded(ctx context.Context, req channel.Adjudicat
10095 }
10196}
10297
98+ func (a * Adjudicator ) conclude (ctx context.Context , req channel.AdjudicatorReq , subStates channel.StateMap ) error {
99+ // If the on-chain state resulted from forced execution, we do not have a fully-signed state and cannot call concludeFinal.
100+ forceExecuted , err := a .isForceExecuted (ctx , req .Params .ID ())
101+ if err != nil {
102+ return errors .WithMessage (err , "checking force execution" )
103+ }
104+ if req .Tx .IsFinal && ! forceExecuted {
105+ err = errors .WithMessage (a .callConcludeFinal (ctx , req ), "calling concludeFinal" )
106+ } else {
107+ err = errors .WithMessage (a .callConclude (ctx , req , subStates ), "calling conclude" )
108+ }
109+ if IsErrTxFailed (err ) {
110+ a .log .WithError (err ).Warn ("Calling conclude(Final) failed, waiting for event anyways..." )
111+ } else if err != nil {
112+ return err
113+ }
114+ return nil
115+ }
116+
103117// isConcluded returns whether a channel is already concluded.
104118func (a * Adjudicator ) isConcluded (ctx context.Context , sub * subscription.EventSub ) (bool , error ) {
105119 events := make (chan * subscription.Event , 10 )
@@ -119,6 +133,36 @@ func (a *Adjudicator) isConcluded(ctx context.Context, sub *subscription.EventSu
119133 return false , errors .WithMessage (<- subErr , "reading past events" )
120134}
121135
136+ // isForceExecuted returns whether a channel is in the forced execution phase.
137+ func (a * Adjudicator ) isForceExecuted (_ctx context.Context , c channel.ID ) (bool , error ) {
138+ ctx , cancel := context .WithCancel (_ctx )
139+ defer cancel ()
140+ sub , err := subscription .NewEventSub (ctx , a .ContractBackend , a .bound , updateEventType (c ), startBlockOffset )
141+ if err != nil {
142+ return false , errors .WithMessage (err , "subscribing" )
143+ }
144+ defer sub .Close ()
145+ events := make (chan * subscription.Event , 10 )
146+ subErr := make (chan error , 1 )
147+ // Write the events into events.
148+ go func () {
149+ defer close (events )
150+ subErr <- sub .ReadPast (ctx , events )
151+ }()
152+ // Read all events and check for force execution.
153+ var lastEvent * subscription.Event
154+ for _e := range events {
155+ lastEvent = _e
156+ }
157+ if lastEvent != nil {
158+ e := lastEvent .Data .(* adjudicator.AdjudicatorChannelUpdate )
159+ if e .Phase == phaseForceExec {
160+ return true , nil
161+ }
162+ }
163+ return false , errors .WithMessage (<- subErr , "reading past events" )
164+ }
165+
122166func updateEventType (channelID [32 ]byte ) subscription.EventFactory {
123167 return func () * subscription.Event {
124168 return & subscription.Event {
0 commit comments