Skip to content

Commit 1dfea31

Browse files
committed
socket: harden concurrency model and fix data races
Rewrite pool_get() as the core thread-safety mechanism using atomic acquire/release on versioned socket IDs. This prevents use-after-free and ensures worker threads get either NULL or a valid socket pointer. Simplify the API by replacing complex racy checks with safe NULL tests in functions.
1 parent 387f61d commit 1dfea31

File tree

3 files changed

+114
-90
lines changed

3 files changed

+114
-90
lines changed

lualib-src/lualib-metrics.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,13 @@ static inline void table_set_str(lua_State *L, int table, const char *k,
117117

118118
static int lnetstat(lua_State *L)
119119
{
120-
struct silly_netstat *stat;
121-
stat = silly_socket_netstat();
122-
lua_pushinteger(L, stat->connecting);
123-
lua_pushinteger(L, stat->tcpclient);
120+
struct silly_netstat stat;
121+
silly_socket_netstat(&stat);
122+
lua_pushinteger(L, stat.connecting);
123+
lua_pushinteger(L, stat.tcpclient);
124124
lua_pushinteger(L, silly_socket_ctrlcount());
125-
lua_pushinteger(L, stat->sendsize);
126-
lua_pushinteger(L, stat->recvsize);
125+
lua_pushinteger(L, stat.sendsize);
126+
lua_pushinteger(L, stat.recvsize);
127127
return 5;
128128
}
129129

silly-src/silly_socket.c

Lines changed: 102 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,50 @@
1616
#include "silly_malloc.h"
1717
#include "silly_socket.h"
1818

19-
//STYPE == socket type
19+
/*
20+
* === Socket Field Concurrency Rules ===
21+
*
22+
* [!!WARNING!!]
23+
* This socket pool allows lock-free reading **only of the `sid` field**.
24+
* Direct access by Worker threads to other `struct socket` fields can
25+
* race with concurrent modifications from the Socket thread.
26+
* Read the rules below carefully to avoid subtle bugs.
27+
*
28+
* --- The Concurrency Model: Optimistic Reads with `sid` Verification ---
29+
*
30+
* A `struct socket *s` obtained from `pool_get()` is NOT locked.
31+
* The Socket thread may free or reuse the socket at any time.
32+
*
33+
* Reading any field other than `s->sid` is a data race.
34+
* For non-pointer fields, this can result in stale or torn reads.
35+
*
36+
* Reads by Worker threads are "optimistic" — correctness is ensured
37+
* because the Socket thread validates `sid` before performing any
38+
* state-changing operation. The versioned `sid` acts as an optimistic lock.
39+
*
40+
* --- The Rule of Safe Interaction ---
41+
*
42+
* Worker thread logic must never depend on values from fields other than `s->sid`.
43+
* Such dependencies are racy and can cause operations to be silently dropped
44+
* by the Socket thread during `sid` verification.
45+
*
46+
* --- Correct Workflow ---
47+
*
48+
* To ensure correctness, send commands (with `sid`) to the Socket thread,
49+
* which safely accesses all socket fields on behalf of Worker threads.
50+
*
51+
* --- Optional Optimization (Not Recommended) ---
52+
*
53+
* Making socket fields atomic and reading them with sid re-verification
54+
* might seem attractive, but it cannot guarantee a consistent snapshot
55+
* of the socket's state.
56+
*
57+
* Since `sid` only changes on free, it does not protect against concurrent
58+
* updates to other fields. Strict correctness requires a full seqlock,
59+
* which is not implemented here.
60+
*
61+
* Therefore, Worker threads should avoid relying on multi-field atomic reads.
62+
*/
2063

2164
#if EAGAIN == EWOULDBLOCK
2265
#define ETRYAGAIN EAGAIN
@@ -82,13 +125,14 @@ struct wlist {
82125
};
83126

84127
struct socket {
85-
socket_id_t sid; //socket descriptor
128+
_Atomic(socket_id_t) sid; //socket descriptor
86129
fd_t fd;
87130
uint16_t version;
88131
unsigned char protocol;
89132
unsigned char reading;
90133
enum stype type;
91-
size_t wloffset;
134+
atomic_uint_least32_t wlbytes;
135+
uint32_t wloffset;
92136
struct wlist *wlhead;
93137
struct wlist **wltail;
94138
struct socket *next;
@@ -212,6 +256,7 @@ static inline void wlist_append(struct socket *s, uint8_t *buf, size_t size,
212256
w->udpaddress = NULL;
213257
*s->wltail = w;
214258
s->wltail = &w->next;
259+
atomic_fetch_add_explicit(&s->wlbytes, size-s->wloffset, memory_order_relaxed);
215260
return;
216261
}
217262

@@ -235,6 +280,7 @@ static inline void wlist_appendudp(struct socket *s, uint8_t *buf, size_t size,
235280
}
236281
*s->wltail = w;
237282
s->wltail = &w->next;
283+
atomic_fetch_add_explicit(&s->wlbytes, size, memory_order_relaxed);
238284
return;
239285
}
240286

@@ -252,6 +298,7 @@ static void wlist_free(struct socket *s)
252298
}
253299
s->wlhead = NULL;
254300
s->wltail = &s->wlhead;
301+
atomic_store_explicit(&s->wlbytes, 0, memory_order_relaxed);
255302
return;
256303
}
257304

