Skip to content

Commit de6e2cc

Browse files
committed
Add thread-based RCU
Add the Linux Kernel style thread-based simple RCU. It supports sparse checking [1]. With sparse, it will report: main.c: note: in included file: thrd_rcu.h:95:42: warning: Using plain integer as NULL pointer thrd_rcu.h:95:42: warning: Using plain integer as NULL pointer It is from the pthread_mutex_t initializer, we can ignore it. [1] https://www.kernel.org/doc/html/latest/dev-tools/sparse.html
1 parent 5737c46 commit de6e2cc

File tree

4 files changed

+308
-0
lines changed

4 files changed

+308
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ purpose of these programs is to be illustrative and educational.
3030
- [qsbr](qsbr/): An implementation of Quiescent state based reclamation (QSBR).
3131
- [list-move](list-move/): Evaluation of two concurrent linked lists: QSBR and lock-based.
3232
- [rcu\_queue](rcu_queue/): An efficient concurrent queue based on QSBR.
33+
- [thread-rcu](thread-rcu/): A Linux Kernel style thread-based simple RCU.
3334
* Applications
3435
- [httpd](httpd/): A multi-threaded web server.
3536
- [map-reduce](map-reduce/): word counting using MapReduce.

thread-rcu/Makefile

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
CFLAGS = -Wall
2+
CFLAGS += -g
3+
CFLAGS += -std=c99
4+
CFLAGS += -D'N_READERS=10'
5+
LDFLAGS += -lpthread
6+
7+
all:
8+
$(CC) -o main main.c $(CFLAGS) $(LDFLAGS)
9+
10+
# Semantic Checker
11+
# https://www.kernel.org/doc/html/latest/dev-tools/sparse.html
12+
sparse:
13+
sparse main.c $(CFLAGS) $(LDFLAGS)
14+
15+
indent:
16+
clang-format -i *.[ch]
17+
18+
clean:
19+
rm -f main

thread-rcu/main.c

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#include <pthread.h>
2+
#include <stdio.h>
3+
#include <stdlib.h>
4+
5+
#include "rcu.h"
6+
7+
struct test {
8+
unsigned int count;
9+
};
10+
11+
static struct test __rcu *dut;
12+
13+
static void *reader_side(void *argv)
14+
{
15+
struct test *tmp;
16+
17+
if (rcu_init())
18+
abort();
19+
20+
rcu_read_lock();
21+
22+
tmp = rcu_dereference(dut);
23+
24+
printf("[reader %u] %u\n", current_tid(), tmp->count);
25+
26+
rcu_read_unlock();
27+
28+
pthread_exit(NULL);
29+
}
30+
31+
static void *updater_side(void *argv)
32+
{
33+
struct test *oldp;
34+
struct test *newval = malloc(sizeof(struct test));
35+
36+
newval->count = current_tid();
37+
38+
printf("[updater %u]\n", newval->count);
39+
40+
oldp = rcu_assign_pointer(dut, newval);
41+
42+
synchronize_rcu();
43+
free(oldp);
44+
45+
pthread_exit(NULL);
46+
}
47+
48+
int main(int argc, char *argv[])
49+
{
50+
pthread_t reader[N_READERS];
51+
pthread_t updater;
52+
int i;
53+
54+
dut = (struct test __rcu *) malloc(sizeof(struct test));
55+
rcu_uncheck(dut)->count = 0;
56+
57+
for (i = 0; i < N_READERS / 2; i++)
58+
pthread_create(&reader[i], NULL, reader_side, NULL);
59+
60+
pthread_create(&updater, NULL, updater_side, NULL);
61+
62+
for (i = N_READERS / 2; i < N_READERS; i++)
63+
pthread_create(&reader[i], NULL, reader_side, NULL);
64+
65+
for (i = 0; i < N_READERS; i++)
66+
pthread_join(reader[i], NULL);
67+
68+
pthread_join(updater, NULL);
69+
70+
free(rcu_uncheck(dut));
71+
rcu_clean();
72+
73+
return 0;
74+
}

