Skip to content

Commit

Permalink
sync data to arweave (dbpunk-labs#475)
Browse files Browse the repository at this point in the history
* feat: add compress function

* feat: support upload data to arweave

* fix: fix add rollup record test case

* fix: fix the code style error

* feat: add scan rollup records

* feat:  add scan rollup records

* fix: fix test case
  • Loading branch information
imotai authored Jun 9, 2023
1 parent d020ad9 commit abad032
Show file tree
Hide file tree
Showing 18 changed files with 616 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[submodule "thirdparty/merkdb"]
path = thirdparty/merkdb
url = https://github.com/dbpunk-labs/merkdb.git
[submodule "thirdparty/arweave-rs"]
path = thirdparty/arweave-rs
url = https://github.com/imotai/arweave-rs.git
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ fastcrypto = {git = "https://github.com/MystenLabs/fastcrypto", rev = "306465d4f
ethers = {git="https://github.com/imotai/ethers-rs", rev="d526191b7972e8cf4412fee8b71cbf42e0ce7995"}
tonic = {git="https://github.com/hyperium/tonic", rev="ae7580160431cd25c1eecda4c85014ef6ce8d93f"}
tonic-web = {git="https://github.com/hyperium/tonic", rev="ae7580160431cd25c1eecda4c85014ef6ce8d93f"}
arweave-rs = {git="https://github.com/imotai/arweave-rs", rev="31d708a98e4d7199593f7554c2e184272b0bb8dd"}
serde_json= "1.0"
2 changes: 2 additions & 0 deletions src/error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ pub enum DB3Error {
WriteStoreError(String),
#[error("fail to read store for error {0}")]
ReadStoreError(String),
#[error("fail to rollup data for error {0}")]
RollupError(String),
}

pub type Result<T> = std::result::Result<T, DB3Error>;
4 changes: 4 additions & 0 deletions src/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,9 @@ ed25519-dalek = "2.0.0-rc.2"
dirs = "5.0.0"
redb = "0.14.0"
chrono = "0.4.22"
arrow = "41.0.0"
parquet = {version="41.0.0", features=["arrow", "lz4", "flate2"], default-features = false}
arweave-rs = {workspace=true}
url = "2.4.0"
[build-dependencies]
shadow-rs = "0.21.0"
45 changes: 44 additions & 1 deletion src/node/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::auth_storage::AuthStorage;
use crate::context::Context;
use crate::indexer_impl::{IndexerBlockSyncer, IndexerNodeImpl};
use crate::node_storage::NodeStorage;
use crate::rollup_executor::RollupExecutorConfig;
use crate::storage_node_impl::StorageNodeImpl;
use crate::storage_node_light_impl::{StorageNodeV2Config, StorageNodeV2Impl};
use clap::Parser;
Expand Down Expand Up @@ -78,7 +79,7 @@ pub enum DB3Command {
/// The port of grpc api
#[clap(long, default_value = "26619")]
public_grpc_port: u16,
/// Bind the abci server to this port.
/// Log more logs
#[clap(short, long)]
verbose: bool,
/// The database path for mutation
Expand All @@ -90,6 +91,21 @@ pub enum DB3Command {
/// The network id
#[clap(long, default_value = "10")]
network_id: u64,
/// The block interval
#[clap(long, default_value = "2000")]
block_interval: u64,
/// The interval of rollup
#[clap(long, default_value = "60000")]
rollup_interval: u64,
/// The data path of rollup
#[clap(long, default_value = "./rollup_data")]
rollup_data_path: String,
/// The Ar miner node
#[clap(long, default_value = "http://127.0.0.1:1984/")]
ar_node_url: String,
/// The Ar wallet path
#[clap(long, default_value = "./wallet.json")]
ar_key_path: String,
},

/// Start db3 network
Expand Down Expand Up @@ -218,6 +234,11 @@ impl DB3Command {
mutation_db_path,
state_db_path,
network_id,
block_interval,
rollup_interval,
rollup_data_path,
ar_node_url,
ar_key_path,
} => {
let log_level = if verbose {
LevelFilter::DEBUG
Expand All @@ -232,6 +253,11 @@ impl DB3Command {
mutation_db_path.as_str(),
state_db_path.as_str(),
network_id,
block_interval,
rollup_interval,
rollup_data_path.as_str(),
ar_node_url.as_str(),
ar_key_path.as_str(),
)
.await;
let running = Arc::new(AtomicBool::new(true));
Expand Down Expand Up @@ -406,12 +432,24 @@ impl DB3Command {
mutation_db_path: &str,
state_db_path: &str,
network_id: u64,
block_interval: u64,
rollup_interval: u64,
rollup_data_path: &str,
ar_node_url: &str,
ar_key_path: &str,
) {
let addr = format!("{public_host}:{public_grpc_port}");
let rollup_config = RollupExecutorConfig {
rollup_interval,
temp_data_path: rollup_data_path.to_string(),
ar_node_url: ar_node_url.to_string(),
ar_key_path: ar_key_path.to_string(),
};
let store_config = MutationStoreConfig {
db_path: mutation_db_path.to_string(),
block_store_cf_name: "block_store_cf".to_string(),
tx_store_cf_name: "tx_store_cf".to_string(),
rollup_store_cf_name: "rollup_store_cf".to_string(),
message_max_buffer: 4 * 1024,
scan_max_limit: 50,
};
Expand All @@ -422,13 +460,18 @@ impl DB3Command {
let config = StorageNodeV2Config {
store_config,
state_config,
rollup_config,
network_id,
block_interval,
};
let storage_node = StorageNodeV2Impl::new(config).unwrap();
info!(
"start db3 store node on public addr {} and network {}",
addr, network_id
);
std::fs::create_dir_all(rollup_data_path).unwrap();
storage_node.start_to_produce_block();
storage_node.start_to_rollup().await;
let cors_layer = CorsLayer::new()
.allow_methods([Method::GET, Method::POST, Method::OPTIONS])
.allow_headers(Any)
Expand Down
1 change: 1 addition & 0 deletions src/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ pub mod indexer_impl;
mod mutation_utils;
pub mod node_key;
pub mod node_storage;
pub mod rollup_executor;
pub mod storage_node_impl;
pub mod storage_node_light_impl;
189 changes: 189 additions & 0 deletions src/node/src/rollup_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
//
// rollup_executor.rs
// Copyright (C) 2023 db3.network Author imotai <codego.me@gmail.com>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

use arrow::array::{ArrayRef, BinaryBuilder, StringBuilder, UInt32Builder, UInt64Builder};
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use arweave_rs::Arweave;
use db3_base::times;
use db3_error::{DB3Error, Result};
use db3_proto::db3_mutation_v2_proto::{MutationBody, MutationHeader};
use db3_proto::db3_rollup_proto::RollupRecord;
use db3_storage::mutation_store::MutationStore;
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::basic::Compression;
use parquet::basic::GzipLevel;
use parquet::file::properties::WriterProperties;
use std::fs::File;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use tempdir::TempDir;
use tracing::info;

#[derive(Clone)]
pub struct RollupExecutorConfig {
// the interval in ms
pub rollup_interval: u64,
pub temp_data_path: String,
pub ar_key_path: String,
pub ar_node_url: String,
}

pub struct RollupExecutor {
config: RollupExecutorConfig,
storage: MutationStore,
schema: SchemaRef,
arweave: Arweave,
}

impl RollupExecutor {
pub fn new(config: RollupExecutorConfig, storage: MutationStore) -> Result<Self> {
let schema = Arc::new(Schema::new(vec![
Field::new("payload", DataType::Binary, true),
Field::new("signature", DataType::Utf8, true),
Field::new("block", DataType::UInt64, true),
Field::new("order", DataType::UInt32, true),
]));
let arweave_url = url::Url::from_str(config.ar_node_url.as_str())
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
let path = Path::new(config.ar_key_path.as_str());
let arweave = Arweave::from_keypair_path(path, arweave_url)
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
info!(
"start rollup executor with ar account {}",
arweave.get_wallet_address().as_str()
);
Ok(Self {
config,
storage,
schema,
arweave,
})
}

fn convert_to_recordbatch(
&self,
mutations: &[(MutationHeader, MutationBody)],
) -> Result<RecordBatch> {
//TODO limit the memory usage
let mut payload_builder = BinaryBuilder::new();
let mut signature_builder = StringBuilder::new();
let mut block_builder = UInt64Builder::new();
let mut order_builder = UInt32Builder::new();
for (header, body) in mutations {
let body_ref: &[u8] = &body.payload;
payload_builder.append_value(body_ref);
signature_builder.append_value(body.signature.as_str());
block_builder.append_value(header.block_id);
order_builder.append_value(header.order_id);
}
let array_refs: Vec<ArrayRef> = vec![
Arc::new(payload_builder.finish()),
Arc::new(signature_builder.finish()),
Arc::new(block_builder.finish()),
Arc::new(order_builder.finish()),
];
let record_batch = RecordBatch::try_new(self.schema.clone(), array_refs)
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
info!(
"convert {} into recordbatch with memory {}",
mutations.len(),
record_batch.get_array_memory_size()
);
Ok(record_batch)
}

fn dump_recordbatch(&self, path: &Path, recordbatch: &RecordBatch) -> Result<(u64, u64)> {
let properties = WriterProperties::builder()
.set_compression(Compression::GZIP(GzipLevel::default()))
.build();
let fd = File::create(path).map_err(|e| DB3Error::RollupError(format!("{e}")))?;
let mut writer = ArrowWriter::try_new(fd, self.schema.clone(), Some(properties))
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
writer
.write(recordbatch)
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
let meta = writer
.close()
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
let metadata =
std::fs::metadata(path).map_err(|e| DB3Error::RollupError(format!("{e}")))?;
Ok((meta.num_rows as u64, metadata.len()))
}

async fn upload_data(&self, path: &Path) -> Result<(String, u64)> {
let metadata =
std::fs::metadata(path).map_err(|e| DB3Error::RollupError(format!("{e}")))?;
let fee = self
.arweave
.get_fee_by_size(metadata.len())
.await
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
//TODO add app name
self.arweave
.upload_file_from_path(path, vec![], fee)
.await
.map_err(|e| DB3Error::RollupError(format!("{e}")))
}

pub async fn process(&self) -> Result<()> {
let next_rollup_start_block = match self.storage.get_last_rollup_record()? {
Some(r) => r.end_block + 1,
_ => 0_u64,
};
let current_block = self.storage.get_current_block()?;
if current_block <= next_rollup_start_block {
info!("no block to rollup");
return Ok(());
}
let now = Instant::now();
info!("the next rollup start block {next_rollup_start_block} and the newest block {current_block}");
let mutations = self
.storage
.get_range_mutations(next_rollup_start_block, current_block)?;
if mutations.len() <= 0 {
info!("no block to rollup");
return Ok(());
}
let recordbatch = self.convert_to_recordbatch(&mutations)?;
let memory_size = recordbatch.get_array_memory_size();
let tmp_dir = TempDir::new_in(&self.config.temp_data_path, "compression")
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
let file_path = tmp_dir.path().join("rollup.parquet.gz");
let (num_rows, size) = self.dump_recordbatch(&file_path, &recordbatch)?;
let (id, reward) = self.upload_data(&file_path).await?;
info!("the process rollup done with num mutations {num_rows}, raw data size {memory_size}, compress data size {size} and processed time {} id {} cost {}", now.elapsed().as_secs(),
id.as_str(), reward
);
let record = RollupRecord {
end_block: current_block - 1,
raw_data_size: memory_size as u64,
compress_data_size: size,
processed_time: now.elapsed().as_secs(),
arweave_tx: id,
time: times::get_current_time_in_secs(),
mutation_count: num_rows,
cost: reward,
};
self.storage
.add_rollup_record(&record)
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
Ok(())
}
}
Loading

0 comments on commit abad032

Please sign in to comment.