Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase the scale example from book runs away grabbing memory when input exceeds 2^10 #524

Closed
jacg opened this issue Oct 15, 2024 · 15 comments · Fixed by #529
Closed

Increase the scale example from book runs away grabbing memory when input exceeds 2^10 #524

jacg opened this issue Oct 15, 2024 · 15 comments · Fixed by #529

Comments

@jacg
Copy link

jacg commented Oct 15, 2024

I am trying to follow the examples in the Differential Dataflow book, and run into a problem in the Increase the scale section.

With CLI args up to 1024, the code completes in under a millisecond. As soon as the argument exceeds 1024, the program maxes out one CPU core and continues to grab memory until I kill it by hand. I have observed it grab as much as 15 Gigabytes.

Here is a flamegraph of stack samples when this happens:

flamegraph-1025

As there seems to be no complete sample of the code at that stage, and I have to derive it by applying the changes suggested in the text to earlier versions of the code, I cannot be sure that I haven't made a mistake. Nevertheless, the drastic step function change in behaviour seems unlikely to be caused by my misinterpreting the instructions.

I am using Rust 1.81.0, on Linux (NixOS).

@frankmcsherry
Copy link
Member

I'll take a peek. A funny thing does happen as you cross over some thresholds (e.g. 1024) involving how we package up the data, and it's possible that we've changed something and missed some consequences.

@frankmcsherry
Copy link
Member

Would you mind sharing your code? I just copied the example code from 1.1, and then added the loop from 1.2, and removed the inspect operator that prints out results. It seems to work up to 1M:

mcsherry@gallustrate differential-dataflow % time cargo run --release --example manages -- 1000000
    Finished `release` profile [optimized + debuginfo] target(s) in 0.03s
     Running `target/release/examples/manages 1000000`
cargo run --release --example manages -- 1000000  0.65s user 0.06s system 77% cpu 0.911 total
mcsherry@gallustrate differential-dataflow % 

@jacg
Copy link
Author

jacg commented Oct 15, 2024

Sure

use differential_dataflow::{
    input::InputSession,
    operators::Join,
};

fn main() {
    // Define a timely dataflow computation
    timely::execute_from_args(std::env::args(), move |worker| {
        // Create an input collection of data
        let mut input = InputSession::new();
        // Define a new computation
        worker.dataflow(|scope| {
            // Create a new collection from our input
            let manages = input.to_collection(scope);
            // If (m2, m1) and (m1, p) then output (m1, (m2, p))
            manages
                .map(|(m2, m1)| (m1, m2))
                .join(&manages)
                //.inspect(|x| println!("{x:?}"))
                ;
        });
        // Set a size for our organization from the input
        let size = std::env::args().nth(1).and_then(|s| {s.parse::<u32>().ok()}).unwrap_or(10);
        // Load input (a binary tree)
        input.advance_to(0);
        for person in 0..size {
            input.insert((person/2, person));
        }

        // for person in 1..size {
        //     input.advance_to(person);
        //     input.remove((person/2, person));
        //     input.insert((person/3, person));
        // }

    }).expect("Computation terminated abnormally");
}

I just repeated it again from scratch, copy-pasting the code from the book according to the process you mention above, and the behaviour is identical.

@frankmcsherry
Copy link
Member

Huh. I just copy/pasted your code into my environment, and .. it runs great. I was on nightly, but went back to 1.81 stable and still good. I'm running this in a check-out of the repo; I can't understand if that might matter. I did a cargo update to make sure I had the most recent things; still good.

Let me ponder this a bit, and see if I can get some other eyes on it. What you are seeing from the stacks is that it is spending a massive amount of time in data processing, which means it had to make a massive amount of data for some reason. But .. nothing in the code as written would produce lots of data (for an input of 1025) barring a bug somewhere, but .. I'm not sure how to track down that bug. =/

@jacg
Copy link
Author

jacg commented Oct 15, 2024

In case it's any use to you, my environment is defined, pretty strictly, by this Nix flake

{
  description = "Playing with timely dataflow";

  inputs = {

    # Version pinning is managed in flake.lock. Upgrading can be done with
    # something like
    #
    #    nix flake lock --update-input nixpkgs

    nixpkgs     .url = "github:nixos/nixpkgs/nixos-24.05";
    rust-overlay.url = "github:oxalica/rust-overlay";
    flake-utils .url = "github:numtide/flake-utils";
    flake-compat = {
      url = "github:edolstra/flake-compat";
      flake = false;
    };

  };

  outputs = { self, nixpkgs, rust-overlay, flake-utils, ... }:

    # Option 1: try to support each default system
    flake-utils.lib.eachDefaultSystem # NB Some packages in nixpkgs are not supported on some systems

    # Option 2: try to support selected systems
    # flake-utils.lib.eachSystem ["x86_64-linux" "i686-linux" "aarch64-linux" "x86_64-darwin"]
      (system:
        let
          pkgs = import nixpkgs {
              inherit system;
              overlays = [
                # ===== Specification of the rust toolchain to be used ====================
                rust-overlay.overlays.default (final: prev:
                  { rust-tools = final.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml; }
                )
              ];
            };
        in
          {
            devShell = pkgs.mkShell {
              name = "my-rust-project";
              buildInputs = [
                pkgs.rust-tools
                pkgs.cargo-nextest
                pkgs.cargo-flamegraph
                pkgs.bacon
                pkgs.just
                pkgs.cowsay
              ];
              shellHook =
                ''
                  export PS1="timely book devshell> "
                '';
              # Requires "rust-src" to be present in components in ./rust-toolchain.toml
              RUST_SRC_PATH = "${pkgs.rust-tools}/lib/rustlib/src/rust/library";
            };
          }
      );
}

