@@ -30,43 +30,29 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
3030 path. as_ref ( ) . encode_wide ( ) . chain ( Some ( 0 ) ) . collect ( )
3131}
3232
33- // The number of read/write/remove/list operations after which we clean up our `locks` HashMap.
34- const GC_LOCK_INTERVAL : usize = 25 ;
35-
3633/// A [`KVStoreSync`] implementation that writes to and reads from the file system.
3734pub struct FilesystemStore {
3835 data_dir : PathBuf ,
3936 tmp_file_counter : AtomicUsize ,
40- gc_counter : AtomicUsize ,
41- locks : Mutex < HashMap < PathBuf , Arc < RwLock < ( ) > > > > ,
37+
38+ // Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
39+ // latest written version per key.
40+ locks : Mutex < HashMap < PathBuf , Arc < RwLock < u64 > > > > ,
4241}
4342
4443impl FilesystemStore {
4544 /// Constructs a new [`FilesystemStore`].
4645 pub fn new ( data_dir : PathBuf ) -> Self {
4746 let locks = Mutex :: new ( HashMap :: new ( ) ) ;
4847 let tmp_file_counter = AtomicUsize :: new ( 0 ) ;
49- let gc_counter = AtomicUsize :: new ( 1 ) ;
50- Self { data_dir, tmp_file_counter, gc_counter, locks }
48+ Self { data_dir, tmp_file_counter, locks }
5149 }
5250
5351 /// Returns the data directory.
5452 pub fn get_data_dir ( & self ) -> PathBuf {
5553 self . data_dir . clone ( )
5654 }
5755
58- fn garbage_collect_locks ( & self ) {
59- let gc_counter = self . gc_counter . fetch_add ( 1 , Ordering :: AcqRel ) ;
60-
61- if gc_counter % GC_LOCK_INTERVAL == 0 {
62- // Take outer lock for the cleanup.
63- let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
64-
65- // Garbage collect all lock entries that are not referenced anymore.
66- outer_lock. retain ( |_, v| Arc :: strong_count ( & v) > 1 ) ;
67- }
68- }
69-
7056 fn get_dest_dir_path (
7157 & self , primary_namespace : & str , secondary_namespace : & str ,
7258 ) -> std:: io:: Result < PathBuf > {
@@ -90,36 +76,12 @@ impl FilesystemStore {
9076
9177 Ok ( dest_dir_path)
9278 }
93- }
94-
95- impl KVStoreSync for FilesystemStore {
96- fn read (
97- & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
98- ) -> lightning:: io:: Result < Vec < u8 > > {
99- check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "read" ) ?;
100-
101- let mut dest_file_path = self . get_dest_dir_path ( primary_namespace, secondary_namespace) ?;
102- dest_file_path. push ( key) ;
103-
104- let mut buf = Vec :: new ( ) ;
105- {
106- let inner_lock_ref = {
107- let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
108- Arc :: clone ( & outer_lock. entry ( dest_file_path. clone ( ) ) . or_default ( ) )
109- } ;
110- let _guard = inner_lock_ref. read ( ) . unwrap ( ) ;
111-
112- let mut f = fs:: File :: open ( dest_file_path) ?;
113- f. read_to_end ( & mut buf) ?;
114- }
115-
116- self . garbage_collect_locks ( ) ;
117-
118- Ok ( buf)
119- }
12079
121- fn write (
80+ /// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
81+ /// returns early without writing.
82+ pub ( crate ) fn write_version (
12283 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : & [ u8 ] ,
84+ version : Option < u64 > ,
12385 ) -> lightning:: io:: Result < ( ) > {
12486 check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "write" ) ?;
12587
@@ -153,7 +115,18 @@ impl KVStoreSync for FilesystemStore {
153115 let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
154116 Arc :: clone ( & outer_lock. entry ( dest_file_path. clone ( ) ) . or_default ( ) )
155117 } ;
156- let _guard = inner_lock_ref. write ( ) . unwrap ( ) ;
118+ let mut last_written_version = inner_lock_ref. write ( ) . unwrap ( ) ;
119+
120+ // If a version is provided, we check if we already have a newer version written. This is used in async
121+ // contexts to realize eventual consistency.
122+ if let Some ( version) = version {
123+ if version <= * last_written_version {
124+ // If the version is not greater, we don't write the file.
125+ return Ok ( ( ) ) ;
126+ }
127+
128+ * last_written_version = version;
129+ }
157130
158131 #[ cfg( not( target_os = "windows" ) ) ]
159132 {
@@ -200,10 +173,39 @@ impl KVStoreSync for FilesystemStore {
200173 }
201174 } ;
202175
203- self . garbage_collect_locks ( ) ;
204-
205176 res
206177 }
178+ }
179+
180+ impl KVStoreSync for FilesystemStore {
181+ fn read (
182+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
183+ ) -> lightning:: io:: Result < Vec < u8 > > {
184+ check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "read" ) ?;
185+
186+ let mut dest_file_path = self . get_dest_dir_path ( primary_namespace, secondary_namespace) ?;
187+ dest_file_path. push ( key) ;
188+
189+ let mut buf = Vec :: new ( ) ;
190+ {
191+ let inner_lock_ref = {
192+ let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
193+ Arc :: clone ( & outer_lock. entry ( dest_file_path. clone ( ) ) . or_default ( ) )
194+ } ;
195+ let _guard = inner_lock_ref. read ( ) . unwrap ( ) ;
196+
197+ let mut f = fs:: File :: open ( dest_file_path) ?;
198+ f. read_to_end ( & mut buf) ?;
199+ }
200+
201+ Ok ( buf)
202+ }
203+
204+ fn write (
205+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : & [ u8 ] ,
206+ ) -> lightning:: io:: Result < ( ) > {
207+ self . write_version ( primary_namespace, secondary_namespace, key, buf, None )
208+ }
207209
208210 fn remove (
209211 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
@@ -295,8 +297,6 @@ impl KVStoreSync for FilesystemStore {
295297 }
296298 }
297299
298- self . garbage_collect_locks ( ) ;
299-
300300 Ok ( ( ) )
301301 }
302302
@@ -325,8 +325,6 @@ impl KVStoreSync for FilesystemStore {
325325 keys. push ( key) ;
326326 }
327327
328- self . garbage_collect_locks ( ) ;
329-
330328 Ok ( keys)
331329 }
332330}
0 commit comments