Skip to content

Commit 559eaa3

Browse files
committed
io: Correctly align async closure contexts
This fixes package fetching on Windows. Previously, `Async/GroupClosure` allocations were only aligned for the closure struct type, which resulted in panics when `context_alignment` (or `result_alignment` for that matter) had a greater alignment.
1 parent adcc7cd commit 559eaa3

File tree

1 file changed

+106
-85
lines changed

1 file changed

+106
-85
lines changed

lib/std/Io/Threaded.zig

Lines changed: 106 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -387,8 +387,9 @@ const AsyncClosure = struct {
387387
func: *const fn (context: *anyopaque, result: *anyopaque) void,
388388
reset_event: ResetEvent,
389389
select_condition: ?*ResetEvent,
390-
context_alignment: std.mem.Alignment,
390+
context_offset: usize,
391391
result_offset: usize,
392+
alloc_len: usize,
392393

393394
const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent));
394395

@@ -425,18 +426,57 @@ const AsyncClosure = struct {
425426

426427
fn contextPointer(ac: *AsyncClosure) [*]u8 {
427428
const base: [*]u8 = @ptrCast(ac);
428-
return base + ac.context_alignment.forward(@sizeOf(AsyncClosure));
429+
return base + ac.context_offset;
430+
}
431+
432+
fn init(
433+
gpa: Allocator,
434+
mode: enum { async, concurrent },
435+
result_len: usize,
436+
result_alignment: std.mem.Alignment,
437+
context: []const u8,
438+
context_alignment: std.mem.Alignment,
439+
func: *const fn (context: *const anyopaque, result: *anyopaque) void,
440+
) Allocator.Error!*AsyncClosure {
441+
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(AsyncClosure);
442+
const worst_case_context_offset = context_alignment.forward(@sizeOf(AsyncClosure) + max_context_misalignment);
443+
const worst_case_result_offset = result_alignment.forward(worst_case_context_offset + context.len);
444+
const alloc_len = worst_case_result_offset + result_len;
445+
446+
const ac: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), alloc_len)));
447+
errdefer comptime unreachable;
448+
449+
const actual_context_offset = context_alignment.forward(@intFromPtr(ac) + @sizeOf(AsyncClosure)) - @intFromPtr(ac);
450+
const actual_result_offset = result_alignment.forward(actual_context_offset + context.len);
451+
ac.* = .{
452+
.closure = .{
453+
.cancel_tid = .none,
454+
.start = start,
455+
.is_concurrent = switch (mode) {
456+
.async => false,
457+
.concurrent => true,
458+
},
459+
},
460+
.func = func,
461+
.context_offset = actual_context_offset,
462+
.result_offset = actual_result_offset,
463+
.alloc_len = alloc_len,
464+
.reset_event = .unset,
465+
.select_condition = null,
466+
};
467+
@memcpy(ac.contextPointer()[0..context.len], context);
468+
return ac;
429469
}
430470

431-
fn waitAndFree(ac: *AsyncClosure, gpa: Allocator, result: []u8) void {
471+
fn waitAndDeinit(ac: *AsyncClosure, gpa: Allocator, result: []u8) void {
432472
ac.reset_event.waitUncancelable();
433473
@memcpy(result, ac.resultPointer()[0..result.len]);
434-
free(ac, gpa, result.len);
474+
ac.deinit(gpa);
435475
}
436476

437-
fn free(ac: *AsyncClosure, gpa: Allocator, result_len: usize) void {
477+
fn deinit(ac: *AsyncClosure, gpa: Allocator) void {
438478
const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(ac);
439-
gpa.free(base[0 .. ac.result_offset + result_len]);
479+
gpa.free(base[0..ac.alloc_len]);
440480
}
441481
};
442482

@@ -452,44 +492,28 @@ fn async(
452492
start(context.ptr, result.ptr);
453493
return null;
454494
}
495+
455496
const t: *Threaded = @ptrCast(@alignCast(userdata));
456497
const cpu_count = t.cpu_count catch {
457498
return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
458499
start(context.ptr, result.ptr);
459500
return null;
460501
};
461502
};
503+
462504
const gpa = t.allocator;
463-
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
464-
const result_offset = result_alignment.forward(context_offset + context.len);
465-
const n = result_offset + result.len;
466-
const ac: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch {
505+
const ac = AsyncClosure.init(gpa, .async, result.len, result_alignment, context, context_alignment, start) catch {
467506
start(context.ptr, result.ptr);
468507
return null;
469-
}));
470-
471-
ac.* = .{
472-
.closure = .{
473-
.cancel_tid = .none,
474-
.start = AsyncClosure.start,
475-
.is_concurrent = false,
476-
},
477-
.func = start,
478-
.context_alignment = context_alignment,
479-
.result_offset = result_offset,
480-
.reset_event = .unset,
481-
.select_condition = null,
482508
};
483509