Which depends on this rust-toolchain.toml:

# https://rust-lang.github.io/rustup/overrides.html#the-toolchain-file

[toolchain]

# You may need to run
#
#   nix flake lock --update-input rust-overlay
#
# when changing these to more recent versions
channel = "1.81.0"
#channel = "nightly-2024-07-21"

# see https://rust-lang.github.io/rustup/concepts/profiles.html
profile = "default"
#profile = "minimal"


# see https://rust-lang.github.io/rustup/concepts/components.html
components = [ "rust-analyzer", "clippy", "rust-src", "rustfmt" ]

#targets = [ "wasm32-unknown-unknown" ]

In theory I should also give you my flake.lock, but it's noisy and unlikely to make a difference, so I won't bother unless you have the means and interest to try to go down this route.

@antiguru
Copy link
Member

Interesting! I tried running your example with various parameters but couldn't observe the behavior you mentioned. I'm not using nix, but Debian, so that seems like the remaining difference in how we setup the experiment. Can you share the exact invocation that causes the runaway memory utilization?

@jacg
Copy link
Author

jacg commented Oct 15, 2024

Can you share the exact invocation that causes the runaway memory utilization?

cargo run --release -- 1025

Change that 1025 to 1024 and it completes in under a millisecond.

@jacg
Copy link
Author

jacg commented Oct 16, 2024

I've tried it in a VM running Debian 12, hosted on the same machine as before. Essentially the same result: 1024 completes; 1025 gets killed by oom-killer.

So it looks like Debian/NixOS is not the crucial difference. Could it be my hardware? I'll try to do some more tests ...

@antiguru
Copy link
Member

If you can share a VM image I'd be happy to take a look.

@jacg
Copy link
Author

jacg commented Oct 16, 2024

It works fine both in my NixOS+flake and in my virtual machine, if I downgrade the rust toolchain to 1.75.0. From 1.76.0 onwards, both fail. This seems at odds with what @frankmcsherry reported earlier.

My observations in the (Debian 12) VM are based on sourcing this shell script:

sudo apt-get update --assume-yes
sudo apt install build-essential --assume-yes
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
. "$HOME/.cargo/env"
cargo new hmm
cd hmm
cargo add timely differential-dataflow
cat <<THE-END > src/main.rs
use differential_dataflow::{
    input::InputSession,
    operators::Join,
};

fn main() {
    timely::execute_from_args(std::env::args(), move |worker| {
        let mut input = InputSession::new();
        worker.dataflow(|scope| {
            let manages = input.to_collection(scope);
            manages
                .map(|(m2, m1)| (m1, m2))
                .join(&manages)
                //.inspect(|x| println!("{x:?}"))
                ;
        });
        let size = std::env::args().nth(1).and_then(|s| {s.parse::<u32>().ok()}).unwrap_or(10);
        input.advance_to(0);
        for person in 0..size {
            input.insert((person/2, person));
        }

        // for person in 1..size {
        //     input.advance_to(person);
        //     input.remove((person/2, person));
        //     input.insert((person/3, person));
        // }

    }).expect("Computation terminated abnormally");
}
THE-END

rustup default 1.75.0
cargo build --release
echo It works fine with Rust 1.75.0
time target/release/hmm 1024
time target/release/hmm 1025

rustup default 1.76.0
cargo build --release
echo 1025 goes OOM with Rust 1.76.0
time target/release/hmm 1024
time target/release/hmm 1025

@jacg
Copy link
Author

jacg commented Oct 16, 2024

I have now managed to reproduce these observations (works with 1.75.0, fails with 1.76.0 and 1.81.0) on a machine that has no Nix on it whatsoever, so it looks like the problem is not related to my local configuration.

@frankmcsherry
Copy link
Member

cargo add timely differential-dataflow

I bet this is it! We are using a different version of the repo, if so, I think. The crates.io version is .. years out of date at this point (and includes various bits of unsafe code that have since been removed, and which with different Rust versions might result in new horrible UB).

If you point your Cargo.toml dependencies at

timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" }

instead, it might work!

@frankmcsherry
Copy link
Member

If so, there's still the docs to clean up (which were written at a point where 0.12 "worked" with what was then current Rust, as much as UB is "worked"). Potentially a clean version of the stack to push up to crates too (probably double check for ~zero unsafe blocks, and ship on that basis).

@jacg
Copy link
Author

jacg commented Oct 16, 2024

timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" }

That works.

Thank you.

@frankmcsherry
Copy link
Member

I'll get on fixing this up so that the next poor soul who tries to follow the instructions has a less horrible time than you did. I apologize for that!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants