Skip to content

Conversation

@andrewrk
Copy link
Member

@andrewrk andrewrk commented Oct 15, 2025

This patchset adds std.Io and provides two implementations for it:

  • std.Io.Threaded - based on a thread pool.
    • -fno-single-threaded - supports concurrency and cancellation.
    • -fsingle-threaded - does not support concurrency or cancellation.
  • std.Io.Evented - work-in-progress, experimental. This API is not ready to be used yet, but it serves to inform the evolution of std.Io API.
    • IoUring implementation for Linux proof of concept. This backend has really nice properties but it's not finished yet.
    • KQueue implementation, implemented enough to prove the concept, including fixing a common bug in other async runtimes.

std.Io.Threaded has networking and file-system operations implemented.
Cancellation works beautifully, except for a known race condition that has a
couple of competing solutions already in mind.

All of std.net has been deleted in favor of std.Io.net.

std.fs has been partially updated to use std.Io - only as required so that
std.Io.Writer.sendFile could use *std.Io.File.Reader rather than
*std.fs.File.Reader.

closes #8224

Laundry List of Io Features

  • async/await - these primitives express that operations can be done
    independently, making them infallible and support execution on limited Io
    implementations that lack a concurrency mechanism.
  • concurrent - same as async except communicates that the operation
    must be done concurrently for correctness. Requires memory allocation.
  • cancel - equivalent to await except also requests the Io implementation
    to interrupt the operation and return error.Canceled. std.Io.Threaded
    supports cancellation by sending a signal to a thread, causing blocking
    syscalls to return EINTR, giving a chance to notice the cancellation request.
  • select - API for blocking on multiple futures using switch syntax
  • Group - efficiently manages many async tasks. Supports waiting for and
    cancelling all tasks in the group together.
  • Queue(T) - Many producer, many consumer, thread-safe, runtime configurable
    buffer size. When buffer is empty, consumers suspend and are resumed by
    producers. When buffer is full, producers suspend and are resumed by consumers.
    • Avoids code bloat using a type safe wrapper around TypeErasedQueue.
  • Select - for blocking on runtime-known number of tasks and handling a
    subset of them.
  • Clock, Duration, Timestamp, Timeout - type safety for units of measurement
  • Mutex, Condition - synchronization primitives

Demo

Here is an example that makes an HTTP request to a domain:

const std = @import("std");
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
const Io = std.Io;
const HostName = std.Io.net.HostName;

pub fn main() !void {
    var debug_allocator: std.heap.DebugAllocator(.{}) = .init;
    const gpa = debug_allocator.allocator();

    var threaded: std.Io.Threaded = .init(gpa);
    defer threaded.deinit();

    const io = threaded.io();

    const args = try std.process.argsAlloc(gpa);

    const host_name: HostName = try .init(args[1]);

    var http_client: std.http.Client = .{ .allocator = gpa, .io = io };
    defer http_client.deinit();

    var request = try http_client.request(.HEAD, .{
        .scheme = "http",
        .host = .{ .percent_encoded = host_name.bytes },
        .port = 80,
        .path = .{ .percent_encoded = "/" },
    }, .{});
    defer request.deinit();

    try request.sendBodiless();

    var redirect_buffer: [1024]u8 = undefined;
    var response = try request.receiveHead(&redirect_buffer);
    std.log.info("received {d} {s}", .{ response.head.status, response.head.reason });
}

Thanks to the fact that networking is now taking advantage of the new std.Io interface,
this code has the following properties:

  • It asynchronously sends out DNS queries to each configured nameserver
  • As each response comes in, it immediately, asynchronously tries to TCP connect to the
    returned IP address.
  • Upon the first successful TCP connection, all other in-flight connection
    attempts are canceled, including DNS queries.
  • The code also works when compiled with -fsingle-threaded even though the
    operations happen sequentially.
  • No heap allocation.

