-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Increase the scale
example from book runs away grabbing memory when input exceeds 2^10
#524
Comments
I'll take a peek. A funny thing does happen as you cross over some thresholds (e.g. 1024) involving how we package up the data, and it's possible that we've changed something and missed some consequences. |
Would you mind sharing your code? I just copied the example code from 1.1, and then added the loop from 1.2, and removed the
|
Sure use differential_dataflow::{
input::InputSession,
operators::Join,
};
fn main() {
// Define a timely dataflow computation
timely::execute_from_args(std::env::args(), move |worker| {
// Create an input collection of data
let mut input = InputSession::new();
// Define a new computation
worker.dataflow(|scope| {
// Create a new collection from our input
let manages = input.to_collection(scope);
// If (m2, m1) and (m1, p) then output (m1, (m2, p))
manages
.map(|(m2, m1)| (m1, m2))
.join(&manages)
//.inspect(|x| println!("{x:?}"))
;
});
// Set a size for our organization from the input
let size = std::env::args().nth(1).and_then(|s| {s.parse::<u32>().ok()}).unwrap_or(10);
// Load input (a binary tree)
input.advance_to(0);
for person in 0..size {
input.insert((person/2, person));
}
// for person in 1..size {
// input.advance_to(person);
// input.remove((person/2, person));
// input.insert((person/3, person));
// }
}).expect("Computation terminated abnormally");
} I just repeated it again from scratch, copy-pasting the code from the book according to the process you mention above, and the behaviour is identical. |
Huh. I just copy/pasted your code into my environment, and .. it runs great. I was on nightly, but went back to 1.81 stable and still good. I'm running this in a check-out of the repo; I can't understand if that might matter. I did a Let me ponder this a bit, and see if I can get some other eyes on it. What you are seeing from the stacks is that it is spending a massive amount of time in data processing, which means it had to make a massive amount of data for some reason. But .. nothing in the code as written would produce lots of data (for an input of 1025) barring a bug somewhere, but .. I'm not sure how to track down that bug. =/ |
In case it's any use to you, my environment is defined, pretty strictly, by this Nix flake {
description = "Playing with timely dataflow";
inputs = {
# Version pinning is managed in flake.lock. Upgrading can be done with
# something like
#
# nix flake lock --update-input nixpkgs
nixpkgs .url = "github:nixos/nixpkgs/nixos-24.05";
rust-overlay.url = "github:oxalica/rust-overlay";
flake-utils .url = "github:numtide/flake-utils";
flake-compat = {
url = "github:edolstra/flake-compat";
flake = false;
};
};
outputs = { self, nixpkgs, rust-overlay, flake-utils, ... }:
# Option 1: try to support each default system
flake-utils.lib.eachDefaultSystem # NB Some packages in nixpkgs are not supported on some systems
# Option 2: try to support selected systems
# flake-utils.lib.eachSystem ["x86_64-linux" "i686-linux" "aarch64-linux" "x86_64-darwin"]
(system:
let
pkgs = import nixpkgs {
inherit system;
overlays = [
# ===== Specification of the rust toolchain to be used ====================
rust-overlay.overlays.default (final: prev:
{ rust-tools = final.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml; }
)
];
};
in
{
devShell = pkgs.mkShell {
name = "my-rust-project";
buildInputs = [
pkgs.rust-tools
pkgs.cargo-nextest
pkgs.cargo-flamegraph
pkgs.bacon
pkgs.just
pkgs.cowsay
];
shellHook =
''
export PS1="timely book devshell> "
'';
# Requires "rust-src" to be present in components in ./rust-toolchain.toml
RUST_SRC_PATH = "${pkgs.rust-tools}/lib/rustlib/src/rust/library";
};
}
);
} Which depends on this # https://rust-lang.github.io/rustup/overrides.html#the-toolchain-file
[toolchain]
# You may need to run
#
# nix flake lock --update-input rust-overlay
#
# when changing these to more recent versions
channel = "1.81.0"
#channel = "nightly-2024-07-21"
# see https://rust-lang.github.io/rustup/concepts/profiles.html
profile = "default"
#profile = "minimal"
# see https://rust-lang.github.io/rustup/concepts/components.html
components = [ "rust-analyzer", "clippy", "rust-src", "rustfmt" ]
#targets = [ "wasm32-unknown-unknown" ] In theory I should also give you my |
Interesting! I tried running your example with various parameters but couldn't observe the behavior you mentioned. I'm not using nix, but Debian, so that seems like the remaining difference in how we setup the experiment. Can you share the exact invocation that causes the runaway memory utilization? |
Change that 1025 to 1024 and it completes in under a millisecond. |
I've tried it in a VM running Debian 12, hosted on the same machine as before. Essentially the same result: 1024 completes; 1025 gets killed by oom-killer. So it looks like Debian/NixOS is not the crucial difference. Could it be my hardware? I'll try to do some more tests ... |
If you can share a VM image I'd be happy to take a look. |
It works fine both in my NixOS+flake and in my virtual machine, if I downgrade the rust toolchain to 1.75.0. From 1.76.0 onwards, both fail. This seems at odds with what @frankmcsherry reported earlier. My observations in the (Debian 12) VM are based on sourcing this shell script: sudo apt-get update --assume-yes
sudo apt install build-essential --assume-yes
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
. "$HOME/.cargo/env"
cargo new hmm
cd hmm
cargo add timely differential-dataflow
cat <<THE-END > src/main.rs
use differential_dataflow::{
input::InputSession,
operators::Join,
};
fn main() {
timely::execute_from_args(std::env::args(), move |worker| {
let mut input = InputSession::new();
worker.dataflow(|scope| {
let manages = input.to_collection(scope);
manages
.map(|(m2, m1)| (m1, m2))
.join(&manages)
//.inspect(|x| println!("{x:?}"))
;
});
let size = std::env::args().nth(1).and_then(|s| {s.parse::<u32>().ok()}).unwrap_or(10);
input.advance_to(0);
for person in 0..size {
input.insert((person/2, person));
}
// for person in 1..size {
// input.advance_to(person);
// input.remove((person/2, person));
// input.insert((person/3, person));
// }
}).expect("Computation terminated abnormally");
}
THE-END
rustup default 1.75.0
cargo build --release
echo It works fine with Rust 1.75.0
time target/release/hmm 1024
time target/release/hmm 1025
rustup default 1.76.0
cargo build --release
echo 1025 goes OOM with Rust 1.76.0
time target/release/hmm 1024
time target/release/hmm 1025 |
I have now managed to reproduce these observations (works with 1.75.0, fails with 1.76.0 and 1.81.0) on a machine that has no Nix on it whatsoever, so it looks like the problem is not related to my local configuration. |
I bet this is it! We are using a different version of the repo, if so, I think. The crates.io version is .. years out of date at this point (and includes various bits of unsafe code that have since been removed, and which with different Rust versions might result in new horrible UB). If you point your
instead, it might work! |
If so, there's still the docs to clean up (which were written at a point where 0.12 "worked" with what was then current Rust, as much as UB is "worked"). Potentially a clean version of the stack to push up to crates too (probably double check for ~zero |
That works. Thank you. |
I'll get on fixing this up so that the next poor soul who tries to follow the instructions has a less horrible time than you did. I apologize for that! |
I am trying to follow the examples in the Differential Dataflow book, and run into a problem in the Increase the scale section.
With CLI args up to 1024, the code completes in under a millisecond. As soon as the argument exceeds 1024, the program maxes out one CPU core and continues to grab memory until I kill it by hand. I have observed it grab as much as 15 Gigabytes.
Here is a flamegraph of stack samples when this happens:
As there seems to be no complete sample of the code at that stage, and I have to derive it by applying the changes suggested in the text to earlier versions of the code, I cannot be sure that I haven't made a mistake. Nevertheless, the drastic step function change in behaviour seems unlikely to be caused by my misinterpreting the instructions.
I am using Rust 1.81.0, on Linux (NixOS).
The text was updated successfully, but these errors were encountered: