From a88f0eed688a8b7fc5b599ea6bcc7beef5bc9c8c Mon Sep 17 00:00:00 2001 From: atomgardner Date: Wed, 2 Aug 2023 17:15:02 +1000 Subject: [PATCH] Add a `torrent from-link` subcommand (#511) It is now possible to create a torrent file from a magnet link. The search is performed with minimal concurrency, and so may take a while to complete. type: added --- Cargo.lock | 1 + Cargo.toml | 1 + bin/gen/config.yaml | 4 + src/common.rs | 10 +- src/error.rs | 32 ++ src/infohash.rs | 8 + src/lib.rs | 1 + src/magnet_link.rs | 12 +- src/peer.rs | 6 + src/peer/client.rs | 675 +++++++++++++++++++++++ src/peer/connection.rs | 93 ++++ src/peer/extended.rs | 9 + src/peer/handshake.rs | 63 +++ src/peer/message.rs | 109 ++++ src/peer/message/extended/handshake.rs | 164 ++++++ src/peer/message/extended/id.rs | 38 ++ src/peer/message/extended/mod.rs | 9 + src/peer/message/extended/ut_metadata.rs | 82 +++ src/peer/message/flavour.rs | 58 ++ src/subcommand/torrent.rs | 3 + src/subcommand/torrent/announce.rs | 4 +- src/subcommand/torrent/from_link.rs | 328 +++++++++++ src/tracker.rs | 4 + src/tracker/client.rs | 36 +- src/tracker/daemon.rs | 153 +++++ 25 files changed, 1879 insertions(+), 24 deletions(-) create mode 100644 src/peer.rs create mode 100644 src/peer/client.rs create mode 100644 src/peer/connection.rs create mode 100644 src/peer/extended.rs create mode 100644 src/peer/handshake.rs create mode 100644 src/peer/message.rs create mode 100644 src/peer/message/extended/handshake.rs create mode 100644 src/peer/message/extended/id.rs create mode 100644 src/peer/message/extended/mod.rs create mode 100644 src/peer/message/extended/ut_metadata.rs create mode 100644 src/peer/message/flavour.rs create mode 100644 src/subcommand/torrent/from_link.rs create mode 100644 src/tracker/daemon.rs diff --git a/Cargo.lock b/Cargo.lock index de4b566e..8eaa504d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -674,6 +674,7 @@ dependencies = [ "pretty_assertions", "pretty_env_logger", "rand", + "rayon", "regex", "serde", "serde-hex", diff --git a/Cargo.toml b/Cargo.toml index b5b734f1..df7f8d69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ rand = "0.7.3" open = "1.4.0" pretty_assertions = "0.6.0" pretty_env_logger = "0.4.0" +rayon = "<=1.6.0" regex = "1.0.0" serde-hex = "0.1.0" serde_bytes = "0.11.0" diff --git a/bin/gen/config.yaml b/bin/gen/config.yaml index 2d958b07..3ad9e938 100644 --- a/bin/gen/config.yaml +++ b/bin/gen/config.yaml @@ -19,6 +19,10 @@ examples: text: "Intermodal can be used to create `.torrent` files:" code: "imdl torrent create --input foo" +- command: imdl torrent from-link + text: "Intermodal can be used to create a `.torrent` file from a magnet link:" + code: "imdl torrent from-link magnet:?foo" + - command: imdl torrent show text: "Print information about existing `.torrent` files:" code: "imdl torrent show --input foo.torrent" diff --git a/src/common.rs b/src/common.rs index 891e868b..2d159826 100644 --- a/src/common.rs +++ b/src/common.rs @@ -12,14 +12,14 @@ pub(crate) use std::{ hash::Hash, io::{self, BufRead, BufReader, Cursor, Read, Write}, iter::{self, Sum}, - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs, UdpSocket}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpStream, ToSocketAddrs, UdpSocket}, num::{ParseFloatError, ParseIntError, TryFromIntError}, ops::{AddAssign, Div, DivAssign, Mul, MulAssign, Sub, SubAssign}, path::{self, Path, PathBuf}, process::ExitStatus, str::{self, FromStr}, string::FromUtf8Error, - sync::Once, + sync::{mpsc::channel, Once}, time::{Duration, SystemTime, SystemTimeError}, usize, }; @@ -54,7 +54,9 @@ pub(crate) use url::{Host, Url}; pub(crate) use log::trace; // modules -pub(crate) use crate::{consts, error, host_port_parse_error, magnet_link_parse_error, tracker}; +pub(crate) use crate::{ + consts, error, host_port_parse_error, magnet_link_parse_error, peer, tracker, +}; // functions pub(crate) use crate::xor_args::xor_args; @@ -88,9 +90,11 @@ mod test { // test stdlib types pub(crate) use std::{ cell::RefCell, + net::TcpListener, ops::{Deref, DerefMut}, process::Command, rc::Rc, + thread, }; // test dependencies diff --git a/src/error.rs b/src/error.rs index e59539de..7dc174e4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -30,6 +30,8 @@ pub(crate) enum Error { Filesystem { source: io::Error, path: PathBuf }, #[snafu(display("Error searching for files: {}", source))] FileSearch { source: ignore::Error }, + #[snafu(display("Failed to fetch infodict from accessible peers"))] + FromLinkNoInfo, #[snafu(display("Invalid glob: {}", source))] GlobParse { source: globset::Error }, #[snafu(display("Failed to serialize torrent info dictionary: {}", source))] @@ -68,6 +70,8 @@ pub(crate) enum Error { input: InputTarget, source: MetainfoError, }, + #[snafu(display("Network error: {}", source))] + Network { source: io::Error }, #[snafu(display("Failed to invoke opener: {}", source))] OpenerInvoke { source: io::Error }, #[snafu(display("Opener failed: {}", exit_status))] @@ -114,6 +118,34 @@ pub(crate) enum Error { bytes: Bytes, source: TryFromIntError, }, + #[snafu(display("Received peer handshake with the wrong infohash"))] + PeerHandshakeInfohash, + #[snafu(display("Received peer handshake with the wrong protocol header"))] + PeerHandshakeHeader, + #[snafu(display("Bencoding error: `{}`", source))] + PeerMessageBencode { source: bendy::serde::Error }, + #[snafu(display("Peer extended message payload is malformed"))] + PeerMessageExtendedPayload, + #[snafu(display("Failed to decode bencoded message: `{}`", source))] + PeerMessageFromBencode { source: bendy::serde::Error }, + #[snafu(display("Peer message payload is too large"))] + PeerMessagePayload { source: TryFromIntError }, + #[snafu(display("Extended handshake has not been received from peer"))] + PeerNoExtendedHandshake, + #[snafu(display("Received UtMetadata info dict that's failed to deserialize"))] + PeerUtMetadataInfoDeserialize { source: bendy::serde::Error }, + #[snafu(display("Received UtMetadata info dict that's too long"))] + PeerUtMetadataInfoLength, + #[snafu(display("Received UtMetadata data message that's too long"))] + PeerUtMetadataPieceLength, + #[snafu(display("Peer doesn't know metadata size"))] + PeerUtMetadataMetadataSizeNotKnown, + #[snafu(display("Peer doesn't support UtMetadata extension"))] + PeerUtMetadataNotSupported, + #[snafu(display("Hash of received info dict does not match"))] + PeerUtMetadataWrongInfohash, + #[snafu(display("Received the wrong UtMetadata piece"))] + PeerUtMetadataWrongPiece, #[snafu(display("Piece length `{}` is not an even power of two", bytes))] PieceLengthUneven { bytes: Bytes }, #[snafu(display("Piece length must be at least 16 KiB"))] diff --git a/src/infohash.rs b/src/infohash.rs index 9425787b..f4888fea 100644 --- a/src/infohash.rs +++ b/src/infohash.rs @@ -62,6 +62,14 @@ impl From for Infohash { } } +impl From<[u8; 20]> for Infohash { + fn from(bytes: [u8; 20]) -> Self { + Infohash { + inner: Sha1Digest::from_bytes(bytes), + } + } +} + impl From for [u8; 20] { fn from(infohash: Infohash) -> Self { infohash.inner.bytes() diff --git a/src/lib.rs b/src/lib.rs index a812a269..b1860e56 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,6 +71,7 @@ mod mode; mod options; mod output_stream; mod output_target; +mod peer; mod piece_length_picker; mod piece_list; mod platform; diff --git a/src/magnet_link.rs b/src/magnet_link.rs index a9bb3d7a..0717b740 100644 --- a/src/magnet_link.rs +++ b/src/magnet_link.rs @@ -1,12 +1,12 @@ use crate::common::*; -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub(crate) struct MagnetLink { - infohash: Infohash, - name: Option, - peers: Vec, - trackers: Vec, - indices: BTreeSet, + pub(crate) infohash: Infohash, + pub(crate) name: Option, + pub(crate) peers: Vec, + pub(crate) trackers: Vec, + pub(crate) indices: BTreeSet, } impl MagnetLink { diff --git a/src/peer.rs b/src/peer.rs new file mode 100644 index 00000000..0db8f3c3 --- /dev/null +++ b/src/peer.rs @@ -0,0 +1,6 @@ +pub(crate) use client::Client; + +pub(crate) mod client; +pub(crate) mod connection; +pub(crate) mod handshake; +pub(crate) mod message; diff --git a/src/peer/client.rs b/src/peer/client.rs new file mode 100644 index 00000000..4ea21e8a --- /dev/null +++ b/src/peer/client.rs @@ -0,0 +1,675 @@ +use crate::common::*; + +use message::extended; +use message::Message; +use peer::connection::Connection; +use peer::message; + +#[derive(Debug)] +pub(crate) struct Client { + infohash: Infohash, + conn: Connection, + state: State, + info: Option, + extension_handshake: Option, +} + +#[derive(Debug, PartialEq)] +pub(crate) enum State { + Idle, + WantInfo(Vec), +} + +impl Client { + pub(crate) fn connect(addr: &SocketAddr, infohash: Infohash) -> Result { + let conn = Connection::new(addr, infohash)?; + + if !conn.supports_extension_protocol() { + return Err(Error::PeerUtMetadataNotSupported); + } + + Ok(Client { + infohash, + conn, + state: State::Idle, + info: None, + extension_handshake: None, + }) + } + + fn send_extension_handshake(&mut self) -> Result<()> { + let mut handshake = extended::Handshake::default(); + if let Some(info) = &self.info { + let info_dict = bendy::serde::ser::to_bytes(&info).context(error::InfoSerialize)?; + handshake.with_metadata_size(info_dict.len()); + } + self.conn.send(&Message::new_extended( + extended::Id::Handshake.into(), + handshake, + )?) + } + + pub(crate) fn fetch_info_dict(mut self) -> Result { + self.state = State::WantInfo(Vec::new()); + + self.send_extension_handshake()?; + + loop { + if let Some(info) = self.info { + return Ok(info); + } + + let msg = self.conn.recv()?; + if msg.flavour != message::Flavour::Extended { + continue; + } + + self.handle_msg(&msg)?; + } + } + + fn handle_msg(&mut self, msg: &Message) -> Result<()> { + match msg.flavour { + message::Flavour::Extended => self.handle_extended(msg), + _ => Ok(()), + } + } + + fn handle_extended(&mut self, msg: &Message) -> Result<()> { + let (id, payload) = msg.parse_extended_payload()?; + match id { + extended::Id::Handshake => self.handle_extension_handshake(payload), + extended::Id::UtMetadata => self.handle_ut_metadata(payload), + extended::Id::NotImplemented(_) => Ok(()), + } + } + + fn handle_extension_handshake(&mut self, payload: &[u8]) -> Result<()> { + let handshake: extended::Handshake = Message::from_bencode(payload)?; + + // Drop the peer if we want info and the peer can't give it to us. + if let State::WantInfo(_) = self.state { + if handshake.metadata_size.is_none() { + return Err(Error::PeerUtMetadataMetadataSizeNotKnown); + } else if handshake + .message_ids + .get(extended::UtMetadata::NAME) + .is_none() + { + return Err(Error::PeerUtMetadataNotSupported); + } + }; + + self.extension_handshake.replace(handshake); + + if let State::WantInfo(_) = self.state { + self.send_ut_metadata_request(0)?; + } + + Ok(()) + } + + fn handle_ut_metadata(&mut self, payload: &[u8]) -> Result<()> { + let metadata_size = self.ut_metadata_size()?; + + let msg: extended::UtMetadata = Message::from_bencode(payload)?; + match msg.msg_type.into() { + extended::ut_metadata::MsgType::Data => (), + extended::ut_metadata::MsgType::Request | extended::ut_metadata::MsgType::Reject => { + return Ok(()) + } + }; + + if let State::WantInfo(info_buf) = &mut self.state { + let piece = info_buf.len() / extended::UtMetadata::PIECE_LENGTH; + if msg.piece != piece { + return Err(Error::PeerUtMetadataWrongPiece); + } + + // The ut_metadata::MsgType::Data payload splits into two parts, + // 1. a bencoded UtMetadata message, + // 2. the binary info_dict peice data. + // Their boundary is not delimited. Bencode the message to find the piece offset. + let piece_offset = bendy::serde::ser::to_bytes(&msg) + .context(error::PeerMessageBencode)? + .len(); + if payload[piece_offset..].len() > extended::UtMetadata::PIECE_LENGTH { + return Err(Error::PeerUtMetadataPieceLength); + } + info_buf.extend_from_slice(&payload[piece_offset..]); + + return match info_buf.len().cmp(&metadata_size) { + Ordering::Equal => { + let info = Self::verify_info_dict(info_buf, self.infohash)?; + self.info = Some(info); + self.state = State::Idle; + Ok(()) + } + Ordering::Less => self.send_ut_metadata_request(piece + 1), + Ordering::Greater => Err(Error::PeerUtMetadataInfoLength), + }; + } + + Ok(()) + } + + pub(crate) fn send_ut_metadata_request(&mut self, piece: usize) -> Result<()> { + let id = self.ut_metadata_msg_id()?; + let msg = Message::new_extended(id, extended::UtMetadata::request(piece))?; + + self.conn.send(&msg) + } + + fn verify_info_dict(buf: &[u8], target: Infohash) -> Result { + let info = + bendy::serde::de::from_bytes::(buf).context(error::PeerUtMetadataInfoDeserialize)?; + + let infohash = Infohash::from_bencoded_info_dict( + &bendy::serde::ser::to_bytes(&info).context(error::InfoSerialize)?, + ); + + if infohash == target { + Ok(info) + } else { + Err(Error::PeerUtMetadataWrongInfohash) + } + } + + fn ut_metadata_size(&self) -> Result { + match &self.extension_handshake { + Some(handshake) => match handshake.metadata_size { + Some(size) => Ok(size), + None => Err(Error::PeerUtMetadataMetadataSizeNotKnown), + }, + None => Err(Error::PeerNoExtendedHandshake), + } + } + + fn ut_metadata_msg_id(&self) -> Result { + match &self.extension_handshake { + Some(handshake) => match handshake.message_ids.get(extended::UtMetadata::NAME) { + Some(id) => Ok(*id), + None => Err(Error::PeerUtMetadataNotSupported), + }, + None => Err(Error::PeerNoExtendedHandshake), + } + } + + #[cfg(test)] + pub(crate) fn listen(listener: &TcpListener, infohash: Infohash) -> Result { + let (conn, _) = listener.accept().context(error::Network)?; + conn + .set_read_timeout(Some(Duration::new(3, 0))) + .context(error::Network)?; + + Ok(Client { + infohash, + conn: Connection::from(conn, infohash)?, + state: State::Idle, + extension_handshake: None, + info: None, + }) + } + + #[cfg(test)] + pub(crate) fn send_ut_metadata_data( + &mut self, + piece: usize, + total_size: usize, + data: &[u8], + ) -> Result<()> { + let message_id = self.ut_metadata_msg_id()?; + + let msg = Message::new_extended_with_trailer( + message_id, + extended::UtMetadata::data(piece, total_size), + data, + )?; + + self.conn.send(&msg) + } + + #[cfg(test)] + pub(crate) fn spawn_info_dict_seeder(info: &Info) -> (thread::JoinHandle<()>, SocketAddr) { + let info_dict = bendy::serde::ser::to_bytes(info).unwrap(); + let infohash = Infohash::from_bencoded_info_dict(&info_dict); + let listener = TcpListener::bind((Ipv4Addr::UNSPECIFIED, 0)).unwrap(); + let addr = (Ipv4Addr::LOCALHOST, listener.local_addr().unwrap().port()).into(); + let seeder = thread::spawn(move || { + let mut seeder = Client::listen(&listener, infohash).unwrap(); + let handshake = extended::Handshake { + metadata_size: Some(info_dict.len()), + ..extended::Handshake::default() + }; + let msg = Message::new_extended(extended::Id::Handshake.into(), handshake).unwrap(); + seeder.conn.send(&msg).unwrap(); + + // The first message from the fetcher is an extension handshake. + let msg = seeder.conn.recv().unwrap(); + assert_eq!(msg.flavour, message::Flavour::Extended); + let (id, _) = msg.parse_extended_payload().unwrap(); + assert_eq!(id, extended::Id::Handshake); + seeder.handle_msg(&msg).unwrap(); + + let mut pieces = info_dict.len() / extended::UtMetadata::PIECE_LENGTH; + if info_dict.len() % extended::UtMetadata::PIECE_LENGTH > 0 { + pieces += 1; + } + + // Respond to any serviceable ut_metadata request. Ignore errors. + loop { + let msg = match seeder.conn.recv() { + Ok(msg) => msg, + Err(_) => continue, + }; + + let payload = match msg.parse_extended_payload() { + Ok((_, payload)) => payload, + Err(_) => continue, + }; + + let req: extended::UtMetadata = match Message::from_bencode(payload) { + Ok(req) => req, + Err(_) => continue, + }; + if req.piece > pieces { + continue; + } + + let range = std::ops::Range { + start: extended::UtMetadata::PIECE_LENGTH * req.piece, + end: if pieces == 1 { + info_dict.len() + } else { + extended::UtMetadata::PIECE_LENGTH * (req.piece + 1) + }, + }; + + seeder + .send_ut_metadata_data(req.piece, info_dict.len(), &info_dict[range]) + .unwrap(); + } + }); + + (seeder, addr) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn new_one_piece_info() -> Info { + Info { + private: Some(true), + piece_length: Bytes(9001), + name: "foo".into(), + source: None, + pieces: PieceList::new(), + mode: Mode::Single { + md5sum: None, + length: Bytes(1), + }, + update_url: None, + } + } + + fn new_two_piece_info() -> Info { + Info { + private: Some(true), + piece_length: Bytes(9001), + name: "a".repeat(extended::UtMetadata::PIECE_LENGTH), + source: None, + pieces: PieceList::from_pieces(["hello", "cargo", "test"]), + mode: Mode::Single { + md5sum: None, + length: Bytes(1), + }, + update_url: None, + } + } + + /// Start a remote client that listens on the returned `SocketAddr`. The client responds to the + /// first received handshake. The client performs `work` if the handshake is successful. + fn spawn_peer(infohash: Infohash, work: W) -> (thread::JoinHandle>, SocketAddr) + where + W: Fn(Client) -> Result + Send + 'static, + T: Send + 'static, + { + let listener = TcpListener::bind((Ipv4Addr::UNSPECIFIED, 0)).unwrap(); + let addr = listener.local_addr().unwrap(); + let join_handle = thread::spawn(move || work(Client::listen(&listener, infohash)?)); + (join_handle, (Ipv4Addr::LOCALHOST, addr.port()).into()) + } + + fn spawn_idle_peer(infohash: Infohash) -> (thread::JoinHandle>, SocketAddr) { + spawn_peer(infohash, |_| Ok(())) + } + + fn spawn_info_dict_fetcher(infohash: Infohash) -> (thread::JoinHandle>, SocketAddr) { + spawn_peer(infohash, Client::fetch_info_dict) + } + + fn new_client_ready_to_send_ut_metadata_data( + addr: SocketAddr, + infohash: Infohash, + info: Info, + ) -> peer::Client { + let mut c = Client::connect(&addr, infohash).unwrap(); + c.info = Some(info); + c.send_extension_handshake().unwrap(); + expect_extended_handshake(&mut c); + expect_ut_metadata_request(&mut c, 0); + c + } + + fn expect_extended_handshake(c: &mut Client) { + let msg = c.conn.recv().unwrap(); + assert_eq!(msg.flavour, message::Flavour::Extended); + let (id, _) = msg.parse_extended_payload().unwrap(); + assert_eq!(id, extended::Id::Handshake); + c.handle_msg(&msg).unwrap(); + assert_matches!(&c.extension_handshake, Some(..)); + assert_matches!(c.ut_metadata_msg_id(), Ok(..)); + } + + fn expect_ut_metadata_request(c: &mut Client, piece: usize) { + let msg = c.conn.recv().unwrap(); + assert_eq!(msg.flavour, message::Flavour::Extended); + let (id, payload) = msg.parse_extended_payload().unwrap(); + assert_eq!(id, extended::Id::UtMetadata); + let ut_metadata_request: extended::UtMetadata = Message::from_bencode(payload).unwrap(); + assert_eq!( + ut_metadata_request.msg_type, + u8::from(extended::ut_metadata::MsgType::Request) + ); + assert_eq!(ut_metadata_request.piece, piece); + } + + #[test] + fn handshake() { + let info = Info { + private: Some(true), + piece_length: Bytes(9001), + name: "test info".into(), + source: None, + pieces: PieceList::new(), + mode: Mode::Single { + md5sum: None, + length: Bytes(1), + }, + update_url: None, + }; + let infohash = info.infohash_lossy().unwrap(); + + let (remote_handle, addr) = spawn_idle_peer(infohash); + assert_matches!(Client::connect(&addr, infohash), Ok(..)); + assert_matches!(remote_handle.join(), Ok(..)); + } + + #[test] + fn handshake_bad_bt_header() { + let info = new_one_piece_info(); + let infohash = info.infohash_lossy().unwrap(); + let (handle, addr) = spawn_idle_peer(infohash); + + let mut stream = TcpStream::connect_timeout(&addr, Duration::new(3, 0)).unwrap(); + let mut payload = peer::handshake::Handshake::new(infohash).serialize(); + // Deface the handshake. + payload[0] = b'i'; + payload[1] = b'm'; + payload[2] = b'd'; + payload[3] = b'l'; + stream.write_all(&payload[..]).unwrap(); + + assert_matches!(handle.join().unwrap(), Err(Error::PeerHandshakeHeader)); + } + + #[test] + fn handshake_infohash_mismatch() { + let info = new_one_piece_info(); + let mut info2 = new_one_piece_info(); + info2.name = String::from("bar"); + let infohash = info.infohash_lossy().unwrap(); + let infohash2 = info2.infohash_lossy().unwrap(); + let (handle, addr) = spawn_idle_peer(infohash); + + assert_matches!( + Client::connect(&addr, infohash2), + Err(Error::Network { .. }) + ); + assert_matches!(handle.join().unwrap(), Err(Error::PeerHandshakeInfohash)); + } + + #[test] + fn connection_timeout() { + let info = new_one_piece_info(); + let infohash = info.infohash_lossy().unwrap(); + let listener = TcpListener::bind((Ipv4Addr::UNSPECIFIED, 0)).unwrap(); + let addr = (Ipv4Addr::LOCALHOST, listener.local_addr().unwrap().port()).into(); + + assert_matches!(Client::connect(&addr, infohash), Err(Error::Network { .. })); + } + + #[test] + fn extension_handshake() { + let info = new_one_piece_info(); + let infohash = info.infohash_lossy().unwrap(); + + let (handle, addr) = spawn_peer(infohash, |mut c| { + c.send_extension_handshake()?; + let msg = c.conn.recv()?; + c.handle_msg(&msg) + }); + + let mut c = Client::connect(&addr, infohash).unwrap(); + c.send_extension_handshake().unwrap(); + c.conn.recv().and_then(|msg| c.handle_msg(&msg)).unwrap(); + let handshake = c.extension_handshake.unwrap(); + assert_eq!( + handshake.version.unwrap(), + format!("intermodal {}", consts::VERSION) + ); + assert_matches!(handle.join().unwrap(), Ok(())); + } + + #[test] + fn extension_handshake_errors() { + let info = new_one_piece_info(); + let infohash = info.infohash_lossy().unwrap(); + let (handle, addr) = spawn_peer(infohash, |mut c| { + let msg = c.conn.recv()?; + c.handle_msg(&msg)?; + c.send_extension_handshake() + }); + + let mut local = Client::connect(&addr, infohash).unwrap(); + assert_matches!( + local.ut_metadata_size(), + Err(Error::PeerNoExtendedHandshake) + ); + assert_matches!( + local.ut_metadata_msg_id(), + Err(Error::PeerNoExtendedHandshake) + ); + local.send_extension_handshake().unwrap(); + local + .conn + .recv() + .and_then(|msg| local.handle_msg(&msg)) + .unwrap(); + assert_matches!( + local.ut_metadata_size(), + Err(Error::PeerUtMetadataMetadataSizeNotKnown) + ); + assert_matches!(handle.join().unwrap(), Ok(())); + } + + #[test] + fn ut_metadata_not_supported() { + let info = new_one_piece_info(); + let infohash = info.infohash_lossy().unwrap(); + let (join_handle, addr) = spawn_info_dict_fetcher(infohash); + let mut c = Client::connect(&addr, infohash).unwrap(); + let mut extended_handshake = extended::Handshake::new(); // no ut_metadata message id set + extended_handshake.metadata_size = Some(1); + c.conn + .send(&Message::new_extended(extended::Id::Handshake.into(), extended_handshake).unwrap()) + .unwrap(); + + assert_matches!( + join_handle.join().unwrap(), + Err(Error::PeerUtMetadataNotSupported) + ); + } + + #[test] + fn metadata_size_not_known() { + let info = new_one_piece_info(); + let infohash = info.infohash_lossy().unwrap(); + + let (join_handle, addr) = spawn_info_dict_fetcher(infohash); + let mut c = Client::connect(&addr, infohash).unwrap(); + c.send_extension_handshake().unwrap(); + + assert_matches!( + join_handle.join().unwrap(), + Err(Error::PeerUtMetadataMetadataSizeNotKnown) + ); + } + + #[test] + fn fetch_info_one_piece() { + let info = new_one_piece_info(); + let info_dict = bendy::serde::ser::to_bytes(&info).unwrap(); + let infohash = info.infohash_lossy().unwrap(); + + let (join_handle, addr) = spawn_info_dict_fetcher(infohash); + let mut c = new_client_ready_to_send_ut_metadata_data(addr, infohash, info.clone()); + + c.send_ut_metadata_data(0, info_dict.len(), &info_dict[..]) + .unwrap(); + + assert_eq!(join_handle.join().unwrap().unwrap(), info); + } + + #[test] + fn fetch_info_two_pieces() { + let info = new_two_piece_info(); + let info_dict = bendy::serde::ser::to_bytes(&info).unwrap(); + let infohash = info.infohash_lossy().unwrap(); + let (fetcher, addr) = spawn_info_dict_fetcher(infohash); + let mut c = new_client_ready_to_send_ut_metadata_data(addr, infohash, info.clone()); + + c.send_ut_metadata_data( + 0, + info_dict.len(), + &info_dict[..extended::UtMetadata::PIECE_LENGTH], + ) + .unwrap(); + expect_ut_metadata_request(&mut c, 1); + c.send_ut_metadata_data( + 1, + info_dict.len(), + &info_dict[extended::UtMetadata::PIECE_LENGTH..], + ) + .unwrap(); + + assert_eq!(fetcher.join().unwrap().unwrap(), info); + } + + #[test] + fn ut_metadata_wrong_piece_length() { + let info = new_two_piece_info(); + let info_dict = bendy::serde::ser::to_bytes(&info).unwrap(); + let infohash = info.infohash_lossy().unwrap(); + + let (join_handle, addr) = spawn_info_dict_fetcher(infohash); + let mut c = new_client_ready_to_send_ut_metadata_data(addr, infohash, info); + + c.send_ut_metadata_data(0, info_dict.len(), &info_dict[..]) + .unwrap(); + + assert_matches!( + join_handle.join().unwrap(), + Err(Error::PeerUtMetadataPieceLength) + ); + } + + #[test] + fn ut_metadata_receive_wrong_piece() { + let info = new_two_piece_info(); + let info_dict = bendy::serde::ser::to_bytes(&info).unwrap(); + let infohash = info.infohash_lossy().unwrap(); + + let (join_handle, addr) = spawn_info_dict_fetcher(infohash); + let mut c = new_client_ready_to_send_ut_metadata_data(addr, infohash, info); + c.send_ut_metadata_data( + 1, // wrong piece + info_dict.len(), + &info_dict[extended::UtMetadata::PIECE_LENGTH..], + ) + .unwrap(); + + assert_matches!( + join_handle.join().unwrap(), + Err(Error::PeerUtMetadataWrongPiece) + ); + } + + #[test] + fn receive_info_dict_with_wrong_infohash() { + let info = new_one_piece_info(); + let mut wrong_info = new_one_piece_info(); + wrong_info.name = String::from("bar"); + let infohash = info.infohash_lossy().unwrap(); + + let (join_handle, addr) = spawn_info_dict_fetcher(infohash); + let mut c = new_client_ready_to_send_ut_metadata_data(addr, infohash, info); + + let wrong_info_dict = bendy::serde::ser::to_bytes(&wrong_info).unwrap(); + c.send_ut_metadata_data(0, wrong_info_dict.len(), &wrong_info_dict[..]) + .expect("ut_metadata data send succeeds"); + + assert_matches!( + join_handle.join().unwrap(), + Err(Error::PeerUtMetadataWrongInfohash) + ); + } + + #[test] + fn receive_info_dict_that_fails_to_deserialize() { + let info = new_one_piece_info(); + let infohash = info.infohash_lossy().unwrap(); + let info_dict_len = bendy::serde::ser::to_bytes(&info).unwrap().len(); + let wrong_info_dict = vec![0; info_dict_len]; + + let (join_handle, addr) = spawn_info_dict_fetcher(infohash); + let mut c = new_client_ready_to_send_ut_metadata_data(addr, infohash, info); + c.send_ut_metadata_data(0, wrong_info_dict.len(), &wrong_info_dict[..]) + .unwrap(); + + assert_matches!( + join_handle.join().unwrap(), + Err(Error::PeerUtMetadataInfoDeserialize { .. }) + ); + } + + #[test] + fn receive_info_dict_with_wrong_metadata_size() { + let info = new_one_piece_info(); + let infohash = info.infohash_lossy().unwrap(); + let info_dict_len = bendy::serde::ser::to_bytes(&info).unwrap().len(); + let wrong_info_dict = vec![0; info_dict_len + 1]; + + let (join_handle, addr) = spawn_info_dict_fetcher(infohash); + let mut c = new_client_ready_to_send_ut_metadata_data(addr, infohash, info); + c.send_ut_metadata_data(0, wrong_info_dict.len(), &wrong_info_dict[..]) + .unwrap(); + + assert_matches!( + join_handle.join().unwrap(), + Err(Error::PeerUtMetadataInfoLength { .. }) + ); + } +} diff --git a/src/peer/connection.rs b/src/peer/connection.rs new file mode 100644 index 00000000..70372cf1 --- /dev/null +++ b/src/peer/connection.rs @@ -0,0 +1,93 @@ +use crate::common::*; + +use message::Message; +use peer::handshake::Handshake; +use peer::message; + +#[derive(Debug)] +pub struct Connection { + pub(crate) stream: TcpStream, + pub(crate) handshake: Handshake, +} + +impl Connection { + pub(crate) fn new(addr: &SocketAddr, infohash: Infohash) -> Result { + let mut stream = + TcpStream::connect_timeout(addr, Duration::new(3, 0)).context(error::Network)?; + stream + .set_read_timeout(Some(Duration::new(3, 0))) + .context(error::Network)?; + + Self::send_handshake(&mut stream, infohash)?; + let handshake = Self::recv_handshake(&mut stream, infohash)?; + + Ok(Self { stream, handshake }) + } + + fn recv_handshake(stream: &mut TcpStream, infohash: Infohash) -> Result { + let mut buf = [0u8; Handshake::LENGTH]; + stream.read_exact(&mut buf).context(error::Network)?; + let handshake = Handshake::try_from(buf)?; + if Infohash::from(handshake.infohash) != infohash { + return Err(error::Error::PeerHandshakeInfohash); + } + Ok(handshake) + } + + fn send_handshake(stream: &mut TcpStream, infohash: Infohash) -> Result { + let handshake = Handshake::new(infohash); + stream + .write_all(&handshake.serialize()[..]) + .context(error::Network)?; + Ok(handshake) + } + + pub(crate) fn recv(&mut self) -> Result { + // The message header is four bytes of message length, plus a one byte instruction. + let mut header = [0u8; 5]; + self + .stream + .read_exact(&mut header) + .context(error::Network)?; + + let length = u32::from_be_bytes( + header[..4] + .try_into() + .invariant_unwrap("bound is checked by read_exact and the length of buf"), + ); + + let payload = if length > 1 { + let mut payload = Vec::new(); + (&self.stream) + .take((length - 1).into()) + .read_to_end(&mut payload) + .context(error::Network)?; + Some(payload) + } else { + None + }; + + Ok(Message { + flavour: message::Flavour::from(header[4]), + payload, + }) + } + + pub(crate) fn send(&mut self, msg: &message::Message) -> Result<()> { + self + .stream + .write_all(&msg.serialize()?) + .context(error::Network) + } + + pub(crate) fn supports_extension_protocol(&self) -> bool { + self.handshake.supports_extension_protocol() + } + + #[cfg(test)] + pub(crate) fn from(mut stream: TcpStream, infohash: Infohash) -> Result { + let handshake = Self::recv_handshake(&mut stream, infohash)?; + Self::send_handshake(&mut stream, infohash)?; + Ok(Self { stream, handshake }) + } +} diff --git a/src/peer/extended.rs b/src/peer/extended.rs new file mode 100644 index 00000000..aa69bfc4 --- /dev/null +++ b/src/peer/extended.rs @@ -0,0 +1,9 @@ +use super::*; + +pub(crate) use handshake::Handshake; +pub(crate) use id::Id; +pub(crate) use ut_metadata::UtMetadata; + +pub(crate) mod handshake; +pub(crate) mod id; +pub(crate) mod ut_metadata; diff --git a/src/peer/handshake.rs b/src/peer/handshake.rs new file mode 100644 index 00000000..87ecbe36 --- /dev/null +++ b/src/peer/handshake.rs @@ -0,0 +1,63 @@ +use crate::common::*; + +pub(crate) const HEADER: &[u8; 20] = b"\x13BitTorrent protocol"; +pub(crate) const SUPPORTS_EXTENSION_PROTOCOL: u8 = 0b0001_0000; +pub(crate) const IMDL_RESERVED_BYTES: [u8; 8] = [0, 0, 0, 0, 0, SUPPORTS_EXTENSION_PROTOCOL, 0, 0]; + +#[derive(Debug)] +pub(crate) struct Handshake { + pub(crate) peer_id: [u8; 20], + pub(crate) infohash: [u8; 20], + pub(crate) reserved: [u8; 8], +} + +impl Handshake { + pub(crate) const LENGTH: usize = 68; + + pub(crate) fn new(infohash: Infohash) -> Self { + Handshake { + peer_id: rand::thread_rng().gen(), + infohash: infohash.into(), + reserved: IMDL_RESERVED_BYTES, + } + } + + pub(crate) fn serialize(&self) -> [u8; Handshake::LENGTH] { + let mut msg = [0u8; Handshake::LENGTH]; + + msg[0..20].copy_from_slice(HEADER); + msg[20..28].copy_from_slice(&self.reserved); + msg[28..48].copy_from_slice(&self.infohash); + msg[48..68].copy_from_slice(&self.peer_id); + + msg + } + + pub fn supports_extension_protocol(&self) -> bool { + self.reserved[5] & SUPPORTS_EXTENSION_PROTOCOL > 0 + } +} + +impl TryFrom<[u8; Handshake::LENGTH]> for Handshake { + type Error = Error; + + fn try_from(buf: [u8; Handshake::LENGTH]) -> Result { + if &buf[0..20] != HEADER { + return Err(error::Error::PeerHandshakeHeader); + } + + let mut infohash = [0u8; 20]; + let mut reserved = [0u8; 8]; + let mut peer_id = [0u8; 20]; + + reserved.clone_from_slice(&buf[20..28]); + infohash.clone_from_slice(&buf[28..48]); + peer_id.clone_from_slice(&buf[48..68]); + + Ok(Handshake { + peer_id, + infohash, + reserved, + }) + } +} diff --git a/src/peer/message.rs b/src/peer/message.rs new file mode 100644 index 00000000..72342b46 --- /dev/null +++ b/src/peer/message.rs @@ -0,0 +1,109 @@ +use crate::common::*; + +pub mod extended; +pub mod flavour; + +pub(crate) use flavour::Flavour; + +#[derive(Debug)] +pub(crate) struct Message { + pub(crate) flavour: Flavour, + pub(crate) payload: Option>, +} + +impl Message { + pub(crate) fn new(flavour: Flavour, payload: Option>) -> Self { + Message { flavour, payload } + } + + /// Create a new extended message. Since extended message ids for the same extension protocol may + /// vary between peers, the id parameter must be determined from an extended handshake or prior + /// knowledge. + pub(crate) fn new_extended(id: u8, p: T) -> Result { + let mut payload = vec![id]; + payload.extend_from_slice(&Self::bencode(p)?); + Ok(Self::new(Flavour::Extended, Some(payload))) + } + + #[cfg(test)] + // Create a new extended message but append `buf` to the bencoded message payload. + pub(crate) fn new_extended_with_trailer( + id: u8, + payload: T, + buf: &[u8], + ) -> Result { + let mut m = Self::new_extended(id, payload)?; + if let Some(p) = &mut m.payload { + p.extend_from_slice(buf); + } + Ok(m) + } + + pub(crate) fn len(&self) -> usize { + 1 + self.payload.as_ref().map_or(0, std::vec::Vec::len) + } + + // Serialize the message to the BitTorrent wire format: + // + // prefix 4 bytes + // flavour 1 byte + // payload x bytes + // + // where prefix is the network byte order encoding of `x + 1` into a u32. + pub(crate) fn serialize(&self) -> Result> { + // TODO: find a way to test this without blowing the stack. + let message_length = self.len().try_into().context(error::PeerMessagePayload)?; + let mut buf: Vec = u32::to_be_bytes(message_length).to_vec(); + buf.push(self.flavour.into()); + if let Some(ref p) = &self.payload { + buf.extend(p); + } + + Ok(buf) + } + + pub(crate) fn parse_extended_payload(&self) -> Result<(extended::Id, &[u8])> { + if let Some(p) = &self.payload { + if !p.is_empty() { + return Ok((p[0].into(), &p[1..])); + } + } + Err(Error::PeerMessageExtendedPayload) + } + + pub fn bencode(msg: T) -> Result> { + bendy::serde::ser::to_bytes(&msg).context(error::PeerMessageBencode) + } + + pub fn from_bencode<'a, T: serde::Deserialize<'a>>(buf: &'a [u8]) -> Result { + bendy::serde::de::from_bytes(buf).context(error::PeerMessageFromBencode) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_extended_payload() { + let m = Message { + flavour: Flavour::Extended, + payload: Some(vec![1, 2, 3, 4]), + }; + let (i, b) = m.parse_extended_payload().unwrap(); + assert_eq!(i, extended::Id::UtMetadata); + assert_eq!(b, vec![2, 3, 4]); + } + + #[test] + fn parse_extended_payload_fail() { + let m = Message { + flavour: Flavour::Extended, + payload: None, + }; + assert_matches!( + m.parse_extended_payload(), + Err(Error::PeerMessageExtendedPayload) + ); + } +} diff --git a/src/peer/message/extended/handshake.rs b/src/peer/message/extended/handshake.rs new file mode 100644 index 00000000..4be84372 --- /dev/null +++ b/src/peer/message/extended/handshake.rs @@ -0,0 +1,164 @@ +use crate::common::*; + +use extended::ut_metadata; +use peer::message::extended; + +// From BEP10: +// +// Handshake is a dictionary of supported extension messages which maps names of extensions to an +// extended message ID for each extension message. The only requirement on these IDs is that no +// extension message share the same one. Setting an extension number to zero means that the +// extension is not supported/disabled. The client should ignore any extension names it doesn't +// recognize. +// +// The extension message IDs are the IDs used to send the extension messages to the peer sending +// this handshake. i.e. The IDs are local to this particular peer. +#[derive(Debug, Deserialize, Serialize)] +pub(crate) struct Handshake { + #[serde(rename = "m")] + pub(crate) message_ids: HashMap, + // Sent with the handshake if ut_metadata is supported. + #[serde( + default, + skip_serializing_if = "Option::is_none", + with = "unwrap_or_skip" + )] + pub(crate) metadata_size: Option, + // Local TCP listen port. Allows each side to learn about the TCP port number + // of the other side. Note that there is no need for the receiving side of + // the connection to send this extension message, since its port number is + // already known. + #[serde( + default, + rename = "p", + skip_serializing_if = "Option::is_none", + with = "unwrap_or_skip" + )] + pub(crate) port: Option, + // Client name and version (as a utf-8 string). This is a much more reliable + // way of identifying the client than relying on the peer id encoding. + #[serde( + default, + rename = "v", + skip_serializing_if = "Option::is_none", + with = "unwrap_or_skip" + )] + pub(crate) version: Option, + // A string containing the compact representation of the ip address this peer + // sees you as. i.e. this is the receiver's external ip address (no port is + // included). This may be either an IPv4 (4 bytes) or an IPv6 (16 bytes) + // address. + #[serde(default, skip_serializing_if = "Vec::is_empty", with = "serde_bytes")] + pub(crate) yourip: Vec, + // If this peer has an IPv6 interface, this is the compact representation of + // that address (16 bytes). The client may prefer to connect back via the IPv6 + // address. + #[serde(default, skip_serializing_if = "Vec::is_empty", with = "serde_bytes")] + pub(crate) ipv6: Vec, + // If this peer has an IPv4 interface, this is the compact representation of + // that address (4 bytes). The client may prefer to connect back via this + // interface. + #[serde(default, skip_serializing_if = "Vec::is_empty", with = "serde_bytes")] + pub(crate) ipv4: Vec, + // An integer, the number of outstanding request messages this client supports + // without dropping any. The default in in libtorrent is 250. + #[serde( + default, + rename = "reqq", + skip_serializing_if = "Option::is_none", + with = "unwrap_or_skip" + )] + pub(crate) request_queue_size: Option, +} + +impl Handshake { + pub(crate) fn new() -> Self { + Handshake { + message_ids: HashMap::new(), + metadata_size: None, + port: None, + ipv4: vec![], + ipv6: vec![], + request_queue_size: None, + yourip: vec![], + version: Some(format!("intermodal {}", consts::VERSION)), + } + } + + pub(crate) fn with_message(&mut self, name: String, id: u8) { + self.message_ids.insert(name, id); + } + + pub(crate) fn with_metadata_size(&mut self, size: usize) { + self.metadata_size = Some(size); + } +} + +impl Default for Handshake { + fn default() -> Self { + let mut handshake = Handshake::new(); + handshake.with_message(String::from(ut_metadata::UtMetadata::NAME), 1); + handshake + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn handshake_from_bencoded_handshake() { + let payload = b"d1:ei0e1:md11:ut_metadatai255ee13:metadata_sizei1337e1:pi12345e4:reqqi2048e1:v18:intermodal v0.1.12e"; + let handshake: Handshake = bendy::serde::de::from_bytes(payload).unwrap(); + assert_eq!( + handshake + .message_ids + .get(&String::from(extended::Id::UtMetadata)), + Some(255).as_ref() + ); + assert_eq!(handshake.metadata_size, Some(1337)); + assert_eq!(handshake.port, Some(12345)); + assert_eq!(handshake.version, Some("intermodal v0.1.12".into())); + } + + #[test] + fn handshake_with_unsupported_extensions() { + let payload = b"d12:complete_agoi6e1:md11:lt_donthavei7e10:share_modei8e11:upload_onlyi3e12:ut_holepunchi4e11:ut_metadatai2e6:ut_pexi1eee"; + let handshake: Handshake = bendy::serde::de::from_bytes(payload).unwrap(); + assert_eq!( + handshake + .message_ids + .get(&String::from(extended::Id::UtMetadata)), + Some(2).as_ref() + ); + assert_eq!(handshake.metadata_size, None); + } + + #[test] + fn handshake_with_yourip() { + let payload = b"d1:ei0e1:md11:ut_metadatai255ee13:metadata_sizei1337e1:pi12345e4:reqqi2048e1:v18:intermodal v0.1.126:yourip4:z\xc7%\xcfee"; + let handshake: Handshake = bendy::serde::de::from_bytes(payload).unwrap(); + assert_eq!( + handshake + .message_ids + .get(&String::from(extended::Id::UtMetadata)), + Some(255).as_ref() + ); + assert_eq!(handshake.yourip, vec![0x7a, 0xc7, 0x25, 0xcf]); + } + + #[test] + fn handshake_ser_with_yourip() { + let handshake = Handshake { + ipv4: vec![0x13, 0x37, 0x13, 0x37], + ..Handshake::default() + }; + bendy::serde::ser::to_bytes(&handshake).unwrap(); + } + + #[test] + fn extension_handshake_default_ok() { + let handshake = Handshake::default(); + bendy::serde::ser::to_bytes(&handshake).unwrap(); + } +} diff --git a/src/peer/message/extended/id.rs b/src/peer/message/extended/id.rs new file mode 100644 index 00000000..ff61029a --- /dev/null +++ b/src/peer/message/extended/id.rs @@ -0,0 +1,38 @@ +use super::*; + +#[derive(Debug, Eq, Hash, PartialEq)] +pub(crate) enum Id { + Handshake, + UtMetadata, + NotImplemented(u8), +} + +impl From for String { + fn from(i: Id) -> Self { + match i { + Id::Handshake => "handshake".to_string(), + Id::UtMetadata => "ut_metadata".to_string(), + Id::NotImplemented(_) => "not supported".to_string(), + } + } +} + +impl From for Id { + fn from(ins: u8) -> Self { + match ins { + 0x00 => Id::Handshake, + 0x01 => Id::UtMetadata, + _ => Id::NotImplemented(ins), + } + } +} + +impl From for u8 { + fn from(ins: Id) -> Self { + match ins { + Id::Handshake => 0x00, + Id::UtMetadata => 0x01, + Id::NotImplemented(n) => n, + } + } +} diff --git a/src/peer/message/extended/mod.rs b/src/peer/message/extended/mod.rs new file mode 100644 index 00000000..015c0c0f --- /dev/null +++ b/src/peer/message/extended/mod.rs @@ -0,0 +1,9 @@ +use crate::common::*; + +pub(crate) use handshake::Handshake; +pub(crate) use id::Id; +pub(crate) use ut_metadata::UtMetadata; + +pub(crate) mod handshake; +pub(crate) mod id; +pub(crate) mod ut_metadata; diff --git a/src/peer/message/extended/ut_metadata.rs b/src/peer/message/extended/ut_metadata.rs new file mode 100644 index 00000000..2c323335 --- /dev/null +++ b/src/peer/message/extended/ut_metadata.rs @@ -0,0 +1,82 @@ +use crate::common::*; + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +pub(crate) struct UtMetadata { + pub(crate) msg_type: u8, + pub(crate) piece: usize, + #[serde( + skip_serializing_if = "Option::is_none", + default, + with = "unwrap_or_skip" + )] + pub(crate) total_size: Option, +} + +#[derive(Debug, PartialEq)] +pub(crate) enum MsgType { + Request, + Data, + Reject, +} + +impl UtMetadata { + pub(crate) const NAME: &'static str = "ut_metadata"; + pub(crate) const PIECE_LENGTH: usize = 16 * (1 << 10); + + pub(crate) fn request(piece: usize) -> Self { + Self { + msg_type: MsgType::Request.into(), + piece, + total_size: None, + } + } + + #[cfg(test)] + pub(crate) fn data(piece: usize, total_size: usize) -> Self { + Self { + msg_type: MsgType::Data.into(), + piece, + total_size: Some(total_size), + } + } +} + +impl From for u8 { + fn from(m: MsgType) -> u8 { + match m { + MsgType::Request => 0, + MsgType::Data => 1, + MsgType::Reject => 2, + } + } +} + +impl From for MsgType { + fn from(x: u8) -> Self { + match x { + 0 => Self::Request, + 1 => Self::Data, + _ => MsgType::Reject, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn bencode_extended_metadata_message() { + let req = UtMetadata { + msg_type: MsgType::Data.into(), + piece: 1, + total_size: None, + }; + let mut msg = bendy::serde::ser::to_bytes(&req).unwrap(); + let benc = b"d8:msg_typei1e5:piecei1ee"; + assert_eq!(benc, &*msg); + msg.extend_from_slice(b"piece data goes here xxxx..."); + let req2 = bendy::serde::de::from_bytes(&msg).unwrap(); + assert_eq!(req, req2); + } +} diff --git a/src/peer/message/flavour.rs b/src/peer/message/flavour.rs new file mode 100644 index 00000000..a53e4f52 --- /dev/null +++ b/src/peer/message/flavour.rs @@ -0,0 +1,58 @@ +#[derive(Clone, Copy, Debug, PartialEq)] +pub(crate) enum Flavour { + Choke = 0, + Unchoke, + Interested, + NotInterested, + Have, + Bitfield, + Request, + Piece, + Cancel, + + HaveAll = 14, + HaveNone = 15, + Extended = 20, + + Bad = 255, +} + +impl From for Flavour { + fn from(i: u8) -> Self { + match i { + 0x00 => Flavour::Choke, + 0x01 => Flavour::Unchoke, + 0x02 => Flavour::Interested, + 0x03 => Flavour::NotInterested, + 0x04 => Flavour::Have, + 0x05 => Flavour::Bitfield, + 0x06 => Flavour::Request, + 0x07 => Flavour::Piece, + 0x08 => Flavour::Cancel, + 0x0e => Flavour::HaveAll, + 0x0f => Flavour::HaveNone, + 0x14 => Flavour::Extended, + _ => Flavour::Bad, + } + } +} + +impl From for u8 { + fn from(i: Flavour) -> Self { + match i { + Flavour::Choke => 0x00, + Flavour::Unchoke => 0x01, + Flavour::Interested => 0x02, + Flavour::NotInterested => 0x03, + Flavour::Have => 0x04, + Flavour::Bitfield => 0x05, + Flavour::Request => 0x06, + Flavour::Piece => 0x07, + Flavour::Cancel => 0x08, + Flavour::HaveAll => 0x0e, + Flavour::HaveNone => 0x0f, + Flavour::Extended => 0x14, + Flavour::Bad => 0xff, + } + } +} diff --git a/src/subcommand/torrent.rs b/src/subcommand/torrent.rs index 49775409..efca33dc 100644 --- a/src/subcommand/torrent.rs +++ b/src/subcommand/torrent.rs @@ -2,6 +2,7 @@ use crate::common::*; mod announce; mod create; +mod from_link; mod link; mod piece_length; mod show; @@ -17,6 +18,7 @@ mod verify; pub(crate) enum Torrent { Announce(announce::Announce), Create(create::Create), + FromLink(from_link::FromLink), Link(link::Link), #[structopt(alias = "piece-size")] PieceLength(piece_length::PieceLength), @@ -30,6 +32,7 @@ impl Torrent { match self { Self::Announce(announce) => announce.run(env), Self::Create(create) => create.run(env, options), + Self::FromLink(from_link) => from_link.run(env, options), Self::Link(link) => link.run(env), Self::PieceLength(piece_length) => piece_length.run(env), Self::Show(show) => show.run(env), diff --git a/src/subcommand/torrent/announce.rs b/src/subcommand/torrent/announce.rs index 0ef14047..b3cb0e95 100644 --- a/src/subcommand/torrent/announce.rs +++ b/src/subcommand/torrent/announce.rs @@ -60,7 +60,7 @@ impl Announce { } }; - let client = match tracker::Client::from_url(tracker_url) { + let client = match tracker::Client::from_url(&tracker_url) { Ok(client) => client, Err(err) => { errln!(env, "Couldn't build tracker client. {}", err)?; @@ -69,7 +69,7 @@ impl Announce { }; usable_trackers += 1; - match client.announce_exchange(infohash) { + match client.announce_exchange(&infohash) { Ok(peer_list) => peers.extend(peer_list), Err(err) => errln!(env, "Announce failed: {}", err)?, } diff --git a/src/subcommand/torrent/from_link.rs b/src/subcommand/torrent/from_link.rs new file mode 100644 index 00000000..fb9bf340 --- /dev/null +++ b/src/subcommand/torrent/from_link.rs @@ -0,0 +1,328 @@ +use crate::common::*; +use rayon::prelude::*; + +const URI_HELP: &str = "Generate a torrent file from a magnet URI"; + +const INPUT_FLAG: &str = "input-flag"; +const INPUT_POSITIONAL: &str = ""; +const INPUT_HELP: &str = "The magnet URI."; + +#[derive(StructOpt)] +#[structopt( + help_message(consts::HELP_MESSAGE), + version_message(consts::VERSION_MESSAGE), + about(URI_HELP) +)] +pub(crate) struct FromLink { + #[structopt( + name = INPUT_FLAG, + long = "input", + short = "i", + value_name = "INPUT", + empty_values = false, + help = INPUT_HELP, + )] + input_flag: Option, + #[structopt( + name = INPUT_POSITIONAL, + value_name = "INPUT", + empty_values = false, + required_unless = INPUT_FLAG, + conflicts_with = INPUT_FLAG, + help = INPUT_HELP, + )] + input_positional: Option, + #[structopt( + long = "output", + short = "o", + value_name = "TARGET", + empty_values(false), + required_if(INPUT_FLAG, "-"), + required_if(INPUT_POSITIONAL, "-"), + help = "Save `.torrent` file to `TARGET`; if omitted, the parameter is set to `./${INFOHASH}.torrent`." + )] + output: Option, +} + +impl FromLink { + pub(crate) fn run(self, env: &mut Env, options: &Options) -> Result<()> { + let link = xor_args( + "input_flag", + &self.input_flag, + "input_positional", + &self.input_positional, + )?; + + let infohash = link.infohash; + + if !options.quiet { + errln!(env, "Sending announce to all trackers.")?; + } + + let (tx, rx) = channel(); + link.trackers.par_iter().for_each_with(tx, |s, x| { + let c = match tracker::Client::from_url(x) { + Ok(c) => c, + Err(_) => return, + }; + if let Ok(list) = c.announce_exchange(&infohash) { + for p in list { + s.send(p).ok(); + } + } + }); + + let peers: HashSet<_> = rx.iter().collect(); + + if !options.quiet { + errln!(env, "Trackers returned {} peers.", peers.len())?; + } + + let info = peers.par_iter().find_map_any(|addr| { + peer::Client::connect(addr, infohash) + .ok() + .and_then(|c| c.fetch_info_dict().ok()) + }); + + let metainfo = match info { + Some(info) => Metainfo { + announce: None, + announce_list: Some(vec![link.trackers.iter().map(Url::to_string).collect()]), + nodes: None, + comment: None, + created_by: None, + creation_date: None, + encoding: None, + info, + }, + None => return Err(Error::FromLinkNoInfo), + }; + + if !options.quiet { + errln!(env, "Received info dict.")?; + } + + let mut path = self.output.unwrap_or_else(|| { + let mut path = PathBuf::new(); + path.push(infohash.to_string()); + path.set_extension("torrent"); + path + }); + path = env.resolve(path)?; + + fs::File::create(&path) + .context(error::Filesystem { path: path.clone() }) + .and_then(|mut f| { + f.write_all(&metainfo.serialize()?) + .context(error::Filesystem { path: path.clone() }) + })?; + + if !options.quiet { + errln!(env, "Torrent file written to `{}`.", path.display())?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn input_required() { + test_env! { + args: [ + "torrent", + "from-link", + ], + tree: { + }, + matches: Err(Error::Clap { .. }), + }; + } + + #[test] + fn test_no_info() { + let tracker_url = "udp://1.2.3.4:1337"; + let metainfo = Metainfo { + announce: None, + announce_list: Some(vec![vec![tracker_url.into()]]), + nodes: None, + comment: None, + created_by: None, + creation_date: None, + encoding: None, + info: Info { + private: None, + piece_length: Bytes(16 * 1024), + source: None, + name: "testing".into(), + pieces: PieceList::from_pieces(["test", "data"]), + mode: Mode::Single { + length: Bytes(2 * 16 * 1024), + md5sum: None, + }, + update_url: None, + }, + }; + let link = MagnetLink::from_metainfo_lossy(&metainfo).unwrap(); + let mut env = test_env! { + args: [ + "torrent", + "from-link", + link.to_url().as_str(), + ], + tree: {}, + }; + assert_matches!(env.run(), Err(Error::FromLinkNoInfo)); + } + + #[test] + fn with_one_good_seeder() { + let info = Info { + private: None, + piece_length: Bytes(16 * 1024), + source: None, + name: "testing".into(), + pieces: PieceList::from_pieces(["test", "data"]), + mode: Mode::Single { + length: Bytes(2 * 16 * 1024), + md5sum: None, + }, + update_url: None, + }; + let infohash = info.infohash_lossy().unwrap(); + let (_, addr_s) = peer::Client::spawn_info_dict_seeder(&info); + let records = HashMap::from([(infohash.into(), HashSet::from([addr_s]))]); + let (_, addr_d) = tracker::Daemon::spawn_with_records(records); + let tracker_url = addr_d.to_string(); + + let metainfo = Metainfo { + announce: None, + announce_list: Some(vec![vec![format!("udp://{}", tracker_url)]]), + nodes: None, + comment: None, + created_by: None, + creation_date: None, + encoding: None, + info, + }; + let link = MagnetLink::from_metainfo_lossy(&metainfo) + .unwrap() + .to_url() + .to_string(); + + let mut env = test_env! { + args: [ + "torrent", + "from-link", + link, + ], + tree: {}, + }; + env.assert_ok(); + assert_eq!(metainfo, env.load_metainfo(format!("{}.torrent", infohash))); + } + + #[test] + fn with_one_good_seeder_and_output_flag() { + let info = Info { + private: None, + piece_length: Bytes(16 * 1024), + source: None, + name: "testing".into(), + pieces: PieceList::from_pieces(["test", "data"]), + mode: Mode::Single { + length: Bytes(2 * 16 * 1024), + md5sum: None, + }, + update_url: None, + }; + let infohash = info.infohash_lossy().unwrap(); + let (_, addr_s) = peer::Client::spawn_info_dict_seeder(&info); + let records = HashMap::from([(infohash.into(), HashSet::from([addr_s]))]); + let (_, addr_d) = tracker::Daemon::spawn_with_records(records); + let tracker_url = addr_d.to_string(); + + let metainfo = Metainfo { + announce: None, + announce_list: Some(vec![vec![format!("udp://{}", tracker_url)]]), + nodes: None, + comment: None, + created_by: None, + creation_date: None, + encoding: None, + info, + }; + let link = MagnetLink::from_metainfo_lossy(&metainfo) + .unwrap() + .to_url() + .to_string(); + + let mut env = test_env! { + args: [ + "torrent", + "from-link", + link, + "-o", + "foo.torrent", + ], + tree: {}, + }; + env.assert_ok(); + assert_eq!(metainfo, env.load_metainfo("foo.torrent")); + } + + #[test] + fn with_one_good_seeder_many_bad_seeders() { + let info = Info { + private: None, + piece_length: Bytes(16 * 1024), + source: None, + name: "testing".into(), + pieces: PieceList::from_pieces(["test", "data"]), + mode: Mode::Single { + length: Bytes(2 * 16 * 1024), + md5sum: None, + }, + update_url: None, + }; + let (_, addr_s) = peer::Client::spawn_info_dict_seeder(&info); + let mut set = HashSet::from([addr_s]); + for p in [1337, 12345, 54321] { + set.insert((Ipv4Addr::LOCALHOST, p).into()); + } + let infohash = info.infohash_lossy().unwrap(); + let records = HashMap::from([(infohash.into(), set)]); + let (_, addr_d) = tracker::Daemon::spawn_with_records(records); + let tracker_url = addr_d.to_string(); + let metainfo = Metainfo { + announce: None, + announce_list: Some(vec![vec![format!("udp://{}", tracker_url)]]), + nodes: None, + comment: None, + created_by: None, + creation_date: None, + encoding: None, + info, + }; + let link = MagnetLink::from_metainfo_lossy(&metainfo) + .unwrap() + .to_url() + .to_string(); + + let mut env = test_env! { + args: [ + "torrent", + "from-link", + link, + "-o", + "foo.torrent", + ], + tree: {}, + }; + env.assert_ok(); + assert_eq!(metainfo, env.load_metainfo("foo.torrent")); + } +} diff --git a/src/tracker.rs b/src/tracker.rs index c000cb1f..35f75bdb 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -3,8 +3,12 @@ use response::Response; pub(crate) use action::Action; pub(crate) use client::Client; +#[cfg(test)] +pub(crate) use daemon::Daemon; mod client; +#[cfg(test)] +pub mod daemon; mod request; mod response; diff --git a/src/tracker/client.rs b/src/tracker/client.rs index 6d518cef..a3f52316 100644 --- a/src/tracker/client.rs +++ b/src/tracker/client.rs @@ -55,11 +55,17 @@ impl Client { Ok(sock) } - pub fn from_url(tracker_url: Url) -> Result { + pub fn from_url(tracker_url: &Url) -> Result { if tracker_url.scheme() != "udp" { - return Err(Error::TrackerUdpOnly { tracker_url }); + return Err(Error::TrackerUdpOnly { + tracker_url: tracker_url.clone(), + }); } - Self::connect(HostPort::try_from(&tracker_url).context(error::TrackerHostPort { tracker_url })?) + Self::connect( + HostPort::try_from(tracker_url).context(error::TrackerHostPort { + tracker_url: tracker_url.clone(), + })?, + ) } fn connect_exchange(&mut self) -> Result<()> { @@ -70,7 +76,7 @@ impl Client { Ok(()) } - pub fn announce_exchange(&self, btinh: Infohash) -> Result> { + pub fn announce_exchange(&self, btinh: &Infohash) -> Result> { let connection_id = match self.connection_id { Some(id) => id, None => return Err(Error::TrackerNoConnectionId), @@ -80,7 +86,7 @@ impl Client { .sock .local_addr() .context(error::UdpSocketLocalAddress)?; - let req = announce::Request::new(connection_id, btinh, self.peer_id, local_addr.port()); + let req = announce::Request::new(connection_id, *btinh, self.peer_id, local_addr.port()); let mut buf = [0u8; Self::RX_BUF_LEN]; let (_, payload) = self.exchange(&req, &mut buf)?; @@ -148,12 +154,16 @@ impl Client { Ok(peer_list) } + + #[cfg(test)] + pub fn local_addr(&self) -> SocketAddr { + (Ipv4Addr::LOCALHOST, self.sock.local_addr().unwrap().port()).into() + } } #[cfg(test)] mod tests { use super::*; - use std::thread; struct TestServer { sock: UdpSocket, @@ -232,7 +242,7 @@ mod tests { fn client_from_url_no_port() { let tracker_url = Url::parse("udp://intermodal.io/announce").unwrap(); assert_matches!( - Client::from_url(tracker_url), + Client::from_url(&tracker_url), Err(Error::TrackerHostPort { .. }) ); } @@ -241,7 +251,7 @@ mod tests { fn client_from_url_no_host() { let tracker_url = Url::parse("udp://magnet:?announce=no_host").unwrap(); assert_matches!( - Client::from_url(tracker_url), + Client::from_url(&tracker_url), Err(Error::TrackerHostPort { .. }) ); } @@ -250,7 +260,7 @@ mod tests { fn client_from_url_not_udp() { let tracker_url = Url::parse("https://intermodal.io:100/announce").unwrap(); assert_matches!( - Client::from_url(tracker_url), + Client::from_url(&tracker_url), Err(Error::TrackerUdpOnly { .. }) ); } @@ -296,7 +306,7 @@ mod tests { }); let c = Client::connect(addr).unwrap(); - let addrs = c.announce_exchange(Sha1Digest::from_bytes([0u8; 20]).into()); + let addrs = c.announce_exchange(&Sha1Digest::from_bytes([0u8; 20]).into()); assert_matches!(addrs, Err(Error::TrackerExchange { .. })); } @@ -308,7 +318,7 @@ mod tests { }); let c = Client::connect(addr).unwrap(); - let addrs = c.announce_exchange(Sha1Digest::from_bytes([0u8; 20]).into()); + let addrs = c.announce_exchange(&Sha1Digest::from_bytes([0u8; 20]).into()); assert_matches!(addrs, Err(Error::TrackerExchange { .. })); } @@ -322,7 +332,7 @@ mod tests { let c = Client::connect(addr).unwrap(); let addrs = c - .announce_exchange(Sha1Digest::from_bytes([0u8; 20]).into()) + .announce_exchange(&Sha1Digest::from_bytes([0u8; 20]).into()) .unwrap(); assert_eq!( addrs, @@ -340,7 +350,7 @@ mod tests { let c = Client::connect(addr).unwrap(); let addrs = c - .announce_exchange(Sha1Digest::from_bytes([0u8; 20]).into()) + .announce_exchange(&Sha1Digest::from_bytes([0u8; 20]).into()) .unwrap(); assert_eq!( addrs, diff --git a/src/tracker/daemon.rs b/src/tracker/daemon.rs new file mode 100644 index 00000000..6ef53277 --- /dev/null +++ b/src/tracker/daemon.rs @@ -0,0 +1,153 @@ +use super::*; +use crate::common::*; + +#[cfg(test)] +pub(crate) struct Daemon { + pub(crate) sock: UdpSocket, + pub(crate) records: HashMap<[u8; 20], HashSet>, +} + +impl Daemon { + pub fn spawn() -> (thread::JoinHandle<()>, SocketAddr) { + Self::spawn_with_records(HashMap::new()) + } + + pub fn spawn_with_records( + records: HashMap<[u8; 20], HashSet>, + ) -> (thread::JoinHandle<()>, SocketAddr) { + let sock = UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)).unwrap(); + let addr = match sock.local_addr().unwrap() { + SocketAddr::V4(a) => (Ipv4Addr::LOCALHOST, a.port()).into(), + SocketAddr::V6(a) => (Ipv6Addr::LOCALHOST, a.port()).into(), + }; + let mut d = Daemon { sock, records }; + let handle = thread::spawn(move || d.run()); + (handle, addr) + } + + fn run(&mut self) { + let mut rng = rand::thread_rng(); + let mut buf = [0u8; 8192]; + loop { + if let Ok((n, peer)) = self.sock.recv_from(&mut buf) { + if let Ok((req, _)) = connect::Request::deserialize(&buf[..n]) { + let resp = connect::Response { + action: Action::Connect.into(), + transaction_id: req.transaction_id, + connection_id: rng.gen(), + } + .serialize(); + self.sock.send_to(&resp, peer).unwrap(); + continue; + } + + if let Ok((req, _)) = announce::Request::deserialize(&buf[..n]) { + let mut resp: Vec = announce::Response { + action: Action::Announce.into(), + transaction_id: req.transaction_id, + interval: 0x1337_1337, + leechers: 0xcafe_babe, + seeders: 0xdead_beef, + } + .serialize(); + resp.extend_from_slice(&self.peer_list(&req.infohash)); + self.sock.send_to(&resp, peer).unwrap(); + + self.insert(req.infohash, peer); + } + } + } + } + + pub fn insert(&mut self, infohash: [u8; 20], addr: SocketAddr) { + if let Some(set) = self.records.get_mut(&infohash) { + set.insert(addr); + } else { + let mut set = HashSet::new(); + set.insert(addr); + self.records.insert(infohash, set); + } + } + + fn peer_list(&self, infohash: &[u8; 20]) -> Vec { + match self.records.get(infohash) { + None => vec![], + Some(set) => Self::compact_peer_list(set), + } + } + + fn compact_peer_list(set: &HashSet) -> Vec { + let mut v = Vec::new(); + for p in set { + match p.ip() { + IpAddr::V4(ip) => v.extend_from_slice(&ip.octets()), + IpAddr::V6(ip) => v.extend_from_slice(&ip.octets()), + } + v.extend_from_slice(&p.port().to_be_bytes()); + } + v + } +} + +mod tests { + use super::*; + + #[test] + fn run() { + let (_, addr) = Daemon::spawn(); + + let mut c = Client::connect(addr).unwrap(); + let mut a = c.local_addr(); + let mut resp = c.announce_exchange(&[0u8; 20].into()).unwrap(); + + for i in 0..4 { + assert_eq!(resp.len(), i); + c = Client::connect(addr).unwrap(); + resp = c.announce_exchange(&[0u8; 20].into()).unwrap(); + assert!(resp.contains(&a)); + a = c.local_addr(); + } + } + + #[test] + fn separate_infohashes() { + let (_, addr) = Daemon::spawn(); + let c1 = Client::connect(addr).unwrap(); + let c2 = Client::connect(addr).unwrap(); + let a1 = c1.local_addr(); + let a2 = c2.local_addr(); + + let infohash1 = Infohash::from(rand::thread_rng().gen::<[u8; 20]>()); + let infohash2 = Infohash::from(rand::thread_rng().gen::<[u8; 20]>()); + let resp1 = c1.announce_exchange(&infohash1).unwrap(); + let resp2 = c2.announce_exchange(&infohash2).unwrap(); + assert_eq!(resp2.len(), 0); + assert_eq!(resp1.len(), 0); + + let resp1 = c1.announce_exchange(&infohash1).unwrap(); + let resp2 = c2.announce_exchange(&infohash2).unwrap(); + assert_eq!(resp1.len(), 1); + assert_eq!(resp2.len(), 1); + assert!(resp1.contains(&a1)); + assert!(resp2.contains(&a2)); + } + + #[test] + fn reannounce() { + let (_, addr) = Daemon::spawn(); + let infohash = Infohash::from(rand::thread_rng().gen::<[u8; 20]>()); + let c1 = Client::connect(addr).unwrap(); + let c2 = Client::connect(addr).unwrap(); + let a1 = c1.local_addr(); + let a2 = c2.local_addr(); + let resp1 = c1.announce_exchange(&infohash).unwrap(); + let resp2 = c2.announce_exchange(&infohash).unwrap(); + let resp3 = c1.announce_exchange(&infohash).unwrap(); + + assert_eq!(resp1.len(), 0); + assert_eq!(resp2.len(), 1); + assert_eq!(resp3.len(), 2); + assert!(resp3.contains(&a1)); + assert!(resp3.contains(&a2)); + } +}