Skip to content

Commit 458aafc

Browse files
committed
feat: compress bulk txns with zlib compression
1 parent 493c323 commit 458aafc

File tree

6 files changed

+417
-3
lines changed

6 files changed

+417
-3
lines changed

metabased-sequencer/interceptor/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,9 @@ serde = "1.0.210"
3434
hyper-util = "0.1.8"
3535
alloy-primitives = "0.8.7"
3636
url = { version = "2", features = ["serde"] }
37+
brotli = "7.0.0"
38+
rand = "0.8.5"
39+
flate2 = "1.0.34"
40+
criterion = "0.5.1"
41+
hex = "0.4.3"
42+
prettytable-rs = "0.10.0"

metabased-sequencer/interceptor/src/application/send_raw_transaction.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ where
3939
)?;
4040
}
4141

42+
// 3. Submission/forwarding:
4243
chain.process_transaction(encoded).await?;
4344

4445
Ok(tx.tx_hash().to_owned())

metabased-sequencer/interceptor/src/domain/chain.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ pub trait MetabasedSequencerChainService {
99
async fn process_transaction(&self, tx: Bytes) -> Result<(), Self::Error>;
1010

1111
async fn process_bulk_transactions(&self, tx: Vec<Bytes>) -> Result<(), Self::Error>;
12+
13+
async fn process_bulk_transactions_compressed(&self, tx: Vec<Bytes>) -> Result<(), Self::Error>;
1214
}
1315

1416
#[cfg(test)]
@@ -20,6 +22,7 @@ mod tests {
2022
use std::convert::Infallible;
2123
use std::sync::Arc;
2224
use tokio::sync::RwLock;
25+
use crate::infrastructure::compress_transactions;
2326

2427
#[derive(Debug)]
2528
pub struct InMemoryMetabasedSequencerChain {
@@ -47,5 +50,12 @@ mod tests {
4750

4851
Ok(())
4952
}
53+
54+
async fn process_bulk_transactions_compressed(&self, tx: Vec<Bytes>) -> Result<(), Infallible> {
55+
let compressed_txns = compress_transactions(&tx)?;
56+
self.transactions.write().await.push(compressed_txns);
57+
58+
Ok(())
59+
}
5060
}
5161
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
mod sol;
2+
mod zlib_compression;
23

34
pub use sol::SolMetabasedSequencerChainService;
5+
pub use zlib_compression::{compress_transactions};

metabased-sequencer/interceptor/src/infrastructure/sol.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use crate::domain::primitives::{Address, Bytes};
22
use crate::domain::MetabasedSequencerChainService;
3-
use crate::infrastructure::sol::MetabasedSequencerChain::MetabasedSequencerChainInstance;
43
use alloy::network::Network;
54
use alloy::providers::Provider;
65
use alloy::sol;
76
use alloy::transports::Transport;
87
use async_trait::async_trait;
98
use std::marker::PhantomData;
9+
use crate::infrastructure::compress_transactions;
10+
use crate::infrastructure::sol::MetabasedSequencerChain::MetabasedSequencerChainInstance;
1011

1112
sol! {
1213
#[derive(Debug, PartialEq, Eq)]
@@ -17,6 +18,7 @@ sol! {
1718
function emitTransactionProcessed(bytes calldata encodedTxn) public;
1819
function processTransaction(bytes calldata encodedTxn) public;
1920
function processBulkTransactions(bytes[] calldata encodedTxns) public;
21+
function processBulkTransactionsCompressed(bytes calldata compressedTxns) public; // TODO: Contract needs to be updated
2022
}
2123
}
2224

@@ -52,14 +54,20 @@ impl<P: Provider<T, N>, T: Transport + Clone, N: Network> MetabasedSequencerChai
5254
type Error = alloy::contract::Error;
5355

5456
async fn process_transaction(&self, tx: Bytes) -> Result<(), Self::Error> {
57+
// Do not compress individual transaction, doesn't get smaller
5558
self.contract().processTransaction(tx).call().await?;
56-
5759
Ok(())
5860
}
5961

6062
async fn process_bulk_transactions(&self, tx: Vec<Bytes>) -> Result<(), Self::Error> {
6163
self.contract().processBulkTransactions(tx).call().await?;
64+
Ok(())
65+
}
6266

67+
async fn process_bulk_transactions_compressed(&self, tx: Vec<Bytes>) -> Result<(), Self::Error> {
68+
// Note: Compression starts being beneficial at ~5 transactions
69+
let compressed_txs = compress_transactions(&tx)?;
70+
self.contract().processBulkTransactionsCompressed(compressed_txs).call().await?;
6371
Ok(())
6472
}
65-
}
73+
}

0 commit comments

Comments
 (0)