@@ -79,8 +79,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
79
79
private int storeOffset = 0 ;
80
80
81
81
// Used to indicate that the scanner has closed (see HBASE-1107)
82
- // Do not need to be volatile because it's always accessed via synchronized methods
83
- private boolean closing = false ;
82
+ private volatile boolean closing = false ;
84
83
private final boolean get ;
85
84
private final boolean explicitColumnQuery ;
86
85
private final boolean useRowColBloom ;
@@ -157,6 +156,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
157
156
final List <KeyValueScanner > currentScanners = new ArrayList <>();
158
157
// flush update lock
159
158
private final ReentrantLock flushLock = new ReentrantLock ();
159
+ // lock for closing.
160
+ private final ReentrantLock closeLock = new ReentrantLock ();
160
161
161
162
protected final long readPt ;
162
163
private boolean topChanged = false ;
@@ -473,31 +474,38 @@ public void close() {
473
474
}
474
475
475
476
private void close (boolean withDelayedScannersClose ) {
476
- if (this .closing ) {
477
- return ;
478
- }
479
- if (withDelayedScannersClose ) {
480
- this .closing = true ;
481
- }
482
- // For mob compaction, we do not have a store.
483
- if (this .store != null ) {
484
- this .store .deleteChangedReaderObserver (this );
485
- }
486
- if (withDelayedScannersClose ) {
487
- clearAndClose (scannersForDelayedClose );
488
- clearAndClose (memStoreScannersAfterFlush );
489
- clearAndClose (flushedstoreFileScanners );
490
- if (this .heap != null ) {
491
- this .heap .close ();
492
- this .currentScanners .clear ();
493
- this .heap = null ; // CLOSED!
477
+ closeLock .lock ();
478
+ // If the closeLock is acquired then any subsequent updateReaders()
479
+ // call is ignored.
480
+ try {
481
+ if (this .closing ) {
482
+ return ;
494
483
}
495
- } else {
496
- if (this .heap != null ) {
497
- this .scannersForDelayedClose .add (this .heap );
498
- this .currentScanners .clear ();
499
- this .heap = null ;
484
+ if (withDelayedScannersClose ) {
485
+ this .closing = true ;
486
+ }
487
+ // For mob compaction, we do not have a store.
488
+ if (this .store != null ) {
489
+ this .store .deleteChangedReaderObserver (this );
500
490
}
491
+ if (withDelayedScannersClose ) {
492
+ clearAndClose (scannersForDelayedClose );
493
+ clearAndClose (memStoreScannersAfterFlush );
494
+ clearAndClose (flushedstoreFileScanners );
495
+ if (this .heap != null ) {
496
+ this .heap .close ();
497
+ this .currentScanners .clear ();
498
+ this .heap = null ; // CLOSED!
499
+ }
500
+ } else {
501
+ if (this .heap != null ) {
502
+ this .scannersForDelayedClose .add (this .heap );
503
+ this .currentScanners .clear ();
504
+ this .heap = null ;
505
+ }
506
+ }
507
+ } finally {
508
+ closeLock .unlock ();
501
509
}
502
510
}
503
511
@@ -876,8 +884,25 @@ public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreSc
876
884
if (CollectionUtils .isEmpty (sfs ) && CollectionUtils .isEmpty (memStoreScanners )) {
877
885
return ;
878
886
}
887
+ boolean updateReaders = false ;
879
888
flushLock .lock ();
880
889
try {
890
+ if (!closeLock .tryLock ()) {
891
+ // The reason for doing this is that when the current store scanner does not retrieve
892
+ // any new cells, then the scanner is considered to be done. The heap of this scanner
893
+ // is not closed till the shipped() call is completed. Hence in that case if at all
894
+ // the partial close (close (false)) has been called before updateReaders(), there is no
895
+ // need for the updateReaders() to happen.
896
+ LOG .debug ("StoreScanner already has the close lock. There is no need to updateReaders" );
897
+ // no lock acquired.
898
+ return ;
899
+ }
900
+ // lock acquired
901
+ updateReaders = true ;
902
+ if (this .closing ) {
903
+ LOG .debug ("StoreScanner already closing. There is no need to updateReaders" );
904
+ return ;
905
+ }
881
906
flushed = true ;
882
907
final boolean isCompaction = false ;
883
908
boolean usePread = get || scanUsePread ;
@@ -896,6 +921,9 @@ public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreSc
896
921
}
897
922
} finally {
898
923
flushLock .unlock ();
924
+ if (updateReaders ) {
925
+ closeLock .unlock ();
926
+ }
899
927
}
900
928
// Let the next() call handle re-creating and seeking
901
929
}
0 commit comments