Skip to content

Commit

Permalink
fix: get_ranges is not spawned in io-runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Jan 9, 2024
1 parent 61b123a commit cb7fc2b
Showing 1 changed file with 66 additions and 6 deletions.
72 changes: 66 additions & 6 deletions components/object_store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,16 @@ impl ObjectStore for StoreWithMetrics {
OBJECT_STORE_THROUGHPUT_HISTOGRAM
.put
.observe(bytes.len() as f64);
self.store.put(location, bytes).await

let loc = location.clone();
let store = self.store.clone();
self.runtime
.spawn(async move { store.put(&loc, bytes).await })
.await
.map_err(|source| StoreError::Generic {
store: METRICS,
source: Box::new(source),
})?
}

async fn put_multipart(
Expand Down Expand Up @@ -225,11 +234,22 @@ impl ObjectStore for StoreWithMetrics {

async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.get_ranges.start_timer();
let result = self.store.get_ranges(location, ranges).await?;
let store = self.store.clone();
let loc = location.clone();
let ranges = ranges.to_vec();
let result = self
.runtime
.spawn(async move { store.get_ranges(&loc, &ranges).await })
.await
.map_err(|e| StoreError::Generic {
store: METRICS,
source: Box::new(e),
})??;
let len: usize = result.iter().map(|v| v.len()).sum();
OBJECT_STORE_THROUGHPUT_HISTOGRAM
.get_ranges
.observe(len as f64);

Ok(result)
}

Expand Down Expand Up @@ -283,25 +303,65 @@ impl ObjectStore for StoreWithMetrics {

async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.copy.start_timer();
self.store.copy(from, to).await

let store = self.store.clone();
let from = from.clone();
let to = to.clone();
self.runtime
.spawn(async move { store.copy(&from, &to).await })
.await
.map_err(|source| StoreError::Generic {
store: METRICS,
source: Box::new(source),
})?
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM.rename.start_timer();
self.store.rename(from, to).await

let store = self.store.clone();
let from = from.clone();
let to = to.clone();
self.runtime
.spawn(async move { store.rename(&from, &to).await })
.await
.map_err(|source| StoreError::Generic {
store: METRICS,
source: Box::new(source),
})?
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM
.copy_if_not_exists
.start_timer();
self.store.copy_if_not_exists(from, to).await

let store = self.store.clone();
let from = from.clone();
let to = to.clone();
self.runtime
.spawn(async move { store.copy_if_not_exists(&from, &to).await })
.await
.map_err(|source| StoreError::Generic {
store: METRICS,
source: Box::new(source),
})?
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let _timer = OBJECT_STORE_DURATION_HISTOGRAM
.rename_if_not_exists
.start_timer();
self.store.rename_if_not_exists(from, to).await

let store = self.store.clone();
let from = from.clone();
let to = to.clone();
self.runtime
.spawn(async move { store.rename_if_not_exists(&from, &to).await })
.await
.map_err(|source| StoreError::Generic {
store: METRICS,
source: Box::new(source),
})?
}
}

0 comments on commit cb7fc2b

Please sign in to comment.