Skip to content

Commit

Permalink
Merge branch 'trunk' into s3/HADOOP-16202-enhance-openfile
Browse files Browse the repository at this point in the history
Change-Id: If30684e9b4d39e9d1ba9cfdf50963b655c20144f
  • Loading branch information
steveloughran committed Apr 19, 2022
2 parents 98ebf76 + ec0ff1d commit bf8e1d4
Show file tree
Hide file tree
Showing 116 changed files with 4,263 additions and 757 deletions.
12 changes: 6 additions & 6 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,12 @@ com.aliyun.oss:aliyun-sdk-oss:3.13.2
com.amazonaws:aws-java-sdk-bundle:1.11.901
com.cedarsoftware:java-util:1.9.0
com.cedarsoftware:json-io:2.5.1
com.fasterxml.jackson.core:jackson-annotations:2.13.0
com.fasterxml.jackson.core:jackson-core:2.13.0
com.fasterxml.jackson.core:jackson-databind:2.13.0
com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:2.13.0
com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:2.13.0
com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.13.0
com.fasterxml.jackson.core:jackson-annotations:2.13.2
com.fasterxml.jackson.core:jackson-core:2.13.2
com.fasterxml.jackson.core:jackson-databind:2.13.2.2
com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:2.13.2
com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:2.13.2
com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.13.2
com.fasterxml.uuid:java-uuid-generator:3.1.4
com.fasterxml.woodstox:woodstox-core:5.3.0
com.github.davidmoten:rxjava-extras:0.8.0.17
Expand Down
4 changes: 4 additions & 0 deletions dev-support/bin/create-release
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,10 @@ function makearelease

big_console_header "Cleaning the Source Tree"

# Since CVE-2022-24765 in April 2022, git refuses to work in directories
# whose owner != the current user, unless explicitly told to trust it.
git config --global --add safe.directory /build/source

# git clean to clear any remnants from previous build
run "${GIT}" clean -xdf -e /patchprocess

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,6 @@ protected void processOptions(LinkedList<String> args) {

@Override
protected void processPath(PathData item) throws IOException {
if (item.stat.isDirectory()) {
throw new PathIsDirectoryException(item.toString());
}
touch(item);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,12 @@ private RetryInfo handleException(final Method method, final int callId,
throw retryInfo.getFailException();
}

log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e);
log(method, retryInfo.isFailover(), counters.failovers, counters.retries, retryInfo.delay, e);
return retryInfo;
}

