Skip to content
This repository was archived by the owner on Nov 21, 2022. It is now read-only.

Commit 994f706

Browse files
committed
srcu: Make Tree SRCU able to operate without snp_node array
This commit makes Tree SRCU able to operate without an snp_node array, that is, when the srcu_data structures' ->mynode pointers are NULL. This can result in high contention on the srcu_struct structure's ->lock, but only when there are lots of call_srcu(), synchronize_srcu(), and synchronize_srcu_expedited() calls. Note that when there is no snp_node array, all SRCU callbacks use CPU 0's callback queue. This is optimal in the common case of low update-side load because it removes the need to search each CPU for the single callback that made the grace period happen. Co-developed-by: Neeraj Upadhyay <quic_neeraju@quicinc.com> Signed-off-by: Neeraj Upadhyay <quic_neeraju@quicinc.com> Signed-off-by: Paul E. McKenney <paulmck@kernel.org>
1 parent 7b9e9b5 commit 994f706

File tree

2 files changed

+124
-93
lines changed

2 files changed

+124
-93
lines changed

include/linux/srcutree.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ struct srcu_struct {
6363
struct srcu_node node[NUM_RCU_NODES]; /* Combining tree. */
6464
struct srcu_node *level[RCU_NUM_LVLS + 1];
6565
/* First node at each level. */
66+
int srcu_size_state; /* Small-to-big transition state. */
6667
struct mutex srcu_cb_mutex; /* Serialize CB preparation. */
67-
spinlock_t __private lock; /* Protect counters */
68+
spinlock_t __private lock; /* Protect counters and size state. */
6869
struct mutex srcu_gp_mutex; /* Serialize GP work. */
6970
unsigned int srcu_idx; /* Current rdr array element. */
7071
unsigned long srcu_gp_seq; /* Grace-period seq #. */
@@ -83,6 +84,17 @@ struct srcu_struct {
8384
struct lockdep_map dep_map;
8485
};
8586

87+
/* Values for size state variable (->srcu_size_state). */
88+
#define SRCU_SIZE_SMALL 0
89+
#define SRCU_SIZE_ALLOC 1
90+
#define SRCU_SIZE_WAIT_BARRIER 2
91+
#define SRCU_SIZE_WAIT_CALL 3
92+
#define SRCU_SIZE_WAIT_CBS1 4
93+
#define SRCU_SIZE_WAIT_CBS2 5
94+
#define SRCU_SIZE_WAIT_CBS3 6
95+
#define SRCU_SIZE_WAIT_CBS4 7
96+
#define SRCU_SIZE_BIG 8
97+
8698
/* Values for state variable (bottom bits of ->srcu_gp_seq). */
8799
#define SRCU_STATE_IDLE 0
88100
#define SRCU_STATE_SCAN1 1

kernel/rcu/srcutree.c

Lines changed: 111 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -152,16 +152,17 @@ static void init_srcu_struct_nodes(struct srcu_struct *ssp)
152152
sdp->ssp = ssp;
153153
sdp->grpmask = 1 << (cpu - sdp->mynode->grplo);
154154
}
155+
smp_store_release(&ssp->srcu_size_state, SRCU_SIZE_WAIT_BARRIER);
155156
}
156157

157158
/*
158159
* Initialize non-compile-time initialized fields, including the
159-
* associated srcu_node and srcu_data structures. The is_static
160-
* parameter is passed through to init_srcu_struct_nodes(), and
161-
* also tells us that ->sda has already been wired up to srcu_data.
160+
* associated srcu_node and srcu_data structures. The is_static parameter
161+
* tells us that ->sda has already been wired up to srcu_data.
162162
*/
163163
static int init_srcu_struct_fields(struct srcu_struct *ssp, bool is_static)
164164
{
165+
ssp->srcu_size_state = SRCU_SIZE_SMALL;
165166
mutex_init(&ssp->srcu_cb_mutex);
166167
mutex_init(&ssp->srcu_gp_mutex);
167168
ssp->srcu_idx = 0;
@@ -175,6 +176,7 @@ static int init_srcu_struct_fields(struct srcu_struct *ssp, bool is_static)
175176
if (!ssp->sda)
176177
return -ENOMEM;
177178
init_srcu_struct_nodes(ssp);
179+
ssp->srcu_size_state = SRCU_SIZE_BIG;
178180
ssp->srcu_gp_seq_needed_exp = 0;
179181
ssp->srcu_last_gp_end = ktime_get_mono_fast_ns();
180182
smp_store_release(&ssp->srcu_gp_seq_needed, 0); /* Init done. */
@@ -391,6 +393,7 @@ void cleanup_srcu_struct(struct srcu_struct *ssp)
391393
}
392394
free_percpu(ssp->sda);
393395
ssp->sda = NULL;
396+
ssp->srcu_size_state = SRCU_SIZE_SMALL;
394397
}
395398
EXPORT_SYMBOL_GPL(cleanup_srcu_struct);
396399

