24
24
import java .util .concurrent .ConcurrentHashMap ;
25
25
import java .util .concurrent .ConcurrentMap ;
26
26
import java .util .concurrent .atomic .AtomicBoolean ;
27
+ import java .util .concurrent .locks .Condition ;
27
28
import java .util .concurrent .locks .Lock ;
29
+ import java .util .concurrent .locks .ReentrantLock ;
28
30
import java .util .function .BiPredicate ;
29
31
import java .util .regex .Matcher ;
30
32
import java .util .regex .Pattern ;
@@ -87,6 +89,15 @@ public abstract class AbstractWALProvider implements WALProvider, PeerActionList
87
89
88
90
private final KeyLocker <String > createLock = new KeyLocker <>();
89
91
92
+ // in getWALs we can not throw any exceptions out, so we use lock and condition here as it
93
+ // supports awaitUninterruptibly which will not throw a InterruptedException
94
+ private final Lock numRemoteWALUnderCreationLock = new ReentrantLock ();
95
+ private final Condition noRemoteWALUnderCreationCond =
96
+ numRemoteWALUnderCreationLock .newCondition ();
97
+ // record the number of remote WALs which are under creation. This is very important to not
98
+ // missing a WAL instance in getWALs method. See HBASE-28140 and related issues for more details.
99
+ private int numRemoteWALUnderCreation ;
100
+
90
101
// we need to have this because when getting meta wal, there is no peer info provider yet.
91
102
private SyncReplicationPeerInfoProvider peerInfoProvider = new SyncReplicationPeerInfoProvider () {
92
103
@@ -150,11 +161,26 @@ private WAL getRemoteWAL(RegionInfo region, String peerId, String remoteWALDir)
150
161
WAL wal = createRemoteWAL (region , ReplicationUtils .getRemoteWALFileSystem (conf , remoteWALDir ),
151
162
ReplicationUtils .getPeerRemoteWALDir (remoteWALDir , peerId ), getRemoteWALPrefix (peerId ),
152
163
ReplicationUtils .SYNC_WAL_SUFFIX );
164
+ numRemoteWALUnderCreationLock .lock ();
165
+ try {
166
+ numRemoteWALUnderCreation ++;
167
+ } finally {
168
+ numRemoteWALUnderCreationLock .unlock ();
169
+ }
153
170
initWAL (wal );
154
171
peerId2WAL .put (peerId , Optional .of (wal ));
155
172
return wal ;
156
173
} finally {
157
174
lock .unlock ();
175
+ numRemoteWALUnderCreationLock .lock ();
176
+ try {
177
+ numRemoteWALUnderCreation --;
178
+ if (numRemoteWALUnderCreation == 0 ) {
179
+ noRemoteWALUnderCreationCond .signalAll ();
180
+ }
181
+ } finally {
182
+ numRemoteWALUnderCreationLock .unlock ();
183
+ }
158
184
}
159
185
}
160
186
@@ -179,6 +205,17 @@ public final WAL getWAL(RegionInfo region) throws IOException {
179
205
180
206
@ Override
181
207
public final List <WAL > getWALs () {
208
+ List <WAL > wals = new ArrayList <WAL >();
209
+ numRemoteWALUnderCreationLock .lock ();
210
+ try {
211
+ while (numRemoteWALUnderCreation > 0 ) {
212
+ noRemoteWALUnderCreationCond .awaitUninterruptibly ();
213
+ }
214
+ peerId2WAL .values ().stream ().filter (Optional ::isPresent ).map (Optional ::get )
215
+ .forEach (wals ::add );
216
+ } finally {
217
+ numRemoteWALUnderCreationLock .unlock ();
218
+ }
182
219
return Streams
183
220
.concat (peerId2WAL .values ().stream ().filter (Optional ::isPresent ).map (Optional ::get ),
184
221
getWALs0 ().stream ())
0 commit comments