Skip to content

Commit

Permalink
feat(indexer): Extend the NEAR Indexer API with an ability to communi…
Browse files Browse the repository at this point in the history
…cate the starting point of streaming (#3236)
  • Loading branch information
khorolets authored Aug 24, 2020
1 parent c2b0fc1 commit 6eb8ab5
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 67 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chain/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2018"
actix = "0.9"
tracing = "0.1.13"
futures = "0.3.5"
rocksdb = { git = "https://github.com/nearprotocol/rust-rocksdb", branch="disable-thread" }
tokio = { version = "0.2", features = ["time", "sync"] }

neard = { path = "../../neard" }
Expand Down
15 changes: 15 additions & 0 deletions chain/indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ $ cargo run --release --home-dir ~/.near/testnet init --chain-id testnet --downl

The above code will download the official genesis config and generate necessary configs. You can replace `testnet` in the command above to different network ID `betanet`.

**NB!** According to changes in `nearcore` config generation we don't fill all the necessary fields in the config file. While this issue is open https://github.com/nearprotocol/nearcore/issues/3156 you need to download config you want and replace the generated one manually.
- [testnet config.json](https://s3-us-west-1.amazonaws.com/build.nearprotocol.com/nearcore-deploy/testnet/config.json)
- [betanet config.json](https://s3-us-west-1.amazonaws.com/build.nearprotocol.com/nearcore-deploy/betanet/config.json)

Replace `config.json` in your `--home-dir` (e.g. `~/.near/testnet/config.json`) with downloaded one.

Configs for the specified network are in the `--home-dir` provided folder. We need to ensure that NEAR Indexer follows all the necessary shards, so `"tracked_shards"` parameters in `~/.near/testnet/config.json` needs to be configured properly. For example, with a single shared network, you just add the shard #0 to the list:

```
Expand Down Expand Up @@ -85,6 +91,15 @@ As already has been mentioned in this README, the most common tweak you need to
...
```


You can choose Indexer Framework sync mode by setting what to stream:
- `LatestSynced` - Real-time syncing, always taking the latest finalized block to stream
- `FromInterruption` - Starts syncing from the block NEAR Indexer was interrupted last time
- `BlockHeight(u64)` - Specific block height to start syncing from

Refer to `main()` function in [Indexer Example](https://github.com/nearprotocol/nearcore/blob/master/tools/indexer/example/src/main.rs)


Another tweak changes the default "fast" sync process to a "full" sync process. When the node gets online and observes that its state is missing or outdated, it will do state sync, and that can be done in two strategies:

1. ("fast" / default) sync enough information (only block headers) to ensure that the chain is valid; that means that the node won't have transactions, receipts, and execution outcomes, only the proofs, so Indexer will skip these blocks
Expand Down
51 changes: 35 additions & 16 deletions chain/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,38 @@
//! See the [example] for further details.
//!
//! [example]: https://github.com/nearprotocol/nearcore/tree/master/tools/indexer/example
use std::path::PathBuf;

use actix::System;
use tokio::sync::mpsc;

pub use neard::{get_default_home, init_configs, NearConfig};
mod streamer;

pub use self::streamer::{BlockResponse, Outcome};
pub use self::streamer::{Outcome, StreamerMessage};
pub use near_primitives;

/// Enum to define a mode of syncing for NEAR Indexer
#[derive(Debug, Clone)]
pub enum SyncModeEnum {
/// Real-time syncing, always taking the latest finalized block to stream
LatestSynced,
/// Starts syncing from the block NEAR Indexer was interrupted last time
FromInterruption,
/// Specific block height to start syncing from
BlockHeight(u64),
}

/// NEAR Indexer configuration to be provided to `Indexer::new(IndexerConfig)`
#[derive(Debug, Clone)]
pub struct IndexerConfig {
/// Path to `home_dir` where configs and keys can be found
pub home_dir: std::path::PathBuf,
/// Mode of syncing for NEAR Indexer instance
pub sync_mode: SyncModeEnum,
}

/// This is the core component, which handles `nearcore` and internal `streamer`.
pub struct Indexer {
indexer_config: IndexerConfig,
near_config: neard::NearConfig,
actix_runtime: actix::SystemRunner,
view_client: actix::Addr<near_client::ViewClientActor>,
Expand All @@ -28,31 +47,31 @@ pub struct Indexer {

impl Indexer {
/// Initialize Indexer by configuring `nearcore`
pub fn new(custom_home_dir: Option<&std::path::Path>) -> Self {
let home_dir = if !custom_home_dir.is_some() {
PathBuf::from(get_default_home())
} else {
PathBuf::from(custom_home_dir.unwrap())
};

let near_config = neard::load_config(&home_dir);
pub fn new(indexer_config: IndexerConfig) -> Self {
let near_config = neard::load_config(&indexer_config.home_dir);
let system = System::new("NEAR Indexer");
neard::genesis_validate::validate_genesis(&near_config.genesis);
assert!(
!&near_config.client_config.tracked_shards.is_empty(),
"Indexer should track at least one shard. \n\
Tip: You may want to update {} with `\"tracked_shards\": [0]`
",
home_dir.join("config.json").display()
indexer_config.home_dir.join("config.json").display()
);
let (client, view_client, _) = neard::start_with_config(&home_dir, near_config.clone());
Self { actix_runtime: system, view_client, client, near_config }
let (client, view_client, _) =
neard::start_with_config(&indexer_config.home_dir, near_config.clone());
Self { actix_runtime: system, view_client, client, near_config, indexer_config }
}

/// Boots up `near_indexer::streamer`, so it monitors the new blocks with chunks, transactions, receipts, and execution outcomes inside. The returned stream handler should be drained and handled on the user side.
pub fn streamer(&self) -> mpsc::Receiver<streamer::BlockResponse> {
pub fn streamer(&self) -> mpsc::Receiver<streamer::StreamerMessage> {
let (sender, receiver) = mpsc::channel(16);
actix::spawn(streamer::start(self.view_client.clone(), self.client.clone(), sender));
actix::spawn(streamer::start(
self.view_client.clone(),
self.client.clone(),
self.indexer_config.clone(),
sender,
));
receiver
}

Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod streamer;
pub(crate) use self::streamer::start;
pub use self::streamer::{BlockResponse, Outcome};
pub use self::streamer::{Outcome, StreamerMessage};
56 changes: 43 additions & 13 deletions chain/indexer/src/streamer/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;

use actix::{Addr, MailboxError};
use futures::stream::StreamExt;
use rocksdb::DB;
use tokio::sync::mpsc;
use tokio::time;
use tracing::{debug, info};
Expand All @@ -12,6 +13,8 @@ use near_client;
pub use near_primitives::hash::CryptoHash;
pub use near_primitives::{types, views};

use crate::IndexerConfig;

const INTERVAL: Duration = Duration::from_millis(500);
const INDEXER: &str = "indexer";

Expand All @@ -29,13 +32,13 @@ impl From<MailboxError> for FailedToFetchData {
}

struct FetchBlockResponse {
block_response: BlockResponse,
block_response: StreamerMessage,
new_outcomes_to_get: Vec<types::TransactionOrReceiptId>,
}

/// Resulting struct represents block with chunks
#[derive(Debug)]
pub struct BlockResponse {
pub struct StreamerMessage {
pub block: views::BlockView,
pub chunks: Vec<views::ChunkView>,
pub outcomes: Vec<Outcome>,
Expand Down Expand Up @@ -103,15 +106,15 @@ async fn fetch_block_response(
outcomes_to_retry.extend(chunk.receipts.iter().map(|receipt| {
types::TransactionOrReceiptId::Receipt {
receipt_id: receipt.receipt_id,
receiver_id: receipt.receiver_id.to_string().clone(),
receiver_id: receipt.receiver_id.to_string(),
}
}));
}

let state_changes = fetch_state_changes(&client, block.header.hash).await?;

Ok(FetchBlockResponse {
block_response: BlockResponse { block, chunks, outcomes, state_changes },
block_response: StreamerMessage { block, chunks, outcomes, state_changes },
new_outcomes_to_get: outcomes_to_retry,
})
}
Expand Down Expand Up @@ -194,11 +197,23 @@ async fn fetch_chunks(
pub(crate) async fn start(
view_client: Addr<near_client::ViewClientActor>,
client: Addr<near_client::ClientActor>,
mut blocks_sink: mpsc::Sender<BlockResponse>,
indexer_config: IndexerConfig,
mut blocks_sink: mpsc::Sender<StreamerMessage>,
) {
info!(target: INDEXER, "Starting Streamer...");
let mut indexer_db_path = neard::get_store_path(&indexer_config.home_dir);
indexer_db_path.push_str("/indexer");

// TODO: implement proper error handling
let db = DB::open_default(indexer_db_path).unwrap();
let mut outcomes_to_get = Vec::<types::TransactionOrReceiptId>::new();
let mut last_synced_block_height: types::BlockHeight = 0;
let mut last_synced_block_height: Option<types::BlockHeight> = None;

info!(
target: INDEXER,
"Last synced block height in db is {}",
last_synced_block_height.unwrap_or(0)
);
'main: loop {
time::delay_for(INTERVAL).await;
let status = fetch_status(&client).await;
Expand All @@ -215,16 +230,30 @@ pub(crate) async fn start(
};

let latest_block_height = block.header.height;
if last_synced_block_height == 0 {
last_synced_block_height = latest_block_height;
}
let start_syncing_block_height = if let Some(last_synced_block_height) =
last_synced_block_height
{
last_synced_block_height + 1
} else {
match indexer_config.sync_mode {
crate::SyncModeEnum::FromInterruption => {
match db.get(b"last_synced_block_height").unwrap() {
Some(value) => String::from_utf8(value).unwrap().parse::<u64>().unwrap(),
None => latest_block_height,
}
}
crate::SyncModeEnum::LatestSynced => latest_block_height,
crate::SyncModeEnum::BlockHeight(height) => height,
}
};

debug!(
target: INDEXER,
"The last synced block is #{} and the latest block is #{}",
last_synced_block_height,
"Streaming is about to start from block #{} and the latest block is #{}",
start_syncing_block_height,
latest_block_height
);
for block_height in (last_synced_block_height + 1)..=latest_block_height {
for block_height in start_syncing_block_height..=latest_block_height {
if let Ok(block) = fetch_block_by_height(&view_client, block_height).await {
let response =
fetch_block_response(&view_client, block, outcomes_to_get.drain(..)).await;
Expand All @@ -251,7 +280,8 @@ pub(crate) async fn start(
}
}
}
last_synced_block_height = block_height;
db.put(b"last_synced_block_height", &block_height.to_string()).unwrap();
last_synced_block_height = Some(block_height);
}
}
}
1 change: 1 addition & 0 deletions tools/indexer/example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ clap = "3.0.0-beta.1"
openssl-probe = { version = "0.1.2" }
serde_json = "1.0.55"
tokio = { version = "0.2", features = ["sync"] }
tracing = "0.1.13"
tracing-subscriber = "0.2.4"

near-indexer = { path = "../../../chain/indexer" }
37 changes: 5 additions & 32 deletions tools/indexer/example/src/configs.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use std::env;
use std::io;

use clap::Clap;

use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::EnvFilter;

/// NEAR Indexer Example
Expand Down Expand Up @@ -54,35 +50,12 @@ pub(crate) struct InitConfigArgs {
pub download_genesis_url: Option<String>,
}

pub(crate) fn init_logging(verbose: bool) {
let mut env_filter = EnvFilter::new("tokio_reactor=info,near=info,stats=info,telemetry=info");

if verbose {
env_filter = env_filter
.add_directive("cranelift_codegen=warn".parse().unwrap())
.add_directive("cranelift_codegen=warn".parse().unwrap())
.add_directive("h2=warn".parse().unwrap())
.add_directive("trust_dns_resolver=warn".parse().unwrap())
.add_directive("trust_dns_proto=warn".parse().unwrap());

env_filter = env_filter.add_directive(LevelFilter::DEBUG.into());
} else {
env_filter = env_filter.add_directive(LevelFilter::WARN.into());
}

if let Ok(rust_log) = env::var("RUST_LOG") {
for directive in rust_log.split(',').filter_map(|s| match s.parse() {
Ok(directive) => Some(directive),
Err(err) => {
eprintln!("Ignoring directive `{}`: {}", s, err);
None
}
}) {
env_filter = env_filter.add_directive(directive);
}
}
pub(crate) fn init_logging() {
let env_filter = EnvFilter::new(
"tokio_reactor=info,near=info,near=error,stats=info,telemetry=info,indexer_example=info,indexer=info",
);
tracing_subscriber::fmt::Subscriber::builder()
.with_env_filter(env_filter)
.with_writer(io::stderr)
.with_writer(std::io::stderr)
.init();
}
24 changes: 19 additions & 5 deletions tools/indexer/example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use actix;

use clap::derive::Clap;
use tokio::sync::mpsc;
use tracing::info;

use configs::{init_logging, Opts, SubCommand};
use near_indexer;

mod configs;

async fn listen_blocks(mut stream: mpsc::Receiver<near_indexer::BlockResponse>) {
while let Some(block) = stream.recv().await {
async fn listen_blocks(mut stream: mpsc::Receiver<near_indexer::StreamerMessage>) {
while let Some(streamer_message) = stream.recv().await {
// TODO: handle data as you need
// Example of `block` with all the data
//
Expand Down Expand Up @@ -176,15 +177,24 @@ async fn listen_blocks(mut stream: mpsc::Receiver<near_indexer::BlockResponse>)
// ),
// ],
// }
eprintln!("{:#?}", block);
info!(
target: "indexer_example",
"#{} {} Chunks: {}, Transactions: {}, Receipts: {}, ExecutionOutcomes: {}",
streamer_message.block.header.height,
streamer_message.block.header.hash,
streamer_message.chunks.len(),
streamer_message.chunks.iter().map(|chunk| chunk.transactions.len()).sum::<usize>(),
streamer_message.chunks.iter().map(|chunk| chunk.receipts.len()).sum::<usize>(),
streamer_message.outcomes.len(),
);
}
}

fn main() {
// We use it to automatically search the for root certificates to perform HTTPS calls
// (sending telemetry and downloading genesis)
openssl_probe::init_ssl_cert_env_vars();
init_logging(true);
init_logging();

let opts: Opts = Opts::parse();

Expand All @@ -193,7 +203,11 @@ fn main() {

match opts.subcmd {
SubCommand::Run => {
let indexer = near_indexer::Indexer::new(Some(&home_dir));
let indexer_config = near_indexer::IndexerConfig {
home_dir,
sync_mode: near_indexer::SyncModeEnum::FromInterruption,
};
let indexer = near_indexer::Indexer::new(indexer_config);
let stream = indexer.streamer();
actix::spawn(listen_blocks(stream));
indexer.start();
Expand Down

0 comments on commit 6eb8ab5

Please sign in to comment.