@@ -439,6 +442,10 @@ static void srcu_gp_start(struct srcu_struct *ssp)
439442
struct srcu_data *sdp = this_cpu_ptr(ssp->sda);
440443
int state;
441444

445+
if (smp_load_acquire(&ssp->srcu_size_state) < SRCU_SIZE_WAIT_BARRIER)
446+
sdp = per_cpu_ptr(ssp->sda, 0);
447+
else
448+
sdp = this_cpu_ptr(ssp->sda);
442449
lockdep_assert_held(&ACCESS_PRIVATE(ssp, lock));
443450
WARN_ON_ONCE(ULONG_CMP_GE(ssp->srcu_gp_seq, ssp->srcu_gp_seq_needed));
444451
spin_lock_rcu_node(sdp); /* Interrupts already disabled. */
@@ -539,38 +546,40 @@ static void srcu_gp_end(struct srcu_struct *ssp)
539546
/* A new grace period can start at this point. But only one. */
540547

541548
/* Initiate callback invocation as needed. */
542-
idx = rcu_seq_ctr(gpseq) % ARRAY_SIZE(snp->srcu_have_cbs);
543-
srcu_for_each_node_breadth_first(ssp, snp) {
544-
spin_lock_irq_rcu_node(snp);
545-
cbs = false;
546-
last_lvl = snp >= ssp->level[rcu_num_lvls - 1];
547-
if (last_lvl)
548-
cbs = snp->srcu_have_cbs[idx] == gpseq;
549-
snp->srcu_have_cbs[idx] = gpseq;
550-
rcu_seq_set_state(&snp->srcu_have_cbs[idx], 1);
551-
if (ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, gpseq))
552-
WRITE_ONCE(snp->srcu_gp_seq_needed_exp, gpseq);
553-
mask = snp->srcu_data_have_cbs[idx];
554-
snp->srcu_data_have_cbs[idx] = 0;
555-
spin_unlock_irq_rcu_node(snp);
556-
if (cbs)
557-
srcu_schedule_cbs_snp(ssp, snp, mask, cbdelay);
558-
559-
/* Occasionally prevent srcu_data counter wrap. */
560-
if (!(gpseq & counter_wrap_check) && last_lvl)
561-
for (cpu = snp->grplo; cpu <= snp->grphi; cpu++) {
562-
sdp = per_cpu_ptr(ssp->sda, cpu);
563-
spin_lock_irqsave_rcu_node(sdp, flags);
564-
if (ULONG_CMP_GE(gpseq,
565-
sdp->srcu_gp_seq_needed + 100))
566-
sdp->srcu_gp_seq_needed = gpseq;
567-
if (ULONG_CMP_GE(gpseq,
568-
sdp->srcu_gp_seq_needed_exp + 100))
569-
sdp->srcu_gp_seq_needed_exp = gpseq;
570-
spin_unlock_irqrestore_rcu_node(sdp, flags);
571-
}
549+
if (smp_load_acquire(&ssp->srcu_size_state) < SRCU_SIZE_WAIT_BARRIER) {
550+
srcu_schedule_cbs_sdp(per_cpu_ptr(ssp->sda, 0), cbdelay);
551+
} else {
552+
idx = rcu_seq_ctr(gpseq) % ARRAY_SIZE(snp->srcu_have_cbs);
553+
srcu_for_each_node_breadth_first(ssp, snp) {
554+
spin_lock_irq_rcu_node(snp);
555+
cbs = false;
556+
last_lvl = snp >= ssp->level[rcu_num_lvls - 1];
557+
if (last_lvl)
558+
cbs = snp->srcu_have_cbs[idx] == gpseq;
559+
snp->srcu_have_cbs[idx] = gpseq;
560+
rcu_seq_set_state(&snp->srcu_have_cbs[idx], 1);
561+
if (ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, gpseq))
562+
WRITE_ONCE(snp->srcu_gp_seq_needed_exp, gpseq);
563+
mask = snp->srcu_data_have_cbs[idx];
564+
snp->srcu_data_have_cbs[idx] = 0;
565+
spin_unlock_irq_rcu_node(snp);
566+
if (cbs)
567+
srcu_schedule_cbs_snp(ssp, snp, mask, cbdelay);
568+
}
572569
}
573570

