diff --git a/packages/fuel-indexer-database/postgres/src/lib.rs b/packages/fuel-indexer-database/postgres/src/lib.rs index 4fc3ca420..7ad569ecf 100644 --- a/packages/fuel-indexer-database/postgres/src/lib.rs +++ b/packages/fuel-indexer-database/postgres/src/lib.rs @@ -3,6 +3,7 @@ use bigdecimal::ToPrimitive; use fuel_indexer_database_types::*; use fuel_indexer_lib::utils::sha256_digest; +use fuel_indexer_types::fuel::BlockData; use sqlx::{pool::PoolConnection, postgres::PgRow, types::JsonValue, Postgres, Row}; use std::str::FromStr; use std::time::{SystemTime, UNIX_EPOCH}; @@ -496,9 +497,9 @@ pub async fn register_indexer( }) } -pub async fn save_blockdata( +pub async fn save_block_data( conn: &mut PoolConnection, - blockdata: &[fuel_indexer_types::fuel::BlockData], + blockdata: &[BlockData], ) -> sqlx::Result<()> { if blockdata.is_empty() { return Ok(()); @@ -526,6 +527,28 @@ pub async fn save_blockdata( Ok(()) } +pub async fn load_block_data( + conn: &mut PoolConnection, + start_block: u32, + end_block: Option, + limit: usize, +) -> sqlx::Result> { + let end_condition = end_block + .map(|x| format!("AND block_height <= {x}")) + .unwrap_or("".to_string()); + let query = format!("SELECT block_data FROM index_block_data WHERE block_height >= {start_block} {end_condition} ORDER BY block_height ASC LIMIT {limit}"); + + let rows = sqlx::query(&query).fetch_all(conn).await?; + + let mut blocks = Vec::new(); + for row in rows { + let bytes = row.get::, usize>(0); + let blockdata: BlockData = fuel_indexer_lib::utils::deserialize(&bytes).unwrap(); + blocks.push(blockdata); + } + Ok(blocks) +} + /// Return all indexers registered to this indexer serivce. #[cfg_attr(feature = "metrics", metrics)] pub async fn all_registered_indexers( diff --git a/packages/fuel-indexer-database/src/queries.rs b/packages/fuel-indexer-database/src/queries.rs index 9c52facec..e396b08c6 100644 --- a/packages/fuel-indexer-database/src/queries.rs +++ b/packages/fuel-indexer-database/src/queries.rs @@ -1,5 +1,6 @@ use crate::{types::*, IndexerConnection}; use fuel_indexer_postgres as postgres; +use fuel_indexer_types::fuel::BlockData; use sqlx::types::{ chrono::{DateTime, Utc}, JsonValue, @@ -219,13 +220,28 @@ pub async fn register_indexer( } } -pub async fn save_blockdata( +/// Save `BlockData` in the database. +pub async fn save_block_data( conn: &mut IndexerConnection, - blockdata: &[fuel_indexer_types::fuel::BlockData], + blockdata: &[BlockData], ) -> sqlx::Result<()> { match conn { IndexerConnection::Postgres(ref mut c) => { - postgres::save_blockdata(c, blockdata).await + postgres::save_block_data(c, blockdata).await + } + } +} + +/// Load `BlockData` from the database. +pub async fn load_block_data( + conn: &mut IndexerConnection, + start_block: u32, + end_block: Option, + limit: usize, +) -> sqlx::Result> { + match conn { + IndexerConnection::Postgres(ref mut c) => { + postgres::load_block_data(c, start_block, end_block, limit).await } } } diff --git a/packages/fuel-indexer-lib/src/config/cli.rs b/packages/fuel-indexer-lib/src/config/cli.rs index fe1a76ba4..c547e1397 100644 --- a/packages/fuel-indexer-lib/src/config/cli.rs +++ b/packages/fuel-indexer-lib/src/config/cli.rs @@ -192,10 +192,10 @@ pub struct IndexerArgs { #[clap(long, help = "Amount of blocks to return in a request to a Fuel node.", default_value_t = defaults::NODE_BLOCK_PAGE_SIZE)] pub block_page_size: usize, - /// Store blocks in the database and use these stored blocks to fast-forward an indexer starting it up. + /// Store blocks in the database and use these stored blocks to fast-forward an indexer starting up. #[clap( long, - help = "Store blocks in the database and use these stored blocks to fast-forward an indexer starting it up." + help = "Store blocks in the database and use these stored blocks to fast-forward an indexer starting up." )] pub enable_block_store: bool, } diff --git a/packages/fuel-indexer-lib/src/defaults.rs b/packages/fuel-indexer-lib/src/defaults.rs index 091646612..0523559f9 100644 --- a/packages/fuel-indexer-lib/src/defaults.rs +++ b/packages/fuel-indexer-lib/src/defaults.rs @@ -135,6 +135,5 @@ pub const REMOVE_DATA: bool = false; /// Allow the web server to accept raw SQL queries. pub const ACCEPT_SQL: bool = false; -/// Store blocks in the database and use these stored blocks to fast-forward an -/// indexer starting it up. +/// Store blocks in the database and use these stored blocks to fast-forward an indexer starting up. pub const ENABLE_BLOCK_STORE: bool = false; diff --git a/packages/fuel-indexer/src/executor.rs b/packages/fuel-indexer/src/executor.rs index 498924289..5cc7089d4 100644 --- a/packages/fuel-indexer/src/executor.rs +++ b/packages/fuel-indexer/src/executor.rs @@ -69,34 +69,6 @@ impl From for Vec { } } -pub async fn load_blocks( - pool: &IndexerConnectionPool, - start_block: u32, - end_block: Option, - limit: usize, -) -> IndexerResult> { - use sqlx::Row; - - let end_condition = end_block - .map(|x| format!("AND block_height <= {x}")) - .unwrap_or("".to_string()); - let query = format!("SELECT block_data FROM index_block_data WHERE block_height >= {start_block} {end_condition} ORDER BY block_height ASC LIMIT {limit}"); - - let pool = match pool { - IndexerConnectionPool::Postgres(pool) => pool.clone(), - }; - - let rows = sqlx::query(&query).fetch_all(&pool).await?; - - let mut blocks = Vec::new(); - for row in rows { - let bytes = row.get::, usize>(0); - let blockdata: BlockData = fuel_indexer_lib::utils::deserialize(&bytes).unwrap(); - blocks.push(blockdata); - } - Ok(blocks) -} - /// Run the executor task until the kill switch is flipped, or until some other /// stop criteria is met. // @@ -174,8 +146,8 @@ pub fn run_executor( // Fetch the next page of blocks, and the starting cursor for the subsequent page let (block_info, next_cursor, _has_next_page) = if enable_block_store { - let result = load_blocks( - &pool, + let result = fuel_indexer_database::queries::load_block_data( + &mut conn, start_block, executor.manifest().end_block(), node_block_page_size, diff --git a/packages/fuel-indexer/src/service.rs b/packages/fuel-indexer/src/service.rs index d967ea042..e72cdedd2 100644 --- a/packages/fuel-indexer/src/service.rs +++ b/packages/fuel-indexer/src/service.rs @@ -408,20 +408,20 @@ pub(crate) async fn create_block_sync_task( ) { let mut conn = pool.acquire().await.unwrap(); - let start_block_height = queries::last_block_height_for_stored_blocks(&mut conn) + let last_height = queries::last_block_height_for_stored_blocks(&mut conn) .await .unwrap(); - let mut cursor = Some(start_block_height.to_string()); + let mut cursor = Some(last_height.to_string()); - info!("Block sync: starting from Block#{}", start_block_height + 1); + let task_id = "Block Sync"; + + info!("{task_id}: starting from Block#{}", last_height + 1); let client = fuel_core_client::client::FuelClient::new(config.fuel_node.uri().to_string()) .unwrap_or_else(|e| panic!("Client node connection failed: {e}.")); - let task_id = "Block Sync"; - loop { // Get the next page of blocks, and the starting cursor for the subsequent page let (block_info, next_cursor, _has_next_page) = @@ -462,7 +462,7 @@ pub(crate) async fn create_block_sync_task( // Blocks must be in order, and there can be no missing blocks. This // is enforced when saving to the database by a trigger. If // `save_blockdata` succeeds, all is well. - fuel_indexer_database::queries::save_blockdata(&mut conn, &block_info) + fuel_indexer_database::queries::save_block_data(&mut conn, &block_info) .await .unwrap();