484-
@memcpy(ac.contextPointer()[0..context.len], context);
485-
486510
t.mutex.lock();
487511

488512
const thread_capacity = cpu_count - 1 + t.concurrent_count;
489513

490514
t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
491515
t.mutex.unlock();
492-
ac.free(gpa, result.len);
516+
ac.deinit(gpa);
493517
start(context.ptr, result.ptr);
494518
return null;
495519
};
@@ -501,7 +525,7 @@ fn async(
501525
if (t.threads.items.len == 0) {
502526
assert(t.run_queue.popFirst() == &ac.closure.node);
503527
t.mutex.unlock();
504-
ac.free(gpa, result.len);
528+
ac.deinit(gpa);
505529
start(context.ptr, result.ptr);
506530
return null;
507531
}
@@ -530,27 +554,11 @@ fn concurrent(
530554

531555
const t: *Threaded = @ptrCast(@alignCast(userdata));
532556
const cpu_count = t.cpu_count catch 1;
557+
533558
const gpa = t.allocator;
534-
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
535-
const result_offset = result_alignment.forward(context_offset + context.len);
536-
const n = result_offset + result_len;
537-
const ac_bytes = gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch
559+
const ac = AsyncClosure.init(gpa, .concurrent, result_len, result_alignment, context, context_alignment, start) catch {
538560
return error.ConcurrencyUnavailable;
539-
const ac: *AsyncClosure = @ptrCast(@alignCast(ac_bytes));
540-
541-
ac.* = .{
542-
.closure = .{
543-
.cancel_tid = .none,
544-
.start = AsyncClosure.start,
545-
.is_concurrent = true,
546-
},
547-
.func = start,
548-
.context_alignment = context_alignment,
549-
.result_offset = result_offset,
550-
.reset_event = .unset,
551-
.select_condition = null,
552561
};
553-
@memcpy(ac.contextPointer()[0..context.len], context);
554562

555563
t.mutex.lock();
556564

@@ -559,7 +567,7 @@ fn concurrent(
559567

560568
t.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
561569
t.mutex.unlock();
562-
ac.free(gpa, result_len);
570+
ac.deinit(gpa);
563571
return error.ConcurrencyUnavailable;
564572
};
565573

@@ -569,7 +577,7 @@ fn concurrent(
569577
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
570578
assert(t.run_queue.popFirst() == &ac.closure.node);
571579
t.mutex.unlock();
572-
ac.free(gpa, result_len);
580+
ac.deinit(gpa);
573581
return error.ConcurrencyUnavailable;
574582
};
575583
t.threads.appendAssumeCapacity(thread);
@@ -587,8 +595,8 @@ const GroupClosure = struct {
587595
/// Points to sibling `GroupClosure`. Used for walking the group to cancel all.
588596
node: std.SinglyLinkedList.Node,
589597
func: *const fn (*Io.Group, context: *anyopaque) void,
590-
context_alignment: std.mem.Alignment,
591-
context_len: usize,
598+
context_offset: usize,
599+
alloc_len: usize,
592600

593601
fn start(closure: *Closure) void {
594602
const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure));
@@ -616,22 +624,48 @@ const GroupClosure = struct {
616624
if (prev_state == (sync_one_pending | sync_is_waiting)) reset_event.set();
617625
}
618626

619-
fn free(gc: *GroupClosure, gpa: Allocator) void {
620-
const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(gc);
621-
gpa.free(base[0..contextEnd(gc.context_alignment, gc.context_len)]);
622-
}
623-
624-
fn contextOffset(context_alignment: std.mem.Alignment) usize {
625-
return context_alignment.forward(@sizeOf(GroupClosure));
626-
}
627-
628-
fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize {
629-
return contextOffset(context_alignment) + context_len;
630-
}
631-
632627
fn contextPointer(gc: *GroupClosure) [*]u8 {
633628
const base: [*]u8 = @ptrCast(gc);
634-
return base + contextOffset(gc.context_alignment);
629+
return base + gc.context_offset;
630+
}
631+
632+
/// Does not initialize the `node` field.
633+
fn init(
634+
gpa: Allocator,
635+
t: *Threaded,
636+
group: *Io.Group,
637+
context: []const u8,
638+
context_alignment: std.mem.Alignment,
639+
func: *const fn (*Io.Group, context: *const anyopaque) void,
640+
) Allocator.Error!*GroupClosure {
641+
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure);
642+
const worst_case_context_offset = context_alignment.forward(@sizeOf(GroupClosure) + max_context_misalignment);
643+
const alloc_len = worst_case_context_offset + context.len;
644+
645+
const gc: *GroupClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(GroupClosure), alloc_len)));
646+
errdefer comptime unreachable;
647+
648+
const actual_context_offset = context_alignment.forward(@intFromPtr(gc) + @sizeOf(GroupClosure)) - @intFromPtr(gc);
649+
gc.* = .{
650+
.closure = .{
651+
.cancel_tid = .none,
652+
.start = start,
653+
.is_concurrent = false,
654+
},
655+
.t = t,
656+
.group = group,
657+
.node = undefined,
658+
.func = func,
659+
.context_offset = actual_context_offset,
660+
.alloc_len = alloc_len,
661+
};
662+
@memcpy(gc.contextPointer()[0..context.len], context);
663+
return gc;
664+
}
665+
666+
fn deinit(gc: *GroupClosure, gpa: Allocator) void {
667+
const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(gc);
668+
gpa.free(base[0..gc.alloc_len]);
635669
}
636670

