Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-13660 Copy file till the source file length during distcp #1404

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,10 @@ private DistCpConstants() {

/** Filename of sorted target listing. */
public static final String TARGET_SORTED_FILE = "target_sorted.seq";

public static final String LENGTH_MISMATCH_ERROR_MSG =
"Mismatch in length of source:";

public static final String CHECKSUM_MISMATCH_ERROR_MSG =
"Checksum mismatch between ";
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private void concatFileChunks(Configuration conf) throws IOException {
// This is the last chunk of the splits, consolidate allChunkPaths
try {
concatFileChunks(conf, srcFileStatus.getPath(), targetFile,
allChunkPaths);
allChunkPaths, srcFileStatus);
} catch (IOException e) {
// If the concat failed because a chunk file doesn't exist,
// then we assume that the CopyMapper has skipped copying this
Expand Down Expand Up @@ -609,7 +609,8 @@ private void commitData(Configuration conf) throws IOException {
* Concat the passed chunk files into one and rename it the targetFile.
*/
private void concatFileChunks(Configuration conf, Path sourceFile,
Path targetFile, LinkedList<Path> allChunkPaths)
Path targetFile, LinkedList<Path> allChunkPaths,
CopyListingFileStatus srcFileStatus)
throws IOException {
if (allChunkPaths.size() == 1) {
return;
Expand Down Expand Up @@ -637,8 +638,9 @@ private void concatFileChunks(Configuration conf, Path sourceFile,
LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile));
}
rename(dstfs, firstChunkFile, targetFile);
DistCpUtils.compareFileLengthsAndChecksums(
srcfs, sourceFile, null, dstfs, targetFile, skipCrc);
DistCpUtils.compareFileLengthsAndChecksums(srcFileStatus.getLen(),
srcfs, sourceFile, null, dstfs,
targetFile, skipCrc, srcFileStatus.getLen());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ public void setup(Context context) throws IOException, InterruptedException {
public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
Context context) throws IOException, InterruptedException {
Path sourcePath = sourceFileStatus.getPath();

if (LOG.isDebugEnabled())
LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath);

Expand Down Expand Up @@ -354,7 +353,7 @@ private boolean canSkip(FileSystem sourceFS, CopyListingFileStatus source,
if (sameLength && sameBlockSize) {
return skipCrc ||
DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null,
targetFS, target.getPath());
targetFS, target.getPath(), source.getLen());
} else {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ private long doCopy(CopyListingFileStatus source, Path target,
offset, context, fileAttributes, sourceChecksum);

if (!source.isSplit()) {
DistCpUtils.compareFileLengthsAndChecksums(sourceFS, sourcePath,
sourceChecksum, targetFS, targetPath, skipCrc);
DistCpUtils.compareFileLengthsAndChecksums(source.getLen(), sourceFS,
sourcePath, sourceChecksum, targetFS,
targetPath, skipCrc, source.getLen());
}
// it's not append or direct write (preferred for s3a) case, thus we first
// write to a temporary file, then rename it to the target path.
Expand Down Expand Up @@ -247,24 +248,27 @@ long copyBytes(CopyListingFileStatus source2, long sourceOffset,
boolean finished = false;
try {
inStream = getInputStream(source, context.getConfiguration());
long fileLength = source2.getLen();
int numBytesToRead = (int) getNumBytesToRead(fileLength, sourceOffset,
bufferSize);
seekIfRequired(inStream, sourceOffset);
int bytesRead = readBytes(inStream, buf);
while (bytesRead >= 0) {
int bytesRead = readBytes(inStream, buf, numBytesToRead);
while (bytesRead > 0) {
if (chunkLength > 0 &&
(totalBytesRead + bytesRead) >= chunkLength) {
bytesRead = (int)(chunkLength - totalBytesRead);
finished = true;
}
totalBytesRead += bytesRead;
if (action == FileAction.APPEND) {
sourceOffset += bytesRead;
}
sourceOffset += bytesRead;
outStream.write(buf, 0, bytesRead);
updateContextStatus(totalBytesRead, context, source2);
if (finished) {
break;
}
bytesRead = readBytes(inStream, buf);
numBytesToRead = (int) getNumBytesToRead(fileLength, sourceOffset,
bufferSize);
bytesRead = readBytes(inStream, buf, numBytesToRead);
}
outStream.close();
outStream = null;
Expand All @@ -274,6 +278,15 @@ long copyBytes(CopyListingFileStatus source2, long sourceOffset,
return totalBytesRead;
}

@VisibleForTesting
long getNumBytesToRead(long fileLength, long position, long bufLength) {
if(position + bufLength < fileLength) {
return bufLength;
} else {
return fileLength - position;
}
}

private void updateContextStatus(long totalBytesRead, Mapper.Context context,
CopyListingFileStatus source2) {
StringBuilder message = new StringBuilder(DistCpUtils.getFormatter()
Expand All @@ -287,10 +300,11 @@ private void updateContextStatus(long totalBytesRead, Mapper.Context context,
context.setStatus(message.toString());
}

private static int readBytes(ThrottledInputStream inStream, byte buf[])
private static int readBytes(ThrottledInputStream inStream, byte[] buf,
int numBytes)
throws IOException {
try {
return inStream.read(buf);
return inStream.read(buf, 0, numBytes);
} catch (IOException e) {
throw new CopyReadException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.CopyListing.AclsNotSupportedException;
import org.apache.hadoop.tools.CopyListing.XAttrsNotSupportedException;
import org.apache.hadoop.tools.CopyListingFileStatus;
Expand Down Expand Up @@ -565,13 +566,15 @@ public static String getStringDescriptionFor(long nBytes) {
* @throws IOException if there's an exception while retrieving checksums.
*/
public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
FileChecksum sourceChecksum, FileSystem targetFS, Path target)
FileChecksum sourceChecksum,
FileSystem targetFS,
Path target, long sourceLen)
throws IOException {
FileChecksum targetChecksum = null;
try {
sourceChecksum = sourceChecksum != null
? sourceChecksum
: sourceFS.getFileChecksum(source);
: sourceFS.getFileChecksum(source, sourceLen);
if (sourceChecksum != null) {
// iff there's a source checksum, look for one at the destination.
targetChecksum = targetFS.getFileChecksum(target);
Expand All @@ -595,23 +598,22 @@ public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
* @param skipCrc The flag to indicate whether to skip checksums.
* @throws IOException if there's a mismatch in file lengths or checksums.
*/
public static void compareFileLengthsAndChecksums(
FileSystem sourceFS, Path source, FileChecksum sourceChecksum,
FileSystem targetFS, Path target, boolean skipCrc) throws IOException {
long srcLen = sourceFS.getFileStatus(source).getLen();
long tgtLen = targetFS.getFileStatus(target).getLen();
if (srcLen != tgtLen) {
public static void compareFileLengthsAndChecksums(long srcLen,
FileSystem sourceFS, Path source, FileChecksum sourceChecksum,
FileSystem targetFS, Path target, boolean skipCrc,
long targetLen) throws IOException {
if (srcLen != targetLen) {
throw new IOException(
"Mismatch in length of source:" + source + " (" + srcLen
+ ") and target:" + target + " (" + tgtLen + ")");
DistCpConstants.LENGTH_MISMATCH_ERROR_MSG + source + " (" + srcLen
+ ") and target:" + target + " (" + targetLen + ")");
}

//At this point, src & dest lengths are same. if length==0, we skip checksum
if ((srcLen != 0) && (!skipCrc)) {
if (!checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target)) {
targetFS, target, srcLen)) {
StringBuilder errorMessage =
new StringBuilder("Checksum mismatch between ")
new StringBuilder(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG)
.append(source).append(" and ").append(target).append(".");
boolean addSkipHint = false;
String srcScheme = sourceFS.getScheme();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,12 @@ private void testCommitWithChecksumMismatch(boolean skipCrc)
if (!skipCrc) {
Assert.fail("Expected commit to fail");
}
Path sourcePath = new Path(sourceBase + srcFilename);
CopyListingFileStatus sourceCurrStatus =
new CopyListingFileStatus(fs.getFileStatus(sourcePath));
Assert.assertFalse(DistCpUtils.checksumsAreEqual(
fs, new Path(sourceBase + srcFilename), null,
fs, new Path(targetBase + srcFilename)));
fs, new Path(targetBase + srcFilename), sourceCurrStatus.getLen()));
} catch(IOException exception) {
if (skipCrc) {
LOG.error("Unexpected exception is found", exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -444,6 +449,57 @@ private void testCopyingExistingFiles(FileSystem fs, CopyMapper copyMapper,
}
}

@Test(timeout = 40000)
public void testCopyWhileAppend() throws Exception {
mukund-thakur marked this conversation as resolved.
Show resolved Hide resolved
deleteState();
mkdirs(SOURCE_PATH + "/1");
touchFile(SOURCE_PATH + "/1/3");
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
stubContext.getContext();
copyMapper.setup(context);
final Path path = new Path(SOURCE_PATH + "/1/3");
int manyBytes = 100000000;
appendFile(path, manyBytes);
ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
Runnable task = new Runnable() {
public void run() {
try {
int maxAppendAttempts = 20;
int appendCount = 0;
while (appendCount < maxAppendAttempts) {
appendFile(path, 1000);
Thread.sleep(200);
appendCount++;
}
} catch (IOException | InterruptedException e) {
LOG.error("Exception encountered ", e);
Assert.fail("Test failed: " + e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new AssertionError(Test failed: " + e,e)
Stack traces are too import to throw away

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete stack trace is already printed using logger.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @steveloughran is right, we should log in assert instead of logger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I will make the change then.
@steveloughran Do you want me to fix the current 3 checkstyle issues? You said we can ignore some for better readability of code while doing parallel reviews.
Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not that worried about things where lines become 82, 84, 86 chars wide, because often chopping things down can make things more verbose. the current ones are examples of this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is still open: we need that stack trace. Or the caught exception is saved to some variable outside the runnable; after the run() we throw that exception if non null

}
}
};
scheduledExecutorService.schedule(task, 10, TimeUnit.MILLISECONDS);
try {
steveloughran marked this conversation as resolved.
Show resolved Hide resolved
copyMapper.map(new Text(DistCpUtils.getRelativePath(
new Path(SOURCE_PATH), path)),
new CopyListingFileStatus(cluster.getFileSystem().getFileStatus(
path)), context);
} catch (Exception ex) {
StringWriter sw = new StringWriter();
ex.printStackTrace(new PrintWriter(sw));
mukund-thakur marked this conversation as resolved.
Show resolved Hide resolved
String exceptionAsString = sw.toString();
LOG.error("Exception encountered ", ex);
if (exceptionAsString.contains(DistCpConstants.LENGTH_MISMATCH_ERROR_MSG) ||
exceptionAsString.contains(DistCpConstants.CHECKSUM_MISMATCH_ERROR_MSG)) {
Assert.fail("Test failed: " + exceptionAsString);
}
} finally {
scheduledExecutorService.shutdown();
}
}

@Test(timeout=40000)
public void testMakeDirFailure() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.mapred.CopyMapper.FileAction;

import org.junit.Assert;
mukund-thakur marked this conversation as resolved.
Show resolved Hide resolved
steveloughran marked this conversation as resolved.
Show resolved Hide resolved
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -57,5 +59,26 @@ public void testFailOnCloseError() throws Exception {
}
assertNotNull("close didn't fail", actualEx);
assertEquals(expectedEx, actualEx);
}
}

@Test(timeout = 40000)
public void testGetNumBytesToRead() {
long pos = 100;
long buffLength = 1024;
long fileLength = 2058;
RetriableFileCopyCommand retriableFileCopyCommand =
new RetriableFileCopyCommand("Testing NumBytesToRead ",
FileAction.OVERWRITE);
long numBytes = retriableFileCopyCommand
.getNumBytesToRead(fileLength, pos, buffLength);
Assert.assertEquals(1024, numBytes);
pos += numBytes;
numBytes = retriableFileCopyCommand
.getNumBytesToRead(fileLength, pos, buffLength);
Assert.assertEquals(934, numBytes);
pos += numBytes;
numBytes = retriableFileCopyCommand
.getNumBytesToRead(fileLength, pos, buffLength);
Assert.assertEquals(0, numBytes);
}
}
Loading