Skip to content

Commit bd539d0

Browse files
committed
Acquire the GIL only per chunk
1 parent 161c3c2 commit bd539d0

File tree

1 file changed

+9
-7
lines changed

1 file changed

+9
-7
lines changed

libs/opsqueue_python/src/producer.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -217,17 +217,19 @@ impl ProducerClient {
217217
py.allow_threads(|| {
218218
let prefix = uuid::Uuid::now_v7().to_string();
219219
tracing::debug!("Uploading submission chunks to object store subfolder {prefix}...");
220-
let chunk_count = Python::with_gil(|py| {
221-
self.block_unless_interrupted(async {
222-
let chunk_contents = chunk_contents.bind(py);
223-
let stream = futures::stream::iter(chunk_contents)
224-
.map(|item| item.and_then(|item| item.extract()).map_err(Into::into));
220+
let chunk_count = self.block_unless_interrupted(async {
221+
let chunk_contents = std::iter::from_fn(move || {
222+
Python::with_gil(|py|
223+
chunk_contents.bind(py).clone().next()
224+
.map(|item| item.and_then(
225+
|item| item.extract()).map_err(Into::into)))
226+
});
227+
let stream = futures::stream::iter(chunk_contents);
225228
self.object_store_client
226229
.store_chunks(&prefix, ChunkType::Input, stream)
227230
.await
228231
.map_err(|e| CError(R(L(e))))
229-
})
230-
})?;
232+
})?;
231233
let chunk_count = chunk::ChunkIndex::from(chunk_count);
232234
tracing::debug!("Finished uploading to object store. {prefix} contains {chunk_count} chunks");
233235

0 commit comments

Comments
 (0)