Skip to content

HDFS-15180.Split FsDatasetImpl lock to block Pool level #3889

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;

import java.util.Iterator;
import java.util.function.Consumer;

/**
* A low memory footprint {@link GSet} implementation,
* which uses an array for storing the elements
Expand Down Expand Up @@ -87,16 +90,45 @@ public LightWeightResizableGSet(int initCapacity) {

@Override
public E put(final E element) {
E existing = super.put(element);
expandIfNecessary();
return existing;
synchronized (this) {
E existing = super.put(element);
expandIfNecessary();
return existing;
}
}

@Override
public E get(K key) {
synchronized (this) {
return super.get(key);
}
}

@Override
public E remove(K key) {
synchronized (this) {
return super.remove(key);
}
}

@Override
public int size() {
synchronized (this) {
return super.size();
}
}

public void getIterator(Consumer<Iterator<E>> consumer) {
synchronized (this) {
consumer.accept(super.values().iterator());
}
}

/**
* Resize the internal table to given capacity.
*/
@SuppressWarnings("unchecked")
protected void resize(int cap) {
protected synchronized void resize(int cap) {
int newCapacity = actualArrayLength(cap);
if (newCapacity == this.capacity) {
return;
Expand All @@ -121,7 +153,7 @@ protected void resize(int cap) {
/**
* Checks if we need to expand, and expands if necessary.
*/
protected void expandIfNecessary() {
protected synchronized void expandIfNecessary() {
if (size > this.threshold && capacity < MAX_ARRAY_LENGTH) {
resize(capacity * 2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,20 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
DFS_NAMESERVICES_RESOLVER_IMPL =
"dfs.datanode.nameservices.resolver.impl";

public static final String
DFS_DATANODE_BLOCKPOOL_LOCK_FAIR =
"dfs.blockpool.lock.fair";

public static final boolean
DFS_DATANODE_BLOCKPOOL_LOCK_FAIR_DEFAULT = false;

public static final String
DFS_DATANODE_BLOCKPOOL_LOCK_TRACE =
"dfs.blockpool.lock.trace";

public static final boolean
DFS_DATANODE_BLOCKPOOL_LOCK_TRACE_DEFAULT = false;

// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.common;

import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils;

import java.util.concurrent.locks.Lock;

import static org.apache.hadoop.hdfs.server.datanode.DataSetLockManager.LOG;

/**
* Auto release lock when exit try scoop.
*/
public class AutoCloseLock extends AutoCloseableLock {
private Lock lock;
private AutoCloseLock parentLock;
private LockManager lockManager;

public AutoCloseLock(Lock lock) {
this.lock = lock;
}

@Override
public void close() {
if (lock != null) {
lock.unlock();
if (lockManager != null) {
lockManager.hook();
}
} else {
LOG.error("Try to unlock null lock" +
StringUtils.getStackTrace(Thread.currentThread()));
}
if (parentLock != null) {
parentLock.close();
}
}

public void lock() {
if (lock != null) {
lock.lock();
return;
}
LOG.error("Try to lock null lock" +
StringUtils.getStackTrace(Thread.currentThread()));
}

public void setParentLock(AutoCloseLock parent) {
if (parentLock == null) {
this.parentLock = parent;
}
}

public void setLockManager(LockManager lockManager) {
this.lockManager = lockManager;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.common;

/**
* Use for manage a set of lock for datanode.
*/
public interface LockManager<T> {

enum LockLevel {
BLOCK_POOl,
VOLUME
}

/**
* Acquire readLock and then lock.
*/
T readLock(LockLevel level, String... resources);

/**
* Acquire writeLock and then lock.
*/
T writeLock(LockLevel level, String... resources);

/**
* Add a lock to LockManager.
*/
void addLock(LockLevel level, String... resources);

/**
* Remove a lock from LockManager.
*/
void removeLock(LockLevel level, String... resources);

/**
* LockManager may need to back hook.
*/
void hook();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.common;

import java.util.concurrent.locks.Lock;

/**
* Some ut or temp replicaMap not need to lock with DataSetLockManager.
*/
public class NoLockManager implements LockManager<AutoCloseLock> {
private NoLock lock = new NoLock(null);

private final class NoLock extends AutoCloseLock {

private NoLock(Lock lock) {
super(lock);
}

@Override
public void lock() {
}

@Override
public void close() {
}
}

public NoLockManager() {
}

@Override
public AutoCloseLock readLock(LockLevel level, String... resources) {
return lock;
}

@Override
public AutoCloseLock writeLock(LockLevel level, String... resources) {
return lock;
}

@Override
public void addLock(LockLevel level, String... resources) {
}

@Override
public void removeLock(LockLevel level, String... resources) {
}

@Override
public void hook() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.LockManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
Expand Down Expand Up @@ -307,6 +308,10 @@ private void connectToNNAndHandshake() throws IOException {
// info.
NamespaceInfo nsInfo = retrieveNamespaceInfo();

// init block pool lock when init.
dn.getDataSetLockManager().addLock(LockManager.LockLevel.BLOCK_POOl,
nsInfo.getBlockPoolID());

// Verify that this matches the other NN in this HA pair.
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.LockManager;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
Expand Down Expand Up @@ -256,7 +257,8 @@ class BlockSender implements java.io.Closeable {
// the append write.
ChunkChecksum chunkChecksum = null;
final long replicaVisibleLength;
try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
try (AutoCloseableLock lock = datanode.getDataSetLockManager().readLock(
LockManager.LockLevel.BLOCK_POOl, block.getBlockPoolId())) {
replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.LockManager;
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
Expand Down Expand Up @@ -396,6 +397,7 @@ public static InetSocketAddress createSocketAddr(String target) {
.availableProcessors();
private static final double CONGESTION_RATIO = 1.5;
private DiskBalancer diskBalancer;
private DataSetLockManager dataSetLockManager;

private final ExecutorService xferService;

Expand Down Expand Up @@ -437,6 +439,7 @@ private static Tracer createTracer(Configuration conf) {
this.pipelineSupportSlownode = false;
this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
this.dnConf = new DNConf(this);
this.dataSetLockManager = new DataSetLockManager(conf);
initOOBTimeout();
storageLocationChecker = null;
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
Expand All @@ -455,6 +458,7 @@ private static Tracer createTracer(Configuration conf) {
super(conf);
this.tracer = createTracer(conf);
this.fileIoProvider = new FileIoProvider(conf, this);
this.dataSetLockManager = new DataSetLockManager(conf);
this.blockScanner = new BlockScanner(this);
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
Expand Down Expand Up @@ -2265,6 +2269,7 @@ public void shutdown() {
notifyAll();
}
tracer.close();
dataSetLockManager.lockLeakCheck();
}

/**
Expand Down Expand Up @@ -3171,7 +3176,9 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
final BlockConstructionStage stage;

//get replica information
try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {

try(AutoCloseableLock lock = dataSetLockManager.writeLock(LockManager.
LockLevel.BLOCK_POOl, b.getBlockPoolId())) {
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId());
if (null == storedBlock) {
Expand Down Expand Up @@ -3888,6 +3895,10 @@ private static boolean isWrite(BlockConstructionStage stage) {
|| stage == PIPELINE_SETUP_APPEND_RECOVERY);
}

public DataSetLockManager getDataSetLockManager() {
return dataSetLockManager;
}

boolean isSlownodeByNameserviceId(String nsId) {
return blockPoolManager.isSlownodeByNameserviceId(nsId);
}
Expand Down
Loading