Skip to content

[QDP] Add a Quantum Data Loader and API refactor#1000

Merged
guan404ming merged 5 commits intoapache:mainfrom
rich7420:dataloader-api
Feb 2, 2026
Merged

[QDP] Add a Quantum Data Loader and API refactor#1000
guan404ming merged 5 commits intoapache:mainfrom
rich7420:dataloader-api

Conversation

@rich7420
Copy link
Contributor

Purpose of PR

Introduces a stateful Rust iterator (Quantum Data Loader) that yields one encoded batch (DLPack) per step, so Python can drive encoding with for qt in loader: instead of a closed benchmark loop. The same Rust core supports both benchmark (full pipeline, stats only) and data loader (batch-by-batch iteration). The public Python API is moved out of benchmark/ into a package qumat_qdp; benchmark scripts import from qumat_qdp and remain the primary consumers.

qdp-core (Rust)

  • pipeline_runner.rs (new): PipelineConfig, PipelineRunResult, run_throughput_pipeline, run_latency_pipeline; shared vector_len, generate_batch, fill_sample. Adds DataSource enum (Synthetic only; File reserved for Phase 2) and PipelineIterator with new_synthetic(engine, config) and next_batch(&mut self) -> Result<Option<*mut DLManagedTensor>> reusing encode_batch.
  • lib.rs: QdpEngine implements Clone; re-exports DataSource, PipelineIterator, PipelineConfig, PipelineRunResult, run_throughput_pipeline, run_latency_pipeline (Linux only).
  • gpu/encodings/amplitude.rs: run_amplitude_dual_stream_pipeline and exposure for dual-stream encode path used by benchmarks.

qdp-python (Rust bindings)

  • SendPtr wrapper: wraps *mut DLManagedTensor so the raw pointer can cross py.allow_threads (closure return must be Send). Used only to release GIL during encode.
  • PyQuantumLoader (#[pyclass]): holds Option<PipelineIterator>. Implements Python iterator protocol: __iter__ returns self; __next__ takes iterator out with take(), runs next_batch() inside py.allow_threads, then restores the iterator or returns a QuantumTensor. Raises StopIteration when exhausted. Stub on non-Linux.
  • QdpEngine::create_synthetic_loader(...): builds PipelineConfig, calls PipelineIterator::new_synthetic(engine.clone(), config), returns PyQuantumLoader. Stub on non-Linux.
  • run_throughput_pipeline_py: pipeline run is executed inside py.allow_threads so the full benchmark loop runs with GIL released (replacing previous detach/allow_threads usage as appropriate).

qumat_qdp (Python package)

  • qumat_qdp/ at project root: __init__.py (re-exports QdpEngine, QuantumTensor, QdpBenchmark, ThroughputResult, LatencyResult, QuantumDataLoader, run_throughput_pipeline_py), api.py (QdpBenchmark, ThroughputResult, LatencyResult; calls _qdp.run_throughput_pipeline_py), loader.py (QuantumDataLoader builder; __iter__ calls engine.create_synthetic_loader and returns the Rust iterator).
  • pyproject.toml: python-source = "." so the root qumat_qdp package is included in the wheel.

Benchmark scripts

  • Imports: All benchmark scripts now use from qumat_qdp import ... (or re-export). benchmark/api.py and benchmark/loader.py only re-export from qumat_qdp for backward compatibility.
  • Import path: Each script inserts the project root into sys.path before importing qumat_qdp, so uv run python benchmark/run_pipeline_baseline.py (and similar) works without extra PYTHONPATH.
  • benchmark_loader_throughput.py (new): runs throughput by iterating QuantumDataLoader (for qt in loader) and compares with QdpBenchmark.run_throughput() (full Rust pipeline).

Behaviour

  • Benchmark path: Unchanged from caller’s perspective. QdpBenchmark(device_id=0).qubits(16).batches(100, 64).run_throughput() still runs the full pipeline in Rust with GIL released via run_throughput_pipeline_py.
  • Loader path: QuantumDataLoader(device_id=0).qubits(16).batches(100, 64).source_synthetic() then for qt in loader: yields one QuantumTensor (batch) per iteration; GIL is released during each next_batch() in Rust.

Testing

  • Existing benchmark flow: uv run python benchmark/run_pipeline_baseline.py, benchmark_throughput.py, benchmark_latency.py.
  • Loader throughput: uv run python benchmark/benchmark_loader_throughput.py (compares loader iteration vs full pipeline).

Related Issues or PRs

Related to #969

Changes Made

  • Bug fix
  • New feature
  • Refactoring
  • Documentation
  • Test
  • CI/CD pipeline
  • Other

Breaking Changes

  • Yes
  • No

Checklist

  • Added or updated unit tests for all changes
  • Added or updated documentation for all changes
  • Successfully built and ran all unit tests or manual tests locally
  • PR title follows "MAHOUT-XXX: Brief Description" format (if related to an issue)
  • Code follows ASF guidelines

@rich7420
Copy link
Contributor Author

Before
Running throughput: 5 trials (qubits=16, batch_size=64, prefetch=16, batches=200)
Throughput: median=1153.5 vec/s, p95=1279.0 vec/s
Running latency: 5 trials
Latency: median=0.853 ms/vec, p95=0.874 ms/vec

After
Running throughput: 5 trials (qubits=16, batch_size=64, prefetch=16, batches=200)
Throughput: median=2541.9 vec/s, p95=3000.6 vec/s
Running latency: 5 trials
Latency: median=0.373 ms/vec, p95=0.401 ms/vec

@rich7420
Copy link
Contributor Author

cc @viiccwen

@ryankert01
Copy link
Member

ryankert01 commented Jan 31, 2026

Love the idea! When will 0.6.0 release dates be? I think we should land it faster.

Btw, can you share the benchmark result? Do it increase speed for decrease speed? I might be able to help with that.
edit: sorry I didn't saw it at first place!

@guan404ming
Copy link
Member

We could prepare it recently I think. We could iterate it fast.

Copy link
Member

@guan404ming guan404ming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design looks nice, left some question about implementation. Thanks!

_qdp: Optional[object] = None


def _get_qdp():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure if we need this. Could you explain more?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah , you're right, we don't need it in this part

Copy link
Contributor Author

@rich7420 rich7420 Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had intended the lazy import to defer loading the Rust extension until first use, but with the current design it doesn’t actually achieve that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some investigation, I found this piece of code seems work for this issue. Could you help test it?

from functools import lru_cache

@lru_cache(maxsize=1)
def get_qdp():
    import _qdp
    return _qdp

Copy link
Member

@guan404ming guan404ming Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it seems not work. Maybe we could also try to move the import _qdp to if TYPE_CHECKING: and rewrite the type def to def func(x: "_qdp.Type"): and see if it works

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it! thanks for the advice!

@guan404ming
Copy link
Member

Please help resolve ci error thanks!

@viiccwen
Copy link
Contributor

viiccwen commented Feb 1, 2026

It's failed with testing (CI didn't trigger these tests due to lack of CUDA environment).

