Skip to content

Commit

Permalink
Add Minimal Implementation of MEV-Share SSE Collector (#8)
Browse files Browse the repository at this point in the history
* Minimal implementation of MEV-Share collector event source via SSE

* Comments

* Better error handling

* Change SSE lib to mev-share-rs
  • Loading branch information
grayleonard authored May 9, 2023
1 parent f1b7bab commit 41fecab
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 1 deletion.
1 change: 1 addition & 0 deletions crates/artemis-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ futures = "0.3"
tokio-tungstenite = "*"
async-trait = "0.1.64"
opensea-stream = { git = "https://github.com/FrankieIsLost/opensea-stream-rs"}
mev-share-rs = { git = "https://github.com/mattsse/mev-share-rs" }
ethers-flashbots = { git = "https://github.com/FrankieIsLost/ethers-flashbots", features = ["rustls"] }
reqwest = { version = "0.11.14", default-features = false, features = ["rustls-tls"] }
serde = "1.0.152"
Expand Down
33 changes: 33 additions & 0 deletions crates/artemis-core/src/collectors/mevshare_collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use async_trait::async_trait;
use tokio_stream::{StreamExt};
use mev_share_rs::{EventClient, sse::{Event}};
use crate::types::{Collector, CollectorStream};
use anyhow::Result;

/// A collector that streams from MEV-Share SSE endpoint
/// and generates [events](Event), which return tx hash, logs, and bundled txs.
pub struct MevShareCollector {
mevshare_sse_url: String,
}

impl MevShareCollector {
pub fn new(mevshare_sse_url: String) -> Self {
Self { mevshare_sse_url }
}
}

/// Implementation of the [Collector](Collector) trait for the
/// [MevShareCollector](MevShareCollector).
#[async_trait]
impl Collector<Event> for MevShareCollector
{
async fn get_event_stream(&self) -> Result<CollectorStream<Event>> {
let client = EventClient::default();
let stream = client.subscribe(&self.mevshare_sse_url).await.unwrap();
let stream = stream.filter_map(|event| match event {
Ok(evt) => Some(evt),
Err(_) => None
});
Ok(Box::pin(stream))
}
}
2 changes: 2 additions & 0 deletions crates/artemis-core/src/collectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ pub mod mempool_collector;

/// This collector listens to a stream of new Opensea orders.
pub mod opensea_order_collector;

pub mod mevshare_collector;
12 changes: 11 additions & 1 deletion crates/artemis-core/tests/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration};

use artemis_core::{
collectors::{block_collector::BlockCollector, mempool_collector::MempoolCollector},
collectors::{block_collector::BlockCollector, mempool_collector::MempoolCollector, mevshare_collector::MevShareCollector},
executors::mempool_executor::{MempoolExecutor, SubmitTxToMempool},
types::{Collector, Executor},
};
Expand Down Expand Up @@ -86,3 +86,13 @@ async fn test_mempool_executor_sends_tx_simple() {
let tx = provider.get_transaction_count(account, None).await.unwrap();
assert_eq!(tx, 1.into());
}

/// Test that mevshare collector correctly emits blocks.
#[tokio::test]
async fn test_mevshare_collector_sends_events() {
let mevshare_collector = MevShareCollector::new(String::from("https://mev-share.flashbots.net"));
let block_stream = mevshare_collector.get_event_stream().await.unwrap();
let block_a = block_stream.into_future().await.0.unwrap();
assert_eq!(block_a.hash, block_a.hash);
}

0 comments on commit 41fecab

Please sign in to comment.