Skip to content

[SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths" #20119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
package org.apache.spark.network.buffer;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;

import com.google.common.base.Objects;
import com.google.common.io.ByteStreams;
Expand Down Expand Up @@ -94,9 +93,9 @@ public ByteBuffer nioByteBuffer() throws IOException {

@Override
public InputStream createInputStream() throws IOException {
InputStream is = null;
FileInputStream is = null;
try {
is = Files.newInputStream(file.toPath());
is = new FileInputStream(file);
ByteStreams.skipFully(is, offset);
return new LimitedInputStream(is, length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this two lines might be the place which suffers from skip issue, can we just only revert this place? @zsxwing @cloud-fan @gatorsmile .

} catch (IOException e) {
Expand Down Expand Up @@ -133,7 +132,7 @@ public Object convertToNetty() throws IOException {
if (conf.lazyFileDescriptor()) {
return new DefaultFileRegion(file, offset, length);
} else {
FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
FileChannel fileChannel = new FileInputStream(file).getChannel();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao I think this is the only line that may reduce the memory pressure for external shuffle service. Right?

return new DefaultFileRegion(fileChannel, offset, length);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.spark.network.shuffle;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.util.Arrays;

import org.slf4j.Logger;
Expand Down Expand Up @@ -165,7 +165,7 @@ private class DownloadCallback implements StreamCallback {

DownloadCallback(int chunkIndex) throws IOException {
this.targetFile = tempFileManager.createTempFile();
this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath()));
this.channel = Channels.newChannel(new FileOutputStream(targetFile));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I think we can use FileChannel.open instead.

this.chunkIndex = chunkIndex;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
import java.nio.file.Files;

/**
* Keeps the index information for a particular map output
Expand All @@ -39,7 +39,7 @@ public ShuffleIndexInformation(File indexFile) throws IOException {
offsets = buffer.asLongBuffer();
DataInputStream dis = null;
try {
dis = new DataInputStream(Files.newInputStream(indexFile.toPath()));
dis = new DataInputStream(new FileInputStream(indexFile));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing also here I think it will affect external shuffle service.

dis.readFully(buffer.array());
} finally {
if (dis != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import static java.nio.file.StandardOpenOption.*;
import javax.annotation.Nullable;

import scala.None$;
Expand Down Expand Up @@ -75,6 +75,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private static final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);

private final int fileBufferSize;
private final boolean transferToEnabled;
private final int numPartitions;
private final BlockManager blockManager;
private final Partitioner partitioner;
Expand Down Expand Up @@ -106,6 +107,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
SparkConf conf) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
Expand Down Expand Up @@ -186,21 +188,17 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
return lengths;
}

// This file needs to opened in append mode in order to work around a Linux kernel bug that
// affects transferTo; see SPARK-3948 for more details.
final FileChannel out = FileChannel.open(outputFile.toPath(), WRITE, APPEND, CREATE);
final FileOutputStream out = new FileOutputStream(outputFile, true);
final long writeStartTime = System.nanoTime();
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
if (file.exists()) {
final FileChannel in = FileChannel.open(file.toPath(), READ);
final FileInputStream in = new FileInputStream(file);
boolean copyThrewException = true;
try {
long size = in.size();
Utils.copyFileStreamNIO(in, out, 0, size);
lengths[i] = size;
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import javax.annotation.Nullable;
import java.io.*;
import java.nio.channels.FileChannel;
import static java.nio.file.StandardOpenOption.*;
import java.util.Iterator;

import scala.Option;
Expand Down Expand Up @@ -291,7 +290,7 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti
final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
try {
if (spills.length == 0) {
java.nio.file.Files.newOutputStream(outputFile.toPath()).close(); // Create an empty file
new FileOutputStream(outputFile).close(); // Create an empty file
return new long[partitioner.numPartitions()];
} else if (spills.length == 1) {
// Here, we don't need to perform any metrics updates because the bytes written to this
Expand Down Expand Up @@ -368,7 +367,7 @@ private long[] mergeSpillsWithFileStream(
final InputStream[] spillInputStreams = new InputStream[spills.length];

final OutputStream bos = new BufferedOutputStream(
java.nio.file.Files.newOutputStream(outputFile.toPath()),
new FileOutputStream(outputFile),
outputBufferSizeInBytes);
// Use a counting output stream to avoid having to close the underlying file and ask
// the file system for its size after each partition is written.
Expand Down Expand Up @@ -443,11 +442,11 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
boolean threwException = true;
try {
for (int i = 0; i < spills.length; i++) {
spillInputChannels[i] = FileChannel.open(spills[i].file.toPath(), READ);
spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
}
// This file needs to opened in append mode in order to work around a Linux kernel bug that
// affects transferTo; see SPARK-3948 for more details.
mergedFileOutputChannel = FileChannel.open(outputFile.toPath(), WRITE, CREATE, APPEND);
mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel();

long bytesWrittenToMergedFile = 0;
for (int partition = 0; partition < numPartitions; partition++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.shuffle

import java.io._
import java.nio.file.Files

import com.google.common.io.ByteStreams

Expand Down Expand Up @@ -142,8 +141,7 @@ private[spark] class IndexShuffleBlockResolver(
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val out = new DataOutputStream(
new BufferedOutputStream(Files.newOutputStream(indexTmp.toPath)))
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
Expand Down Expand Up @@ -198,7 +196,7 @@ private[spark] class IndexShuffleBlockResolver(
// find out the consolidated file, then the offset within that from our index
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)

val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
val in = new DataInputStream(new FileInputStream(indexFile))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao this is another place. In addition, I'm not sure if there is any compression codec using skip or not.

I also noticed sun.nio.ch.ChannelInputStream has extra synchronizeds as Files.newInputStream needs to be thread-safe. Not sure if it may cause performance regression or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

try {
ByteStreams.skipFully(in, blockId.reduceId * 8)
val offset = in.readLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.spark.util.collection

import java.io._
import java.nio.channels.{Channels, FileChannel}
import java.nio.file.StandardOpenOption
import java.util.Comparator

import scala.collection.BufferedIterator
Expand Down Expand Up @@ -461,7 +459,7 @@ class ExternalAppendOnlyMap[K, V, C](
)

private var batchIndex = 0 // Which batch we're in
private var fileChannel: FileChannel = null
private var fileStream: FileInputStream = null

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
Expand All @@ -478,23 +476,22 @@ class ExternalAppendOnlyMap[K, V, C](
if (batchIndex < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStream.close()
fileChannel.close()
fileStream.close()
deserializeStream = null
fileChannel = null
fileStream = null
}

val start = batchOffsets(batchIndex)
fileChannel = FileChannel.open(file.toPath, StandardOpenOption.READ)
fileChannel.position(start)
fileStream = new FileInputStream(file)
fileStream.getChannel.position(start)
batchIndex += 1

val end = batchOffsets(batchIndex)

assert(end >= start, "start = " + start + ", end = " + end +
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))

val bufferedStream = new BufferedInputStream(
ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
val wrappedStream = serializerManager.wrapStream(blockId, bufferedStream)
ser.deserializeStream(wrappedStream)
} else {
Expand Down Expand Up @@ -554,9 +551,9 @@ class ExternalAppendOnlyMap[K, V, C](
ds.close()
deserializeStream = null
}
if (fileChannel != null) {
fileChannel.close()
fileChannel = null
if (fileStream != null) {
fileStream.close()
fileStream = null
}
if (file.exists()) {
if (!file.delete()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.spark.util.collection

import java.io._
import java.nio.channels.{Channels, FileChannel}
import java.nio.file.StandardOpenOption
import java.util.Comparator

import scala.collection.mutable
Expand Down Expand Up @@ -494,7 +492,7 @@ private[spark] class ExternalSorter[K, V, C](

// Intermediate file and deserializer streams that read from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
var fileChannel: FileChannel = null
var fileStream: FileInputStream = null
var deserializeStream = nextBatchStream() // Also sets fileStream

var nextItem: (K, C) = null
Expand All @@ -507,23 +505,22 @@ private[spark] class ExternalSorter[K, V, C](
if (batchId < batchOffsets.length - 1) {
if (deserializeStream != null) {
deserializeStream.close()
fileChannel.close()
fileStream.close()
deserializeStream = null
fileChannel = null
fileStream = null
}

val start = batchOffsets(batchId)
fileChannel = FileChannel.open(spill.file.toPath, StandardOpenOption.READ)
fileChannel.position(start)
fileStream = new FileInputStream(spill.file)
fileStream.getChannel.position(start)
batchId += 1

val end = batchOffsets(batchId)

assert(end >= start, "start = " + start + ", end = " + end +
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))

val bufferedStream = new BufferedInputStream(
ByteStreams.limit(Channels.newInputStream(fileChannel), end - start))
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))

val wrappedStream = serializerManager.wrapStream(spill.blockId, bufferedStream)
serInstance.deserializeStream(wrappedStream)
Expand Down Expand Up @@ -613,7 +610,7 @@ private[spark] class ExternalSorter[K, V, C](
batchId = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
deserializeStream = null
fileChannel = null
fileStream = null
if (ds != null) {
ds.close()
}
Expand Down