Skip to content

HDFS-16429. Add DataSetLockManager to manage fine-grain locks for FsDataSetImpl #3900

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;

import java.util.Iterator;
import java.util.function.Consumer;

/**
* A low memory footprint {@link GSet} implementation,
* which uses an array for storing the elements
Expand Down Expand Up @@ -86,17 +89,36 @@ public LightWeightResizableGSet(int initCapacity) {
}

@Override
public E put(final E element) {
public synchronized E put(final E element) {
E existing = super.put(element);
expandIfNecessary();
return existing;
}

@Override
public synchronized E get(K key) {
return super.get(key);
}

@Override
public synchronized E remove(K key) {
return super.remove(key);
}

@Override
public synchronized int size() {
return super.size();
}

public synchronized void getIterator(Consumer<Iterator<E>> consumer) {
consumer.accept(super.values().iterator());
}

/**
* Resize the internal table to given capacity.
*/
@SuppressWarnings("unchecked")
protected void resize(int cap) {
protected synchronized void resize(int cap) {
int newCapacity = actualArrayLength(cap);
if (newCapacity == this.capacity) {
return;
Expand All @@ -121,7 +143,7 @@ protected void resize(int cap) {
/**
* Checks if we need to expand, and expands if necessary.
*/
protected void expandIfNecessary() {
protected synchronized void expandIfNecessary() {
if (size > this.threshold && capacity < MAX_ARRAY_LENGTH) {
resize(capacity * 2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
DFS_NAMESERVICES_RESOLVER_IMPL =
"dfs.datanode.nameservices.resolver.impl";

public static final String
DFS_DATANODE_LOCKMANAGER_TRACE =
"dfs.datanode.lockmanager.trace";

public static final boolean
DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT = false;

// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.common;

import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils;

import java.util.concurrent.locks.Lock;

import static org.apache.hadoop.hdfs.server.datanode.DataSetLockManager.LOG;

/**
* Extending AutoCloseableLock such that the users can
* use a try-with-resource syntax.
*/
public class AutoCloseDataSetLock extends AutoCloseableLock {
private Lock lock;
private AutoCloseDataSetLock parentLock;
private DataNodeLockManager<AutoCloseDataSetLock> dataNodeLockManager;

public AutoCloseDataSetLock(Lock lock) {
this.lock = lock;
}

@Override
public void close() {
if (lock != null) {
lock.unlock();
if (dataNodeLockManager != null) {
dataNodeLockManager.hook();
}
} else {
LOG.error("Try to unlock null lock" +
StringUtils.getStackTrace(Thread.currentThread()));
}
if (parentLock != null) {
parentLock.close();
}
}

/**
* Actually acquire the lock.
*/
public void lock() {
if (lock != null) {
lock.lock();
return;
}
LOG.error("Try to lock null lock" +
StringUtils.getStackTrace(Thread.currentThread()));
}

public void setParentLock(AutoCloseDataSetLock parent) {
if (parentLock == null) {
this.parentLock = parent;
}
}

public void setDataNodeLockManager(DataNodeLockManager<AutoCloseDataSetLock>
dataNodeLockManager) {
this.dataNodeLockManager = dataNodeLockManager;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.common;

/**
* Use for manage a set of lock for datanode.
*/
public interface DataNodeLockManager<T extends AutoCloseDataSetLock> {

/**
* Acquire block pool level first if you want to Acquire volume lock.
* Or only acquire block pool level lock.
*/
enum LockLevel {
BLOCK_POOl,
VOLUME
}

/**
* Acquire readLock and then lock.
*/
T readLock(LockLevel level, String... resources);

/**
* Acquire writeLock and then lock.
*/
T writeLock(LockLevel level, String... resources);

/**
* Add a lock to LockManager.
*/
void addLock(LockLevel level, String... resources);

/**
* Remove a lock from LockManager.
*/
void removeLock(LockLevel level, String... resources);

/**
* LockManager may need to back hook.
*/
void hook();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.common;

import java.util.concurrent.locks.Lock;

/**
* Some ut or temp replicaMap not need to lock with DataSetLockManager.
*/
public class NoLockManager implements DataNodeLockManager<AutoCloseDataSetLock> {
private final NoDataSetLock lock = new NoDataSetLock(null);

private static final class NoDataSetLock extends AutoCloseDataSetLock {

private NoDataSetLock(Lock lock) {
super(lock);
}

@Override
public void lock() {
}

@Override
public void close() {
}
}

public NoLockManager() {
}

@Override
public AutoCloseDataSetLock readLock(LockLevel level, String... resources) {
return lock;
}

@Override
public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) {
return lock;
}

@Override
public void addLock(LockLevel level, String... resources) {
}

@Override
public void removeLock(LockLevel level, String... resources) {
}

@Override
public void hook() {
}
}
Loading