Skip to content

Commit b778754

Browse files
maxkozlovskyjhunsaker
authored andcommitted
Improve statesync shutdown and fix protocol corruption
This commit improves the statesync shutdown mechanism and fixes a protocol corruption issue and busy recv loop. Key changes: 1. Modified statesync_server_recv() to guarantee complete reads: - Now always returns exactly the requested number of bytes or -1 - Accumulates data internally until the full message is received - Prevents protocol corruption from partial reads + reconnection - Uses poll() instead of busy-waiting, eliminating CPU spinning 2. Removed retry loop in monad_statesync_server_run_once(): - Simplified to two simple recv() calls (type + request) - No longer needed since recv() guarantees complete reads 3. Fixed shutdown handling during reconnect: - connect() now polls shutdown_eventfd during retry loop - Prevents infinite hang when connection keeps failing - recv() reconnects and returns error on connection loss This code was generated using Claude Sonnet 4.5
1 parent d790b1c commit b778754

File tree

8 files changed

+455
-59
lines changed

8 files changed

+455
-59
lines changed

category/statesync/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ add_library(
3434
"statesync_server.h"
3535
"statesync_server_context.cpp"
3636
"statesync_server_context.hpp"
37-
"statesync_server_context.hpp"
37+
"statesync_thread.cpp"
38+
"statesync_thread.hpp"
3839
"statesync_version.cpp"
3940
"statesync_version.h")
4041

category/statesync/statesync_server.cpp

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -517,15 +517,10 @@ void monad_statesync_server_run_once(struct monad_statesync_server *const sync)
517517
return;
518518
}
519519
MONAD_ASSERT(buf[0] == SYNC_TYPE_REQUEST);
520-
unsigned char *ptr = buf;
521-
uint64_t n = sizeof(monad_sync_request);
522-
while (n != 0) {
523-
auto const res = sync->statesync_server_recv(sync->net, ptr, n);
524-
if (res == -1) {
525-
continue;
526-
}
527-
ptr += res;
528-
n -= static_cast<size_t>(res);
520+
if (sync->statesync_server_recv(
521+
sync->net, buf, sizeof(monad_sync_request)) !=
522+
static_cast<ssize_t>(sizeof(monad_sync_request))) {
523+
return;
529524
}
530525
auto const &rq = unaligned_load<monad_sync_request>(buf);
531526
monad_statesync_server_handle_request(sync, rq);

category/statesync/statesync_server.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#include <category/statesync/statesync_messages.h>
1919

20+
#include <sys/types.h>
21+
2022
struct monad_statesync_server;
2123
struct monad_statesync_server_context;
2224
struct monad_statesync_server_network;

category/statesync/statesync_server_network.hpp

Lines changed: 97 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121

2222
#include <quill/Quill.h>
2323

24+
#include <array>
2425
#include <chrono>
26+
#include <poll.h>
27+
#include <sys/eventfd.h>
2528
#include <sys/socket.h>
2629
#include <sys/un.h>
2730
#include <thread>
@@ -30,6 +33,7 @@
3033
struct monad_statesync_server_network
3134
{
3235
int fd;
36+
int shutdown_eventfd;
3337
monad::byte_string obuf;
3438
std::string path;
3539

@@ -42,19 +46,50 @@ struct monad_statesync_server_network
4246
memset(&addr, 0, sizeof(addr));
4347
addr.sun_family = AF_UNIX;
4448
strncpy(addr.sun_path, path.c_str(), sizeof(addr.sun_path) - 1);
49+
4550
while (::connect(fd, (sockaddr *)&addr, sizeof(addr)) != 0) {
46-
std::this_thread::sleep_for(std::chrono::milliseconds(1));
51+
// Check if shutdown was requested before sleeping
52+
std::array<pollfd, 1> pfds{};
53+
pfds[0].fd = shutdown_eventfd;
54+
pfds[0].events = POLLIN;
55+
56+
// Poll with short timeout to check for shutdown signal
57+
int const poll_ret = poll(pfds.data(), pfds.size(), 1);
58+
59+
if (poll_ret > 0 && (pfds[0].revents & POLLIN)) {
60+
// Shutdown requested, abort connection attempt
61+
LOG_WARNING("Connect aborted due to shutdown request");
62+
return;
63+
}
64+
}
65+
}
66+
67+
void signal_shutdown()
68+
{
69+
uint64_t const val = 1;
70+
ssize_t const res = write(shutdown_eventfd, &val, sizeof(val));
71+
if (res != sizeof(val)) {
72+
LOG_WARNING(
73+
"Failed to signal shutdown eventfd: {}", strerror(errno));
4774
}
4875
}
4976

5077
monad_statesync_server_network(char const *const path)
5178
: path{path}
5279
{
80+
shutdown_eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
81+
MONAD_ASSERT_PRINTF(
82+
shutdown_eventfd >= 0,
83+
"failed to create eventfd: %s",
84+
strerror(errno));
5385
connect();
5486
}
5587

5688
~monad_statesync_server_network()
5789
{
90+
if (shutdown_eventfd >= 0) {
91+
close(shutdown_eventfd);
92+
}
5893
if (fd >= 0) {
5994
close(fd);
6095
}
@@ -92,33 +127,76 @@ namespace
92127
}
93128
}
94129