571+
/* Occasionally prevent srcu_data counter wrap. */
572+
if (!(gpseq & counter_wrap_check))
573+
for_each_possible_cpu(cpu) {
574+
sdp = per_cpu_ptr(ssp->sda, cpu);
575+
spin_lock_irqsave_rcu_node(sdp, flags);
576+
if (ULONG_CMP_GE(gpseq, sdp->srcu_gp_seq_needed + 100))
577+
sdp->srcu_gp_seq_needed = gpseq;
578+
if (ULONG_CMP_GE(gpseq, sdp->srcu_gp_seq_needed_exp + 100))
579+
sdp->srcu_gp_seq_needed_exp = gpseq;
580+
spin_unlock_irqrestore_rcu_node(sdp, flags);
581+
}
582+
574583
/* Callback initiation done, allow grace periods after next. */
575584
mutex_unlock(&ssp->srcu_cb_mutex);
576585

@@ -599,18 +608,19 @@ static void srcu_funnel_exp_start(struct srcu_struct *ssp, struct srcu_node *snp
599608
{
600609
unsigned long flags;
601610

602-
for (; snp != NULL; snp = snp->srcu_parent) {
603-
if (rcu_seq_done(&ssp->srcu_gp_seq, s) ||
604-
ULONG_CMP_GE(READ_ONCE(snp->srcu_gp_seq_needed_exp), s))
605-
return;
606-
spin_lock_irqsave_rcu_node(snp, flags);
607-
if (ULONG_CMP_GE(snp->srcu_gp_seq_needed_exp, s)) {
611+
if (snp)
612+
for (; snp != NULL; snp = snp->srcu_parent) {
613+
if (rcu_seq_done(&ssp->srcu_gp_seq, s) ||
614+
ULONG_CMP_GE(READ_ONCE(snp->srcu_gp_seq_needed_exp), s))
615+
return;
616+
spin_lock_irqsave_rcu_node(snp, flags);
617+
if (ULONG_CMP_GE(snp->srcu_gp_seq_needed_exp, s)) {
618+
spin_unlock_irqrestore_rcu_node(snp, flags);
619+
return;
620+
}
621+
WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s);
608622
spin_unlock_irqrestore_rcu_node(snp, flags);
609-
return;
610623
}
611-
WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s);
612-
spin_unlock_irqrestore_rcu_node(snp, flags);
613-
}
614624
spin_lock_irqsave_rcu_node(ssp, flags);
615625
if (ULONG_CMP_LT(ssp->srcu_gp_seq_needed_exp, s))
616626
WRITE_ONCE(ssp->srcu_gp_seq_needed_exp, s);
@@ -633,36 +643,37 @@ static void srcu_funnel_gp_start(struct srcu_struct *ssp, struct srcu_data *sdp,
633643
unsigned long flags;
634644
int idx = rcu_seq_ctr(s) % ARRAY_SIZE(sdp->mynode->srcu_have_cbs);
635645
struct srcu_node *snp;
636-
struct srcu_node *snp_leaf = sdp->mynode;
646+
struct srcu_node *snp_leaf = smp_load_acquire(&sdp->mynode);
637647
unsigned long snp_seq;
638648

639-
/* Each pass through the loop does one level of the srcu_node tree. */
640-
for (snp = snp_leaf; snp != NULL; snp = snp->srcu_parent) {
641-
if (rcu_seq_done(&ssp->srcu_gp_seq, s) && snp != snp_leaf)
642-
return; /* GP already done and CBs recorded. */
643-
spin_lock_irqsave_rcu_node(snp, flags);
644-
if (ULONG_CMP_GE(snp->srcu_have_cbs[idx], s)) {
645-
snp_seq = snp->srcu_have_cbs[idx];
646-
if (snp == snp_leaf && snp_seq == s)
647-
snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
648-
spin_unlock_irqrestore_rcu_node(snp, flags);
649-
if (snp == snp_leaf && snp_seq != s) {
650-
srcu_schedule_cbs_sdp(sdp, do_norm
651-
? SRCU_INTERVAL
652-
: 0);
649+
if (snp_leaf)
650+
/* Each pass through the loop does one level of the srcu_node tree. */
651+
for (snp = snp_leaf; snp != NULL; snp = snp->srcu_parent) {
652+
if (rcu_seq_done(&ssp->srcu_gp_seq, s) && snp != snp_leaf)
653+
return; /* GP already done and CBs recorded. */
654+
spin_lock_irqsave_rcu_node(snp, flags);
655+
if (ULONG_CMP_GE(snp->srcu_have_cbs[idx], s)) {
656+
snp_seq = snp->srcu_have_cbs[idx];
657+
if (snp == snp_leaf && snp_seq == s)
658+
snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
659+
spin_unlock_irqrestore_rcu_node(snp, flags);
660+
if (snp == snp_leaf && snp_seq != s) {
661+
srcu_schedule_cbs_sdp(sdp, do_norm
662+
? SRCU_INTERVAL
663+
: 0);
664+
return;
665+
}
666+
if (!do_norm)
667+
srcu_funnel_exp_start(ssp, snp, s);
653668
return;
654669
}
655-
if (!do_norm)
656-
srcu_funnel_exp_start(ssp, snp, s);
657-
return;
670+
snp->srcu_have_cbs[idx] = s;
671+
if (snp == snp_leaf)
672+
snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
673+
if (!do_norm && ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, s))
674+
WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s);
675+
spin_unlock_irqrestore_rcu_node(snp, flags);
658676
}
659-
snp->srcu_have_cbs[idx] = s;
660-
if (snp == snp_leaf)
661-
snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
662-
if (!do_norm && ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, s))
663-
WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s);
664-
spin_unlock_irqrestore_rcu_node(snp, flags);
665-
}
666677