thread-rcu/rcu.h

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
#ifndef __RCU_H__
2+
#define __RCU_H__
3+
4+
/* lock primitives from pthread and compiler primitives */
5+
6+
#include <pthread.h>
7+
8+
typedef pthread_mutex_t spinlock_t;
9+
10+
#define SPINLOCK_INIT PTHREAD_MUTEX_INITIALIZER
11+
static inline void spin_lock(spinlock_t *sp)
12+
{
13+
int ret;
14+
15+
ret = pthread_mutex_lock(sp);
16+
if (ret != 0) {
17+
fprintf(stderr, "spin_lock:pthread_mutex_lock %d\n", ret);
18+
abort();
19+
}
20+
}
21+
22+
static inline void spin_unlock(spinlock_t *sp)
23+
{
24+
int ret;
25+
26+
ret = pthread_mutex_unlock(sp);
27+
if (ret != 0) {
28+
fprintf(stderr, "spin_unlock:pthread_mutex_unlock %d\n", ret);
29+
abort();
30+
}
31+
}
32+
33+
#define current_tid() (unsigned int) pthread_self()
34+
35+
#define ACCESS_ONCE(x) (*(volatile __typeof__(x) *) &(x))
36+
#define READ_ONCE(x) \
37+
({ \
38+
__typeof__(x) ___x = ACCESS_ONCE(x); \
39+
___x; \
40+
})
41+
#define WRITE_ONCE(x, val) \
42+
do { \
43+
ACCESS_ONCE(x) = (val); \
44+
} while (0)
45+
#define barrier() __asm__ __volatile__("" : : : "memory")
46+
#define smp_mb() __asm__ __volatile__("mfence" : : : "memory")
47+
48+
#include <errno.h>
49+
#include <stdio.h>
50+
#include <stdlib.h>
51+
#include <unistd.h>
52+
53+
#ifdef __CHECKER__
54+
#define __rcu __attribute__((noderef, address_space(__rcu)))
55+
#define rcu_check_sparse(p, space) ((void) (((typeof(*p) space *) p) == p))
56+
#define __force __attribute__((force))
57+
#define rcu_uncheck(p) ((__typeof__(*p) __force *) p)
58+
#define rcu_check(p) ((__typeof__(*p) __force __rcu *) p)
59+
#else
60+
#define __rcu
61+
#define rcu_check_sparse(p, space)
62+
#define __force
63+
#define rcu_uncheck(p) p
64+
#define rcu_check(p) p
65+
#endif /* __CHECKER__ */
66+
67+
/* Avoid false sharing */
68+
#define __rcu_aligned __attribute__((aligned(128)))
69+
70+
struct rcu_node {
71+
unsigned int tid;
72+
int rcu_nesting[2];
73+
struct rcu_node *next;
74+
} __rcu_aligned;
75+
76+
struct rcu_data {
77+
unsigned int nr_thread;
78+
struct rcu_node *head;
79+
unsigned int rcu_thread_nesting_idx;
80+
spinlock_t sp;
81+
};
82+
83+
/* Easy to use */
84+
#define __rcu_thread_idx rcu_data.rcu_thread_nesting_idx
85+
#define __rcu_thread_nesting(ptr) \
86+
ptr->rcu_nesting[READ_ONCE(__rcu_thread_idx) & 0x01]
87+
#define rcu_thread_nesting __rcu_thread_nesting(__rcu_per_thread_ptr)
88+
89+
static struct rcu_data rcu_data = {
90+
.nr_thread = 0,
91+
.head = NULL,
92+
.rcu_thread_nesting_idx = 0,
93+
.sp = SPINLOCK_INIT,
94+
};
95+
static __thread struct rcu_node *__rcu_per_thread_ptr;
96+
97+
static inline struct rcu_node *__rcu_node_add(unsigned int tid)
98+
{
99+
struct rcu_node **indirect = &rcu_data.head;
100+
struct rcu_node *node = malloc(sizeof(struct rcu_node));
101+
102+
if (!node) {
103+
fprintf(stderr, "__rcu_node_add: malloc failed\n");
104+
abort();
105+
}
106+
107+
node->tid = tid;
108+
node->rcu_nesting[0] = 0;
109+
node->rcu_nesting[1] = 0;
110+
node->next = NULL;
111+
112+
spin_lock(&rcu_data.sp);
113+
114+
while (*indirect) {
115+
if ((*indirect)->tid == node->tid) {
116+
spin_unlock(&rcu_data.sp);
117+
free(node);
118+
return NULL;
119+
}
120+
indirect = &(*indirect)->next;
121+
}
122+
123+
*indirect = node;
124+
rcu_data.nr_thread++;
125+
126+
spin_unlock(&rcu_data.sp);
127+
128+
smp_mb();
129+
130+
return node;
131+
}
132+
133+
static inline int rcu_init(void)
134+
{
135+
unsigned int tid = current_tid();
136+
137+
__rcu_per_thread_ptr = __rcu_node_add(tid);
138+
139+
return (__rcu_per_thread_ptr == NULL) ? -ENOMEM : 0;
140+
}
141+
142+
static inline void rcu_clean(void)
143+
{
144+
struct rcu_node *node, *tmp;
145+
146+
spin_lock(&rcu_data.sp);
147+
148+
for (node = rcu_data.head; node; node = tmp) {
149+
tmp = node->next;
150+
free(node);
151+
}
152+
153+
rcu_data.head = NULL;
154+
rcu_data.nr_thread = 0;
155+
156+
spin_unlock(&rcu_data.sp);
157+
}
158+
159+
/* The per-thread reference count will only modified by their owner
160+
* thread but will read by other threads. So here we use WRITE_ONCE().
161+
*/
162+
static inline void rcu_read_lock(void)
163+
{
164+
WRITE_ONCE(rcu_thread_nesting, 1);
165+
}
166+
167+
static inline void rcu_read_unlock(void)
168+
{
169+
WRITE_ONCE(rcu_thread_nesting, 0);
170+
}
171+
172+
static inline void synchronize_rcu(void)
173+
{
174+
struct rcu_node *node;
175+
176+
smp_mb();
177+
178+
spin_lock(&rcu_data.sp);
179+
180+
/* When the rcu_thread_nesting is odd, which means that the LSB set 1,
181+
* that thread is in the read-side critical section. Also, we need
182+
* to skip the read side when it is in the new grace period.
183+
*/
184+
for (node = rcu_data.head; node; node = node->next) {
185+
while (READ_ONCE(__rcu_thread_nesting(node)) & 0x1) {
186+
barrier();
187+
}
188+
}
189+
190+
/* Going to next grace period */
191+
__atomic_fetch_add(&__rcu_thread_idx, 1, __ATOMIC_RELEASE);
192+
193+
spin_unlock(&rcu_data.sp);
194+
195+
smp_mb();
196+
}
197+
198+
#define rcu_dereference(p) \
199+
({ \
200+
__typeof__(*p) *__r_d_p = (__typeof__(*p) __force *) READ_ONCE(p); \
201+
rcu_check_sparse(p, __rcu); \
202+
__r_d_p; \
203+
})
204+
205+
#define rcu_assign_pointer(p, v) \
206+
({ \
207+
__typeof__(*p) *__r_a_p = \
208+
(__typeof__(*p) __force *) __atomic_exchange_n( \
209+
&(p), (__typeof__(*(p)) __force __rcu *) v, __ATOMIC_RELEASE); \
210+
rcu_check_sparse(p, __rcu); \
211+
__r_a_p; \
212+
})
213+
214+
#endif /* __RCU_H__ */

0 commit comments

Comments
 (0)