Skip to content

Commit 11c10b4

Browse files
committed
os/windows/named-pipe: New accept() method, formatting and small refactor
The new accept() method supports async_op and allows you to have a fully callback based named pipe, advancing state whenever the callback is run.
1 parent c1e39f4 commit 11c10b4

File tree

2 files changed

+62
-41
lines changed

2 files changed

+62
-41
lines changed

source/os/windows/named-pipe.cpp

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,13 @@ inline void open_logic(HANDLE &handle, std::wstring name, os::windows::pipe_read
112112
throw std::runtime_error(msg.data());
113113
}
114114

115-
DWORD pipe_read_mode = 0;
115+
DWORD pipe_read_mode = PIPE_WAIT;
116116
switch (mode) {
117117
case os::windows::pipe_read_mode::Message:
118-
pipe_read_mode = PIPE_READMODE_MESSAGE;
118+
pipe_read_mode |= PIPE_READMODE_MESSAGE;
119119
break;
120120
default:
121-
pipe_read_mode = PIPE_READMODE_BYTE;
121+
pipe_read_mode |= PIPE_READMODE_BYTE;
122122
break;
123123
}
124124

@@ -255,52 +255,58 @@ os::error os::windows::named_pipe::write(std::unique_ptr<os::windows::async_requ
255255

256256
os::error os::windows::named_pipe::read(char *buffer, size_t buffer_length, std::shared_ptr<os::async_op> &op,
257257
os::async_op_cb_t cb) {
258+
os::error ec;
259+
258260
std::shared_ptr<os::windows::async_request> ar = std::static_pointer_cast<os::windows::async_request>(op);
259261
if (!ar) {
260262
ar = std::make_shared<os::windows::async_request>();
261263
}
264+
op = std::static_pointer_cast<os::async_op>(ar);
262265
ar->set_callback(cb);
263266
ar->set_handle(handle);
264267

265268
SetLastError(ERROR_SUCCESS);
266-
if (!ReadFileEx(handle, buffer, DWORD(buffer_length), ar->get_overlapped_pointer(),
267-
os::windows::async_request::completion_routine)
268-
|| (GetLastError() != ERROR_SUCCESS)) {
269-
os::error error = utility::translate_error(GetLastError());
270-
if (error != os::error::Success) {
271-
ar->cancel();
272-
}
273-
return error;
269+
BOOL suc = ReadFileEx(handle, buffer, DWORD(buffer_length), ar->get_overlapped_pointer(),
270+
os::windows::async_request::completion_routine);
271+
DWORD error = GetLastError();
272+
ec = utility::translate_error(error);
273+
274+
if (suc == 0) {
275+
ar->call_callback(ec, buffer_length);
276+
ar->cancel();
277+
return ec;
274278
}
275279

276280
ar->set_valid(true);
277-
op = std::static_pointer_cast<os::async_op>(ar);
278-
return os::error::Success;
281+
return ec;
279282
}
280283

281284
os::error os::windows::named_pipe::write(const char *buffer, size_t buffer_length, std::shared_ptr<os::async_op> &op,
282285
os::async_op_cb_t cb) {
286+
os::error ec;
287+
283288
std::shared_ptr<os::windows::async_request> ar = std::static_pointer_cast<os::windows::async_request>(op);
284289
if (!ar) {
285290
ar = std::make_shared<os::windows::async_request>();
286291
}
292+
op = std::static_pointer_cast<os::async_op>(ar);
287293
ar->set_callback(cb);
288294
ar->set_handle(handle);
289295

290296
SetLastError(ERROR_SUCCESS);
291-
if (!WriteFileEx(handle, buffer, DWORD(buffer_length), ar->get_overlapped_pointer(),
292-
os::windows::async_request::completion_routine)
293-
|| (GetLastError() != ERROR_SUCCESS)) {
294-
os::error error = utility::translate_error(GetLastError());
295-
if (error != os::error::Success) {
296-
ar->cancel();
297-
}
298-
return error;
297+
BOOL suc = WriteFileEx(handle, buffer, DWORD(buffer_length), ar->get_overlapped_pointer(),
298+
os::windows::async_request::completion_routine);
299+
DWORD error = GetLastError();
300+
ec = utility::translate_error(error);
301+
302+
if (suc == 0) {
303+
ar->call_callback(ec, buffer_length);
304+
ar->cancel();
305+
return ec;
299306
}
300307

301308
ar->set_valid(true);
302-
op = std::static_pointer_cast<os::async_op>(ar);
303-
return os::error::Success;
309+
return ec;
304310
}
305311

306312
bool os::windows::named_pipe::is_created() {
@@ -313,29 +319,41 @@ bool os::windows::named_pipe::is_connected() {
313319
}
314320

315321
os::error os::windows::named_pipe::accept(std::unique_ptr<os::windows::async_request> &request) {
322+
std::shared_ptr<os::windows::async_request> ars = std::move(request);
323+
os::error ec = accept(std::static_pointer_cast<os::async_op>(ars), nullptr);
324+
request.reset(ars.get());
325+
return ec;
326+
}
327+
328+
os::error os::windows::named_pipe::accept(std::shared_ptr<os::async_op> &op, os::async_op_cb_t cb) {
329+
os::error ec;
330+
316331
if (!is_created()) {
317332
return os::error::Error;
318333
}
319334

320-
if (!request) {
321-
request = std::make_unique<os::windows::async_request>();
335+
std::shared_ptr<os::windows::async_request> ar = std::static_pointer_cast<os::windows::async_request>(op);
336+
if (!ar) {
337+
ar = std::make_shared<os::windows::async_request>();
322338
}
323-
request->set_handle(handle);
339+
op = std::static_pointer_cast<os::async_op>(ar);
340+
ar->set_callback(cb);
341+
ar->set_handle(handle);
324342

325343
SetLastError(ERROR_SUCCESS);
326-
if (!ConnectNamedPipe(handle, request->get_overlapped_pointer()) || (GetLastError() != ERROR_SUCCESS)) {
327-
DWORD error = GetLastError();
328-
if (error == ERROR_MORE_DATA) {
329-
return os::error::MoreData;
330-
} else if (error == ERROR_BROKEN_PIPE) {
331-
return os::error::Disconnected;
332-
} else if (error == ERROR_PIPE_CONNECTED) {
333-
return os::error::Connected;
334-
} else if (error != ERROR_IO_PENDING) {
335-
return os::error::Error;
336-
}
344+
BOOL suc = ConnectNamedPipe(handle, ar->get_overlapped_pointer());
345+
ec = utility::translate_error(GetLastError());
346+
347+
if (ec != os::error::Pending && ec != os::error::Connected) {
348+
ar->call_callback(ec, 0);
349+
ar->cancel();
350+
return ec;
337351
}
338352

339-
request->set_valid(true);
340-
return os::error::Success;
353+
ar->set_valid(true);
354+
if (ec == os::error::Connected) {
355+
ar->call_callback(ec, 0);
356+
}
357+
358+
return ec;
341359
}

source/os/windows/named-pipe.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,10 @@ namespace os {
6464

6565
os::error total_available(size_t &avail);
6666

67-
[[deprecated("Use read(char*, size_t, std::shared_ptr<os::async_op>&, os::async_op_cb_t).")]]
67+
[[deprecated("This method is deprecated, use read(char*, size_t, std::shared_ptr<os::async_op>&, os::async_op_cb_t) instead.")]]
6868
os::error read(std::unique_ptr<os::windows::async_request> &request, char *buffer, size_t buffer_length);
6969

70-
[[deprecated("Use write(const char*, size_t, std::shared_ptr<os::async_op>&, os::async_op_cb_t).")]]
70+
[[deprecated("This method is deprecated, use write(const char*, size_t, std::shared_ptr<os::async_op>&, os::async_op_cb_t) instead.")]]
7171
os::error write(std::unique_ptr<os::windows::async_request> &request, const char *buffer,
7272
size_t buffer_length);
7373

@@ -81,7 +81,10 @@ namespace os {
8181
bool is_connected();
8282

8383
public: // created only
84+
[[deprecated("This method is deprecated, use accept(std::shared_ptr<os::async_op>&, os::async_op_cb_t) instead.")]]
8485
os::error accept(std::unique_ptr<os::windows::async_request> &request);
86+
87+
os::error accept(std::shared_ptr<os::async_op> &op, os::async_op_cb_t cb);
8588
};
8689
} // namespace windows
8790
} // namespace os

0 commit comments

Comments
 (0)