667678
/* Top of tree, must ensure the grace period will be started. */
668679
spin_lock_irqsave_rcu_node(ssp, flags);
@@ -820,7 +831,10 @@ static unsigned long srcu_gp_start_if_needed(struct srcu_struct *ssp,
820831

821832
check_init_srcu_struct(ssp);
822833
idx = srcu_read_lock(ssp);
823-
sdp = raw_cpu_ptr(ssp->sda);
834+
if (smp_load_acquire(&ssp->srcu_size_state) < SRCU_SIZE_WAIT_CALL)
835+
sdp = per_cpu_ptr(ssp->sda, 0);
836+
else
837+
sdp = raw_cpu_ptr(ssp->sda);
824838
spin_lock_irqsave_rcu_node(sdp, flags);
825839
if (rhp)
826840
rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp);
@@ -840,7 +854,7 @@ static unsigned long srcu_gp_start_if_needed(struct srcu_struct *ssp,
840854
if (needgp)
841855
srcu_funnel_gp_start(ssp, sdp, s, do_norm);
842856
else if (needexp)
843-
srcu_funnel_exp_start(ssp, sdp->mynode, s);
857+
srcu_funnel_exp_start(ssp, smp_load_acquire(&sdp->mynode), s);
844858
srcu_read_unlock(ssp, idx);
845859
return s;
846860
}
@@ -1100,14 +1114,35 @@ static void srcu_barrier_cb(struct rcu_head *rhp)
11001114
complete(&ssp->srcu_barrier_completion);
11011115
}
11021116

