-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths" #20119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,13 +18,12 @@ | |
package org.apache.spark.network.buffer; | ||
|
||
import java.io.File; | ||
import java.io.FileInputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.RandomAccessFile; | ||
import java.nio.ByteBuffer; | ||
import java.nio.channels.FileChannel; | ||
import java.nio.file.Files; | ||
import java.nio.file.StandardOpenOption; | ||
|
||
import com.google.common.base.Objects; | ||
import com.google.common.io.ByteStreams; | ||
|
@@ -94,9 +93,9 @@ public ByteBuffer nioByteBuffer() throws IOException { | |
|
||
@Override | ||
public InputStream createInputStream() throws IOException { | ||
InputStream is = null; | ||
FileInputStream is = null; | ||
try { | ||
is = Files.newInputStream(file.toPath()); | ||
is = new FileInputStream(file); | ||
ByteStreams.skipFully(is, offset); | ||
return new LimitedInputStream(is, length); | ||
} catch (IOException e) { | ||
|
@@ -133,7 +132,7 @@ public Object convertToNetty() throws IOException { | |
if (conf.lazyFileDescriptor()) { | ||
return new DefaultFileRegion(file, offset, length); | ||
} else { | ||
FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ); | ||
FileChannel fileChannel = new FileInputStream(file).getChannel(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jerryshao I think this is the only line that may reduce the memory pressure for external shuffle service. Right? |
||
return new DefaultFileRegion(fileChannel, offset, length); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,11 +18,11 @@ | |
package org.apache.spark.network.shuffle; | ||
|
||
import java.io.File; | ||
import java.io.FileOutputStream; | ||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.nio.channels.Channels; | ||
import java.nio.channels.WritableByteChannel; | ||
import java.nio.file.Files; | ||
import java.util.Arrays; | ||
|
||
import org.slf4j.Logger; | ||
|
@@ -165,7 +165,7 @@ private class DownloadCallback implements StreamCallback { | |
|
||
DownloadCallback(int chunkIndex) throws IOException { | ||
this.targetFile = tempFileManager.createTempFile(); | ||
this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath())); | ||
this.channel = Channels.newChannel(new FileOutputStream(targetFile)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here I think we can use |
||
this.chunkIndex = chunkIndex; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,10 +19,10 @@ | |
|
||
import java.io.DataInputStream; | ||
import java.io.File; | ||
import java.io.FileInputStream; | ||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.nio.LongBuffer; | ||
import java.nio.file.Files; | ||
|
||
/** | ||
* Keeps the index information for a particular map output | ||
|
@@ -39,7 +39,7 @@ public ShuffleIndexInformation(File indexFile) throws IOException { | |
offsets = buffer.asLongBuffer(); | ||
DataInputStream dis = null; | ||
try { | ||
dis = new DataInputStream(Files.newInputStream(indexFile.toPath())); | ||
dis = new DataInputStream(new FileInputStream(indexFile)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zsxwing also here I think it will affect external shuffle service. |
||
dis.readFully(buffer.array()); | ||
} finally { | ||
if (dis != null) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,6 @@ | |
package org.apache.spark.shuffle | ||
|
||
import java.io._ | ||
import java.nio.file.Files | ||
|
||
import com.google.common.io.ByteStreams | ||
|
||
|
@@ -142,8 +141,7 @@ private[spark] class IndexShuffleBlockResolver( | |
val indexFile = getIndexFile(shuffleId, mapId) | ||
val indexTmp = Utils.tempFileWith(indexFile) | ||
try { | ||
val out = new DataOutputStream( | ||
new BufferedOutputStream(Files.newOutputStream(indexTmp.toPath))) | ||
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) | ||
Utils.tryWithSafeFinally { | ||
// We take in lengths of each block, need to convert it to offsets. | ||
var offset = 0L | ||
|
@@ -198,7 +196,7 @@ private[spark] class IndexShuffleBlockResolver( | |
// find out the consolidated file, then the offset within that from our index | ||
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) | ||
|
||
val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) | ||
val in = new DataInputStream(new FileInputStream(indexFile)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jerryshao this is another place. In addition, I'm not sure if there is any compression codec using I also noticed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. |
||
try { | ||
ByteStreams.skipFully(in, blockId.reduceId * 8) | ||
val offset = in.readLong() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this two lines might be the place which suffers from
skip
issue, can we just only revert this place? @zsxwing @cloud-fan @gatorsmile .