private void log(final Method method, final boolean isFailover,
final int failovers, final long delay, final Exception ex) {
private void log(final Method method, final boolean isFailover, final int failovers,
final int retries, final long delay, final Exception ex) {
boolean info = true;
// If this is the first failover to this proxy, skip logging at INFO level
if (!failedAtLeastOnce.contains(proxyDescriptor.getProxyInfo().toString()))
Expand All @@ -408,13 +408,15 @@ private void log(final Method method, final boolean isFailover,
}

final StringBuilder b = new StringBuilder()
.append(ex + ", while invoking ")
.append(ex)
.append(", while invoking ")
.append(proxyDescriptor.getProxyInfo().getString(method.getName()));
if (failovers > 0) {
b.append(" after ").append(failovers).append(" failover attempts");
}
b.append(isFailover? ". Trying to failover ": ". Retrying ");
b.append(delay > 0? "after sleeping for " + delay + "ms.": "immediately.");
b.append(" Current retry count: ").append(retries).append(".");

if (info) {
LOG.info(b.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ protected NetworkTopology init(InnerNode.Factory factory) {
private int depthOfAllLeaves = -1;
/** rack counter */
protected int numOfRacks = 0;
/** empty rack map, rackname->nodenumber. */
private HashMap<String, Set<String>> rackMap =
new HashMap<String, Set<String>>();
/** decommission nodes, contained stoped nodes. */
private HashSet<String> decommissionNodes = new HashSet<>();
/** empty rack counter. */
private int numOfEmptyRacks = 0;

/**
* Whether or not this cluster has ever consisted of more than 1 rack,
Expand Down Expand Up @@ -150,6 +157,7 @@ public void add(Node node) {
if (rack == null) {
incrementRacks();
}
interAddNodeWithEmptyRack(node);
if (depthOfAllLeaves == -1) {
depthOfAllLeaves = node.getLevel();
}
Expand Down Expand Up @@ -224,6 +232,7 @@ public void remove(Node node) {
if (rack == null) {
numOfRacks--;
}
interRemoveNodeWithEmptyRack(node);
}
LOG.debug("NetworkTopology became:\n{}", this);
} finally {
Expand Down Expand Up @@ -1015,4 +1024,108 @@ protected static boolean isNodeInScope(Node node, String scope) {
String nodeLocation = NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR;
return nodeLocation.startsWith(scope);
}
}

/** @return the number of nonempty racks */
public int getNumOfNonEmptyRacks() {
return numOfRacks - numOfEmptyRacks;
}

/**
* Update empty rack number when add a node like recommission.
* @param node node to be added; can be null
*/
public void recommissionNode(Node node) {
if (node == null) {
return;
}
if (node instanceof InnerNode) {
throw new IllegalArgumentException(
"Not allow to remove an inner node: " + NodeBase.getPath(node));
}
netlock.writeLock().lock();
try {
decommissionNodes.remove(node.getName());
interAddNodeWithEmptyRack(node);
} finally {
netlock.writeLock().unlock();
}
}

/**
* Update empty rack number when remove a node like decommission.
* @param node node to be added; can be null
*/
public void decommissionNode(Node node) {
if (node == null) {
return;
}
if (node instanceof InnerNode) {
throw new IllegalArgumentException(
"Not allow to remove an inner node: " + NodeBase.getPath(node));
}
netlock.writeLock().lock();
try {
decommissionNodes.add(node.getName());
interRemoveNodeWithEmptyRack(node);
} finally {
netlock.writeLock().unlock();
}
}

/**
* Internal function for update empty rack number
* for add or recommission a node.
* @param node node to be added; can be null
*/
private void interAddNodeWithEmptyRack(Node node) {
if (node == null) {
return;
}
String rackname = node.getNetworkLocation();
Set<String> nodes = rackMap.get(rackname);
if (nodes == null) {
nodes = new HashSet<String>();
}
if (!decommissionNodes.contains(node.getName())) {
nodes.add(node.getName());
}
rackMap.put(rackname, nodes);
countEmptyRacks();
}

/**
* Internal function for update empty rack number
* for remove or decommission a node.
* @param node node to be removed; can be null
*/
private void interRemoveNodeWithEmptyRack(Node node) {
if (node == null) {
return;
}
String rackname = node.getNetworkLocation();
Set<String> nodes = rackMap.get(rackname);
if (nodes != null) {
InnerNode rack = (InnerNode) getNode(node.getNetworkLocation());
if (rack == null) {
// this node and its rack are both removed.
rackMap.remove(rackname);
} else if (nodes.contains(node.getName())) {
// this node is decommissioned or removed.
nodes.remove(node.getName());
rackMap.put(rackname, nodes);
}
countEmptyRacks();
}
}

private void countEmptyRacks() {
int count = 0;
for (Set<String> nodes : rackMap.values()) {
if (nodes != null && nodes.isEmpty()) {
count++;
}
}
numOfEmptyRacks = count;
LOG.debug("Current numOfEmptyRacks is {}", numOfEmptyRacks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,16 +328,16 @@ Returns 0 on success and -1 on error.
get
---

Usage: `hadoop fs -get [-ignorecrc] [-crc] [-p] [-f] [-t <thread count>] [-q <thread pool queue size>] <src> ... <localdst> `
Usage: `hadoop fs -get [-ignoreCrc] [-crc] [-p] [-f] [-t <thread count>] [-q <thread pool queue size>] <src> ... <localdst> `

Copy files to the local file system. Files that fail the CRC check may be copied with the -ignorecrc option. Files and CRCs may be copied using the -crc option.
Copy files to the local file system. Files that fail the CRC check may be copied with the -ignoreCrc option. Files and CRCs may be copied using the -crc option.

Options:

* `-p` : Preserves access and modification times, ownership and the permissions.
(assuming the permissions can be propagated across filesystems)
* `-f` : Overwrites the destination if it already exists.
* `-ignorecrc` : Skip CRC checks on the file(s) downloaded.
* `-ignoreCrc` : Skip CRC checks on the file(s) downloaded.
* `-crc`: write CRC checksums for the files downloaded.
* `-t <thread count>` : Number of threads to be used, default is 1.
Useful when downloading directories containing more than 1 file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ Each metrics record contains tags such as HAState and Hostname as additional inf
| `FSN(Read/Write)Lock`*OperationName*`NanosAvgTime` | Average time of holding the lock by operations in nanoseconds |
| `FSN(Read/Write)LockOverallNanosNumOps` | Total number of acquiring lock by all operations |
| `FSN(Read/Write)LockOverallNanosAvgTime` | Average time of holding the lock by all operations in nanoseconds |
| `PendingSPSPaths` | The number of paths to be processed by storage policy satisfier |

JournalNode
-----------
Expand Down Expand Up @@ -480,7 +481,8 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `PacketsSlowWriteToMirror` | Total number of packets whose write to other Datanodes in the pipeline takes more than a certain time (300ms by default) |
| `PacketsSlowWriteToDisk` | Total number of packets whose write to disk takes more than a certain time (300ms by default) |
| `PacketsSlowWriteToOsCache` | Total number of packets whose write to os cache takes more than a certain time (300ms by default) |

| `slowFlushOrSyncCount` | Total number of packets whose sync/flush takes more than a certain time (300ms by default) |
| `slowAckToUpstreamCount` | Total number of packets whose upstream ack takes more than a certain time (300ms by default) |
FsVolume
--------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,78 @@ public void testTouch() throws Exception {
FileStatus fileStatus = lfs.getFileStatus(newFile);
assertThat(fileStatus.getAccessTime()).isEqualTo(dateObj.getTime());
assertThat(fileStatus.getModificationTime()).isEqualTo(dateObj.getTime());

lfs.delete(newFile, true);
assertThat(lfs.exists(newFile)).isFalse();

}

@Test
public void testTouchDir() throws Exception {
String strTime;
final String newFileName = "dir3/newFile3";
Date dateObj;
final Path newFile = new Path(newFileName);
FileStatus fstatus;
Path dirPath = new Path("dir3");
lfs.delete(dirPath, true);
lfs.mkdirs(dirPath);
lfs.delete(newFile, true);
assertThat(lfs.exists(newFile)).isFalse();

strTime = formatTimestamp(System.currentTimeMillis());
dateObj = parseTimestamp(strTime);

assertThat(shellRun("-touch", "-t", strTime, newFileName)).as(
"Expected successful touch on a new file with a specified timestamp").isEqualTo(0);
FileStatus newStatus = lfs.getFileStatus(newFile);
assertThat(newStatus.getAccessTime()).isEqualTo(dateObj.getTime());
assertThat(newStatus.getModificationTime()).isEqualTo(dateObj.getTime());

Thread.sleep(500);
strTime = formatTimestamp(System.currentTimeMillis());
dateObj = parseTimestamp(strTime);

assertThat(shellRun("-touch", "-m", "-a", "-t", strTime, "dir3")).as(
"Expected successful touch with a specified modification time").isEqualTo(0);

newStatus = lfs.getFileStatus(dirPath);
// Verify if both modification and access times are recorded correctly
assertThat(newStatus.getAccessTime()).isEqualTo(dateObj.getTime());
assertThat(newStatus.getModificationTime()).isEqualTo(dateObj.getTime());

fstatus = lfs.getFileStatus(dirPath);
Thread.sleep(500);
strTime = formatTimestamp(System.currentTimeMillis());
dateObj = parseTimestamp(strTime);

assertThat(shellRun("-touch", "-m", "-t", strTime, "dir3")).as(
"Expected successful touch with a specified modification time").isEqualTo(0);

newStatus = lfs.getFileStatus(dirPath);
// Verify if modification time is recorded correctly (and access time
// remains unchanged).
assertThat(newStatus.getAccessTime()).isEqualTo(fstatus.getAccessTime());
assertThat(newStatus.getModificationTime()).isEqualTo(dateObj.getTime());

fstatus = lfs.getFileStatus(dirPath);
Thread.sleep(500);
strTime = formatTimestamp(System.currentTimeMillis());
dateObj = parseTimestamp(strTime);

assertThat(shellRun("-touch", "-a", "-t", strTime, "dir3")).as(
"Expected successful touch with a specified modification time").isEqualTo(0);

newStatus = lfs.getFileStatus(dirPath);
// Verify if access time is recorded correctly (and modification time
// remains unchanged).
assertThat(newStatus.getAccessTime()).isEqualTo(dateObj.getTime());
assertThat(newStatus.getModificationTime()).isEqualTo(fstatus.getModificationTime());

lfs.delete(newFile, true);
lfs.delete(dirPath, true);
assertThat(lfs.exists(newFile)).isFalse();
assertThat(lfs.exists(dirPath)).isFalse();
}

private String formatTimestamp(long timeInMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ void prepareDecodeInputs() {
cur = dfsStripedInputStream.getCurStripeBuf().duplicate();
}

this.decodeInputs = new ECChunk[dataBlkNum + parityBlkNum];
if (this.decodeInputs == null) {
this.decodeInputs = new ECChunk[dataBlkNum + parityBlkNum];
}
int bufLen = (int) alignedStripe.getSpanInBlock();
int bufOff = (int) alignedStripe.getOffsetInBlock();
for (int i = 0; i < dataBlkNum; i++) {
Expand Down
Loading

0 comments on commit bf8e1d4

Please sign in to comment.