Skip to content

Add thread-based RCU #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ purpose of these programs is to be illustrative and educational.
- [qsbr](qsbr/): An implementation of Quiescent state based reclamation (QSBR).
- [list-move](list-move/): Evaluation of two concurrent linked lists: QSBR and lock-based.
- [rcu\_queue](rcu_queue/): An efficient concurrent queue based on QSBR.
- [thread-rcu](thread-rcu/): A Linux Kernel style thread-based simple RCU.
* Applications
- [httpd](httpd/): A multi-threaded web server.
- [map-reduce](map-reduce/): word counting using MapReduce.
Expand Down
31 changes: 31 additions & 0 deletions thread-rcu/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
CFLAGS = -Wall
CFLAGS += -g
CFLAGS += -std=c11
CFLAGS += -D'N_READERS=100'
CFLAGS += -D'N_UPDATE_RUN=5'
CFLAGS += -fsanitize=thread
LDFLAGS += -lpthread

# The pthread mutex initializer will warning:
# thrd_rcu.h:95:42: warning: Using plain integer as NULL pointer
# We can ignore it.
SPARSE_FLAGS = -Wno-non-pointer-null

main: main.c rcu.h
$(CC) -o $@ $< $(CFLAGS) $(LDFLAGS)

clang: CC=clang
clang: main

all: main

# Semantic Checker
# https://www.kernel.org/doc/html/latest/dev-tools/sparse.html
sparse:
sparse main.c $(CFLAGS) $(SPARSE_FLAGS)

indent:
clang-format -i *.[ch]

clean:
rm -f main
4 changes: 4 additions & 0 deletions thread-rcu/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Thread-Local Storage Based Read-Copy Update

The Linux Kernel style of Read-Copy Update.
It uses thread-local storage to optimize the read-side lock overhead.
147 changes: 147 additions & 0 deletions thread-rcu/main.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#include <pthread.h>
#include <stdatomic.h>

struct barrier_struct {
atomic_int flag;
int count;
pthread_mutex_t lock;
};

static __thread int local_sense = 0;

#define BARRIER_INIT \
{ \
.flag = 0, .count = 0, .lock = PTHREAD_MUTEX_INITIALIZER \
}

#define DEFINE_BARRIER(name) struct barrier_struct name = BARRIER_INIT

static inline void thread_barrier(struct barrier_struct *b, size_t n)
{
local_sense = !local_sense;

pthread_mutex_lock(&b->lock);
b->count++;
if (b->count == n) {
pthread_mutex_unlock(&b->lock);
atomic_store_explicit(&b->flag, local_sense, memory_order_release);
} else {
pthread_mutex_unlock(&b->lock);
while (atomic_load_explicit(&b->flag, memory_order_acquire) !=
local_sense)
;
}
}

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>

#include "rcu.h"

#define GP_IDX_MAX N_UPDATE_RUN + 1

static DEFINE_BARRIER(test_barrier);

struct test {
unsigned int count;
};
static struct test __rcu *dut;
static atomic_uint gp_idx;
static atomic_uint prev_count;
static atomic_uint grace_periods[GP_IDX_MAX];

static void *reader_func(void *argv)
{
struct test *tmp;
unsigned int old_prev_count;

if (rcu_init())
abort();

thread_barrier(&test_barrier, N_READERS + 1);

rcu_read_lock();

tmp = rcu_dereference(dut);

if (tmp->count != atomic_load_explicit(&prev_count, memory_order_acquire)) {
old_prev_count = atomic_exchange_explicit(&prev_count, tmp->count,
memory_order_release);
if (tmp->count != old_prev_count)
atomic_fetch_add_explicit(&gp_idx, 1, memory_order_release);
if (atomic_load_explicit(&gp_idx, memory_order_acquire) >
N_UPDATE_RUN + 1) {
fprintf(stderr, "grace period index (%u) is over bound (%u).\n",
atomic_load_explicit(&gp_idx, memory_order_acquire),
N_UPDATE_RUN);
abort();
}
}

atomic_fetch_add_explicit(
&grace_periods[atomic_load_explicit(&gp_idx, memory_order_acquire)], 1,
memory_order_relaxed);

rcu_read_unlock();

pthread_exit(NULL);
}

static void *updater_func(void *argv)
{
struct test *oldp;
struct test *newval;
unsigned int i = 0;

thread_barrier(&test_barrier, N_READERS + 1);
atomic_thread_fence(memory_order_seq_cst);

while (i++ < N_UPDATE_RUN) {
newval = malloc(sizeof(struct test));
newval->count = i;
oldp = rcu_assign_pointer(dut, newval);
synchronize_rcu();
free(oldp);
}

pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
pthread_t reader[N_READERS];
pthread_t updater;
unsigned int i, total = 0;

dut = (struct test __rcu *) malloc(sizeof(struct test));
rcu_uncheck(dut)->count = 0;

for (i = 0; i < N_READERS; i++)
pthread_create(&reader[i], NULL, reader_func, NULL);
pthread_create(&updater, NULL, updater_func, NULL);

for (i = 0; i < N_READERS; i++)
pthread_join(reader[i], NULL);
pthread_join(updater, NULL);

free(rcu_uncheck(dut));
rcu_clean();

atomic_thread_fence(memory_order_seq_cst);

printf("%u reader(s), %u update run(s), %u grace period(s)\n", N_READERS,
N_UPDATE_RUN, gp_idx + 1);
for (i = 0; i < gp_idx + 1; i++) {
printf("[grace period #%u] %4u reader(s)\n", i, grace_periods[i]);
total += grace_periods[i];
}

if (total != N_READERS)
fprintf(stderr,
"The Sum of records in the array of grace period(s) (%u) is "
"not the same with number of reader(s) (%u)\n",
total, N_READERS);

return 0;
}
Loading