95-
ssize_t statesync_server_recv(
130+
inline ssize_t statesync_server_recv(
96131
monad_statesync_server_network *const net, unsigned char *buf, size_t n)
97132
{
98-
while (true) {
99-
ssize_t ret = recv(net->fd, buf, n, MSG_DONTWAIT);
100-
if (ret == 0 ||
101-
(ret < 0 && (errno == ECONNRESET || errno == ENOTCONN))) {
102-
LOG_WARNING("connection closed, reconnecting");
133+
size_t total_received = 0;
134+
135+
while (total_received < n) {
136+
std::array<pollfd, 2> pfds{};
137+
pfds[0].fd = net->fd;
138+
pfds[0].events = POLLIN;
139+
140+
pfds[1].fd = net->shutdown_eventfd;
141+
pfds[1].events = POLLIN;
142+
143+
int const poll_ret = poll(pfds.data(), pfds.size(), -1);
144+
145+
if (poll_ret < 0) {
146+
if (errno == EINTR) {
147+
continue;
148+
}
149+
LOG_ERROR("poll error: {}", strerror(errno));
150+
return -1;
151+
}
152+
153+
if (pfds[1].revents & POLLIN) {
154+
return -1;
155+
}
156+
157+
if (pfds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) {
158+
LOG_WARNING("socket error event, reconnecting");
103159
if (close(net->fd) < 0) {
104160
LOG_ERROR("failed to close socket: {}", strerror(errno));
105161
}
106162
net->fd = -1;
107163
net->connect();
108-
}
109-
else if (
110-
ret < 0 &&
111-
(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR)) {
112-
LOG_ERROR("recv error: {}", strerror(errno));
113164
return -1;
114165
}
115-
else {
116-
return ret;
166+
167+
if (pfds[0].revents & POLLIN) {
168+
ssize_t const ret = recv(
169+
net->fd,
170+
buf + total_received,
171+
n - total_received,
172+
MSG_DONTWAIT);
173+
174+
if (ret == 0 ||
175+
(ret < 0 && (errno == ECONNRESET || errno == ENOTCONN))) {
176+
LOG_WARNING("connection closed, reconnecting");
177+
if (close(net->fd) < 0) {
178+
LOG_ERROR("failed to close socket: {}", strerror(errno));
179+
}
180+
net->fd = -1;
181+
net->connect();
182+
return -1;
183+
}
184+
else if (
185+
ret < 0 &&
186+
(errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR)) {
187+
LOG_ERROR("recv error: {}", strerror(errno));
188+
return -1;
189+
}
190+
else if (ret > 0) {
191+
total_received += static_cast<size_t>(ret);
192+
}
117193
}
118194
}
195+
196+
return static_cast<ssize_t>(n);
119197
}
120198

121-
void statesync_server_send_upsert(
199+
inline void statesync_server_send_upsert(
122200
monad_statesync_server_network *const net, monad_sync_type const type,
123201
unsigned char const *const v1, uint64_t const size1,
124202
unsigned char const *const v2, uint64_t const size2)
@@ -152,15 +230,15 @@ void statesync_server_send_upsert(
152230
LOG_DEBUG(
153231
"sending upsert type={} {} ns={}",
154232
std::to_underlying(type),
155-
fmt::format(
233+
fmtquill::format(
156234
"v1=0x{:02x} v2=0x{:02x}",
157-
fmt::join(std::as_bytes(std::span(v1, size1)), ""),
158-
fmt::join(std::as_bytes(std::span(v2, size2)), "")),
235+
fmtquill::join(std::as_bytes(std::span(v1, size1)), ""),
236+
fmtquill::join(std::as_bytes(std::span(v2, size2)), "")),
159237
std::chrono::duration_cast<std::chrono::nanoseconds>(
160238
std::chrono::steady_clock::now() - start));
161239
}
162240

163-
void statesync_server_send_done(
241+
inline void statesync_server_send_done(
164242
monad_statesync_server_network *const net, monad_sync_done const msg)
165243
{
166244
[[maybe_unused]] auto const start = std::chrono::steady_clock::now();
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright (C) 2025 Category Labs, Inc.
2+
//
3+
// This program is free software: you can redistribute it and/or modify
4+
// it under the terms of the GNU General Public License as published by
5+
// the Free Software Foundation, either version 3 of the License, or
6+
// (at your option) any later version.
7+
//
8+
// This program is distributed in the hope that it will be useful,
9+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
// GNU General Public License for more details.
12+
//
13+
// You should have received a copy of the GNU General Public License
14+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
15+
16+
#include <category/statesync/statesync_thread.hpp>
17+
18+
#include <category/mpt/ondisk_db_config.hpp>
19+
20+
#include <pthread.h>
21+
22+
MONAD_NAMESPACE_BEGIN
23+
24+
StateSyncServer::StateSyncServer(StateSyncServerConfig const &config)
25+
: ctx{std::make_unique<monad_statesync_server_context>(*config.triedb)}
26+
, server{monad_statesync_server_create(
27+
ctx.get(), config.network, &monad::statesync_server_recv,
28+
&monad::statesync_server_send_upsert,
29+
&monad::statesync_server_send_done)}
30+
, thread{[this, config](std::stop_token const token) {
31+
pthread_setname_np(pthread_self(), "statesync");
32+
33+
mpt::AsyncIOContext io_ctx{mpt::ReadOnlyOnDiskDbConfig{
34+
.sq_thread_cpu = config.ro_sq_thread_cpu,
35+
.dbname_paths = config.dbname_paths}};
36+
mpt::Db ro{io_ctx};
37+
ctx->ro = &ro;
38+
39+
std::stop_callback stop_cb(
40+
token, [config]() { config.network->signal_shutdown(); });
41+
42+
while (!token.stop_requested()) {
43+
monad_statesync_server_run_once(server.get());
44+
}
45+
46+
ctx->ro = nullptr;
47+
}}
48+
{
49+
}
50+
51+
std::unique_ptr<StateSyncServer>
52+
make_statesync_server(StateSyncServerConfig const &config)
53+
{
54+
return std::make_unique<StateSyncServer>(config);
55+
}
56+
57+
MONAD_NAMESPACE_END
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright (C) 2025 Category Labs, Inc.
2+
//
3+
// This program is free software: you can redistribute it and/or modify
4+
// it under the terms of the GNU General Public License as published by
5+
// the Free Software Foundation, either version 3 of the License, or
6+
// (at your option) any later version.
7+
//
8+
// This program is distributed in the hope that it will be useful,
9+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
// GNU General Public License for more details.
12+
//
13+
// You should have received a copy of the GNU General Public License
14+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
15+
16+
#pragma once
17+
18+
#include <category/statesync/statesync_server.h>
19+
#include <category/statesync/statesync_server_context.hpp>
20+
#include <category/statesync/statesync_server_network.hpp>
21+
22+
#include <cstdint>
23+
#include <filesystem>
24+
#include <memory>
25+
#include <optional>
26+
#include <thread>
27+
#include <vector>
28+
29+
MONAD_NAMESPACE_BEGIN
30+
31+
struct StateSyncServerConfig
32+
{
33+
monad::TrieDb *triedb;
34+
monad_statesync_server_network *network;
35+
std::optional<int> ro_sq_thread_cpu;
36+
std::vector<std::filesystem::path> dbname_paths;
37+
};
38+
39+
struct monad_statesync_server_deleter
40+
{
41+
void operator()(monad_statesync_server *server) const
42+
{
43+
if (server != nullptr) {
44+
monad_statesync_server_destroy(server);
45+
}
46+
}
47+
};
48+
49+
struct StateSyncServer
50+
{
51+
std::unique_ptr<monad_statesync_server_context> ctx;
52+
std::unique_ptr<monad_statesync_server, monad_statesync_server_deleter>
53+
server;
54+
std::jthread thread;
55+
56+
explicit StateSyncServer(StateSyncServerConfig const &config);
57+
58+
StateSyncServer(StateSyncServer const &) = delete;
59+
StateSyncServer &operator=(StateSyncServer const &) = delete;
60+
StateSyncServer(StateSyncServer &&) = delete;
61+
StateSyncServer &operator=(StateSyncServer &&) = delete;
62+
};
63+
64+
std::unique_ptr<StateSyncServer>
65+
make_statesync_server(StateSyncServerConfig const &config);
66+
67+
MONAD_NAMESPACE_END

0 commit comments

Comments
 (0)