A dynamic DAG scheduler for async futures with a Stream-based API.
This crate executes futures according to a directed acyclic graph (DAG) of dependencies and yields their results as a stream. A future starts executing only after all of its dependencies have completed.
The graph can be extended while it is running, and missing dependencies are handled automatically.
-
Dependency-aware execution Futures run only when all declared dependencies are satisfied.
-
Dynamic graph Nodes and dependencies can be inserted while the DAG is already running.
-
Lazy by design No future is polled until the stream itself is polled.
-
Concurrent execution Independent nodes run concurrently using
FuturesUnordered. -
Stream-based API Results are yielded as
(Key, Output)pairs as soon as futures complete. -
Placeholder dependencies Dependencies may be referenced before their futures are inserted.
-
Cycle detection If execution stalls and the graph contains a cycle, polling the stream panics.
-
Thread-safe insertion A mutex-backed wrapper allows inserting nodes from multiple tasks or threads.
use futures::StreamExt;
use std::future::ready;
use crate::FuturesDag;
let mut dag = FuturesDag::new();
dag.insert(1, [2, 3].into(), ready("node 1")).unwrap();
dag.insert(2, [3].into(), ready("node 2")).unwrap();
dag.insert(3, [].into(), ready("node 3")).unwrap();
let results = dag.collect::<Vec<_>>().await;
assert_eq!(
results,
vec![
(3, "node 3"),
(2, "node 2"),
(1, "node 1"),
]
);The order of yielded items reflects completion order, not insertion order.
Nodes may depend on keys that are not yet present in the DAG.
use futures::{StreamExt, FutureExt};
use futures_dag::BoxFuturesDag;
let mut dag = BoxFuturesDag::default();
dag.insert_box(1, [2].into(), async {}).unwrap();
assert!(dag.next().now_or_never().is_none());
dag.insert_box(2, [].into(), async {}).unwrap();
assert_eq!(dag.next().await.unwrap().0, 2);
assert_eq!(dag.next().await.unwrap().0, 1);
assert_eq!(dag.next().await, None);Missing dependencies are tracked as placeholders and automatically resolved once their futures are inserted and completed.
If nodes need to be inserted from multiple tasks or threads while another task
drives the stream, use FuturesMutexDag.
use futures::StreamExt;
use futures_dag::{BoxFuturesDag, FuturesMutexDag};
let dag = FuturesMutexDag::new(BoxFuturesDag::default());
let dag_clone = dag.clone();
tokio::spawn(async move {
dag_clone.insert_box(2, [].into(), async {}).unwrap();
});
dag.insert_box(1, [2].into(), async {}).unwrap();
let results = dag.map(|v| v.0).collect::<Vec<_>>().await;
assert_eq!(results, vec![2, 1]);FuturesMutexDag stores the DAG in Arc<Mutex<_>>.
- Futures are not spawned on a runtime.
- All polling happens when the stream is polled.
- Ready futures are driven concurrently via
FuturesUnordered. - The DAG completes when all nodes have completed and yielded their outputs.
If the DAG cannot make progress and not all nodes are completed, the graph is checked for cycles and polling panics if a cycle is found.
This crate is a good fit when you need:
- dependency-aware async execution,
- incremental graph construction,
- streaming of completion results,
- a lightweight alternative to a full task scheduler.
MIT