Skip to content

riberk/futures-dag

Repository files navigation

futures-dag

Build Status crates.io Coverage

A dynamic DAG scheduler for async futures with a Stream-based API.

This crate executes futures according to a directed acyclic graph (DAG) of dependencies and yields their results as a stream. A future starts executing only after all of its dependencies have completed.

The graph can be extended while it is running, and missing dependencies are handled automatically.


Features

  • Dependency-aware execution Futures run only when all declared dependencies are satisfied.

  • Dynamic graph Nodes and dependencies can be inserted while the DAG is already running.

  • Lazy by design No future is polled until the stream itself is polled.

  • Concurrent execution Independent nodes run concurrently using FuturesUnordered.

  • Stream-based API Results are yielded as (Key, Output) pairs as soon as futures complete.

  • Placeholder dependencies Dependencies may be referenced before their futures are inserted.

  • Cycle detection If execution stalls and the graph contains a cycle, polling the stream panics.

  • Thread-safe insertion A mutex-backed wrapper allows inserting nodes from multiple tasks or threads.


Basic usage

use futures::StreamExt;
use std::future::ready;
use crate::FuturesDag;

let mut dag = FuturesDag::new();

dag.insert(1, [2, 3].into(), ready("node 1")).unwrap();
dag.insert(2, [3].into(), ready("node 2")).unwrap();
dag.insert(3, [].into(), ready("node 3")).unwrap();

let results = dag.collect::<Vec<_>>().await;

assert_eq!(
    results,
    vec![
        (3, "node 3"),
        (2, "node 2"),
        (1, "node 1"),
    ]
);

The order of yielded items reflects completion order, not insertion order.


Dynamic insertion

Nodes may depend on keys that are not yet present in the DAG.

use futures::{StreamExt, FutureExt};
use futures_dag::BoxFuturesDag;

let mut dag = BoxFuturesDag::default();

dag.insert_box(1, [2].into(), async {}).unwrap();

assert!(dag.next().now_or_never().is_none());

dag.insert_box(2, [].into(), async {}).unwrap();

assert_eq!(dag.next().await.unwrap().0, 2);
assert_eq!(dag.next().await.unwrap().0, 1);
assert_eq!(dag.next().await, None);

Missing dependencies are tracked as placeholders and automatically resolved once their futures are inserted and completed.


Thread-safe insertion

If nodes need to be inserted from multiple tasks or threads while another task drives the stream, use FuturesMutexDag.

use futures::StreamExt;
use futures_dag::{BoxFuturesDag, FuturesMutexDag};

let dag = FuturesMutexDag::new(BoxFuturesDag::default());

let dag_clone = dag.clone();
tokio::spawn(async move {
    dag_clone.insert_box(2, [].into(), async {}).unwrap();
});

dag.insert_box(1, [2].into(), async {}).unwrap();

let results = dag.map(|v| v.0).collect::<Vec<_>>().await;
assert_eq!(results, vec![2, 1]);

FuturesMutexDag stores the DAG in Arc<Mutex<_>>.


Execution model

  • Futures are not spawned on a runtime.
  • All polling happens when the stream is polled.
  • Ready futures are driven concurrently via FuturesUnordered.
  • The DAG completes when all nodes have completed and yielded their outputs.

If the DAG cannot make progress and not all nodes are completed, the graph is checked for cycles and polling panics if a cycle is found.


When to use this crate

This crate is a good fit when you need:

  • dependency-aware async execution,
  • incremental graph construction,
  • streaming of completion results,
  • a lightweight alternative to a full task scheduler.

License

MIT

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published