1117+
/*
1118+
* Enqueue an srcu_barrier() callback on the specified srcu_data
1119+
* structure's ->cblist. but only if that ->cblist already has at least one
1120+
* callback enqueued. Note that if a CPU already has callbacks enqueue,
1121+
* it must have already registered the need for a future grace period,
1122+
* so all we need do is enqueue a callback that will use the same grace
1123+
* period as the last callback already in the queue.
1124+
*/
1125+
static void srcu_barrier_one_cpu(struct srcu_struct *ssp, struct srcu_data *sdp)
1126+
{
1127+
spin_lock_irq_rcu_node(sdp);
1128+
atomic_inc(&ssp->srcu_barrier_cpu_cnt);
1129+
sdp->srcu_barrier_head.func = srcu_barrier_cb;
1130+
debug_rcu_head_queue(&sdp->srcu_barrier_head);
1131+
if (!rcu_segcblist_entrain(&sdp->srcu_cblist,
1132+
&sdp->srcu_barrier_head)) {
1133+
debug_rcu_head_unqueue(&sdp->srcu_barrier_head);
1134+
atomic_dec(&ssp->srcu_barrier_cpu_cnt);
1135+
}
1136+
spin_unlock_irq_rcu_node(sdp);
1137+
}
1138+
11031139
/**
11041140
* srcu_barrier - Wait until all in-flight call_srcu() callbacks complete.
11051141
* @ssp: srcu_struct on which to wait for in-flight callbacks.
11061142
*/
11071143
void srcu_barrier(struct srcu_struct *ssp)
11081144
{
11091145
int cpu;
1110-
struct srcu_data *sdp;
11111146
unsigned long s = rcu_seq_snap(&ssp->srcu_barrier_seq);
11121147

11131148
check_init_srcu_struct(ssp);
@@ -1123,27 +1158,11 @@ void srcu_barrier(struct srcu_struct *ssp)
11231158
/* Initial count prevents reaching zero until all CBs are posted. */
11241159
atomic_set(&ssp->srcu_barrier_cpu_cnt, 1);
11251160

1126-
/*
1127-
* Each pass through this loop enqueues a callback, but only
1128-
* on CPUs already having callbacks enqueued. Note that if
1129-
* a CPU already has callbacks enqueue, it must have already
1130-
* registered the need for a future grace period, so all we
1131-
* need do is enqueue a callback that will use the same
1132-
* grace period as the last callback already in the queue.
1133-
*/
1134-
for_each_possible_cpu(cpu) {
1135-
sdp = per_cpu_ptr(ssp->sda, cpu);
1136-
spin_lock_irq_rcu_node(sdp);
1137-
atomic_inc(&ssp->srcu_barrier_cpu_cnt);
1138-
sdp->srcu_barrier_head.func = srcu_barrier_cb;
1139-
debug_rcu_head_queue(&sdp->srcu_barrier_head);
1140-
if (!rcu_segcblist_entrain(&sdp->srcu_cblist,
1141-
&sdp->srcu_barrier_head)) {
1142-
debug_rcu_head_unqueue(&sdp->srcu_barrier_head);
1143-
atomic_dec(&ssp->srcu_barrier_cpu_cnt);
1144-
}
1145-
spin_unlock_irq_rcu_node(sdp);
1146-
}
1161+
if (smp_load_acquire(&ssp->srcu_size_state) < SRCU_SIZE_WAIT_BARRIER)
1162+
srcu_barrier_one_cpu(ssp, per_cpu_ptr(ssp->sda, 0));
1163+
else
1164+
for_each_possible_cpu(cpu)
1165+
srcu_barrier_one_cpu(ssp, per_cpu_ptr(ssp->sda, cpu));
11471166

11481167
/* Remove the initial count, at which point reaching zero can happen. */
11491168
if (atomic_dec_and_test(&ssp->srcu_barrier_cpu_cnt))

0 commit comments

Comments
 (0)