You can see how this is implemented in std.Io.net.HostName.connect:

pub fn connect(
    host_name: HostName,
    io: Io,
    port: u16,
    options: IpAddress.ConnectOptions,
) ConnectError!Stream {
    var connect_many_buffer: [32]ConnectManyResult = undefined;
    var connect_many_queue: Io.Queue(ConnectManyResult) = .init(&connect_many_buffer);

    var connect_many = io.async(connectMany, .{ host_name, io, port, &connect_many_queue, options });
    var saw_end = false;
    defer {
        connect_many.cancel(io);
        if (!saw_end) while (true) switch (connect_many_queue.getOneUncancelable(io)) {
            .connection => |loser| if (loser) |s| s.closeConst(io) else |_| continue,
            .end => break,
        };
    }

    var aggregate_error: ConnectError = error.UnknownHostName;

    while (connect_many_queue.getOne(io)) |result| switch (result) {
        .connection => |connection| if (connection) |stream| return stream else |err| switch (err) {
            error.SystemResources,
            error.OptionUnsupported,
            error.ProcessFdQuotaExceeded,
            error.SystemFdQuotaExceeded,
            error.Canceled,
            => |e| return e,

            error.WouldBlock => return error.Unexpected,

            else => |e| aggregate_error = e,
        },
        .end => |end| {
            saw_end = true;
            try end;
            return aggregate_error;
        },
    } else |err| switch (err) {
        error.Canceled => |e| return e,
    }
}

pub const ConnectManyResult = union(enum) {
    connection: IpAddress.ConnectError!Stream,
    end: ConnectError!void,
};

/// Asynchronously establishes a connection to all IP addresses associated with
/// a host name, adding them to a results queue upon completion.
pub fn connectMany(
    host_name: HostName,
    io: Io,
    port: u16,
    results: *Io.Queue(ConnectManyResult),
    options: IpAddress.ConnectOptions,
) void {
    var canonical_name_buffer: [max_len]u8 = undefined;
    var lookup_buffer: [32]HostName.LookupResult = undefined;
    var lookup_queue: Io.Queue(LookupResult) = .init(&lookup_buffer);

    host_name.lookup(io, &lookup_queue, .{
        .port = port,
        .canonical_name_buffer = &canonical_name_buffer,
    });

    var group: Io.Group = .init;

    while (lookup_queue.getOne(io)) |dns_result| switch (dns_result) {
        .address => |address| group.async(io, enqueueConnection, .{ address, io, results, options }),
        .canonical_name => continue,
        .end => |lookup_result| {
            group.waitUncancelable(io);
            results.putOneUncancelable(io, .{ .end = lookup_result });
            return;
        },
    } else |err| switch (err) {
        error.Canceled => |e| {
            group.cancel(io);
            results.putOneUncancelable(io, .{ .end = e });
        },
    }
}

Upgrade Guide

Missing io Parameter

If you need an io parameter, and you don't have one, you can get one like this:

var threaded: Io.Threaded = .init_single_threaded;
const io = threaded.io();

This is legal as long as these functions are not called:

  • Io.VTable.concurrent

This is a non-ideal workaround - like reaching for std.heap.page_allocator when
you need an Allocator and do not have one. Instead, it is better to accept an
Io parameter if you need one (or store one on a context struct for convenience).
Point is that the application's main function should generally be responsible for
constructing the Io instance used throughout.

When you're testing you can use std.testing.io (much like std.testing.allocator).

How to use async/await

Use this pattern to avoid resource leaks and handle cancellation gracefully:

var foo_future = io.async(foo, .{args});
defer if (foo_future.cancel(io)) |resource| resource.deinit() else |_| {}

var bar_future = io.async(bar, .{args});
defer if (bar_future.cancel(io)) |resource| resource.deinit() else |_| {}

const foo_result = try foo_future.await(io);
const bar_result = try bar_future.await(io);