637671
const sync_is_waiting: usize = 1 << 0;
@@ -646,27 +680,14 @@ fn groupAsync(
646680
start: *const fn (*Io.Group, context: *const anyopaque) void,
647681
) void {
648682
if (builtin.single_threaded) return start(group, context.ptr);
683+
649684
const t: *Threaded = @ptrCast(@alignCast(userdata));
650685
const cpu_count = t.cpu_count catch 1;
686+
651687
const gpa = t.allocator;
652-
const n = GroupClosure.contextEnd(context_alignment, context.len);
653-
const gc: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch {
688+
const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch {
654689
return start(group, context.ptr);
655-
}));
656-
gc.* = .{
657-
.closure = .{
658-
.cancel_tid = .none,
659-
.start = GroupClosure.start,
660-
.is_concurrent = false,
661-
},
662-
.t = t,
663-
.group = group,
664-
.node = undefined,
665-
.func = start,
666-
.context_alignment = context_alignment,
667-
.context_len = context.len,
668690
};
669-
@memcpy(gc.contextPointer()[0..context.len], context);
670691

671692
t.mutex.lock();
672693

@@ -678,7 +699,7 @@ fn groupAsync(
678699

679700
t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
680701
t.mutex.unlock();
681-
gc.free(gpa);
702+
gc.deinit(gpa);
682703
return start(group, context.ptr);
683704
};
684705

@@ -688,7 +709,7 @@ fn groupAsync(
688709
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
689710
assert(t.run_queue.popFirst() == &gc.closure.node);
690711
t.mutex.unlock();
691-
gc.free(gpa);
712+
gc.deinit(gpa);
692713
return start(group, context.ptr);
693714
};
694715
t.threads.appendAssumeCapacity(thread);
@@ -730,7 +751,7 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
730751
while (true) {
731752
const gc: *GroupClosure = @fieldParentPtr("node", node);
732753
const node_next = node.next;
733-
gc.free(gpa);
754+
gc.deinit(gpa);
734755
node = node_next orelse break;
735756
}
736757
}
@@ -761,7 +782,7 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void
761782
while (true) {
762783
const gc: *GroupClosure = @fieldParentPtr("node", node);
763784
const node_next = node.next;
764-
gc.free(gpa);
785+
gc.deinit(gpa);
765786
node = node_next orelse break;
766787
}
767788
}
@@ -776,7 +797,7 @@ fn await(
776797
_ = result_alignment;
777798
const t: *Threaded = @ptrCast(@alignCast(userdata));
778799
const closure: *AsyncClosure = @ptrCast(@alignCast(any_future));
779-
closure.waitAndFree(t.allocator, result);
800+
closure.waitAndDeinit(t.allocator, result);
780801
}
781802

782803
fn cancel(
@@ -789,7 +810,7 @@ fn cancel(
789810
const t: *Threaded = @ptrCast(@alignCast(userdata));
790811
const ac: *AsyncClosure = @ptrCast(@alignCast(any_future));
791812
ac.closure.requestCancel();
792-
ac.waitAndFree(t.allocator, result);
813+
ac.waitAndDeinit(t.allocator, result);
793814
}
794815

795816
fn cancelRequested(userdata: ?*anyopaque) bool {

0 commit comments

Comments
 (0)