Skip to content

Commit

Permalink
Fix handshake workers to process pending events (#2742)
Browse files Browse the repository at this point in the history
* Fix e2e to show the handshake worker bugs

* Process pending handshake events on first NewBlock event.

* Terminate handshake on OpenAckand OpenConfirm events

* Address review comments

* Add test for completion of connection and channel handshakes on start

* Add handshake completion tests after try and ack, plus negative testes when the worker is disabled

Co-authored-by: Romain Ruetschi <romain@informal.systems>
  • Loading branch information
ancazamfir and romac authored Oct 27, 2022
1 parent cead6c3 commit 9ce6c21
Show file tree
Hide file tree
Showing 11 changed files with 622 additions and 19 deletions.
10 changes: 8 additions & 2 deletions crates/relayer/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {
&mut self,
state: State,
) -> Result<(Option<IbcEvent>, Next), ChannelError> {
let res = match (state, self.counterparty_state()?) {
let event = match (state, self.counterparty_state()?) {
(State::Init, State::Uninitialized) => Some(self.build_chan_open_try_and_send()?),
(State::Init, State::Init) => Some(self.build_chan_open_try_and_send()?),
(State::TryOpen, State::Init) => Some(self.build_chan_open_ack_and_send()?),
Expand All @@ -762,7 +762,13 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {
_ => None,
};

Ok((res, Next::Continue))
// Abort if the channel is at OpenAck or OpenConfirm stage, as there is nothing more for the worker to do
match event {
Some(IbcEvent::OpenConfirmChannel(_)) | Some(IbcEvent::OpenAckChannel(_)) => {
Ok((event, Next::Abort))
}
_ => Ok((event, Next::Continue)),
}
}

pub fn step_state(&mut self, state: State, index: u64) -> RetryResult<Next, u64> {
Expand Down
8 changes: 7 additions & 1 deletion crates/relayer/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,13 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Connection<ChainA, ChainB> {
_ => None,
};

Ok((event, Next::Continue))
// Abort if the connection is at OpenAck or OpenConfirm stage, as there is nothing more for the worker to do
match event {
Some(IbcEvent::OpenConfirmConnection(_)) | Some(IbcEvent::OpenAckConnection(_)) => {
Ok((event, Next::Abort))
}
_ => Ok((event, Next::Continue)),
}
}

pub fn step_state(&mut self, state: State, index: u64) -> RetryResult<Next, u64> {
Expand Down
30 changes: 27 additions & 3 deletions crates/relayer/src/worker/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub fn spawn_channel_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
chains: ChainHandlePair<ChainA, ChainB>,
cmd_rx: Receiver<WorkerCmd>,
) -> TaskHandle {
let mut complete_handshake_on_new_block = true;
spawn_background_task(
error_span!("worker.channel", channel = %channel.short_name()),
Some(Duration::from_millis(200)),
Expand All @@ -31,6 +32,7 @@ pub fn spawn_channel_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
let last_event = batch.events.last();
debug!("starts processing {:?}", last_event);

complete_handshake_on_new_block = false;
if let Some(event_with_height) = last_event {
let mut handshake_channel = RelayChannel::restore_from_event(
chains.a.clone(),
Expand All @@ -48,11 +50,33 @@ pub fn spawn_channel_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
}
}

// nothing to do
WorkerCmd::NewBlock { .. } => Ok(Next::Continue),
WorkerCmd::NewBlock {
height: current_height,
new_block: _,
} if complete_handshake_on_new_block => {
debug!("starts processing block event at {:#?}", current_height);

let height = current_height
.decrement()
.map_err(|e| TaskError::Fatal(RunError::ics02(e)))?;

let (mut handshake_channel, state) = RelayChannel::restore_from_state(
chains.a.clone(),
chains.b.clone(),
channel.clone(),
height,
)
.map_err(|e| TaskError::Fatal(RunError::channel(e)))?;

complete_handshake_on_new_block = false;
retry_with_index(retry_strategy::worker_default_strategy(), |index| {
handshake_channel.step_state(state, index)
})
.map_err(|e| TaskError::Fatal(RunError::retry(e)))
}

// nothing to do
WorkerCmd::ClearPendingPackets => Ok(Next::Continue),
_ => Ok(Next::Continue),
}
} else {
Ok(Next::Continue)
Expand Down
32 changes: 29 additions & 3 deletions crates/relayer/src/worker/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub fn spawn_connection_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
chains: ChainHandlePair<ChainA, ChainB>,
cmd_rx: Receiver<WorkerCmd>,
) -> TaskHandle {
let mut complete_handshake_on_new_block = true;
spawn_background_task(
error_span!("worker.connection", connection = %connection.short_name()),
Some(Duration::from_millis(200)),
Expand All @@ -32,6 +33,7 @@ pub fn spawn_connection_worker<ChainA: ChainHandle, ChainB: ChainHandle>(

debug!("starts processing {:?}", last_event_with_height);

complete_handshake_on_new_block = false;
if let Some(event_with_height) = last_event_with_height {
let mut handshake_connection = RelayConnection::restore_from_event(
chains.a.clone(),
Expand All @@ -49,11 +51,35 @@ pub fn spawn_connection_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
}
}

// nothing to do
WorkerCmd::NewBlock { .. } => Ok(Next::Continue),
WorkerCmd::NewBlock {
height: current_height,
new_block: _,
} if complete_handshake_on_new_block => {
debug!("starts processing block event at {}", current_height);

let height = current_height
.decrement()
.map_err(|e| TaskError::Fatal(RunError::ics02(e)))?;

let (mut handshake_connection, state) =
RelayConnection::restore_from_state(
chains.a.clone(),
chains.b.clone(),
connection.clone(),
height,
)
.map_err(|e| TaskError::Fatal(RunError::connection(e)))?;

complete_handshake_on_new_block = false;

retry_with_index(retry_strategy::worker_default_strategy(), |index| {
handshake_connection.step_state(state, index)
})
.map_err(|e| TaskError::Fatal(RunError::retry(e)))
}

// nothing to do
WorkerCmd::ClearPendingPackets => Ok(Next::Continue),
_ => Ok(Next::Continue),
}
} else {
Ok(Next::Continue)
Expand Down
7 changes: 2 additions & 5 deletions e2e/e2e/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,13 +548,10 @@ def verify_state(c: Config,
ibc1_chan_id: ChannelId, port_id: PortId):

mode = toml.load(c.config_file)['mode']
clients_enabled = mode['clients']['enabled']
conn_enabled = mode['connections']['enabled']
chan_enabled = mode['channels']['enabled']
packets_enabled = mode['packets']['enabled']

# verify connection state on both chains, should be 'Open' or 'Init' depending on config 'mode'
if clients_enabled and conn_enabled and chan_enabled and packets_enabled:
# verify channel state on both chains, should be 'Open' or 'Init' depending on config 'mode'
if chan_enabled:
sleep(10.0)
for i in range(20):
sleep(2.0)
Expand Down
5 changes: 1 addition & 4 deletions e2e/e2e/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,10 @@ def verify_state(c: Config,
ibc1_conn_id: ConnectionId):

mode = toml.load(c.config_file)['mode']
clients_enabled = mode['clients']['enabled']
conn_enabled = mode['connections']['enabled']
chan_enabled = mode['channels']['enabled']
packets_enabled = mode['packets']['enabled']

# verify connection state on both chains, should be 'Open' or 'Init' depending on config 'mode'
if clients_enabled and conn_enabled and chan_enabled and packets_enabled:
if conn_enabled:
sleep(10.0)
for i in range(20):
sleep(5.0)
Expand Down
Loading

0 comments on commit 9ce6c21

Please sign in to comment.