Skip to content

[ENH] Add dedupe closure to storage #5141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 24, 2025
Merged

[ENH] Add dedupe closure to storage #5141

merged 8 commits into from
Jul 24, 2025

Conversation

HammadB
Copy link
Collaborator

@HammadB HammadB commented Jul 24, 2025

Description of changes

Summarize the changes made by this PR.

  • Adds a dedupe closure API to the storage interface under fetch
  • Fetch allows providing an async closure to be deduplicated
  • This changes off of a Shared future for deduplication, since the dispatcher runtimes and main runtime may share that future, leading to undefined behavior.

Test plan

How are these changes tested?
Existing tests cover this

  • Tests pass locally with pytest for python, yarn test for js, cargo test for rust

Migration plan

Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?

Observability plan

What is the plan to instrument and monitor this change?
Existing telemetry monitors this change

Documentation Changes

None

Copy link

Reviewer Checklist

Please leverage this checklist to ensure your code review is thorough before approving

Testing, Bugs, Errors, Logs, Documentation

  • Can you think of any use case in which the code does not behave as intended? Have they been tested?
  • Can you think of any inputs or external events that could break the code? Is user input validated and safe? Have they been tested?
  • If appropriate, are there adequate property based tests?
  • If appropriate, are there adequate unit tests?
  • Should any logging, debugging, tracing information be added or removed?
  • Are error messages user-friendly?
  • Have all documentation changes needed been made?
  • Have all non-obvious changes been commented?

System Compatibility

  • Are there any potential impacts on other parts of the system or backward compatibility?
  • Does this change intersect with any items on our roadmap, and if so, is there a plan for fitting them together?

Quality

  • Is this code of a unexpectedly high quality (Readability, Modularity, Intuitiveness)

Copy link
Collaborator Author

HammadB commented Jul 24, 2025

Summary: 1 successful workflow, 1 pending workflow

Last updated: 2025-07-24 18:55:27 UTC

&self,
key: &str,
options: GetOptions,
// TODO: remove is_parallel and move it into GetOptions, refactor all callers
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do in another PR

let (output_tx, output_rx) = tokio::sync::oneshot::channel();
// Add the new sender to the existing request.
inflight_req.senders.push(output_tx);
drop(requests);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to structure all of this to avoid manual drops() but I actually think its cleaner this way

@HammadB

This comment was marked as outdated.

@sanketkedia
Copy link
Contributor

  1. Should we retest the closure approach with the same workload on staging? Semantically it is a bit different from what we tested earlier
  2. Should we have two separate PRs - helps with independent validation. Also, the shared future replacement is a new functionality and introduces new dynamics that we should bake a bit?

@HammadB
Copy link
Collaborator Author

HammadB commented Jul 24, 2025

  1. Yeah we will
  2. Unfortunately the reason I did it in one PR is the minute you introduce the closure the shared future becomes difficult to do.

@HammadB HammadB marked this pull request as ready for review July 24, 2025 18:34
@HammadB HammadB enabled auto-merge (squash) July 24, 2025 18:34
Copy link
Contributor

Add Deduplicated Async Closure API (fetch) to Storage Layer

This PR introduces a new deduplicated request mechanism in the storage layer by adding a fetch API that accepts an async closure, enabling deduplication with per-request closures instead of shared futures. The deduplication logic has been rethought to avoid undefined behavior from cross-runtime use, and the storage interface is updated throughout the codebase (including in blockstore) to support closure-based deduplication. Additional cleanups and minor lock/naming improvements accompany the new API, and extensive integration testing validates correct deduplication behavior.

Key Changes

• Introduces Storage::fetch() API to all major storage backends, supporting deduplicated concurrent requests using an async closure.
• Refactors AdmissionControlledS3Storage internal request deduplication from Shared to a Vec of async oneshot channel senders, fixing cross-runtime issues.
• Adapts blockstore BlockManager to utilize the new fetch API with a closure for block loads.
• Removes deprecated code around inflight request coalescing and mutexes.
• Expands test coverage with a new integration test validating dedupe under high concurrency.
• Cargo.toml: Expands serde features to include 'rc' for compatibility with new code patterns.

Affected Areas

• rust/storage/src/admissioncontrolleds3.rs
• rust/blockstore/src/arrow/provider.rs
• rust/storage/src/lib.rs
• Cargo.toml

This summary was automatically generated by @propel-code-bot

@HammadB HammadB merged commit 1fea47b into main Jul 24, 2025
57 checks passed
Inventrohyder pushed a commit to Inventrohyder/chroma that referenced this pull request Aug 5, 2025
## Description of changes

_Summarize the changes made by this PR._
  - Adds a dedupe closure API to the storage interface under `fetch`
  - Fetch allows providing an async closure to be deduplicated 
- This changes off of a Shared future for deduplication, since the
dispatcher runtimes and main runtime may share that future, leading to
undefined behavior.

## Test plan

_How are these changes tested?_
Existing tests cover this
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Migration plan
_Are there any migrations, or any forwards/backwards compatibility
changes needed in order to make sure this change deploys reliably?_

## Observability plan
_What is the plan to instrument and monitor this change?_
Existing telemetry monitors this change

## Documentation Changes
None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants