Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Rewrite faucet with tokio v0.3 #14336

Merged
merged 2 commits into from
Dec 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions faucet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ edition = "2018"
[dependencies]
bincode = "1.3.1"
byteorder = "1.3.4"
bytes = "0.4"
clap = "2.33"
log = "0.4.11"
serde = "1.0.112"
Expand All @@ -22,8 +21,7 @@ solana-logger = { path = "../logger", version = "1.6.0" }
solana-metrics = { path = "../metrics", version = "1.6.0" }
solana-sdk = { path = "../sdk", version = "1.6.0" }
solana-version = { path = "../version", version = "1.6.0" }
tokio = "0.1"
tokio-codec = "0.1"
tokio = { version = "0.3", features = ["full"] }

[lib]
crate-type = ["lib"]
Expand Down
5 changes: 3 additions & 2 deletions faucet/src/bin/faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use std::{
thread,
};

fn main() {
#[tokio::main]
async fn main() {
let default_keypair = solana_cli_config::Config::default().keypair_path;

solana_logger::setup_with_default("solana=info");
Expand Down Expand Up @@ -76,5 +77,5 @@ fn main() {
faucet1.lock().unwrap().clear_request_count();
});

run_faucet(faucet, faucet_addr, None);
run_faucet(faucet, faucet_addr, None).await;
}
110 changes: 60 additions & 50 deletions faucet/src/faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
//! checking requests against a request cap for a given time time_slice
//! and (to come) an IP rate limit.

use bincode::{deserialize, serialize};
use bincode::{deserialize, serialize, serialized_size};
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Bytes, BytesMut};
use log::*;
use serde_derive::{Deserialize, Serialize};
use solana_metrics::datapoint_info;
Expand All @@ -20,18 +19,17 @@ use solana_sdk::{
transaction::Transaction,
};
use std::{
io::{self, Error, ErrorKind},
io::{self, Error, ErrorKind, Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
sync::{mpsc::Sender, Arc, Mutex},
thread,
time::Duration,
};
use tokio::{
self,
net::TcpListener,
prelude::{Future, Read, Sink, Stream, Write},
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream as TokioTcpStream},
runtime::Runtime,
};
use tokio_codec::{BytesCodec, Decoder};

#[macro_export]
macro_rules! socketaddr {
Expand All @@ -58,6 +56,16 @@ pub enum FaucetRequest {
},
}

impl Default for FaucetRequest {
fn default() -> Self {
Self::GetAirdrop {
lamports: u64::default(),
to: Pubkey::default(),
blockhash: Hash::default(),
}
}
}

pub struct Faucet {
faucet_keypair: Keypair,
ip_cache: Vec<IpAddr>,
Expand Down Expand Up @@ -154,7 +162,7 @@ impl Faucet {
}
}
}
pub fn process_faucet_request(&mut self, bytes: &BytesMut) -> Result<Bytes, io::Error> {
pub fn process_faucet_request(&mut self, bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
let req: FaucetRequest = deserialize(bytes).map_err(|err| {
io::Error::new(
io::ErrorKind::Other,
Expand All @@ -177,9 +185,8 @@ impl Faucet {
LittleEndian::write_u16(&mut response_vec_with_length, response_vec.len() as u16);
response_vec_with_length.extend_from_slice(&response_vec);

let response_bytes = Bytes::from(response_vec_with_length);
info!("Airdrop transaction granted");
Ok(response_bytes)
Ok(response_vec_with_length)
}
Err(err) => {
warn!("Airdrop transaction failed: {:?}", err);
Expand Down Expand Up @@ -270,7 +277,8 @@ pub fn run_local_faucet_with_port(
per_time_cap,
None,
)));
run_faucet(faucet, faucet_addr, Some(sender));
let runtime = Runtime::new().unwrap();
runtime.block_on(run_faucet(faucet, faucet_addr, Some(sender)));
});
}

Expand All @@ -283,58 +291,63 @@ pub fn run_local_faucet(
run_local_faucet_with_port(faucet_keypair, sender, per_time_cap, 0)
}

pub fn run_faucet(
pub async fn run_faucet(
faucet: Arc<Mutex<Faucet>>,
faucet_addr: SocketAddr,
send_addr: Option<Sender<SocketAddr>>,
) {
let socket = TcpListener::bind(&faucet_addr).unwrap();
let listener = TcpListener::bind(&faucet_addr).await.unwrap();
if let Some(send_addr) = send_addr {
send_addr.send(socket.local_addr().unwrap()).unwrap();
send_addr.send(listener.local_addr().unwrap()).unwrap();
}
info!("Faucet started. Listening on: {}", faucet_addr);
info!(
"Faucet account address: {}",
faucet.lock().unwrap().faucet_keypair.pubkey()
);

let done = socket
.incoming()
.map_err(|e| debug!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let faucet2 = faucet.clone();
let framed = BytesCodec::new().framed(socket);
let (writer, reader) = framed.split();

let processor = reader.and_then(move |bytes| {
match faucet2.lock().unwrap().process_faucet_request(&bytes) {
Ok(response_bytes) => {
trace!("Airdrop response_bytes: {:?}", response_bytes.to_vec());
Ok(response_bytes)
}
Err(e) => {
info!("Error in request: {:?}", e);
Ok(Bytes::from(0u16.to_le_bytes().to_vec()))
loop {
let _faucet = faucet.clone();
match listener.accept().await {
Ok((stream, _)) => {
tokio::spawn(async move {
if let Err(e) = process(stream, _faucet).await {
info!("failed to process request; error = {:?}", e);
}
}
});
let server = writer
.send_all(processor.or_else(|err| {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Faucet response: {:?}", err),
))
}))
.then(|_| Ok(()));
tokio::spawn(server)
});
tokio::run(done);
});
}
Err(e) => debug!("failed to accept socket; error = {:?}", e),
}
}
}

async fn process(
mut stream: TokioTcpStream,
faucet: Arc<Mutex<Faucet>>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut request = vec![0u8; serialized_size(&FaucetRequest::default()).unwrap() as usize];
while stream.read_exact(&mut request).await.is_ok() {
trace!("{:?}", request);

let response = match faucet.lock().unwrap().process_faucet_request(&request) {
Ok(response_bytes) => {
trace!("Airdrop response_bytes: {:?}", response_bytes);
response_bytes
}
Err(e) => {
info!("Error in request: {:?}", e);
0u16.to_le_bytes().to_vec()
}
};
stream.write_all(&response).await?;
}

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use bytes::BufMut;
use solana_sdk::system_instruction::SystemInstruction;
use std::time::Duration;

Expand Down Expand Up @@ -446,8 +459,6 @@ mod tests {
to,
};
let req = serialize(&req).unwrap();
let mut bytes = BytesMut::with_capacity(req.len());
bytes.put(&req[..]);

let keypair = Keypair::new();
let expected_instruction = system_instruction::transfer(&keypair.pubkey(), &to, lamports);
Expand All @@ -459,12 +470,11 @@ mod tests {
expected_vec_with_length.extend_from_slice(&expected_bytes);

let mut faucet = Faucet::new(keypair, None, None, None);
let response = faucet.process_faucet_request(&bytes);
let response = faucet.process_faucet_request(&req);
let response_vec = response.unwrap().to_vec();
assert_eq!(expected_vec_with_length, response_vec);

let mut bad_bytes = BytesMut::with_capacity(9);
bad_bytes.put("bad bytes");
let bad_bytes = "bad bytes".as_bytes();
assert!(faucet.process_faucet_request(&bad_bytes).is_err());
}
}