testing/qdp/test_bindings.py::test_encode_cuda_tensor[data_shape0-expected_shape0-1] FAILED                                                                [  3%]
testing/qdp/test_bindings.py::test_encode_cuda_tensor[data_shape1-expected_shape1-3] FAILED                                                                [  3%]
testing/qdp/test_bindings.py::test_encode_cuda_tensor_preserves_input[data_shape0-False] FAILED                                                            [  3%]
testing/qdp/test_bindings.py::test_encode_cuda_tensor_preserves_input[data_shape1-True] FAILED                                                             [  4%]
testing/qdp/test_bindings.py::test_encode_3d_rejected[cuda_tensor-Unsupported CUDA tensor shape: 3D] FAILED                                                [  4%]
testing/qdp/test_bindings.py::test_encode_cuda_tensor_non_finite_values[<lambda>-zeros] FAILED                                                             [  4%]
testing/qdp/test_bindings.py::test_encode_cuda_tensor_non_finite_values[<lambda>-NaN] FAILED                                                               [  5%]
testing/qdp/test_bindings.py::test_encode_cuda_tensor_non_finite_values[<lambda>-Inf] FAILED                                                               [  5%]
testing/qdp/test_bindings.py::test_encode_cuda_tensor_output_dtype[float32-expected_dtype0] FAILED                                                         [  5%]
testing/qdp/test_bindings.py::test_encode_cuda_tensor_output_dtype[float64-expected_dtype1] FAILED                                                         [  5%]

Comment on lines 344 to 348
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this since PyTorch default stream can report cuda_stream as 0.
I don't know whether ur change cause this or not. Just found.
Then we can pass the testing. : D

Copy link
Contributor

@viiccwen viiccwen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx for contributing! left some comments : D

Image

Comment on lines 1132 to 1158
fn create_synthetic_loader(
&self,
total_batches: usize,
batch_size: usize,
num_qubits: u32,
encoding_method: &str,
seed: Option<u64>,
) -> PyResult<PyQuantumLoader> {
let config = PipelineConfig {
device_id: 0,
num_qubits,
batch_size,
total_batches,
encoding_method: encoding_method.to_string(),
seed,
warmup_batches: 0,
};
Copy link
Contributor

@viiccwen viiccwen Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

device_id uses hard-coded 0, config should equals to use actually.

Comment on lines 88 to 94
@requires_qdp
def test_qdp_benchmark_validation():
"""QdpBenchmark.run_throughput() raises if qubits/batches not set."""
import api

with pytest.raises(ValueError, match="qubits and batches"):
api.QdpBenchmark(device_id=0).run_throughput()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lacks of testing of run_latency() if qubits/batches not set.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In test we also don't have device_idpropagation.

batch_size: int = 64,
total_batches: int = 100,
encoding_method: str = "amplitude",
seed: Optional[int] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seed in Rust interface is: seed: Option<u64>, but in Python is: seed: Optional[int] = None.

maybe we can add validate guard to raise ValueError with clear description.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validate every args user enters and raise proper error might be a good one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated! plz take a another look

@ryankert01
Copy link
Member

Overall lgtm

Copy link
Member

@guan404ming guan404ming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks nice, we could follow up to refine this! This is a great starting point.

@guan404ming guan404ming merged commit 769f08b into apache:main Feb 2, 2026
6 checks passed
@rich7420
Copy link
Contributor Author

rich7420 commented Feb 2, 2026

thanks for the review @guan404ming , @ryankert01 and @viiccwen !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants