Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lostman committed Sep 22, 2023
1 parent 04be8a1 commit a818c19
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 45 deletions.
27 changes: 25 additions & 2 deletions packages/fuel-indexer-database/postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use bigdecimal::ToPrimitive;
use fuel_indexer_database_types::*;
use fuel_indexer_lib::utils::sha256_digest;
use fuel_indexer_types::fuel::BlockData;
use sqlx::{pool::PoolConnection, postgres::PgRow, types::JsonValue, Postgres, Row};
use std::str::FromStr;
use std::time::{SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -496,9 +497,9 @@ pub async fn register_indexer(
})
}

pub async fn save_blockdata(
pub async fn save_block_data(
conn: &mut PoolConnection<Postgres>,
blockdata: &[fuel_indexer_types::fuel::BlockData],
blockdata: &[BlockData],
) -> sqlx::Result<()> {
if blockdata.is_empty() {
return Ok(());
Expand Down Expand Up @@ -526,6 +527,28 @@ pub async fn save_blockdata(
Ok(())
}

pub async fn load_block_data(
conn: &mut PoolConnection<Postgres>,
start_block: u32,
end_block: Option<u32>,
limit: usize,
) -> sqlx::Result<Vec<BlockData>> {
let end_condition = end_block
.map(|x| format!("AND block_height <= {x}"))
.unwrap_or("".to_string());
let query = format!("SELECT block_data FROM index_block_data WHERE block_height >= {start_block} {end_condition} ORDER BY block_height ASC LIMIT {limit}");

let rows = sqlx::query(&query).fetch_all(conn).await?;

let mut blocks = Vec::new();
for row in rows {
let bytes = row.get::<Vec<u8>, usize>(0);
let blockdata: BlockData = fuel_indexer_lib::utils::deserialize(&bytes).unwrap();
blocks.push(blockdata);
}
Ok(blocks)
}

/// Return all indexers registered to this indexer serivce.
#[cfg_attr(feature = "metrics", metrics)]
pub async fn all_registered_indexers(
Expand Down
22 changes: 19 additions & 3 deletions packages/fuel-indexer-database/src/queries.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{types::*, IndexerConnection};
use fuel_indexer_postgres as postgres;
use fuel_indexer_types::fuel::BlockData;
use sqlx::types::{
chrono::{DateTime, Utc},
JsonValue,
Expand Down Expand Up @@ -219,13 +220,28 @@ pub async fn register_indexer(
}
}

pub async fn save_blockdata(
/// Save `BlockData` in the database.
pub async fn save_block_data(
conn: &mut IndexerConnection,
blockdata: &[fuel_indexer_types::fuel::BlockData],
blockdata: &[BlockData],
) -> sqlx::Result<()> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::save_blockdata(c, blockdata).await
postgres::save_block_data(c, blockdata).await
}
}
}

/// Load `BlockData` from the database.
pub async fn load_block_data(
conn: &mut IndexerConnection,
start_block: u32,
end_block: Option<u32>,
limit: usize,
) -> sqlx::Result<Vec<BlockData>> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::load_block_data(c, start_block, end_block, limit).await
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/fuel-indexer-lib/src/config/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ pub struct IndexerArgs {
#[clap(long, help = "Amount of blocks to return in a request to a Fuel node.", default_value_t = defaults::NODE_BLOCK_PAGE_SIZE)]
pub block_page_size: usize,

/// Store blocks in the database and use these stored blocks to fast-forward an indexer starting it up.
/// Store blocks in the database and use these stored blocks to fast-forward an indexer starting up.
#[clap(
long,
help = "Store blocks in the database and use these stored blocks to fast-forward an indexer starting it up."
help = "Store blocks in the database and use these stored blocks to fast-forward an indexer starting up."
)]
pub enable_block_store: bool,
}
Expand Down
3 changes: 1 addition & 2 deletions packages/fuel-indexer-lib/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,5 @@ pub const REMOVE_DATA: bool = false;
/// Allow the web server to accept raw SQL queries.
pub const ACCEPT_SQL: bool = false;

/// Store blocks in the database and use these stored blocks to fast-forward an
/// indexer starting it up.
/// Store blocks in the database and use these stored blocks to fast-forward an indexer starting up.
pub const ENABLE_BLOCK_STORE: bool = false;
32 changes: 2 additions & 30 deletions packages/fuel-indexer/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,34 +69,6 @@ impl From<ExecutorSource> for Vec<u8> {
}
}

pub async fn load_blocks(
pool: &IndexerConnectionPool,
start_block: u32,
end_block: Option<u32>,
limit: usize,
) -> IndexerResult<Vec<BlockData>> {
use sqlx::Row;

let end_condition = end_block
.map(|x| format!("AND block_height <= {x}"))
.unwrap_or("".to_string());
let query = format!("SELECT block_data FROM index_block_data WHERE block_height >= {start_block} {end_condition} ORDER BY block_height ASC LIMIT {limit}");

let pool = match pool {
IndexerConnectionPool::Postgres(pool) => pool.clone(),
};

let rows = sqlx::query(&query).fetch_all(&pool).await?;

let mut blocks = Vec::new();
for row in rows {
let bytes = row.get::<Vec<u8>, usize>(0);
let blockdata: BlockData = fuel_indexer_lib::utils::deserialize(&bytes).unwrap();
blocks.push(blockdata);
}
Ok(blocks)
}

/// Run the executor task until the kill switch is flipped, or until some other
/// stop criteria is met.
//
Expand Down Expand Up @@ -174,8 +146,8 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

// Fetch the next page of blocks, and the starting cursor for the subsequent page
let (block_info, next_cursor, _has_next_page) = if enable_block_store {
let result = load_blocks(
&pool,
let result = fuel_indexer_database::queries::load_block_data(
&mut conn,
start_block,
executor.manifest().end_block(),
node_block_page_size,
Expand Down
12 changes: 6 additions & 6 deletions packages/fuel-indexer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,20 +408,20 @@ pub(crate) async fn create_block_sync_task(
) {
let mut conn = pool.acquire().await.unwrap();

let start_block_height = queries::last_block_height_for_stored_blocks(&mut conn)
let last_height = queries::last_block_height_for_stored_blocks(&mut conn)
.await
.unwrap();

let mut cursor = Some(start_block_height.to_string());
let mut cursor = Some(last_height.to_string());

info!("Block sync: starting from Block#{}", start_block_height + 1);
let task_id = "Block Sync";

info!("{task_id}: starting from Block#{}", last_height + 1);

let client =
fuel_core_client::client::FuelClient::new(config.fuel_node.uri().to_string())
.unwrap_or_else(|e| panic!("Client node connection failed: {e}."));

let task_id = "Block Sync";

loop {
// Get the next page of blocks, and the starting cursor for the subsequent page
let (block_info, next_cursor, _has_next_page) =
Expand Down Expand Up @@ -462,7 +462,7 @@ pub(crate) async fn create_block_sync_task(
// Blocks must be in order, and there can be no missing blocks. This
// is enforced when saving to the database by a trigger. If
// `save_blockdata` succeeds, all is well.
fuel_indexer_database::queries::save_blockdata(&mut conn, &block_info)
fuel_indexer_database::queries::save_block_data(&mut conn, &block_info)
.await
.unwrap();

Expand Down

0 comments on commit a818c19

Please sign in to comment.