@@ -262,7 +309,6 @@ static inline int wlist_empty(struct socket *s)
262309

263310
static void socket_default(struct socket *s)
264311
{
265-
s->sid = -1;
266312
s->fd = -1;
267313
s->type = STYPE_RESERVE;
268314
s->wloffset = 0;
@@ -271,6 +317,8 @@ static void socket_default(struct socket *s)
271317
s->wlhead = NULL;
272318
s->wltail = &s->wlhead;
273319
s->next = NULL;
320+
atomic_store_explicit(&s->wlbytes, 0, memory_order_relaxed);
321+
atomic_store_explicit(&s->sid, -1, memory_order_relaxed);
274322
}
275323

276324
static void pool_init(struct socket_pool *p)
@@ -290,15 +338,13 @@ static void pool_init(struct socket_pool *p)
290338
*p->free_tail = s;
291339
p->free_tail = &s->next;
292340
}
293-
p->free_head = p->free_head->next; // 0 is reserved
294-
p->slots[0].next = NULL;
295-
p->slots[0].sid = 0;
296341
return;
297342
}
298343

299344
static struct socket *pool_alloc(struct socket_pool *p, fd_t fd,
300345
enum stype type, unsigned char protocol)
301346
{
347+
socket_id_t id;
302348
assert(protocol == PROTOCOL_TCP || protocol == PROTOCOL_UDP ||
303349
protocol == PROTOCOL_PIPE);
304350
spinlock_lock(&p->lock);
@@ -316,8 +362,9 @@ static struct socket *pool_alloc(struct socket_pool *p, fd_t fd,
316362
s->fd = fd;
317363
s->type = type;
318364
s->protocol = protocol;
319-
s->sid = ((socket_id_t)s->version << SOCKET_POOL_EXP) | (s-&p->slots[0]);
320365
s->reading = 1;
366+
id = ((socket_id_t)s->version << SOCKET_POOL_EXP) | (s-&p->slots[0]);
367+
atomic_store_explicit(&s->sid, id, memory_order_release);
321368
return s;
322369
}
323370

@@ -335,6 +382,8 @@ static void pool_free(struct socket_pool *p, struct socket *s)
335382
static inline struct socket *pool_get(struct socket_pool *p, socket_id_t id)
336383
{
337384
struct socket *s = &p->slots[HASH(id)];
385+
if (unlikely(atomic_load_explicit(&s->sid, memory_order_acquire) != id))
386+
return NULL;
338387
return s;
339388
}
340389

@@ -759,7 +808,7 @@ void silly_socket_readctrl(socket_id_t sid, int flag)
759808
struct socket *s;
760809
struct op_readenable op = {0};
761810
s = pool_get(&SSOCKET->pool, sid);
762-
if (s->type != STYPE_SOCKET)
811+
if (unlikely(s == NULL))
763812
return;
764813
op.hdr.op = OP_READ_ENABLE;
765814
op.hdr.sid = sid;
@@ -784,17 +833,11 @@ static void op_read_enable(struct silly_socket *ss, struct op_readenable *op, st
784833

785834
int silly_socket_sendsize(socket_id_t sid)
786835
{
787-
int size = 0;
788-
struct wlist *w;
789836
struct socket *s;
790837
s = pool_get(&SSOCKET->pool, sid);
791-
if (s->type != STYPE_SOCKET)
792-
return size;
793-
//TODO: race access
794-
for (w = s->wlhead; w != NULL; w = w->next)
795-
size += w->size;
796-
size -= s->wloffset;
797-
return size;
838+
if (unlikely(s == NULL))
839+
return 0;
840+
return atomic_load_explicit(&s->wlbytes, memory_order_relaxed);
798841
}
799842

800843
static int send_msg_tcp(struct silly_socket *ss, struct socket *s)
@@ -812,6 +855,7 @@ static int send_msg_tcp(struct silly_socket *ss, struct socket *s)
812855
return -1;
813856
}
814857
s->wloffset += sz;
858+
atomic_fetch_sub_explicit(&s->wlbytes, sz, memory_order_relaxed);
815859
if (s->wloffset < w->size) //send some
816860
break;
817861
assert((size_t)s->wloffset == w->size);
@@ -842,6 +886,7 @@ static int send_msg_udp(struct silly_socket *ss, struct socket *s)
842886
sz = sendudp(s->fd, w->buf, w->size, w->udpaddress);
843887
if (sz == -2) //EAGAIN, so block it
844888
break;
889+
atomic_fetch_sub_explicit(&s->wlbytes, sz, memory_order_relaxed);
845890
assert(sz == -1 || (size_t)sz == w->size);
846891
//send fail && send ok will clear
847892
s->wlhead = w->next;
@@ -1026,8 +1071,6 @@ socket_id_t silly_socket_udpbind(const char *ip, const char *port)
10261071
return -1;
10271072
}
10281073

