Skip to content

feat(hydro_lang): add sim_atomic_input with scheduler support#2851

Open
shadaj wants to merge 1 commit into
mainfrom
sandbox-15952e72-fc63-4b8e-a9d1-7b7bd73be15c
Open

feat(hydro_lang): add sim_atomic_input with scheduler support#2851
shadaj wants to merge 1 commit into
mainfrom
sandbox-15952e72-fc63-4b8e-a9d1-7b7bd73be15c

Conversation

@shadaj
Copy link
Copy Markdown
Member

@shadaj shadaj commented May 6, 2026

Summary

Adds Process::sim_atomic_input() which returns a SimAtomicSender<T, O, R> and a Stream<T, Atomic<Process>, ...>. The sender's send_atomic method is synchronous — values are immediately available in the next atomic slice without requiring a separate tick or ack pattern.

The sim scheduler now properly explores both orderings when an atomic input and a regular input are sent together.

Implementation

  1. Source placement: The ExternalInput node is placed directly at the Atomic location (not at the top-level process with a batch into the tick).

  2. AtomicSourceHook: A new sim hook type that monitors the external input channel. When items are pending, can_make_nontrivial_decision() returns true, which forces the scheduler to consider the tick as ready. On release, all pending items are emitted (no batching choice).

  3. Scheduler integration: The hook is registered in create_external_source when the source location is Atomic. Items from the external channel are buffered at the process level and released into the tick graph via the hook.

Usage

let (sender, atomic_stream) = process.sim_atomic_input::<i32, _, _>();
// Use atomic_stream in your dataflow (fold, batch_atomic, etc.)

// At runtime — synchronous, no await needed:
sender.send_atomic(42);
sender.send_many_atomic([1, 2, 3]);

Tests

  • sim_atomic_input_api: verifies basic send + scheduler explores both orderings (2 instances)
  • sim_atomic_input_same_tick: verifies the scheduler explores the same-tick scenario
  • Ported 4 request_response tests in hydro_std to use the new API

Closes #2380

@shadaj shadaj requested a review from a team May 6, 2026 21:59
@shadaj shadaj force-pushed the sandbox-15952e72-fc63-4b8e-a9d1-7b7bd73be15c branch from d30aa7d to 0826e9f Compare May 6, 2026 22:04
@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented May 6, 2026

Deploying hydro with  Cloudflare Pages  Cloudflare Pages

Latest commit: 023ed3b
Status: ✅  Deploy successful!
Preview URL: https://8474d9ae.hydroflow.pages.dev
Branch Preview URL: https://sandbox-15952e72-fc63-4b8e-a.hydroflow.pages.dev

View logs

@shadaj shadaj force-pushed the sandbox-15952e72-fc63-4b8e-a9d1-7b7bd73be15c branch 5 times, most recently from dd3fe00 to 87ae805 Compare May 8, 2026 20:34
@shadaj shadaj changed the title feat(hydro_lang): add sim_atomic_input API feat(hydro_lang): add sim_atomic_input with scheduler support May 8, 2026
@shadaj shadaj force-pushed the sandbox-15952e72-fc63-4b8e-a9d1-7b7bd73be15c branch from 87ae805 to 0022399 Compare May 8, 2026 20:39
@shadaj shadaj requested review from Benjscho and luckyworkama May 8, 2026 20:42
@MingweiSamuel MingweiSamuel requested a review from Copilot May 15, 2026 22:21
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds Process::sim_atomic_input() for the Hydro simulator, returning a SimAtomicSender paired with a Stream whose source is placed directly inside an Atomic<Process> location. A new AtomicSourceHook lets the scheduler treat pending atomic-input items as a non-trivial reason to advance the tick, enabling exhaustive exploration of both same-tick and later-tick interleavings between the atomic input and other inputs. Existing request_response tests are ported off the ack pattern to the simpler synchronous API.

Changes:

  • New SimAtomicSender wrapper with synchronous send_atomic / send_many_atomic[_unordered] methods.
  • New AtomicSourceHook and dedicated create_external_source branch in the sim builder that buffers items at the root location and releases them into the tick graph via the hook.
  • New Process::sim_atomic_input constructor and sim_atomic_input_same_tick regression test; hydro_std::request_response tests migrated to the new API.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
hydro_lang/src/sim/mod.rs Declares the SimAtomicSender wrapper type.
hydro_lang/src/sim/compiled.rs Implements synchronous send_atomic / send_many*_atomic on SimAtomicSender.
hydro_lang/src/sim/runtime.rs Adds AtomicSourceHook that drains all pending items on each scheduler decision.
hydro_lang/src/sim/builder.rs Special-cases LocationId::Atomic in create_external_source to install the hook and bridge buffer→tick.
hydro_lang/src/location/process.rs Adds Process::sim_atomic_input constructing an ExternalInput directly at the Atomic location.
hydro_lang/src/location/tick.rs Adds sim_atomic_input_same_tick regression test asserting both interleavings are explored.
hydro_std/src/request_response.rs Ports four tests from the ack pattern to sim_atomic_input / send_atomic.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

input: #buffered_ident.clone(),
to_release: None,
output: #hoff_send_ident,
format_item_debug: |_| None,
///
/// The source is placed directly inside the atomic tick, so sent values
/// are immediately available in the next atomic slice without requiring
/// a separate tick to batch them in. `send_atomic` is synchronous.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this description would benefit from a motivating example. I think this will be a likely discovery point for people/agents. Why exactly would you use this over sim_input.

/// are immediately available in the next atomic slice without requiring
/// a separate tick to batch them in. `send_atomic` is synchronous.
#[expect(clippy::type_complexity, reason = "stream markers")]
pub fn sim_atomic_input<
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no cluster version?

Places the ExternalInput source directly at the Atomic location and adds
an AtomicSourceHook that signals readiness when items are pending. This
allows the sim scheduler to explore both orderings: the batch for the
regular input being non-empty on the first tick (same tick as atomic write)
or empty (read comes in a later tick).

send_atomic is synchronous — values are immediately available in the
next atomic slice without requiring a separate tick or ack pattern.

Closes #2380

PR: #2851
@shadaj shadaj force-pushed the sandbox-15952e72-fc63-4b8e-a9d1-7b7bd73be15c branch from 0022399 to 023ed3b Compare May 20, 2026 23:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add support for atomic simulator inputs that guarantee batching without triggering a tick

3 participants