Skip to content

Commit 21ab47a

Browse files
author
limingxiang02
committed
HDFS-15180.Split FsDatasetImpl lock to block Pool level
1 parent 034dc8d commit 21ab47a

30 files changed

+1035
-438
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LightWeightResizableGSet.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import org.apache.hadoop.HadoopIllegalArgumentException;
2121
import org.apache.hadoop.classification.InterfaceAudience;
2222

23+
import java.util.Iterator;
24+
import java.util.function.Consumer;
25+
2326
/**
2427
* A low memory footprint {@link GSet} implementation,
2528
* which uses an array for storing the elements
@@ -87,16 +90,45 @@ public LightWeightResizableGSet(int initCapacity) {
8790

8891
@Override
8992
public E put(final E element) {
90-
E existing = super.put(element);
91-
expandIfNecessary();
92-
return existing;
93+
synchronized (this) {
94+
E existing = super.put(element);
95+
expandIfNecessary();
96+
return existing;
97+
}
98+
}
99+
100+
@Override
101+
public E get(K key) {
102+
synchronized (this) {
103+
return super.get(key);
104+
}
105+
}
106+
107+
@Override
108+
public E remove(K key) {
109+
synchronized (this) {
110+
return super.remove(key);
111+
}
112+
}
113+
114+
@Override
115+
public int size() {
116+
synchronized (this) {
117+
return super.size();
118+
}
119+
}
120+
121+
public void getIterator(Consumer<Iterator<E>> consumer) {
122+
synchronized (this) {
123+
consumer.accept(super.values().iterator());
124+
}
93125
}
94126

95127
/**
96128
* Resize the internal table to given capacity.
97129
*/
98130
@SuppressWarnings("unchecked")
99-
protected void resize(int cap) {
131+
protected synchronized void resize(int cap) {
100132
int newCapacity = actualArrayLength(cap);
101133
if (newCapacity == this.capacity) {
102134
return;
@@ -121,7 +153,7 @@ protected void resize(int cap) {
121153
/**
122154
* Checks if we need to expand, and expands if necessary.
123155
*/
124-
protected void expandIfNecessary() {
156+
protected synchronized void expandIfNecessary() {
125157
if (size > this.threshold && capacity < MAX_ARRAY_LENGTH) {
126158
resize(capacity * 2);
127159
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1658,6 +1658,20 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
16581658
DFS_NAMESERVICES_RESOLVER_IMPL =
16591659
"dfs.datanode.nameservices.resolver.impl";
16601660

1661+
public static final String
1662+
DFS_DATANODE_BLOCKPOOL_LOCK_FAIR =
1663+
"dfs.blockpool.lock.fair";
1664+
1665+
public static final boolean
1666+
DFS_DATANODE_BLOCKPOOL_LOCK_FAIR_DEFAULT = false;
1667+
1668+
public static final String
1669+
DFS_DATANODE_BLOCKPOOL_LOCK_TRACE =
1670+
"dfs.blockpool.lock.trace";
1671+
1672+
public static final boolean
1673+
DFS_DATANODE_BLOCKPOOL_LOCK_TRACE_DEFAULT = false;
1674+
16611675
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
16621676
@Deprecated
16631677
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.common;
20+
21+
import org.apache.hadoop.util.AutoCloseableLock;
22+
import org.apache.hadoop.util.StringUtils;
23+
24+
import java.util.concurrent.locks.Lock;
25+
26+
import static org.apache.hadoop.hdfs.server.datanode.DataSetLockManager.LOG;
27+
28+
/**
29+
* Auto release lock when exit try scoop.
30+
*/
31+
public class AutoCloseLock extends AutoCloseableLock {
32+
private Lock lock;
33+
private AutoCloseLock parentLock;
34+
private LockManager lockManager;
35+
36+
public AutoCloseLock(Lock lock) {
37+
this.lock = lock;
38+
}
39+
40+
@Override
41+
public void close() {
42+
if (lock != null) {
43+
lock.unlock();
44+
if (lockManager != null) {
45+
lockManager.hook();
46+
}
47+
} else {
48+
LOG.error("Try to unlock null lock" +
49+
StringUtils.getStackTrace(Thread.currentThread()));
50+
}
51+
if (parentLock != null) {
52+
parentLock.close();
53+
}
54+
}
55+
56+
public void lock() {
57+
if (lock != null) {
58+
lock.lock();
59+
return;
60+
}
61+
LOG.error("Try to lock null lock" +
62+
StringUtils.getStackTrace(Thread.currentThread()));
63+
}
64+
65+
public void setParentLock(AutoCloseLock parent) {
66+
if (parentLock == null) {
67+
this.parentLock = parent;
68+
}
69+
}
70+
71+
public void setLockManager(LockManager lockManager) {
72+
this.lockManager = lockManager;
73+
}
74+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.common;
20+
21+
/**
22+
* Use for manage a set of lock for datanode.
23+
*/
24+
public interface LockManager<T> {
25+
26+
enum LockLevel {
27+
BLOCK_POOl,
28+
VOLUME
29+
}
30+
31+
/**
32+
* Acquire readLock and then lock.
33+
*/
34+
T readLock(LockLevel level, String... resources);
35+
36+
/**
37+
* Acquire writeLock and then lock.
38+
*/
39+
T writeLock(LockLevel level, String... resources);
40+
41+
/**
42+
* Add a lock to LockManager.
43+
*/
44+
void addLock(LockLevel level, String... resources);
45+
46+
/**
47+
* Remove a lock from LockManager.
48+
*/
49+
void removeLock(LockLevel level, String... resources);
50+
51+
/**
52+
* LockManager may need to back hook.
53+
*/
54+
void hook();
55+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.common;
20+
21+
import java.util.concurrent.locks.Lock;
22+
23+
/**
24+
* Some ut or temp replicaMap not need to lock with DataSetLockManager.
25+
*/
26+
public class NoLockManager implements LockManager<AutoCloseLock> {
27+
private NoLock lock = new NoLock(null);
28+
29+
private final class NoLock extends AutoCloseLock {
30+
31+
private NoLock(Lock lock) {
32+
super(lock);
33+
}
34+
35+
@Override
36+
public void lock() {
37+
}
38+
39+
@Override
40+
public void close() {
41+
}
42+
}
43+
44+
public NoLockManager() {
45+
}
46+
47+
@Override
48+
public AutoCloseLock readLock(LockLevel level, String... resources) {
49+
return lock;
50+
}
51+
52+
@Override
53+
public AutoCloseLock writeLock(LockLevel level, String... resources) {
54+
return lock;
55+
}
56+
57+
@Override
58+
public void addLock(LockLevel level, String... resources) {
59+
}
60+
61+
@Override
62+
public void removeLock(LockLevel level, String... resources) {
63+
}
64+
65+
@Override
66+
public void hook() {
67+
}
68+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
5454
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
5555
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
56+
import org.apache.hadoop.hdfs.server.common.LockManager;
5657
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
5758
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
5859
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -307,6 +308,10 @@ private void connectToNNAndHandshake() throws IOException {
307308
// info.
308309
NamespaceInfo nsInfo = retrieveNamespaceInfo();
309310

311+
// init block pool lock when init.
312+
dn.getDataSetLockManager().addLock(LockManager.LockLevel.BLOCK_POOl,
313+
nsInfo.getBlockPoolID());
314+
310315
// Verify that this matches the other NN in this HA pair.
311316
// This also initializes our block pool in the DN if we are
312317
// the first NN connection for this BP.

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
4242
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
4343
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
44+
import org.apache.hadoop.hdfs.server.common.LockManager;
4445
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
4546
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
4647
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@@ -256,7 +257,8 @@ class BlockSender implements java.io.Closeable {
256257
// the append write.
257258
ChunkChecksum chunkChecksum = null;
258259
final long replicaVisibleLength;
259-
try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
260+
try (AutoCloseableLock lock = datanode.getDataSetLockManager().readLock(
261+
LockManager.LockLevel.BLOCK_POOl, block.getBlockPoolId())) {
260262
replica = getReplica(block, datanode);
261263
replicaVisibleLength = replica.getVisibleLength();
262264
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@
121121
import org.apache.hadoop.hdfs.DFSUtilClient;
122122
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
123123
import org.apache.hadoop.hdfs.HdfsConfiguration;
124+
import org.apache.hadoop.hdfs.server.common.LockManager;
124125
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
125126
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
126127
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -396,6 +397,7 @@ public static InetSocketAddress createSocketAddr(String target) {
396397
.availableProcessors();
397398
private static final double CONGESTION_RATIO = 1.5;
398399
private DiskBalancer diskBalancer;
400+
private DataSetLockManager dataSetLockManager;
399401

400402
private final ExecutorService xferService;
401403

@@ -437,6 +439,7 @@ private static Tracer createTracer(Configuration conf) {
437439
this.pipelineSupportSlownode = false;
438440
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
439441
this.dnConf = new DNConf(this);
442+
this.dataSetLockManager = new DataSetLockManager(conf);
440443
initOOBTimeout();
441444
storageLocationChecker = null;
442445
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
@@ -455,6 +458,7 @@ private static Tracer createTracer(Configuration conf) {
455458
super(conf);
456459
this.tracer = createTracer(conf);
457460
this.fileIoProvider = new FileIoProvider(conf, this);
461+
this.dataSetLockManager = new DataSetLockManager(conf);
458462
this.blockScanner = new BlockScanner(this);
459463
this.lastDiskErrorCheck = 0;
460464
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@@ -2265,6 +2269,7 @@ public void shutdown() {
22652269
notifyAll();
22662270
}
22672271
tracer.close();
2272+
dataSetLockManager.lockLeakCheck();
22682273
}
22692274

22702275
/**
@@ -3171,7 +3176,9 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
31713176
final BlockConstructionStage stage;
31723177

31733178
//get replica information
3174-
try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
3179+
3180+
try(AutoCloseableLock lock = dataSetLockManager.writeLock(LockManager.
3181+
LockLevel.BLOCK_POOl, b.getBlockPoolId())) {
31753182
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
31763183
b.getBlockId());
31773184
if (null == storedBlock) {
@@ -3888,6 +3895,10 @@ private static boolean isWrite(BlockConstructionStage stage) {
38883895
|| stage == PIPELINE_SETUP_APPEND_RECOVERY);
38893896
}
38903897

3898+
public DataSetLockManager getDataSetLockManager() {
3899+
return dataSetLockManager;
3900+
}
3901+
38913902
boolean isSlownodeByNameserviceId(String nsId) {
38923903
return blockPoolManager.isSlownodeByNameserviceId(nsId);
38933904
}

0 commit comments

Comments
 (0)