1029-
1030-
10311074
static int op_udp_listen(struct silly_socket *ss, struct op_listen *op, struct socket *s)
10321075
{
10331076
int err;
@@ -1193,24 +1236,10 @@ static void op_udp_connect(struct silly_socket *ss, struct op_connect *op, struc
11931236

11941237
int silly_socket_close(socket_id_t sid)
11951238
{
1196-
int type;
11971239
struct op_close op = {0};
11981240
struct socket *s = pool_get(&SSOCKET->pool, sid);
1199-
if (unlikely(s->sid != sid)) {
1200-
silly_log_error("[socket] silly_socket_close incorrect "
1201-
"socket %lld:%lld\n",
1202-
sid, s->sid);
1203-
return -1;
1204-
}
1205-
type = s->type;
1206-
if (unlikely(type == STYPE_CTRL)) {
1207-
silly_log_error("[socket] silly_socket_close ctrl socket:%d\n",
1208-
sid);
1209-
return -1;
1210-
}
1211-
if (unlikely(type == STYPE_RESERVE)) {
1212-
silly_log_warn(
1213-
"[socket] silly_socket_close reserve socket:%llu\n", sid);
1241+
if (unlikely(s == NULL)) {
1242+
silly_log_error("[socket] silly_socket_close invalid sid:%llu\n", sid);
12141243
return -1;
12151244
}
12161245
op.hdr.op = OP_CLOSE;
@@ -1241,26 +1270,16 @@ static int op_tcp_close(struct silly_socket *ss, struct op_close *op, struct soc
12411270
int silly_socket_send(socket_id_t sid, uint8_t *buf, size_t sz,
12421271
void (*freex)(void *))
12431272
{
1244-
int type;
12451273
struct op_tcpsend op = {0};
12461274
struct socket *s = pool_get(&SSOCKET->pool, sid);
12471275
if (freex == NULL)
12481276
freex = silly_free;
1249-
if (unlikely(s->sid != sid || s->protocol != PROTOCOL_TCP)) {
1277+
if (unlikely(s == NULL)) {
12501278
freex(buf);
12511279
silly_log_error("[socket] silly_socket_send invalid sid:%llu\n",
12521280
sid);
12531281
return -1;
12541282
}
1255-
type = s->type;
1256-
if (unlikely(!(type == STYPE_SOCKET || type == STYPE_CONNECTING ||
1257-
type == STYPE_ALLOCED))) {
1258-
freex(buf);
1259-
silly_log_error("[socket] silly_socket_send incorrect type "
1260-
"sid:%llu type:%d\n",
1261-
sid, type);
1262-
return -1;
1263-
}
12641283
if (unlikely(sz == 0)) {
12651284
freex(buf);
12661285
return -1;
@@ -1320,32 +1339,15 @@ static int op_tcp_send(struct silly_socket *ss, struct op_tcpsend *op, struct so
13201339
int silly_socket_udpsend(socket_id_t sid, uint8_t *buf, size_t sz, const uint8_t *addr,
13211340
size_t addrlen, void (*freex)(void *))
13221341
{
1323-
int type;
13241342
struct op_udpsend op = {0};
13251343
struct socket *s = pool_get(&SSOCKET->pool, sid);
13261344
freex = freex ? freex : silly_free;
1327-
if (unlikely(s->sid != sid || s->protocol != PROTOCOL_UDP)) {
1345+
if (unlikely(s == NULL)) {
13281346
freex(buf);
13291347
silly_log_error("[socket] silly_socket_send invalid sid:%llu\n",
13301348
sid);
13311349
return -1;
13321350
}
1333-
type = s->type;
1334-
if (unlikely(!(type == STYPE_SOCKET || type == STYPE_UDPBIND ||
1335-
type == STYPE_ALLOCED))) {
1336-
freex(buf);
1337-
silly_log_error("[socket] silly_socket_send incorrect type "
1338-
"sid:%llu type:%d\n",
1339-
sid, type);
1340-
return -1;
1341-
}
1342-
1343-
if (unlikely(type == STYPE_UDPBIND && addr == NULL)) {
1344-
freex(buf);
1345-
silly_log_error(
1346-
"[socket] udpsend udpbind must specify dest addr\n");
1347-
return -1;
1348-
}
13491351
op.hdr.op = OP_UDP_SEND;
13501352
op.hdr.sid = sid;
13511353
op.hdr.size = sizeof(op);
@@ -1443,9 +1445,9 @@ static int op_process(struct silly_socket *ss)
14431445
assert(op->hdr.size > 0);
14441446
ptr += op->hdr.size;
14451447
s = pool_get(&ss->pool, op->hdr.sid);
1446-
if (s->sid != op->hdr.sid) {
1447-
silly_log_error("[socket] op_process sid:%llu:%llu invalid\n",
1448-
op->hdr.sid, s->sid);
1448+
if (s == NULL) {
1449+
silly_log_error("[socket] op_process sid:%llu invalid\n",
1450+
op->hdr.sid);
14491451
continue;
14501452
}
14511453
switch (op->hdr.op) {
@@ -1671,32 +1673,52 @@ int silly_socket_ctrlcount()
16711673
return atomic_load_explicit(&SSOCKET->ctrlcount, memory_order_relaxed);
16721674
}
16731675

1674-
struct silly_netstat *silly_socket_netstat()
1676+
void silly_socket_netstat(struct silly_netstat *stat)
16751677
{
1676-
return &SSOCKET->netstat;
1678+
stat->connecting = atomic_load_explicit(&SSOCKET->netstat.connecting, memory_order_relaxed);
1679+
stat->tcpclient = atomic_load_explicit(&SSOCKET->netstat.tcpclient, memory_order_relaxed);
1680+
stat->recvsize = atomic_load_explicit(&SSOCKET->netstat.recvsize, memory_order_relaxed);
1681+
stat->sendsize = atomic_load_explicit(&SSOCKET->netstat.sendsize, memory_order_relaxed);
1682+
return;
16771683
}
16781684

1685+
// NOTE: This function uses an optimistic read pattern. It is not guaranteed
1686+
// to be fully consistent and may return a snapshot of fields read at slightly
1687+
// different moments. For its intended, non-critical monitoring purpose, this
1688+
// trade-off for lower latency is considered acceptable.
16791689
void silly_socket_socketstat(socket_id_t sid, struct silly_socketstat *info)
16801690
{
16811691
struct socket *s;
1682-
s = pool_get(&SSOCKET->pool, sid);
16831692
memset(info, 0, sizeof(*info));
1684-
info->sid = s->sid;
1685-
info->fd = s->fd;
1686-
info->type = stype_name[s->type];
1687-
info->protocol = protocol_name[s->protocol];
1688-
if (s->fd >= 0 && s->protocol != PROTOCOL_PIPE) {
1693+
s = pool_get(&SSOCKET->pool, sid);
1694+
if (s == NULL) {
1695+
silly_log_error("[socket] silly_socket_socketstat sid:%llu invalid\n", sid);
1696+
return;
1697+
}
1698+
int fd = s->fd;
1699+
int type = s->type;
1700+
int protocol = s->protocol;
1701+
s = pool_get(&SSOCKET->pool, sid);
1702+
if (s == NULL) {
1703+
silly_log_error("[socket] silly_socket_socketstat sid:%llu invalid\n", sid);
1704+
return;
1705+
}
1706+
info->sid = sid;
1707+
info->fd = fd;
1708+
info->type = stype_name[type];
1709+
info->protocol = protocol_name[protocol];
1710+
if (info->fd >= 0 && protocol != PROTOCOL_PIPE) {
16891711
int namelen;
16901712
socklen_t len;
16911713
union sockaddr_full addr;
16921714
char namebuf[SOCKET_NAMELEN];
16931715
len = sizeof(addr);
1694-
getsockname(s->fd, (struct sockaddr *)&addr, &len);
1716+
getsockname(info->fd, (struct sockaddr *)&addr, &len);
16951717
namelen = ntop(&addr, namebuf);
16961718
memcpy(info->localaddr, namebuf, namelen);
1697-
if (s->type != STYPE_LISTEN) {
1719+
if (type != STYPE_LISTEN) {
16981720
len = sizeof(addr);
1699-
getpeername(s->fd, (struct sockaddr *)&addr, &len);
1721+
getpeername(fd, (struct sockaddr *)&addr, &len);
17001722
namelen = ntop(&addr, namebuf);
17011723
memcpy((void *)info->remoteaddr, namebuf, namelen);
17021724
} else {
@@ -1707,3 +1729,4 @@ void silly_socket_socketstat(socket_id_t sid, struct silly_socketstat *info)
17071729
}
17081730
return;
17091731
}
1732+

0 commit comments

Comments
 (0)