-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathio_windows.cpp
273 lines (217 loc) · 8.24 KB
/
io_windows.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
/**
* @author github.com/luncliff (luncliff@gmail.com)
*/
#include <cassert>
#include <chrono>
#include <coroutine/net.h>
using namespace std;
using namespace gsl;
namespace coro {
void poll_net_tasks(uint64_t nano) noexcept(false) {
using namespace std::chrono;
const auto ms = duration_cast<milliseconds>(nanoseconds{nano});
SleepEx(ms.count(), true);
}
bool is_async_pending(int ec) noexcept {
switch (ec) {
case WSAEWOULDBLOCK:
case EWOULDBLOCK:
case EINPROGRESS:
case ERROR_IO_PENDING:
return true;
default:
return false;
}
}
GSL_SUPPRESS(type .1)
GSL_SUPPRESS(f .6)
void CALLBACK on_io_done(DWORD errc, DWORD sz, LPWSAOVERLAPPED pover,
DWORD flags) noexcept {
UNREFERENCED_PARAMETER(flags);
io_work_t* work = reinterpret_cast<io_work_t*>(pover);
// Mostly, `Internal` and `InternalHigh` holds exactly same value with the parameters.
// So these assignments are redundant. Here, we are just making sure of it.
work->Internal = errc; // -> return of `work.error()`
work->InternalHigh = sz; // -> return of `work.resume()`
assert(static_cast<bool>(work->task));
work->task.resume();
}
bool io_work_t::ready() const noexcept {
return false; // always trigger `await_suspend` in Windows API
}
uint32_t get_io_error(const OVERLAPPED* target) noexcept {
return gsl::narrow_cast<uint32_t>(target->Internal);
}
int64_t get_io_length(const OVERLAPPED* target) noexcept {
return gsl::narrow_cast<int64_t>(target->InternalHigh);
}
// see also: `on_io_done`
uint32_t io_work_t::error() const noexcept {
return get_io_error(this);
}
// zero memory the `OVERLAPPED` part in the `io_work_t`
auto zero_overlapped(io_control_block* work) noexcept -> io_control_block* {
*work = OVERLAPPED{};
return work;
}
GSL_SUPPRESS(type .1)
GSL_SUPPRESS(f .4) // for clang, this is not constexpr function.
auto make_wsa_buf(io_buffer_t v) noexcept -> WSABUF {
WSABUF buf{}; // expect NRVO
buf.buf = reinterpret_cast<char*>(v.data());
buf.len = gsl::narrow_cast<ULONG>(v.size_bytes());
return buf;
}
static_assert(sizeof(SOCKET) <= sizeof(uint64_t));
static_assert(sizeof(HANDLE) == sizeof(SOCKET));
GSL_SUPPRESS(type .1)
GSL_SUPPRESS(type .3)
auto send_to(uint64_t sd, const sockaddr_in6& remote, io_buffer_t buffer,
io_work_t& work) noexcept(false) -> io_send_to& {
work.hEvent = reinterpret_cast<HANDLE>(sd);
work.Pointer = reinterpret_cast<sockaddr*>(
const_cast<sockaddr_in6*>(addressof(remote)));
work.buffer = buffer;
work.Internal = DWORD{0}; // flag
work.InternalHigh = sizeof(sockaddr_in6);
// `co_await` operator will use reference to `io_send_to`
return *reinterpret_cast<io_send_to*>(addressof(work));
}
GSL_SUPPRESS(type .1)
GSL_SUPPRESS(type .3)
auto send_to(uint64_t sd, const sockaddr_in& remote, io_buffer_t buffer,
io_work_t& work) noexcept(false) -> io_send_to& {
work.hEvent = reinterpret_cast<HANDLE>(sd);
work.Pointer = reinterpret_cast<sockaddr*>(
const_cast<sockaddr_in*>(addressof(remote)));
work.buffer = buffer;
work.Internal = DWORD{0}; // flag
work.InternalHigh = sizeof(sockaddr_in);
// `co_await` operator will use reference to `io_send_to`
return *reinterpret_cast<io_send_to*>(addressof(work));
}
GSL_SUPPRESS(type .1)
GSL_SUPPRESS(bounds .3)
void io_send_to::suspend(coroutine_handle<void> t) noexcept(false) {
task = t; // coroutine will be resumed in overlapped callback
const auto sd = reinterpret_cast<SOCKET>(hEvent);
const auto addr = reinterpret_cast<sockaddr*>(Pointer);
const auto addrlen = gsl::narrow_cast<socklen_t>(InternalHigh);
const auto flag = gsl::narrow_cast<DWORD>(Internal);
WSABUF bufs[1] = {make_wsa_buf(buffer)};
OVERLAPPED* p = zero_overlapped(this);
if (::WSASendTo(sd, bufs, 1, nullptr, flag, //
addr, addrlen, //
p, on_io_done) == NO_ERROR)
return;
if (const auto ec = WSAGetLastError()) {
if (is_async_pending(ec))
return;
throw system_error{ec, system_category(), "WSASendTo"};
}
}
int64_t io_send_to::resume() noexcept {
return get_io_length(this);
}
GSL_SUPPRESS(type .1)
auto recv_from(uint64_t sd, sockaddr_in6& remote, io_buffer_t buffer,
io_work_t& work) noexcept(false) -> io_recv_from& {
work.hEvent = reinterpret_cast<HANDLE>(sd);
work.Pointer = reinterpret_cast<sockaddr*>(addressof(remote));
work.buffer = buffer;
work.Internal = DWORD{0}; // flag
work.InternalHigh = sizeof(sockaddr_in6);
// `co_await` operator will use reference to `io_recv_from`
return *reinterpret_cast<io_recv_from*>(addressof(work));
}
GSL_SUPPRESS(type .1)
auto recv_from(uint64_t sd, sockaddr_in& remote, io_buffer_t buffer,
io_work_t& work) noexcept(false) -> io_recv_from& {
work.hEvent = reinterpret_cast<HANDLE>(sd);
work.Pointer = reinterpret_cast<sockaddr*>(addressof(remote));
work.buffer = buffer;
work.Internal = DWORD{0}; // flag
work.InternalHigh = sizeof(sockaddr_in);
// `co_await` operator will use reference to `io_recv_from`
return *reinterpret_cast<io_recv_from*>(addressof(work));
}
GSL_SUPPRESS(type .1)
GSL_SUPPRESS(bounds .3)
void io_recv_from::suspend(coroutine_handle<void> t) noexcept(false) {
task = t; // coroutine will be resumed in overlapped callback
const auto sd = reinterpret_cast<SOCKET>(hEvent);
const auto addr = reinterpret_cast<sockaddr*>(Pointer);
auto addrlen = gsl::narrow_cast<socklen_t>(InternalHigh);
auto flag = gsl::narrow_cast<DWORD>(Internal);
WSABUF bufs[1] = {make_wsa_buf(buffer)};
OVERLAPPED* p = zero_overlapped(this);
if (::WSARecvFrom(sd, bufs, 1, nullptr, &flag, //
addr, &addrlen, //
p, on_io_done) == NO_ERROR)
return;
if (const auto ec = WSAGetLastError()) {
if (is_async_pending(ec))
return;
throw system_error{ec, system_category(), "WSARecvFrom"};
}
}
int64_t io_recv_from::resume() noexcept {
return get_io_length(this);
}
GSL_SUPPRESS(type .1)
auto send_stream(uint64_t sd, io_buffer_t buffer, uint32_t flag,
io_work_t& work) noexcept(false) -> io_send& {
work.hEvent = reinterpret_cast<HANDLE>(sd);
work.buffer = buffer;
work.Internal = flag;
// `co_await` operator will use reference to `io_send`
return *reinterpret_cast<io_send*>(addressof(work));
}
GSL_SUPPRESS(type .1)
GSL_SUPPRESS(bounds .3)
void io_send::suspend(coroutine_handle<void> t) noexcept(false) {
task = t; // coroutine will be resumed in overlapped callback
const auto sd = reinterpret_cast<SOCKET>(hEvent);
const auto flag = gsl::narrow_cast<DWORD>(Internal);
WSABUF bufs[1] = {make_wsa_buf(buffer)};
if (::WSASend(sd, bufs, 1, nullptr, flag, //
zero_overlapped(this), on_io_done) == NO_ERROR)
return;
if (const auto ec = WSAGetLastError()) {
if (is_async_pending(ec))
return;
throw system_error{ec, system_category(), "WSASend"};
}
}
int64_t io_send::resume() noexcept {
return get_io_length(this);
}
GSL_SUPPRESS(type .1)
auto recv_stream(uint64_t sd, io_buffer_t buffer, uint32_t flag,
io_work_t& work) noexcept(false) -> io_recv& {
work.hEvent = reinterpret_cast<HANDLE>(sd);
work.buffer = buffer;
work.Internal = flag;
// `co_await` operator will use reference to `io_recv`
return *reinterpret_cast<io_recv*>(addressof(work));
}
GSL_SUPPRESS(type .1)
GSL_SUPPRESS(bounds .3)
void io_recv::suspend(coroutine_handle<void> t) noexcept(false) {
task = t; // coroutine will be resumed in overlapped callback
const auto sd = reinterpret_cast<SOCKET>(hEvent);
auto flag = gsl::narrow_cast<DWORD>(Internal);
WSABUF bufs[1] = {make_wsa_buf(buffer)};
if (::WSARecv(sd, bufs, 1, nullptr, &flag, //
zero_overlapped(this), on_io_done) == NO_ERROR)
return;
if (const auto ec = WSAGetLastError()) {
if (is_async_pending(ec))
return;
throw system_error{ec, system_category(), "WSARecv"};
}
}
int64_t io_recv::resume() noexcept {
return get_io_length(this);
}
} // namespace coro