If the foo or bar function does not return a resource that must be freed, then the if can be simplified to _ = foo() catch {}, and if the function returns void, then the discard can also be removed. The cancel is necessary however because it releases the async task resource when errors (including error.Canceled) are returned.

Related

Followup Issues

@andrewrk andrewrk added breaking Implementing this issue could cause existing code to no longer compile or have different behavior. release notes This PR should be mentioned in the release notes. labels Oct 16, 2025
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (*Group, context: *const anyopaque) void,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason for passing the *Group as a separate parameter to start (which is not needed by the majority of call sites) instead of just having Select(...).async add it to the args in context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I'm not understanding the suggestion, but that kind of sounds like a pain in the ass and unclear that it will generate better code

lib/std/Io.zig Outdated

/// `s` is a struct with every field a `*Future(T)`, where `T` can be any type,
/// and can be different for each field.
pub fn select(io: Io, s: anytype) SelectUnion(@TypeOf(s)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn select(io: Io, s: anytype) SelectUnion(@TypeOf(s)) {
pub fn select(io: Io, s: anytype) Cancelable!SelectUnion(@TypeOf(s)) {

The doc comment for select should also probably be updated to describe when error.Canceled is returned. Based on my own understanding of how select should behave, maybe something like /// Returns `error.Canceled` if all futures in `s` are canceled. would work?

@andrewrk andrewrk force-pushed the init-std.Io branch 2 times, most recently from 9c20f4f to 477c939 Compare October 18, 2025 12:28
@ValorZard
Copy link

What happened to std.Io.AsyncDetached/goroutines? I remember that being a core feature back when this design was first proposed.

@rpkak

This comment was marked as resolved.

@andrewrk
Copy link
Member Author

What happened to std.Io.AsyncDetached/goroutines? I remember that being a core feature back when this design was first proposed.

Io.Group.async handles that use case. It's better because then you have a way to cancel and wait for those tasks, and you can get the equivalent "detached" properties by having a global instance of Group. With asyncDetached/goroutines you leak the resource. So Io.Group is a more powerful generalization.

@ValorZard
Copy link

ValorZard commented Oct 19, 2025

What happened to std.Io.AsyncDetached/goroutines? I remember that being a core feature back when this design was first proposed.

Io.Group.async handles that use case. It's better because then you have a way to cancel and wait for those tasks, and you can get the equivalent "detached" properties by having a global instance of Group. With asyncDetached/goroutines you leak the resource. So Io.Group is a more powerful generalization.

So what would be the equivalent Zig code for

package main

import (
	"fmt"
)

func fibonacci(n int, c chan int) {
	x, y := 0, 1
	for i := 0; i < n; i++ {
		c <- x
		x, y = y, x+y
	}
	close(c)
}

func main() {
	c := make(chan int, 10)
	go fibonacci(cap(c), c)
	for i := range c {
		fmt.Println(i)
	}
}

(Taken from: https://go.dev/tour/concurrency/4 )

@mlugg
Copy link
Member

mlugg commented Oct 19, 2025

@ValorZard that's actually a great example of a flaw with go. In that code, you're handing management of the fibonacci coroutine's lifetime off to the Go runtime. Here's the thing, though: why are you doing that? You know the lifetime of that coroutine; it finishes precisely when main does. Handing that task off to the runtime is only making it do completely avoidable (though admittedly trivial, at least in this case) work.

So, a "direct" port of your code would look like this:

//! This example omits the creation of the `Io` instance for brevity.

fn fibonacci(n: usize, q: *Io.Queue(u64), io: Io) !void {
    var x: u64 = 0;
    var y: u64 = 1;
    for (0..n) |_| {
        try q.putOne(x);
        const new = x + y;
        x = y;
        y = new;
    }
}

pub fn main(io: Io) !void {
    // This is the "detach" group; you run tasks in this group if you just want them to clean
    // themselves up upon completion. There is just one of these across the entire application.
    var group: Io.Group = .init;
    errdefer _ = group.cancel();
    try mainInner(io, &group));
    try group.wait();
}
fn mainInner(io: Io, group: *Io.Group) !void {
    const n = 10;

    var q_buf: [n]u64 = undefinedl
    var q: Io.Queue(u64) = .init(&q_buf);
    
    try group.concurrent(io, fibonacci, .{ n, &q, io });
    
    for (0..n) |_| {
        std.debug.print("{d}\n", .{try q.getOne(io)});
    }
}

But looking at that, the redundant work I was talking about becomes blindingly obvious: I mean, we're making a group with only one task, and we obviously know when it finishes! So, we could rewrite it as follows:

fn fibonacci(n: usize, q: *Io.Queue(u64), io: Io) !void {
    var x: u64 = 0;
    var y: u64 = 1;
    for (0..n) |_| {
        try q.putOne(x);
        const new = x + y;
        x = y;
        y = new;
    }
}

pub fn main(io: Io) !void {
    const n = 10;

    var q_buf: [n]u64 = undefinedl
    var q: Io.Queue(u64) = .init(&q_buf);
    
    const fib = try io.concurrent(fibonacci, .{ n, &q, io });
    defer fib.cancel() catch {};
    
    for (0..n) |_| {
        std.debug.print("{d}\n", .{try q.getOne(io)});
    }
}

This code is strictly better---it's logically simpler, with obvious lifetimes which both helps you read the code and helps the Io implementation to manage resources more efficiently. In Go, writing the code anything like this would probably seem a bit odd; in Zig, the optimal solution is the most natural one.

@andrewrk
Copy link
Member Author

Disabling wasm32 backend coverage for test-cases due to this failure:

andy@Andrews-MacBook-Pro build-release % stage4/bin/zig test ../test/cases/fn_typeinfo_passed_to_comptime_fn.zig -target wasm32-wasi -fno-llvm
Compiler crash context:
Generating function 'Io.TypeErasedQueue.putLocked'

thread 10348055 panic: reached unreachable code
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:1034:18: 0x10d7ebafb in emitWValue (zig)
        .dead => unreachable, // reference to free'd `WValue` (missing reuseOperand?)
                 ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:2483:22: 0x10db7c57f in load (zig)
    try cg.emitWValue(operand);
                     ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:2450:39: 0x10d807473 in airLoad (zig)
            const loaded = try cg.load(operand, ty, 0);
                                      ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:1899:28: 0x10d43c39f in genInst (zig)
        .load => cg.airLoad(inst),
                           ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:2045:23: 0x10cfc9d43 in genBody (zig)
        try cg.genInst(inst);
                      ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:3354:19: 0x10db856bf in lowerBlock (zig)
    try cg.genBody(body);
                  ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:3335:22: 0x10d8003f3 in airBlock (zig)
    try cg.lowerBlock(inst, ty_pl.ty.toType(), @ptrCast(cg.air.extra.items[extra.end..][0..extra.data.body_len]));
                     ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:1858:30: 0x10d43bfa7 in genInst (zig)
        .block => cg.airBlock(inst),
                             ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:2045:23: 0x10cfc9d43 in genBody (zig)
        try cg.genInst(inst);
                      ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:3354:19: 0x10db856bf in lowerBlock (zig)
    try cg.genBody(body);
                  ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:3335:22: 0x10d8003f3 in airBlock (zig)
    try cg.lowerBlock(inst, ty_pl.ty.toType(), @ptrCast(cg.air.extra.items[extra.end..][0..extra.data.body_len]));
                     ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:1858:30: 0x10d43bfa7 in genInst (zig)
        .block => cg.airBlock(inst),
                             ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:2045:23: 0x10cfc9d43 in genBody (zig)
        try cg.genInst(inst);
                      ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:3390:19: 0x10d807ccb in airLoop (zig)
    try cg.genBody(body);
                  ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:1900:28: 0x10d43c3b7 in genInst (zig)
        .loop => cg.airLoop(inst),
                           ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:2045:23: 0x10cfc9d43 in genBody (zig)
        try cg.genInst(inst);
                      ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:1240:19: 0x10caeaaab in generateInner (zig)
    try cg.genBody(cg.air.getMainBody());
                  ^
/Users/andy/dev/zig/src/codegen/wasm/CodeGen.zig:1221:25: 0x10c4ed153 in generate (zig)
    return generateInner(&code_gen, any_returns) catch |err| switch (err) {
                        ^
/Users/andy/dev/zig/src/codegen.zig:163:45: 0x10bf461c7 in generateFunction (zig)
            const mir = try CodeGen.generate(lf, pt, src_loc, func_index, air, liveness);
                                            ^
/Users/andy/dev/zig/src/Zcu/PerThread.zig:4522:36: 0x10bbcda43 in runCodegenInner (zig)
    return codegen.generateFunction(lf, pt, zcu.navSrcLoc(nav), func_index, air, &liveness) catch |err| switch (err) {
                                   ^
/Users/andy/dev/zig/src/Zcu/PerThread.zig:4400:46: 0x10b9b6a9f in runCodegen (zig)
    const success: bool = if (runCodegenInner(pt, func_index, air)) |mir| success: {
                                             ^
/Users/andy/dev/zig/src/Compilation.zig:5906:18: 0x10bbcb613 in workerZcuCodegen (zig)
    pt.runCodegen(func_index, &air, out);
                 ^
/Users/andy/dev/zig/lib/std/Thread/Pool.zig:180:50: 0x10bbcb7ab in runFn (zig)
            @call(.auto, func, .{id.?} ++ closure.arguments);
                                                 ^
/Users/andy/dev/zig/lib/std/Thread/Pool.zig:293:27: 0x10b961f7f in worker (zig)
            runnable.runFn(runnable, id);
                          ^
/Users/andy/dev/zig/lib/std/Thread.zig:558:13: 0x10b860f23 in callFn__anon_192763 (zig)
            @call(.auto, f, args);
            ^
/Users/andy/dev/zig/lib/std/Thread.zig:830:30: 0x10b78d6bb in entryFn (zig)
                return callFn(f, args_ptr.*);
                             ^
???:?:?: 0x191a03bc7 in __pthread_cond_wait (/usr/lib/system/libsystem_pthread.dylib)
???:?:?: 0x1919feb7f in _pthread_cond_broadcast (/usr/lib/system/libsystem_pthread.dylib)
zsh: abort      stage4/bin/zig test ../test/cases/fn_typeinfo_passed_to_comptime_fn.zig

Coverage can be reinstated when wasm backend is further along. cc @pavelverigo

@andrewrk andrewrk disabled auto-merge October 29, 2025 20:51
@andrewrk andrewrk merged commit a072d82 into master Oct 29, 2025
@andrewrk andrewrk deleted the init-std.Io branch October 29, 2025 20:51
@MrKrot792
Copy link

Yay! New IO!

@He-Pin
Copy link

He-Pin commented Oct 30, 2025

This needs a dedicated design page on zig website.

Tomcat-42 added a commit to Tomcat-42/known-folders that referenced this pull request Oct 31, 2025
Introduced in ziglang/zig#25592

Signed-off-by: Pablo Alessandro Santos Hugen <phugen@redhat.com>
chrboesch pushed a commit to ziglings-org/exercises that referenced this pull request Nov 1, 2025
`test/tests.zig` fails after ziglang/zig#25592 was
merged in. This just ensures that Io is passed, it might not be the
ideal solution.
`build.zig` was also failing due to new color parameter.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

breaking Implementing this issue could cause existing code to no longer compile or have different behavior. release notes This PR should be mentioned in the release notes.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Proposal: Event loop redesign