Skip to content

Commit

Permalink
Now saving torrent updates properly to the new db
Browse files Browse the repository at this point in the history
  • Loading branch information
ikatson committed Aug 15, 2024
1 parent f29dccf commit d77d96b
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 37 deletions.
13 changes: 8 additions & 5 deletions crates/librqbit/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,21 @@ impl Api {
.per_peer_stats_snapshot(filter))
}

pub fn api_torrent_action_pause(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
pub async fn api_torrent_action_pause(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
let handle = self.mgr_handle(idx)?;
handle
.pause()
self.session()
.pause(&handle)
.await
.context("error pausing torrent")
.with_error_status_code(StatusCode::BAD_REQUEST)?;
Ok(Default::default())
}

pub fn api_torrent_action_start(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
pub async fn api_torrent_action_start(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
let handle = self.mgr_handle(idx)?;
self.session
.unpause(&handle)
.await
.context("error unpausing torrent")
.with_error_status_code(StatusCode::BAD_REQUEST)?;
Ok(Default::default())
Expand All @@ -130,14 +132,15 @@ impl Api {
Ok(Default::default())
}

pub fn api_torrent_action_update_only_files(
pub async fn api_torrent_action_update_only_files(
&self,
idx: TorrentId,
only_files: &HashSet<usize>,
) -> Result<EmptyJsonResponse> {
let handle = self.mgr_handle(idx)?;
self.session
.update_only_files(&handle, only_files)
.await
.context("error updating only_files")?;
Ok(Default::default())
}
Expand Down
5 changes: 3 additions & 2 deletions crates/librqbit/src/http_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,14 @@ impl HttpApi {
State(state): State<ApiState>,
Path(idx): Path<usize>,
) -> Result<impl IntoResponse> {
state.api_torrent_action_pause(idx).map(axum::Json)
state.api_torrent_action_pause(idx).await.map(axum::Json)
}

async fn torrent_action_start(
State(state): State<ApiState>,
Path(idx): Path<usize>,
) -> Result<impl IntoResponse> {
state.api_torrent_action_start(idx).map(axum::Json)
state.api_torrent_action_start(idx).await.map(axum::Json)
}

async fn torrent_action_forget(
Expand Down Expand Up @@ -404,6 +404,7 @@ impl HttpApi {
) -> Result<impl IntoResponse> {
state
.api_torrent_action_update_only_files(idx, &req.only_files.into_iter().collect())
.await
.map(axum::Json)
}

Expand Down
39 changes: 28 additions & 11 deletions crates/librqbit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,17 @@ impl Session {
}));
}

let id = if let Some(id) = opts.preferred_id {
id
} else if let Some(p) = self.persistence.as_ref() {
p.next_id().await?
} else {
self.next_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
};

let mut builder = ManagedTorrentBuilder::new(
id,
info,
info_hash,
torrent_bytes,
Expand Down Expand Up @@ -1029,15 +1039,6 @@ impl Session {
builder.peer_read_write_timeout(t);
}

let id = if let Some(id) = opts.preferred_id {
id
} else if let Some(p) = self.persistence.as_ref() {
p.next_id().await?
} else {
self.next_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
};

let managed_torrent = {
let mut g = self.db.write();
if let Some((id, handle)) = g.torrents.iter().find_map(|(eid, t)| {
Expand Down Expand Up @@ -1175,23 +1176,39 @@ impl Session {
Ok(merge_two_optional_streams(dht_rx, peer_rx))
}

pub fn unpause(self: &Arc<Self>, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
async fn try_update_persistence_metadata(&self, handle: &ManagedTorrentHandle) {
if let Some(p) = self.persistence.as_ref() {
if let Err(e) = p.update_metadata(handle.id(), handle).await {
warn!(storage=?p, error=?e, "error updating metadata")
}
}
}

pub async fn pause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
handle.pause()?;
self.try_update_persistence_metadata(handle).await;
Ok(())
}

pub async fn unpause(self: &Arc<Self>, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
let peer_rx = self.make_peer_rx(
handle.info_hash(),
handle.info().trackers.clone().into_iter().collect(),
self.tcp_listen_port,
handle.info().options.force_tracker_interval,
)?;
handle.start(peer_rx, false, self.cancellation_token.child_token())?;
self.try_update_persistence_metadata(handle).await;
Ok(())
}

pub fn update_only_files(
pub async fn update_only_files(
self: &Arc<Self>,
handle: &ManagedTorrentHandle,
only_files: &HashSet<usize>,
) -> anyhow::Result<()> {
handle.update_only_files(only_files)?;
self.try_update_persistence_metadata(handle).await;
Ok(())
}

Expand Down
53 changes: 35 additions & 18 deletions crates/librqbit/src/session_persistence/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,13 @@ impl JsonSessionPersistenceStore {
fn torrent_bytes_filename(&self, info_hash: &Id20) -> PathBuf {
self.output_folder.join(format!("{:?}.torrent", info_hash))
}
}

#[async_trait]
impl SessionPersistenceStore for JsonSessionPersistenceStore {
async fn next_id(&self) -> anyhow::Result<TorrentId> {
Ok(self
.db_content
.read()
.await
.torrents
.keys()
.copied()
.max()
.map(|max| max + 1)
.unwrap_or(0))
}

async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()> {
async fn update_db(
&self,
id: TorrentId,
torrent: &ManagedTorrentHandle,
write_torrent_file: bool,
) -> anyhow::Result<()> {
if !torrent
.storage_factory
.is_type_id(TypeId::of::<FilesystemStorageFactory>())
Expand All @@ -132,7 +121,7 @@ impl SessionPersistenceStore for JsonSessionPersistenceStore {
output_folder: torrent.info().options.output_folder.clone(),
};

if !torrent.info().torrent_bytes.is_empty() {
if write_torrent_file && !torrent.info().torrent_bytes.is_empty() {
let torrent_bytes_file = self.torrent_bytes_filename(&torrent.info_hash());
match tokio::fs::OpenOptions::new()
.create(true)
Expand All @@ -157,6 +146,22 @@ impl SessionPersistenceStore for JsonSessionPersistenceStore {

Ok(())
}
}

#[async_trait]
impl SessionPersistenceStore for JsonSessionPersistenceStore {
async fn next_id(&self) -> anyhow::Result<TorrentId> {
Ok(self
.db_content
.read()
.await
.torrents
.keys()
.copied()
.max()
.map(|max| max + 1)
.unwrap_or(0))
}

async fn delete(&self, id: TorrentId) -> anyhow::Result<()> {
if let Some(t) = self.db_content.write().await.torrents.remove(&id) {
Expand Down Expand Up @@ -211,4 +216,16 @@ impl SessionPersistenceStore for JsonSessionPersistenceStore {
.then(move |id| async move { self.get(id).await.map(move |st| (id, st)) })
.boxed())
}

async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()> {
self.update_db(id, torrent, true).await
}

async fn update_metadata(
&self,
id: TorrentId,
torrent: &ManagedTorrentHandle,
) -> anyhow::Result<()> {
self.update_db(id, torrent, false).await
}
}
6 changes: 6 additions & 0 deletions crates/librqbit/src/session_persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,18 @@ impl SerializedTorrent {
}
}

// TODO: make this info_hash first, ID-second.
#[async_trait]
pub trait SessionPersistenceStore: core::fmt::Debug + Send + Sync {
async fn next_id(&self) -> anyhow::Result<TorrentId>;
async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()>;
async fn delete(&self, id: TorrentId) -> anyhow::Result<()>;
async fn get(&self, id: TorrentId) -> anyhow::Result<SerializedTorrent>;
async fn update_metadata(
&self,
id: TorrentId,
torrent: &ManagedTorrentHandle,
) -> anyhow::Result<()>;
async fn stream_all(
&self,
) -> anyhow::Result<BoxStream<'_, anyhow::Result<(TorrentId, SerializedTorrent)>>>;
Expand Down
13 changes: 12 additions & 1 deletion crates/librqbit/src/torrent_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use tracing::warn;

use crate::chunk_tracker::ChunkTracker;
use crate::file_info::FileInfo;
use crate::session::TorrentId;
use crate::spawn_utils::BlockingSpawner;
use crate::storage::BoxStorageFactory;
use crate::stream_connect::StreamConnector;
Expand Down Expand Up @@ -114,6 +115,8 @@ pub struct ManagedTorrentInfo {
}

pub struct ManagedTorrent {
pub id: TorrentId,
// TODO: merge ManagedTorrent and ManagedTorrentInfo
pub info: Arc<ManagedTorrentInfo>,
pub(crate) storage_factory: BoxStorageFactory,

Expand All @@ -122,6 +125,10 @@ pub struct ManagedTorrent {
}

impl ManagedTorrent {
pub fn id(&self) -> TorrentId {
self.id
}

pub fn info(&self) -> &ManagedTorrentInfo {
&self.info
}
Expand Down Expand Up @@ -344,7 +351,7 @@ impl ManagedTorrent {
}

/// Pause the torrent if it's live.
pub fn pause(&self) -> anyhow::Result<()> {
pub(crate) fn pause(&self) -> anyhow::Result<()> {
let mut g = self.locked.write();
match &g.state {
ManagedTorrentState::Live(live) => {
Expand Down Expand Up @@ -501,6 +508,7 @@ impl ManagedTorrent {
}

pub(crate) struct ManagedTorrentBuilder {
id: TorrentId,
info: TorrentMetaV1Info<ByteBufOwned>,
output_folder: PathBuf,
info_hash: Id20,
Expand All @@ -521,6 +529,7 @@ pub(crate) struct ManagedTorrentBuilder {

impl ManagedTorrentBuilder {
pub fn new(
id: usize,
info: TorrentMetaV1Info<ByteBufOwned>,
info_hash: Id20,
torrent_bytes: Bytes,
Expand All @@ -529,6 +538,7 @@ impl ManagedTorrentBuilder {
storage_factory: BoxStorageFactory,
) -> Self {
Self {
id,
info,
info_hash,
torrent_bytes,
Expand Down Expand Up @@ -641,6 +651,7 @@ impl ManagedTorrentBuilder {
self.storage_factory.create_and_init(&info)?,
));
Ok(Arc::new(ManagedTorrent {
id: self.id,
locked: RwLock::new(ManagedTorrentLocked {
state: ManagedTorrentState::Initializing(initializing),
only_files: self.only_files,
Expand Down

0 comments on commit d77d96b

Please sign in to comment.