Skip to content

Commit 2992df7

Browse files
lorddoskiaskdave
authored andcommitted
btrfs: Implement DREW lock
A (D)ouble (R)eader (W)riter (E)xclustion lock is a locking primitive that allows to have multiple readers or multiple writers but not multiple readers and writers holding it concurrently. The code is factored out from the existing open-coded locking scheme used to exclude pending snapshots from nocow writers and vice-versa. Current implementation actually favors Readers (that is snapshot creaters) to writers (nocow writers of the filesystem). The API provides lock/unlock/trylock for reads and writes. Formal specification for TLA+ provided by Valentin Schneider is at https://lore.kernel.org/linux-btrfs/2dcaf81c-f0d3-409e-cb29-733d8b3b4cc9@arm.com/ Signed-off-by: Nikolay Borisov <nborisov@suse.com> Reviewed-by: David Sterba <dsterba@suse.com> Signed-off-by: David Sterba <dsterba@suse.com>
1 parent fd8efa8 commit 2992df7

File tree

3 files changed

+114
-0
lines changed

3 files changed

+114
-0
lines changed

fs/btrfs/ctree.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "extent_map.h"
3434
#include "async-thread.h"
3535
#include "block-rsv.h"
36+
#include "locking.h"
3637

3738
struct btrfs_trans_handle;
3839
struct btrfs_transaction;

fs/btrfs/locking.c

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,3 +565,96 @@ struct extent_buffer *btrfs_read_lock_root_node(struct btrfs_root *root)
565565
}
566566
return eb;
567567
}
568+
569+
/*
570+
* DREW locks
571+
* ==========
572+
*
573+
* DREW stands for double-reader-writer-exclusion lock. It's used in situation
574+
* where you want to provide A-B exclusion but not AA or BB.
575+
*
576+
* Currently implementation gives more priority to reader. If a reader and a
577+
* writer both race to acquire their respective sides of the lock the writer
578+
* would yield its lock as soon as it detects a concurrent reader. Additionally
579+
* if there are pending readers no new writers would be allowed to come in and
580+
* acquire the lock.
581+
*/
582+
583+
int btrfs_drew_lock_init(struct btrfs_drew_lock *lock)
584+
{
585+
int ret;
586+
587+
ret = percpu_counter_init(&lock->writers, 0, GFP_KERNEL);
588+
if (ret)
589+
return ret;
590+
591+
atomic_set(&lock->readers, 0);
592+
init_waitqueue_head(&lock->pending_readers);
593+
init_waitqueue_head(&lock->pending_writers);
594+
595+
return 0;
596+
}
597+
598+
void btrfs_drew_lock_destroy(struct btrfs_drew_lock *lock)
599+
{
600+
percpu_counter_destroy(&lock->writers);
601+
}
602+
603+
/* Return true if acquisition is successful, false otherwise */
604+
bool btrfs_drew_try_write_lock(struct btrfs_drew_lock *lock)
605+
{
606+
if (atomic_read(&lock->readers))
607+
return false;
608+
609+
percpu_counter_inc(&lock->writers);
610+
611+
/* Ensure writers count is updated before we check for pending readers */
612+
smp_mb();
613+
if (atomic_read(&lock->readers)) {
614+
btrfs_drew_write_unlock(lock);
615+
return false;
616+
}
617+
618+
return true;
619+
}
620+
621+
void btrfs_drew_write_lock(struct btrfs_drew_lock *lock)
622+
{
623+
while (true) {
624+
if (btrfs_drew_try_write_lock(lock))
625+
return;
626+
wait_event(lock->pending_writers, !atomic_read(&lock->readers));
627+
}
628+
}
629+
630+
void btrfs_drew_write_unlock(struct btrfs_drew_lock *lock)
631+
{
632+
percpu_counter_dec(&lock->writers);
633+
cond_wake_up(&lock->pending_readers);
634+
}
635+
636+
void btrfs_drew_read_lock(struct btrfs_drew_lock *lock)
637+
{
638+
atomic_inc(&lock->readers);
639+
640+
/*
641+
* Ensure the pending reader count is perceieved BEFORE this reader
642+
* goes to sleep in case of active writers. This guarantees new writers
643+
* won't be allowed and that the current reader will be woken up when
644+
* the last active writer finishes its jobs.
645+
*/
646+
smp_mb__after_atomic();
647+
648+
wait_event(lock->pending_readers,
649+
percpu_counter_sum(&lock->writers) == 0);
650+
}
651+
652+
void btrfs_drew_read_unlock(struct btrfs_drew_lock *lock)
653+
{
654+
/*
655+
* atomic_dec_and_test implies a full barrier, so woken up writers
656+
* are guaranteed to see the decrement
657+
*/
658+
if (atomic_dec_and_test(&lock->readers))
659+
wake_up(&lock->pending_writers);
660+
}

fs/btrfs/locking.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,18 @@
66
#ifndef BTRFS_LOCKING_H
77
#define BTRFS_LOCKING_H
88

9+
#include <linux/atomic.h>
10+
#include <linux/wait.h>
11+
#include <linux/percpu_counter.h>
912
#include "extent_io.h"
1013

1114
#define BTRFS_WRITE_LOCK 1
1215
#define BTRFS_READ_LOCK 2
1316
#define BTRFS_WRITE_LOCK_BLOCKING 3
1417
#define BTRFS_READ_LOCK_BLOCKING 4
1518

19+
struct btrfs_path;
20+
1621
void btrfs_tree_lock(struct extent_buffer *eb);
1722
void btrfs_tree_unlock(struct extent_buffer *eb);
1823

@@ -48,4 +53,19 @@ static inline void btrfs_tree_unlock_rw(struct extent_buffer *eb, int rw)
4853
BUG();
4954
}
5055

56+
struct btrfs_drew_lock {
57+
atomic_t readers;
58+
struct percpu_counter writers;
59+
wait_queue_head_t pending_writers;
60+
wait_queue_head_t pending_readers;
61+
};
62+
63+
int btrfs_drew_lock_init(struct btrfs_drew_lock *lock);
64+
void btrfs_drew_lock_destroy(struct btrfs_drew_lock *lock);
65+
void btrfs_drew_write_lock(struct btrfs_drew_lock *lock);
66+
bool btrfs_drew_try_write_lock(struct btrfs_drew_lock *lock);
67+
void btrfs_drew_write_unlock(struct btrfs_drew_lock *lock);
68+
void btrfs_drew_read_lock(struct btrfs_drew_lock *lock);
69+
void btrfs_drew_read_unlock(struct btrfs_drew_lock *lock);
70+
5171
#endif

0 commit comments

Comments
 (0)