Skip to content

Commit

Permalink
add download arware and get last rollup tx in ar toolbox (#533)
Browse files Browse the repository at this point in the history
* add download arware and get last rollup tx in ar toolbox

* resolve comment
  • Loading branch information
jingchen2222 authored Jul 3, 2023
1 parent 3c1e5f4 commit 092e2ed
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 35 deletions.
2 changes: 2 additions & 0 deletions src/error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ pub enum DB3Error {
ReadStoreError(String),
#[error("fail to rollup data for error {0}")]
RollupError(String),
#[error("fail to implement arware op for error {0}")]
ArwareOpError(String),
#[error("invalid collection name for error {0}")]
InvalidCollectionNameError(String),
#[error("invalid mutation for error {0}")]
Expand Down
83 changes: 80 additions & 3 deletions src/node/src/ar_toolbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ impl ArToolBox {
Ok((addr, balance.to_string()))
}

pub async fn download_and_parse_record_batch(&self, tx: &str) -> Result<Vec<RecordBatch>> {
let tmp_dir = TempDir::new_in(&self.temp_data_path, "download")
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
let file_path = tmp_dir.path().join(format!("{}.gz.parquet", tx));
self.ar_filesystem
.download_file(file_path.as_path(), tx)
.await?;
Self::parse_gzip_file(file_path.as_path())
}
pub async fn get_prev_arware_tx(&self, tx_id: &str) -> Result<Option<String>> {
self.ar_filesystem.get_last_rollup_tag(tx_id).await
}
pub async fn compress_and_upload_record_batch(
&self,
tx: String,
Expand Down Expand Up @@ -206,8 +218,12 @@ impl ArToolBox {
mod tests {
use super::*;
use arrow::array::{Array, AsArray, BinaryArray, StringArray, UInt32Array, UInt64Array};
use arrow::compute::or;
use arrow::datatypes::{BinaryType, DataType, Field, Schema};
use std::env;
use std::path::PathBuf;
use tempdir::TempDir;

#[test]
fn it_works() {}

Expand Down Expand Up @@ -262,7 +278,6 @@ mod tests {
let res = ArToolBox::parse_gzip_file(parquet_file.as_path()).unwrap();
assert_eq!(res.len(), 1);
let rec = res[0].clone();
println!("schema: {}", rec.schema());
assert!(rec.num_columns() == 4);
assert_eq!(rec.num_rows(), 10);
let payloads = rec
Expand Down Expand Up @@ -310,8 +325,6 @@ mod tests {
let res = ArToolBox::parse_gzip_file(path.as_path()).unwrap();
assert_eq!(res.len(), 1);
let rec = res[0].clone();
println!("schema: {}", rec.schema());
println!("num_rows: {}", rec.num_rows());
assert_eq!(rec.num_columns(), 4);
assert_eq!(rec.num_rows(), 204);

Expand All @@ -322,4 +335,68 @@ mod tests {
assert_eq!(order, 1);
assert_eq!(mutation.signature, "0xf6afe1165ae87fa09375eabccdedc61f3e5af4ed1e5c6456f1b63d397862252667e1f13f0f076f30609754f787c80135c52f7c249e95c9b8fab1b9ed27846c1b1c");
}

#[tokio::test]
async fn download_arware_tx_ut() {
let temp_dir = TempDir::new("download_arware_tx_ut").expect("create temp dir");
let arweave_url = "https://arweave.net";
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let key_root_path = path
.parent()
.unwrap()
.parent()
.unwrap()
.join("tools/keys")
.to_str()
.unwrap()
.to_string();

let network_id = Arc::new(AtomicU64::new(1687961160));
let ar_toolbox = ArToolBox::new(
key_root_path.to_string(),
arweave_url.to_string(),
temp_dir.path().to_str().unwrap().to_string(),
network_id,
)
.unwrap();
let tx_id = "TY5SMaPPRk_TMvSDROaQWyc_WHyJrEL760-UhiNnHG4";
let res = ar_toolbox
.download_and_parse_record_batch(tx_id)
.await
.unwrap();
let rec1 = res[0].clone();
let mutations = ArToolBox::convert_recordbatch_to_mutation(&rec1).unwrap();
assert_eq!(mutations.len(), 8192);
let (mutation, block, order) = mutations[0].clone();
assert_eq!(block, 3712);
assert_eq!(order, 1);
}
#[tokio::test]
async fn get_prev_arware_tx_ut() {
let temp_dir = TempDir::new("download_arware_tx_ut").expect("create temp dir");
let arweave_url = "https://arweave.net";
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let key_root_path = path
.parent()
.unwrap()
.parent()
.unwrap()
.join("tools/keys")
.to_str()
.unwrap()
.to_string();

let network_id = Arc::new(AtomicU64::new(1687961160));
let ar_toolbox = ArToolBox::new(
key_root_path.to_string(),
arweave_url.to_string(),
temp_dir.path().to_str().unwrap().to_string(),
network_id,
)
.unwrap();
let tx_id = "TY5SMaPPRk_TMvSDROaQWyc_WHyJrEL760-UhiNnHG4";
let res = ar_toolbox.get_prev_arware_tx(tx_id).await.unwrap();
assert!(res.is_some());
assert_eq!(res.unwrap(), "ld2W-KnmHhmgYcSgc_DcqjjoU_ke9gkwrQEWk0A2Fpg");
}
}
3 changes: 0 additions & 3 deletions src/node/src/rollup_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
//

use crate::ar_toolbox::ArToolBox;
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use db3_base::times;
use db3_error::{DB3Error, Result};
use db3_proto::db3_mutation_v2_proto::{MutationBody, MutationHeader};
use db3_proto::db3_rollup_proto::{GcRecord, RollupRecord};
use db3_storage::key_store::{KeyStore, KeyStoreConfig};
use db3_storage::meta_store_client::MetaStoreClient;
Expand Down
92 changes: 63 additions & 29 deletions src/storage/src/ar_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ impl ArFileSystem {
addr.as_str()
);
let arweave_url = url::Url::from_str(config.arweave_url.as_str())
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;

let wallet = WalletInfoClient::new(arweave_url);
Ok(Self { arweave, wallet })
}

fn build_arweave(key_root_path: &str, url: &str) -> Result<Arweave> {
let arweave_url =
url::Url::from_str(url).map_err(|e| DB3Error::RollupError(format!("{e}")))?;
url::Url::from_str(url).map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
info!("recover ar key store from {}", key_root_path);
let key_store_config = KeyStoreConfig {
key_root_path: key_root_path.to_string(),
Expand All @@ -74,23 +74,23 @@ impl ArFileSystem {
let data = key_store.get_key("ar")?;
let data_ref: &[u8] = &data;
let priv_key: RsaPrivateKey = RsaPrivateKey::from_pkcs8_der(data_ref)
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
Arweave::from_private_key(priv_key, arweave_url)
.map_err(|e| DB3Error::RollupError(format!("{e}")))
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))
}
false => {
let mut rng = rand::thread_rng();
let bits = 2048;
let priv_key = RsaPrivateKey::new(&mut rng, bits)
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
let doc = priv_key
.to_pkcs8_der()
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
key_store
.write_key("ar", doc.as_ref())
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
Arweave::from_private_key(priv_key, arweave_url)
.map_err(|e| DB3Error::RollupError(format!("{e}")))
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))
}
}
}
Expand All @@ -104,30 +104,30 @@ impl ArFileSystem {
.wallet
.balance(self.arweave.get_wallet_address().as_str())
.await
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
let currency = Currency::from_str(balance.as_str())
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
Ok(currency)
}

pub async fn download_file(&self, path_to_write: &Path, tx: &str) -> Result<()> {
let tx_b64 = Base64::from_str(tx).map_err(|e| DB3Error::RollupError(format!("{e}")))?;
let tx_b64 = Base64::from_str(tx).map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
let (_status, data) = self
.arweave
.get_tx_data(&tx_b64)
.await
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
if let Some(d) = data {
let mut f =
File::create(path_to_write).map_err(|e| DB3Error::RollupError(format!("{e}")))?;
File::create(path_to_write).map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
f.write_all(d.as_ref())
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;

f.sync_all()
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
Ok(())
} else {
Err(DB3Error::RollupError("fail to download file".to_string()))
Err(DB3Error::ArwareOpError("fail to download file".to_string()))
}
}

Expand All @@ -142,18 +142,18 @@ impl ArFileSystem {
) -> Result<(String, u64)> {
let mut tags: Vec<Tag<Base64>> = {
let app_tag: Tag<Base64> = Tag::from_utf8_strs("App-Name", "DB3 Network")
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
let block_start_tag: Tag<Base64> =
Tag::from_utf8_strs("Start-Block", start_block.to_string().as_str())
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
let block_end_tag: Tag<Base64> =
Tag::from_utf8_strs("End-Block", end_block.to_string().as_str())
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
let filename_tag: Tag<Base64> = Tag::from_utf8_strs("File-Name", filename)
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
let network_tag: Tag<Base64> =
Tag::from_utf8_strs("Network-Id", network_id.to_string().as_str())
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
vec![
app_tag,
block_start_tag,
Expand All @@ -165,41 +165,75 @@ impl ArFileSystem {

if !last_ar_tx.is_empty() {
let value = Base64::from_utf8_str(last_ar_tx)
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
let name = Base64::from_utf8_str("Last-Rollup-Tx")
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
let last_rollup_tx = Tag::<Base64> { value, name };
tags.push(last_rollup_tx);
}

let metadata =
std::fs::metadata(path).map_err(|e| DB3Error::RollupError(format!("{e}")))?;
std::fs::metadata(path).map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
let fee = self
.arweave
.get_fee_by_size(metadata.len())
.await
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
self.arweave
.upload_file_from_path(path, tags, fee)
.await
.map_err(|e| DB3Error::RollupError(format!("{e}")))
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))
}

pub async fn get_tx_status(&self, id: &str) -> Result<Option<TxStatus>> {
let tx_id = Base64::from_str(id).map_err(|e| DB3Error::RollupError(format!("{e}")))?;
let tx_id = Base64::from_str(id).map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
let (code, status) = self
.arweave
.get_tx_status(&tx_id)
.await
.map_err(|e| DB3Error::RollupError(format!("{e}")))?;
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
if code == StatusCode::ACCEPTED {
Ok(None)
} else if code == StatusCode::OK {
Ok(status)
} else {
Err(DB3Error::RollupError("fail to get tx status ".to_string()))
Err(DB3Error::ArwareOpError(
"fail to get tx status ".to_string(),
))
}
}
pub async fn get_tags(&self, id_str: &str) -> Result<Vec<Tag<Base64>>> {
let tx_id =
Base64::from_str(id_str).map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;
let (_status, tx) = self
.arweave
.get_tx(&tx_id)
.await
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?;

if let Some(t) = tx {
Ok(t.tags)
} else {
Err(DB3Error::ArwareOpError("fail to get tx tags ".to_string()))
}
}

/// get last rollup tag
pub async fn get_last_rollup_tag(&self, id_str: &str) -> Result<Option<String>> {
let tags = self.get_tags(id_str).await?;
for tag in tags {
if let Ok(name) = tag.name.to_utf8_string() {
if name == "Last-Rollup-Tx" {
return Ok(Some(
tag.value
.to_utf8_string()
.map_err(|e| DB3Error::ArwareOpError(format!("{e}")))?,
));
}
}
}
Ok(None)
}
}

#[cfg(test)]
Expand Down

0 comments on commit 092e2ed

Please sign in to comment.