Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequential async/await calls to AsyncWriteExt::write_all result in reordered futures #2978

Open
alexmaco opened this issue Oct 16, 2020 · 5 comments
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. E-help-wanted Call for participation: Help is requested to fix this issue. M-io Module: tokio/io

Comments

@alexmaco
Copy link

alexmaco commented Oct 16, 2020

Version
all versions from 0.2.20 to 0.3.0 inclusive. Older versions not tested

Platform
Linux $HOSTNAME 4.15.0-118-generic #119-Ubuntu SMP Tue Sep 8 12:30:01 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
Linux $HOSTNAME 4.4.132+ #8 SMP Thu Aug 13 08:51:08 UTC 2020 armv7l GNU/Linux

Description

In a function that performs and awaits 2 calls to AsyncWriteExt::write_all in sequence to stdout, I would expect the data to show up in the same order on stdout.

However, the contents of data and \n in the write_out function below are sometimes reordered.

I tried this code:

use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt};
use tokio::runtime;

fn main() -> Result<(), String> {
    let rt = runtime::Builder::new_current_thread()
        .build()
        .map_err(|e| format!("Failed to create runtime: {:?}", e))?;

    rt.block_on(async {
        let stdin = io::stdin();
        let buf_read = io::BufReader::new(stdin);
        let mut lines = buf_read.lines();
        while let Some(line) = lines
            .next_line()
            .await
            .map_err(|e| format!("Error reading from stdin: {:?}", e))?
        {
            match write_out(line.as_bytes()).await {
                Ok(()) => {}
                Err(e) => return Err(format!("Error printing output: {}", e)),
            };
        }

        Ok(())
    })
}

// The problem seems to be in this function
async fn write_out(data: &[u8]) -> io::Result<()> {
    let mut out = io::stdout();
    out.write_all(data).await?;
    out.write_all(b"\n").await?;
    Ok(())
}

Enabled features features = ["io-std", "io-util", "rt"]
I've also tried enabling the "process" feature and calling enable_io on the Builder, but the reorder issue is the same.

I invoke this as:

$ seq 1 10000 > test_lines
$ cat test_lines | target/release/test_bin > output_lines
$ diff -u test_lines output_lines

I would expect test_lines and output_lines to be identical. However the diff produces multiple deltas like this:

...
@@ -8801,8 +8801,8 @@
 8801
 8802
 8803
-8804
-8805
+88048805
+
 8806
 8807
 8808
@@ -8815,8 +8815,8 @@
 8815
 8816
 8817
-8818
-8819
+88188819
+
 8820
 8821
 8822
...

Running a debug version produces few or no differences, but running a release version usually produces tens of reorders over the 10k line sequence.

I've run into this issue while working on https://github.com/alexmaco/fpipe/ , and this bug is making that tool unreliable.

Please let me know how I can assist in debugging this, since the io-related tokio source I've read seems correct on first glance.

@alexmaco alexmaco added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Oct 16, 2020
@Darksonn Darksonn added the M-io Module: tokio/io label Oct 16, 2020
@Darksonn
Copy link
Contributor

Thank you for the bug report. It appears to work if you add a flush:

async fn write_out(data: &[u8]) -> io::Result<()> {
    let mut out = io::stdout();
    out.write_all(data).await?;
    out.write_all(b"\n").await?;
    out.flush().await?;
    Ok(())
}

It also appears to work if you reuse the io::stdout() between iterations.

use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt};
use tokio::runtime;

fn main() -> Result<(), String> {
    let rt = runtime::Builder::new_current_thread()
        .build()
        .map_err(|e| format!("Failed to create runtime: {:?}", e))?;

    rt.block_on(async {
        let mut out = io::stdout();
        let stdin = io::stdin();
        let buf_read = io::BufReader::new(stdin);
        let mut lines = buf_read.lines();
        while let Some(line) = lines
            .next_line()
            .await
            .map_err(|e| format!("Error reading from stdin: {:?}", e))?
        {
            match write_out(&mut out, line.as_bytes()).await {
                Ok(()) => {}
                Err(e) => return Err(format!("Error printing output: {}", e)),
            };
        }

        Ok(())
    })
}

// The problem seems to be in this function
async fn write_out(out: &mut io::Stdout, data: &[u8]) -> io::Result<()> {
    out.write_all(data).await?;
    out.write_all(b"\n").await?;
    Ok(())
}

alexmaco added a commit to alexmaco/fpipe that referenced this issue Oct 17, 2020
@carllerche carllerche added the E-help-wanted Call for participation: Help is requested to fix this issue. label Nov 12, 2020
@blasrodri
Copy link
Contributor

blasrodri commented Dec 25, 2020

Is this a case of what the docs describe as:

be aware that writes using write_all are not guaranteed to occur as a single write, so multiple threads writing data with write_all may result in interleaved output

?

Do you have any pointers on how to fix this?

@Darksonn
Copy link
Contributor

In this particular case the problem is that if you create multiple handles to stdout, they do not coordinate properly, so if the previous one still has running tasks on the blocking thread pool, the new handle will not wait for the old one's operation to finish.

How exactly this should be approached is not obvious.

@alexmaco
Copy link
Author

What I think is actually surprising is not technically the lack of coordination between different handles, but instead the fact that sequential-looking code has non-sequential effects. If I create 2 different handles that are alive simultaneously, and write to them separately, I would not assume any automatic coordination is happening, though that kind of coordination is not a usability impairment.

But in this case, 2 events that are specified sequentially, seem to overlap or be reordered, when the user code contains no intention or indication of parallelism.

So IMHO, the solution would involve ensuring write effects are at least sequential per async_context first, and per stdout_handle second, so the effects always appear "as locally written".

Perhaps a naive approach would be to make the io::stdout() function async, and stick an async mutex inside the handle. When dropping the handle it would have to ensure that write operations associated with it are completed first. However, this would be api-breaking.

Maybe this sort of ordering can be enforced by setting some sort of flag on an internal stdout singleton, and using that to ensure writes issued by new handles always wait for writes issues by previous handles from the same context to complete. In principle a single atomic integer is sufficient, which is incremented when each handle is created, and then associated with write operations, however i don't know if something like this is compatible with tokio's internals.

@Darksonn
Copy link
Contributor

We do not necessarily need to make the io::stdout() method async. We could make the returned Stdout type hook up with the operation started by the previous Stdout, which would make it wait for that operation to finish before you could write anything.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. E-help-wanted Call for participation: Help is requested to fix this issue. M-io Module: tokio/io
Projects
None yet
Development

No branches or pull requests

4 participants