Skip to content

Commit

Permalink
Move directory traversal I/O to a special thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
ysaito1001 committed Oct 24, 2024
1 parent 5ec574d commit 2148831
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
1 change: 1 addition & 0 deletions aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ aws-smithy-experimental = { version = "0.1.3", features = ["crypto-aws-lc"] }
aws-smithy-runtime-api = "1.7.1"
aws-smithy-types = "1.2.6"
aws-types = "1.3.3"
blocking = "1.6.0"
bytes = "1"
futures-util = "0.3.30"
path-clean = "1.0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use std::sync::atomic::Ordering;
use super::{UploadObjectsContext, UploadObjectsInput};
use async_channel::{Receiver, Sender};
use aws_smithy_types::error::operation::BuildError;
use blocking::Unblock;
use futures_util::StreamExt;
use walkdir::WalkDir;

use crate::io::InputStream;
Expand All @@ -34,12 +36,13 @@ pub(super) async fn list_directory_contents(
ctx: UploadObjectsContext,
work_tx: Sender<Result<UploadObjectJob, error::Error>>,
) -> Result<(), error::Error> {
let walker = walker(&ctx.state.input);
// Move a blocking I/O to a dedicated thread pool
let mut walker = Unblock::new(walker(&ctx.state.input).into_iter());

let default_filter = &UploadFilter::default();
let filter = ctx.state.input.filter().unwrap_or(default_filter);

for entry in walker {
while let Some(entry) = walker.next().await {
let job = match entry {
Ok(entry) => {
if !(filter.predicate)(&UploadFilterItem::new(
Expand Down

0 comments on commit 2148831

Please sign in to comment.