diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index 5c53624b..03e99252 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -38,7 +38,8 @@ type Monitor struct { } type Config struct { - // Max time to wait for other side to accept open channel request before attempting restart + // Max time to wait for other side to accept open channel request before attempting restart. + // Set to 0 to disable timeout. AcceptTimeout time.Duration // Debounce when restart is triggered by multiple errors RestartDebounce time.Duration @@ -46,11 +47,9 @@ type Config struct { RestartBackoff time.Duration // Number of times to try to restart before failing MaxConsecutiveRestarts uint32 - // Max time to wait for the peer to acknowledge a restart request. - // Note: Does not include the time taken to reconnect to the peer. - RestartAckTimeout time.Duration // Max time to wait for the responder to send a Complete message once all - // data has been sent + // data has been sent. + // Set to 0 to disable timeout. CompleteTimeout time.Duration // Called when a restart completes successfully OnRestartComplete func(id datatransfer.ChannelID) @@ -74,17 +73,14 @@ func checkConfig(cfg *Config) { } prefix := "data-transfer channel monitor config " - if cfg.AcceptTimeout <= 0 { - panic(fmt.Sprintf(prefix+"AcceptTimeout is %s but must be > 0", cfg.AcceptTimeout)) + if cfg.AcceptTimeout < 0 { + panic(fmt.Sprintf(prefix+"AcceptTimeout is %s but must be >= 0", cfg.AcceptTimeout)) } if cfg.MaxConsecutiveRestarts == 0 { panic(fmt.Sprintf(prefix+"MaxConsecutiveRestarts is %d but must be > 0", cfg.MaxConsecutiveRestarts)) } - if cfg.RestartAckTimeout <= 0 { - panic(fmt.Sprintf(prefix+"RestartAckTimeout is %s but must be > 0", cfg.RestartAckTimeout)) - } - if cfg.CompleteTimeout <= 0 { - panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be > 0", cfg.CompleteTimeout)) + if cfg.CompleteTimeout < 0 { + panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be >= 0", cfg.CompleteTimeout)) } } @@ -275,6 +271,11 @@ func (mc *monitoredChannel) start() { // an Accept to our open channel request before the accept timeout. // Returns a function that can be used to cancel the timer. func (mc *monitoredChannel) watchForResponderAccept() func() { + // Check if the accept timeout is disabled + if mc.cfg.AcceptTimeout == 0 { + return func() {} + } + // Start a timer for the accept timeout timer := time.NewTimer(mc.cfg.AcceptTimeout) @@ -297,6 +298,11 @@ func (mc *monitoredChannel) watchForResponderAccept() func() { // Wait up to the configured timeout for the responder to send a Complete message func (mc *monitoredChannel) watchForResponderComplete() { + // Check if the complete timeout is disabled + if mc.cfg.CompleteTimeout == 0 { + return + } + // Start a timer for the complete timeout timer := time.NewTimer(mc.cfg.CompleteTimeout) defer timer.Stop() @@ -308,7 +314,7 @@ func (mc *monitoredChannel) watchForResponderComplete() { case <-timer.C: // Timer expired before we received a Complete message from the responder err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer", - mc.chid, mc.cfg.AcceptTimeout) + mc.chid, mc.cfg.CompleteTimeout) mc.closeChannelAndShutdown(err) } } @@ -414,8 +420,7 @@ func (mc *monitoredChannel) doRestartChannel() error { err := mc.sendRestartMessage(restartCount) if err != nil { log.Warnf("%s: restart failed, trying again: %s", mc.chid, err) - // If the restart message could not be sent, or there was a timeout - // waiting for the restart to be acknowledged, try again + // If the restart message could not be sent, try again return mc.doRestartChannel() } log.Infof("%s: restart completed successfully", mc.chid) @@ -438,25 +443,12 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error { log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start)) // Send a restart message for the channel - restartResult := mc.waitForRestartResponse() log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount) err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid) if err != nil { return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err) } - // The restart message is fire and forget, so we need to watch for a - // restart response to know that the restart message reached the peer. - select { - case <-mc.ctx.Done(): - return nil // channel shutdown so just bail out - case err = <-restartResult: - if err != nil { - return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err) - } - } - log.Infof("%s: received restart response from %s", mc.chid, p) - // The restart message was sent successfully. // If a restart backoff is configured, backoff after a restart before // attempting another. @@ -490,47 +482,3 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) { log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err) } } - -// Wait for the peer to send an acknowledgement to the restart request -func (mc *monitoredChannel) waitForRestartResponse() chan error { - restartFired := make(chan struct{}) - restarted := make(chan error, 1) - timer := time.NewTimer(mc.cfg.RestartAckTimeout) - - unsub := mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { - if channelState.ChannelID() != mc.chid { - return - } - - // The Restart event is fired when we receive an acknowledgement - // from the peer that it has received a restart request - if event.Code == datatransfer.Restart { - close(restartFired) - } - }) - - go func() { - defer unsub() - defer timer.Stop() - - select { - - // Restart ack received from peer - case <-restartFired: - restarted <- nil - - // Channel monitor shutdown, just bail out - case <-mc.ctx.Done(): - restarted <- nil - - // Timer expired before receiving a restart ack from peer - case <-timer.C: - p := mc.chid.OtherParty(mc.mgr.PeerID()) - err := xerrors.Errorf("did not receive response to restart request from %s after %s", - p, mc.cfg.RestartAckTimeout) - restarted <- err - } - }() - - return restarted -} diff --git a/channelmonitor/channelmonitor_test.go b/channelmonitor/channelmonitor_test.go index 81f4177b..eefe3fc7 100644 --- a/channelmonitor/channelmonitor_test.go +++ b/channelmonitor/channelmonitor_test.go @@ -27,7 +27,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) { name string errReconnect bool errSendRestartMsg bool - timeoutRestartAck bool } testCases := []testCase{{ name: "attempt restart", @@ -37,9 +36,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) { }, { name: "fail to send restart message", errSendRestartMsg: true, - }, { - name: "timeout waiting for restart message ack from peer", - timeoutRestartAck: true, }} runTest := func(name string, isPush bool) { @@ -59,7 +55,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) { m := NewMonitor(mockAPI, &Config{ AcceptTimeout: time.Hour, MaxConsecutiveRestarts: 1, - RestartAckTimeout: 20 * time.Millisecond, CompleteTimeout: time.Hour, }) @@ -96,16 +91,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) { err := mockAPI.awaitRestartSent() require.NoError(t, err) - // If simulating a restart ack timeout, don't fire the restart - // ack event and expect the channel to be closed with an error - if tc.timeoutRestartAck { - mockAPI.verifyChannelClosed(t, true) - return - } - - // Simulate receiving restart message ack from responder - mockAPI.restartEvent() - if isPush { // Simulate sending the remaining data mockAPI.dataSent(5) @@ -145,7 +130,6 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) { m := NewMonitor(mockAPI, &Config{ AcceptTimeout: time.Hour, MaxConsecutiveRestarts: uint32(maxConsecutiveRestarts), - RestartAckTimeout: time.Hour, CompleteTimeout: time.Hour, }) @@ -169,9 +153,6 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) { err := mockAPI.awaitRestartSent() require.NoError(t, err) - // Simulate receiving restart ack from peer - mockAPI.restartEvent() - err = awaitRestartComplete(mch) require.NoError(t, err) } @@ -232,7 +213,6 @@ func TestChannelMonitorQueuedRestart(t *testing.T) { AcceptTimeout: time.Hour, RestartDebounce: 10 * time.Millisecond, MaxConsecutiveRestarts: 3, - RestartAckTimeout: time.Hour, CompleteTimeout: time.Hour, }) @@ -256,9 +236,6 @@ func TestChannelMonitorQueuedRestart(t *testing.T) { // Trigger another error event before the restart has completed triggerErrorEvent() - // Simulate receiving restart ack from peer (for first restart) - mockAPI.restartEvent() - // A second restart should be sent because of the second error err = mockAPI.awaitRestartSent() require.NoError(t, err) @@ -273,9 +250,11 @@ func TestChannelMonitorQueuedRestart(t *testing.T) { func TestChannelMonitorTimeouts(t *testing.T) { type testCase struct { - name string - expectAccept bool - expectComplete bool + name string + expectAccept bool + expectComplete bool + acceptTimeoutDisabled bool + completeTimeoutDisabled bool } testCases := []testCase{{ name: "accept in time", @@ -284,6 +263,10 @@ func TestChannelMonitorTimeouts(t *testing.T) { }, { name: "accept too late", expectAccept: false, + }, { + name: "disable accept timeout", + acceptTimeoutDisabled: true, + expectAccept: true, }, { name: "complete in time", expectAccept: true, @@ -292,6 +275,11 @@ func TestChannelMonitorTimeouts(t *testing.T) { name: "complete too late", expectAccept: true, expectComplete: false, + }, { + name: "disable complete timeout", + completeTimeoutDisabled: true, + expectAccept: true, + expectComplete: true, }} runTest := func(name string, isPush bool) { @@ -309,10 +297,15 @@ func TestChannelMonitorTimeouts(t *testing.T) { acceptTimeout := 10 * time.Millisecond completeTimeout := 10 * time.Millisecond + if tc.acceptTimeoutDisabled { + acceptTimeout = 0 + } + if tc.completeTimeoutDisabled { + completeTimeout = 0 + } m := NewMonitor(mockAPI, &Config{ AcceptTimeout: acceptTimeout, MaxConsecutiveRestarts: 1, - RestartAckTimeout: time.Hour, CompleteTimeout: completeTimeout, }) @@ -520,10 +513,6 @@ func (m *mockMonitorAPI) receiveDataErrorEvent() { m.fireEvent(datatransfer.Event{Code: datatransfer.ReceiveDataError}, m.ch) } -func (m *mockMonitorAPI) restartEvent() { - m.fireEvent(datatransfer.Event{Code: datatransfer.Restart}, m.ch) -} - type mockChannelState struct { chid datatransfer.ChannelID queued uint64 diff --git a/impl/integration_test.go b/impl/integration_test.go index f08eebb1..2a4e48f8 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -731,7 +731,6 @@ func TestAutoRestart(t *testing.T) { RestartDebounce: 500 * time.Millisecond, RestartBackoff: 500 * time.Millisecond, MaxConsecutiveRestarts: 10, - RestartAckTimeout: 100 * time.Millisecond, CompleteTimeout: 100 * time.Millisecond, }) initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, restartConf)