Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

substrate runner: increase line read + dump CLI output if parsing fails #1781

Merged
merged 14 commits into from
Sep 24, 2024
Merged
15 changes: 14 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ thiserror = "1.0.63"
tokio = { version = "1.40", default-features = false }
tracing = { version = "0.1.40", default-features = false }
tracing-wasm = "0.2.1"
tracing-subscriber = "0.3.18"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
trybuild = "1.0.99"
url = "2.5.2"
wabt = "0.10.0"
Expand Down
3 changes: 2 additions & 1 deletion testing/integration-tests/src/full_client/blocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use crate::{subxt_test, test_context};
use crate::{subxt_test, test_context, utils::consume_initial_blocks};
use codec::{Compact, Encode};
use futures::StreamExt;

Expand Down Expand Up @@ -94,6 +94,7 @@ async fn finalized_headers_subscription() -> Result<(), subxt::Error> {
let api = ctx.client();

let mut sub = api.blocks().subscribe_finalized().await?;
consume_initial_blocks(&mut sub).await;

// check that the finalized block reported lines up with the `latest_finalized_block_ref`.
for _ in 0..2 {
Expand Down
48 changes: 25 additions & 23 deletions testing/integration-tests/src/full_client/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::collections::HashSet;

use crate::{
subxt_test, test_context, test_context_reconnecting_rpc_client,
utils::{node_runtime, wait_for_blocks},
utils::{consume_initial_blocks, node_runtime, wait_for_blocks},
};
use codec::{Decode, Encode};

Expand Down Expand Up @@ -420,29 +420,31 @@ async fn legacy_and_unstable_block_subscription_reconnect() {
let api = api.clone();
async move {
let mut missed_blocks = false;
(api.blocks()
.subscribe_finalized()
.await
.unwrap()
// Ignore `disconnected events`.
// This will be emitted by the legacy backend for every reconnection.
.filter(|item| {
let disconnected = match item {
Ok(_) => false,
Err(e) => {
if matches!(e, Error::Rpc(subxt::error::RpcError::DisconnectedWillReconnect(e)) if e.contains("Missed at least one block when the connection was lost")) {
missed_blocks = true;
}
e.is_disconnected_will_reconnect()
let mut sub = api.blocks().subscribe_finalized().await.unwrap();

consume_initial_blocks(&mut sub).await;

let blocks =
// Ignore `disconnected events`.
// This will be emitted by the legacy backend for every reconnection.
sub.filter(|item| {
let disconnected = match item {
Ok(_) => false,
Err(e) => {
if matches!(e, Error::Rpc(subxt::error::RpcError::DisconnectedWillReconnect(e)) if e.contains("Missed at least one block when the connection was lost")) {
missed_blocks = true;
}
};

futures::future::ready(!disconnected)
})
.take(num)
.map(|x| x.unwrap().hash().to_string())
.collect::<Vec<String>>()
.await, missed_blocks)
e.is_disconnected_will_reconnect()
}
};

futures::future::ready(!disconnected)
})
.take(num)
.map(|x| x.unwrap().hash().to_string())
.collect::<Vec<String>>().await;

(blocks, missed_blocks)
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
//! Just sanity checking some of the new RPC methods to try and
//! catch differences as the implementations evolve.

use crate::{subxt_test, test_context, utils::node_runtime};
use crate::{
subxt_test, test_context,
utils::{consume_initial_blocks, node_runtime},
};
use assert_matches::assert_matches;
use codec::Encode;
use futures::Stream;
Expand Down Expand Up @@ -341,8 +344,11 @@ async fn transaction_v1_broadcast() {

// Subscribe to finalized blocks.
let mut finalized_sub = api.blocks().subscribe_finalized().await.unwrap();

consume_initial_blocks(&mut finalized_sub).await;

// Expect the tx to be encountered in a maximum number of blocks.
let mut num_blocks: usize = 10;
let mut num_blocks: usize = 20;

// Submit the transaction.
let _operation_id = rpc
Expand Down
3 changes: 1 addition & 2 deletions testing/integration-tests/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ mod wait_for_blocks;

pub use context::*;
pub use node_proc::TestNodeProcess;
pub use wait_for_blocks::*;

pub use subxt_test_macro::subxt_test;
pub use wait_for_blocks::*;

/// The test timeout is set to 1 second.
/// However, the test is sleeping for 5 seconds.
Expand Down
27 changes: 26 additions & 1 deletion testing/integration-tests/src/utils/wait_for_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use subxt::{client::OnlineClientT, Config};
use subxt::{
backend::StreamOf, blocks::Block, client::OnlineClientT, Config, Error, OnlineClient,
SubstrateConfig,
};

/// Wait for blocks to be produced before running tests. Specifically, we
/// wait for one more finalized block to be produced, which is important because
Expand All @@ -23,3 +26,25 @@ pub async fn wait_for_number_of_blocks<C: Config>(
sub.next().await;
}
}

/// Consumes the initial blocks from the stream of blocks to ensure that the stream is up-to-date.
///
/// This may be useful on the unstable backend when the initial blocks may be large
/// and one relies on something to included in finalized block in ner future.
pub async fn consume_initial_blocks(
blocks: &mut StreamOf<Result<Block<SubstrateConfig, OnlineClient<SubstrateConfig>>, Error>>,
) {
use tokio::time::{interval_at, Duration, Instant};
const MAX_DURATION: Duration = Duration::from_millis(200);

let mut now = interval_at(Instant::now() + MAX_DURATION, MAX_DURATION);

loop {
tokio::select! {
_ = now.tick() => {
break;
}
_ = blocks.next() => {}
}
}
}
18 changes: 9 additions & 9 deletions testing/substrate-runner/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,26 @@
#[derive(Debug)]
pub enum Error {
Io(std::io::Error),
CouldNotExtractPort,
CouldNotExtractP2pAddress,
CouldNotExtractP2pPort,
CouldNotExtractPort(String),
CouldNotExtractP2pAddress(String),
CouldNotExtractP2pPort(String),
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Io(err) => write!(f, "IO error: {err}"),
Error::CouldNotExtractPort => write!(
Error::CouldNotExtractPort(log) => write!(
f,
"could not extract port from running substrate node's stdout"
"could not extract port from running substrate node's stdout: {log}"
),
Error::CouldNotExtractP2pAddress => write!(
Error::CouldNotExtractP2pAddress(log) => write!(
f,
"could not extract p2p address from running substrate node's stdout"
"could not extract p2p address from running substrate node's stdout: {log}"
),
Error::CouldNotExtractP2pPort => write!(
Error::CouldNotExtractP2pPort(log) => write!(
f,
"could not extract p2p port from running substrate node's stdout"
"could not extract p2p port from running substrate node's stdout: {log}"
),
}
}
Expand Down
57 changes: 48 additions & 9 deletions testing/substrate-runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ impl SubstrateNodeBuilder {

// Wait for RPC port to be logged (it's logged to stderr).
let stderr = proc.stderr.take().unwrap();
let (ws_port, p2p_address, p2p_port) = try_find_substrate_port_from_output(stderr);
let running_node = try_find_substrate_port_from_output(stderr);

let ws_port = ws_port.ok_or(Error::CouldNotExtractPort)?;
let p2p_address = p2p_address.ok_or(Error::CouldNotExtractP2pAddress)?;
let p2p_port = p2p_port.ok_or(Error::CouldNotExtractP2pPort)?;
let ws_port = running_node.ws_port()?;
let p2p_address = running_node.p2p_address()?;
let p2p_port = running_node.p2p_port()?;

Ok(SubstrateNode {
binary_path: bin_path,
Expand Down Expand Up @@ -244,16 +244,19 @@ impl Drop for SubstrateNode {

// Consume a stderr reader from a spawned substrate command and
// locate the port number that is logged out to it.
fn try_find_substrate_port_from_output(
r: impl Read + Send + 'static,
) -> (Option<u16>, Option<String>, Option<u32>) {
fn try_find_substrate_port_from_output(r: impl Read + Send + 'static) -> SubstrateNodeInfo {
let mut port = None;
let mut p2p_address = None;
let mut p2p_port = None;

for line in BufReader::new(r).lines().take(50) {
let mut log = String::new();

for line in BufReader::new(r).lines().take(100) {
let line = line.expect("failed to obtain next line from stdout for port discovery");

log.push_str(&line);
log.push('\n');

// Parse the port lines
let line_port = line
// oldest message:
Expand Down Expand Up @@ -301,7 +304,43 @@ fn try_find_substrate_port_from_output(
.unwrap_or_else(|_| panic!("valid port expected for log line, got '{port_str}'"));
p2p_port = Some(port_num);
}

if port.is_some() && p2p_address.is_some() && p2p_port.is_some() {
Copy link
Member Author

@niklasad1 niklasad1 Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bail early if we found our ports

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooh good catch!

break;
}
}

SubstrateNodeInfo {
ws_port: port,
p2p_address,
p2p_port,
log,
}
}

/// Data extracted from the running node's stdout.
#[derive(Debug)]
pub struct SubstrateNodeInfo {
ws_port: Option<u16>,
p2p_address: Option<String>,
p2p_port: Option<u32>,
log: String,
}

impl SubstrateNodeInfo {
pub fn ws_port(&self) -> Result<u16, Error> {
self.ws_port
.ok_or_else(|| Error::CouldNotExtractPort(self.log.clone()))
}

(port, p2p_address, p2p_port)
pub fn p2p_address(&self) -> Result<String, Error> {
self.p2p_address
.clone()
.ok_or_else(|| Error::CouldNotExtractP2pAddress(self.log.clone()))
}

pub fn p2p_port(&self) -> Result<u32, Error> {
self.p2p_port
.ok_or_else(|| Error::CouldNotExtractP2pPort(self.log.clone()))
}
}
Loading