Skip to content

Commit 1ea4473

Browse files
authored
HBASE-27458 Use ReadWriteLock for region scanner readpoint map (#5068)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent 3fab7b3 commit 1ea4473

File tree

2 files changed

+103
-9
lines changed

2 files changed

+103
-9
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ public void setRestoredRegion(boolean restoredRegion) {
396396
final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
397397

398398
final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
399+
final ReadPointCalculationLock smallestReadPointCalcLock;
399400

400401
/**
401402
* The sequence ID that was enLongAddered when this region was opened.
@@ -434,19 +435,18 @@ public void setRestoredRegion(boolean restoredRegion) {
434435
* this readPoint, are included in every read operation.
435436
*/
436437
public long getSmallestReadPoint() {
437-
long minimumReadPoint;
438438
// We need to ensure that while we are calculating the smallestReadPoint
439439
// no new RegionScanners can grab a readPoint that we are unaware of.
440-
// We achieve this by synchronizing on the scannerReadPoints object.
441-
synchronized (scannerReadPoints) {
442-
minimumReadPoint = mvcc.getReadPoint();
440+
smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.CALCULATION_LOCK);
441+
try {
442+
long minimumReadPoint = mvcc.getReadPoint();
443443
for (Long readPoint : this.scannerReadPoints.values()) {
444-
if (readPoint < minimumReadPoint) {
445-
minimumReadPoint = readPoint;
446-
}
444+
minimumReadPoint = Math.min(minimumReadPoint, readPoint);
447445
}
446+
return minimumReadPoint;
447+
} finally {
448+
smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.CALCULATION_LOCK);
448449
}
449-
return minimumReadPoint;
450450
}
451451

452452
/*
@@ -811,6 +811,7 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co
811811

812812
setHTableSpecificConf();
813813
this.scannerReadPoints = new ConcurrentHashMap<>();
814+
this.smallestReadPointCalcLock = new ReadPointCalculationLock(conf);
814815

815816
this.busyWaitDuration = conf.getLong("hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
816817
this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
@@ -6990,7 +6991,8 @@ public RegionInfo getRegionInfo() {
69906991
// getSmallestReadPoint, before scannerReadPoints is updated.
69916992
IsolationLevel isolationLevel = scan.getIsolationLevel();
69926993
long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
6993-
synchronized (scannerReadPoints) {
6994+
region.smallestReadPointCalcLock.lock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
6995+
try {
69946996
if (mvccReadPoint > 0) {
69956997
this.readPt = mvccReadPoint;
69966998
} else if (
@@ -7001,6 +7003,8 @@ public RegionInfo getRegionInfo() {
70017003
this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
70027004
}
70037005
scannerReadPoints.put(this, this.readPt);
7006+
} finally {
7007+
region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
70047008
}
70057009
initializeScanners(scan, additionalScanners);
70067010
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver;
19+
20+
import java.util.concurrent.locks.Lock;
21+
import java.util.concurrent.locks.ReadWriteLock;
22+
import java.util.concurrent.locks.ReentrantLock;
23+
import java.util.concurrent.locks.ReentrantReadWriteLock;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.yetus.audience.InterfaceAudience;
26+
27+
/**
28+
* Lock to manage concurrency between {@link RegionScanner} and
29+
* {@link HRegion#getSmallestReadPoint()}. We need to ensure that while we are calculating the
30+
* smallest read point, no new scanners can modify the scannerReadPoints Map. We used to achieve
31+
* this by synchronizing on the scannerReadPoints object. But this may block the read thread and
32+
* reduce the read performance. Since the scannerReadPoints object is a
33+
* {@link java.util.concurrent.ConcurrentHashMap}, which is thread-safe, so the
34+
* {@link RegionScanner} can record their read points concurrently, what it needs to do is just
35+
* acquiring a shared lock. When we calculate the smallest read point, we need to acquire an
36+
* exclusive lock. This can improve read performance in most scenarios, only not when we have a lot
37+
* of delta operations, like {@link org.apache.hadoop.hbase.client.Append} or
38+
* {@link org.apache.hadoop.hbase.client.Increment}. So we introduce a flag to enable/disable this
39+
* feature.
40+
*/
41+
@InterfaceAudience.Private
42+
public class ReadPointCalculationLock {
43+
44+
public enum LockType {
45+
CALCULATION_LOCK,
46+
RECORDING_LOCK
47+
}
48+
49+
private final boolean useReadWriteLockForReadPoints;
50+
private Lock lock;
51+
private ReadWriteLock readWriteLock;
52+
53+
ReadPointCalculationLock(Configuration conf) {
54+
this.useReadWriteLockForReadPoints =
55+
conf.getBoolean("hbase.region.readpoints.read.write.lock.enable", false);
56+
if (useReadWriteLockForReadPoints) {
57+
readWriteLock = new ReentrantReadWriteLock();
58+
} else {
59+
lock = new ReentrantLock();
60+
}
61+
}
62+
63+
void lock(LockType lockType) {
64+
if (useReadWriteLockForReadPoints) {
65+
assert lock == null;
66+
if (lockType == LockType.CALCULATION_LOCK) {
67+
readWriteLock.writeLock().lock();
68+
} else {
69+
readWriteLock.readLock().lock();
70+
}
71+
} else {
72+
assert readWriteLock == null;
73+
lock.lock();
74+
}
75+
}
76+
77+
void unlock(LockType lockType) {
78+
if (useReadWriteLockForReadPoints) {
79+
assert lock == null;
80+
if (lockType == LockType.CALCULATION_LOCK) {
81+
readWriteLock.writeLock().unlock();
82+
} else {
83+
readWriteLock.readLock().unlock();
84+
}
85+
} else {
86+
assert readWriteLock == null;
87+
lock.unlock();
88+
}
89+
}
90+
}

0 commit comments

Comments
 (0)