Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-10930.07.patch #160

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
Expand All @@ -53,7 +50,6 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
Expand Down Expand Up @@ -88,8 +84,6 @@ class BlockReceiver implements Closeable {
* the DataNode needs to recalculate checksums before writing.
*/
private final boolean needsChecksumTranslation;
private OutputStream out = null; // to block file at local disk
private FileDescriptor outFd;
private DataOutputStream checksumOut = null; // to crc file at local disk
private final int bytesPerChecksum;
private final int checksumSize;
Expand Down Expand Up @@ -250,7 +244,8 @@ class BlockReceiver implements Closeable {

final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
streams = replicaInfo.createStreams(isCreate, requestedChecksum);
streams = replicaInfo.createStreams(isCreate, requestedChecksum,
datanodeSlowLogThresholdMs);
assert streams != null : "null streams!";

// read checksum meta information
Expand All @@ -260,13 +255,6 @@ class BlockReceiver implements Closeable {
this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();
this.checksumSize = diskChecksum.getChecksumSize();

this.out = streams.getDataOut();
if (out instanceof FileOutputStream) {
this.outFd = ((FileOutputStream)out).getFD();
} else {
LOG.warn("Could not get file descriptor for outputstream of class " +
out.getClass());
}
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.getChecksumOut(), DFSUtilClient.getSmallBufferSize(
datanode.getConf())));
Expand Down Expand Up @@ -319,7 +307,7 @@ public void close() throws IOException {
packetReceiver.close();

IOException ioe = null;
if (syncOnClose && (out != null || checksumOut != null)) {
if (syncOnClose && (streams.getDataOut() != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount();
}
long flushTotalNanos = 0;
Expand Down Expand Up @@ -348,9 +336,9 @@ public void close() throws IOException {
}
// close block file
try {
if (out != null) {
if (streams.getDataOut() != null) {
long flushStartNanos = System.nanoTime();
out.flush();
streams.flushDataOut();
long flushEndNanos = System.nanoTime();
if (syncOnClose) {
long fsyncStartNanos = flushEndNanos;
Expand All @@ -359,14 +347,13 @@ public void close() throws IOException {
}
flushTotalNanos += flushEndNanos - flushStartNanos;
measuredFlushTime = true;
out.close();
out = null;
streams.closeDataStream();
}
} catch (IOException e) {
ioe = e;
}
finally{
IOUtils.closeStream(out);
streams.close();
}
if (replicaHandler != null) {
IOUtils.cleanup(null, replicaHandler);
Expand Down Expand Up @@ -419,9 +406,9 @@ void flushOrSync(boolean isSync) throws IOException {
}
flushTotalNanos += flushEndNanos - flushStartNanos;
}
if (out != null) {
if (streams.getDataOut() != null) {
long flushStartNanos = System.nanoTime();
out.flush();
streams.flushDataOut();
long flushEndNanos = System.nanoTime();
if (isSync) {
long fsyncStartNanos = flushEndNanos;
Expand All @@ -430,10 +417,10 @@ void flushOrSync(boolean isSync) throws IOException {
}
flushTotalNanos += flushEndNanos - flushStartNanos;
}
if (checksumOut != null || out != null) {
if (checksumOut != null || streams.getDataOut() != null) {
datanode.metrics.addFlushNanos(flushTotalNanos);
if (isSync) {
datanode.metrics.incrFsyncCount();
datanode.metrics.incrFsyncCount();
}
}
long duration = Time.monotonicNow() - begin;
Expand Down Expand Up @@ -716,16 +703,12 @@ private int receivePacket() throws IOException {
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);

// Write data to disk.
long begin = Time.monotonicNow();
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
long duration = Time.monotonicNow() - begin;
long duration = streams.writeToDisk(dataBuf.array(),
startByteToDisk, numBytesToDisk);

if (duration > maxWriteToDiskMs) {
maxWriteToDiskMs = duration;
}
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
}

final byte[] lastCrc;
if (shouldNotWriteChecksum) {
Expand Down Expand Up @@ -842,7 +825,7 @@ private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {

private void manageWriterOsCache(long offsetInBlock) {
try {
if (outFd != null &&
if (streams.getOutFd() != null &&
offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
long begin = Time.monotonicNow();
//
Expand All @@ -857,12 +840,11 @@ private void manageWriterOsCache(long offsetInBlock) {
if (syncBehindWrites) {
if (syncBehindWritesInBackground) {
this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest(
block, outFd, lastCacheManagementOffset,
block, streams, lastCacheManagementOffset,
offsetInBlock - lastCacheManagementOffset,
SYNC_FILE_RANGE_WRITE);
} else {
NativeIO.POSIX.syncFileRangeIfPossible(outFd,
lastCacheManagementOffset,
streams.syncFileRangeIfPossible(lastCacheManagementOffset,
offsetInBlock - lastCacheManagementOffset,
SYNC_FILE_RANGE_WRITE);
}
Expand All @@ -879,8 +861,8 @@ private void manageWriterOsCache(long offsetInBlock) {
//
long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
if (dropPos > 0 && dropCacheBehindWrites) {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
block.getBlockName(), outFd, 0, dropPos, POSIX_FADV_DONTNEED);
streams.dropCacheBehindWrites(block.getBlockName(), 0, dropPos,
POSIX_FADV_DONTNEED);
}
lastCacheManagementOffset = offsetInBlock;
long duration = Time.monotonicNow() - begin;
Expand Down Expand Up @@ -989,7 +971,7 @@ void receiveBlock(
// The worst case is not recovering this RBW replica.
// Client will fall back to regular pipeline recovery.
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(streams.getDataOut());
}
try {
// Even if the connection is closed after the ack packet is
Expand Down Expand Up @@ -1047,8 +1029,8 @@ private void cleanupBlock() throws IOException {
* will be overwritten.
*/
private void adjustCrcFilePosition() throws IOException {
if (out != null) {
out.flush();
if (streams.getDataOut() != null) {
streams.flushDataOut();
}
if (checksumOut != null) {
checksumOut.flush();
Expand Down Expand Up @@ -1094,10 +1076,10 @@ private Checksum computePartialChunkCrc(long blkoff, long ckoff)
byte[] crcbuf = new byte[checksumSize];
try (ReplicaInputStreams instr =
datanode.data.getTmpInputStreams(block, blkoff, ckoff)) {
IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk);
instr.readDataFully(buf, 0, sizePartialChunk);

// open meta file and read in crc value computer earlier
IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length);
instr.readChecksumFully(crcbuf, 0, crcbuf.length);
}

// compute crc of partial chunk from data read in the block file.
Expand Down
Loading