Skip to content

Commit 7b5ae9a

Browse files
authored
CXXCBC-285: write to sockets from IO threads (#348)
The writing procedure should be invoked from the IO thread and bound to IO executor. Otherwise when main thread invokes ASIO write directly, the OpenSSL structures are not guaranteed to be thread-safe.
1 parent ebb15f1 commit 7b5ae9a

File tree

3 files changed

+13
-11
lines changed

3 files changed

+13
-11
lines changed

core/io/http_session.hxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ class http_session : public std::enable_shared_from_this<http_session>
293293
if (stopped_) {
294294
return;
295295
}
296-
do_write();
296+
asio::post(asio::bind_executor(ctx_, [self = shared_from_this()]() { self->do_write(); }));
297297
}
298298

299299
template<typename Handler>

core/io/mcbp_session.cxx

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ class mcbp_session_impl
851851
if (stopped_) {
852852
return;
853853
}
854-
do_write();
854+
asio::post(asio::bind_executor(ctx_, [self = shared_from_this()]() { self->do_write(); }));
855855
}
856856

857857
void write_and_flush(std::vector<std::byte>&& buf)
@@ -1397,8 +1397,10 @@ class mcbp_session_impl
13971397
std::scoped_lock inner_lock(self->writing_buffer_mutex_);
13981398
self->writing_buffer_.clear();
13991399
}
1400-
self->do_write();
1401-
self->do_read();
1400+
asio::post(asio::bind_executor(self->ctx_, [self]() {
1401+
self->do_write();
1402+
self->do_read();
1403+
}));
14021404
});
14031405
}
14041406

test/tools/tool_kv_loader.cxx

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,15 @@ enum class operation {
7676

7777
namespace
7878
{
79-
volatile std::sig_atomic_t running{ 1 };
79+
std::atomic_flag running{ 1 };
8080
std::size_t operations_limit{ 0 };
8181

8282
} // namespace
8383

8484
void
8585
sigint_handler(int /* signal */)
8686
{
87-
running = 0;
87+
running.test_and_set();
8888
}
8989

9090
static void
@@ -207,7 +207,7 @@ main()
207207
std::vector<std::thread> io_pool{};
208208
io_pool.reserve(number_of_io_threads);
209209
for (std::size_t i = 0; i < number_of_io_threads; ++i) {
210-
io_pool.emplace_back(std::thread([&io]() { io.run(); }));
210+
io_pool.emplace_back([&io]() { io.run(); });
211211
}
212212

213213
test::utils::open_cluster(cluster, origin);
@@ -251,7 +251,7 @@ main()
251251

252252
asio::steady_timer stats_timer(io);
253253
dump_stats(stats_timer, start_time, total);
254-
while (running != 0) {
254+
while (running.test_and_set()) {
255255
auto opcode = dist(gen) <= chance_of_get ? operation::get : operation::upsert;
256256
if (opcode == operation::get && known_keys.empty()) {
257257
opcode = operation::upsert;
@@ -284,7 +284,7 @@ main()
284284
++errors[resp.ctx.ec()];
285285
}
286286
if (operations_limit > 0 && total >= operations_limit) {
287-
running = 0;
287+
running.clear();
288288
}
289289
});
290290
} break;
@@ -297,7 +297,7 @@ main()
297297
++errors[resp.ctx.ec()];
298298
}
299299
if (operations_limit > 0 && total >= operations_limit) {
300-
running = 0;
300+
running.clear();
301301
}
302302
});
303303
} break;
@@ -312,7 +312,7 @@ main()
312312
++errors[resp.ctx.ec];
313313
}
314314
if (operations_limit > 0 && total > operations_limit) {
315-
running = 0;
315+
running.clear();
316316
}
317317
});
318318
}

0 commit comments

Comments
 (0)