Skip to content

Commit 2740d9b

Browse files
author
zhangminglei
committed
Merge pull request apache#1 from hash-X/NewFileInputStream
New file of AltFileInputStream.java to replace FileInputStream.java in apache/hadoop/HDFS
2 parents 176131f + 6304777 commit 2740d9b

File tree

17 files changed

+367
-91
lines changed

17 files changed

+367
-91
lines changed
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.io;
20+
21+
import org.apache.hadoop.util.Shell;
22+
23+
import java.io.InputStream;
24+
import java.io.FileInputStream;
25+
import java.io.File;
26+
import java.io.FileDescriptor;
27+
import java.io.IOException;
28+
import java.io.RandomAccessFile;
29+
30+
import java.nio.channels.Channels;
31+
import java.nio.channels.FileChannel;
32+
33+
/**
34+
* This class is substitute FileInputStream. When on windows, We are still use
35+
* FileInputStream. For non-windows, we use channel and FileDescriptor to
36+
* construct a stream.
37+
*/
38+
public class AltFileInputStream extends InputStream {
39+
// For non-Windows
40+
private final InputStream inputStream;
41+
private final FileDescriptor fd;
42+
private final FileChannel fileChannel;
43+
44+
// For Windows
45+
private FileInputStream fileInputStream;
46+
47+
public AltFileInputStream(File file) throws IOException {
48+
if (!Shell.WINDOWS) {
49+
RandomAccessFile rf = new RandomAccessFile(file, "r");
50+
this.fd = rf.getFD();
51+
this.fileChannel = rf.getChannel();
52+
this.inputStream = Channels.newInputStream(fileChannel);
53+
} else {
54+
FileInputStream fis = new FileInputStream(file);
55+
this.fileInputStream = fis;
56+
this.inputStream = fileInputStream;
57+
this.fd = fis.getFD();
58+
this.fileChannel = fis.getChannel();
59+
}
60+
}
61+
62+
/**
63+
* Create a stream with fd and channel
64+
* @param fd FileDescriptor
65+
* @param fileChannel FileChannel
66+
*/
67+
public AltFileInputStream(FileDescriptor fd, FileChannel fileChannel) {
68+
this.fd = fd;
69+
this.fileChannel = fileChannel;
70+
this.inputStream = Channels.newInputStream(fileChannel);
71+
}
72+
73+
/**
74+
* Create a stream with FileInputSteam
75+
* @param fis FileInputStream
76+
* @throws IOException
77+
*/
78+
public AltFileInputStream(FileInputStream fis) throws IOException {
79+
this.fileInputStream = fis;
80+
this.inputStream = fileInputStream;
81+
this.fd = fis.getFD();
82+
this.fileChannel = fis.getChannel();
83+
}
84+
85+
/**
86+
* Returns the <code>FileDescriptor</code>
87+
* object that represents the connection to
88+
* the actual file in the file system being
89+
* used by this <code>FileInputStream</code>.
90+
*
91+
* @return the file descriptor object associated with this stream.
92+
* @exception IOException if an I/O error occurs.
93+
* @see java.io.FileDescriptor
94+
*/
95+
public final FileDescriptor getFD() throws IOException {
96+
if (fd != null) {
97+
return fd;
98+
}
99+
throw new IOException();
100+
}
101+
102+
// return a channel
103+
public FileChannel getChannel() {
104+
return fileChannel;
105+
}
106+
107+
/**
108+
* For Windows, use fileInputStream to read data.
109+
* For Non-Windows, use inputStream to read data.
110+
* @return
111+
* @throws IOException
112+
*/
113+
public int read() throws IOException {
114+
if (fileInputStream != null) {
115+
return fileInputStream.read();
116+
} else {
117+
return inputStream.read();
118+
}
119+
}
120+
121+
/**
122+
* Reads up to <code>len</code> bytes of data from this input stream
123+
* into an array of bytes. If <code>len</code> is not zero, the method
124+
* blocks until some input is available; otherwise, no
125+
* bytes are read and <code>0</code> is returned.
126+
*
127+
* @param b the buffer into which the data is read.
128+
* @param off the start offset in the destination array <code>b</code>
129+
* @param len the maximum number of bytes read.
130+
* @return the total number of bytes read into the buffer, or
131+
* <code>-1</code> if there is no more data because the end of
132+
* the file has been reached.
133+
* @exception NullPointerException If <code>b</code> is <code>null</code>.
134+
* @exception IndexOutOfBoundsException If <code>off</code> is negative,
135+
* <code>len</code> is negative, or <code>len</code> is greater than
136+
* <code>b.length - off</code>
137+
* @exception IOException if an I/O error occurs.
138+
*/
139+
public int read(byte[] b, int off, int len) throws IOException {
140+
if (fileInputStream != null) {
141+
return fileInputStream.read(b, off, len);
142+
} else {
143+
return inputStream.read(b, off, len);
144+
}
145+
}
146+
147+
/**
148+
* Reads up to <code>b.length</code> bytes of data from this input
149+
* stream into an array of bytes. This method blocks until some input
150+
* is available.
151+
*
152+
* @param b the buffer into which the data is read.
153+
* @return the total number of bytes read into the buffer, or
154+
* <code>-1</code> if there is no more data because the end of
155+
* the file has been reached.
156+
* @exception IOException if an I/O error occurs.
157+
*/
158+
public int read(byte[] b) throws IOException {
159+
if (fileInputStream != null) {
160+
return fileInputStream.read(b);
161+
} else {
162+
return inputStream.read(b);
163+
}
164+
}
165+
166+
/**
167+
* Closes this file input stream and releases any system resources
168+
* associated with the stream.
169+
*
170+
* <p> If this stream has an associated channel then the channel is closed
171+
* as well.
172+
*
173+
* @exception IOException if an I/O error occurs.
174+
*/
175+
public void close() throws IOException {
176+
if (fileInputStream != null) {
177+
fileInputStream.close();
178+
}
179+
fileChannel.close();
180+
inputStream.close();
181+
}
182+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.hadoop.conf.Configuration;
3636
import org.apache.hadoop.fs.CommonConfigurationKeys;
3737
import org.apache.hadoop.fs.HardLink;
38+
import org.apache.hadoop.io.AltFileInputStream;
3839
import org.apache.hadoop.io.IOUtils;
3940
import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
4041
import org.apache.hadoop.util.NativeCodeLoader;
@@ -773,19 +774,19 @@ public static FileInputStream getShareDeleteFileInputStream(File f)
773774
}
774775

775776
/**
776-
* Create a FileInputStream that shares delete permission on the
777+
* Create a AltFileInputStream that shares delete permission on the
777778
* file opened at a given offset, i.e. other process can delete
778779
* the file the FileInputStream is reading. Only Windows implementation
779780
* uses the native interface.
780781
*/
781-
public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
782+
public static AltFileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
782783
throws IOException {
783784
if (!Shell.WINDOWS) {
784785
RandomAccessFile rf = new RandomAccessFile(f, "r");
785786
if (seekOffset > 0) {
786787
rf.seek(seekOffset);
787788
}
788-
return new FileInputStream(rf.getFD());
789+
return new AltFileInputStream(rf.getFD(), rf.getChannel());
789790
} else {
790791
// Use Windows native interface to create a FileInputStream that
791792
// shares delete permission on the file opened, and set it to the
@@ -800,7 +801,7 @@ public static FileInputStream getShareDeleteFileInputStream(File f, long seekOff
800801
NativeIO.Windows.OPEN_EXISTING);
801802
if (seekOffset > 0)
802803
NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN);
803-
return new FileInputStream(fd);
804+
return new AltFileInputStream(new FileInputStream(fd));
804805
}
805806
}
806807

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
4646
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
4747
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
48+
import org.apache.hadoop.io.AltFileInputStream;
4849
import org.apache.hadoop.io.IOUtils;
4950
import org.apache.hadoop.ipc.RPC;
5051
import org.apache.hadoop.security.UserGroupInformation;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717
*/
1818
package org.apache.hadoop.hdfs.server.datanode;
1919

20-
import java.io.BufferedInputStream;
21-
import java.io.ByteArrayInputStream;
22-
import java.io.DataInputStream;
23-
import java.io.DataOutputStream;
24-
import java.io.EOFException;
2520
import java.io.File;
26-
import java.io.FileInputStream;
21+
import java.io.DataInputStream;
22+
import java.io.BufferedInputStream;
2723
import java.io.IOException;
24+
import java.io.EOFException;
2825
import java.io.RandomAccessFile;
26+
import java.io.ByteArrayInputStream;
27+
import java.io.DataOutputStream;
2928
import java.nio.ByteBuffer;
3029
import java.nio.channels.FileChannel;
3130

@@ -35,6 +34,7 @@
3534
import org.apache.hadoop.classification.InterfaceStability;
3635
import org.apache.hadoop.hdfs.DFSUtil;
3736
import org.apache.hadoop.hdfs.HdfsConfiguration;
37+
import org.apache.hadoop.io.AltFileInputStream;
3838
import org.apache.hadoop.io.IOUtils;
3939
import org.apache.hadoop.util.DataChecksum;
4040

@@ -88,7 +88,7 @@ public static DataChecksum readDataChecksum(File metaFile) throws IOException {
8888
DataInputStream in = null;
8989
try {
9090
in = new DataInputStream(new BufferedInputStream(
91-
new FileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf)));
91+
new AltFileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf)));
9292
return readDataChecksum(in, metaFile);
9393
} finally {
9494
IOUtils.closeStream(in);
@@ -153,7 +153,7 @@ public static BlockMetadataHeader readHeader(File file) throws IOException {
153153
DataInputStream in = null;
154154
try {
155155
in = new DataInputStream(new BufferedInputStream(
156-
new FileInputStream(file)));
156+
new AltFileInputStream(file)));
157157
return readHeader(in);
158158
} finally {
159159
IOUtils.closeStream(in);

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.io.DataInputStream;
2222
import java.io.DataOutputStream;
2323
import java.io.FileDescriptor;
24-
import java.io.FileInputStream;
2524
import java.io.FileNotFoundException;
2625
import java.io.IOException;
2726
import java.io.InputStream;
@@ -41,6 +40,7 @@
4140
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
4241
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
4342
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
43+
import org.apache.hadoop.io.AltFileInputStream;
4444
import org.apache.hadoop.io.IOUtils;
4545
import org.apache.hadoop.io.LongWritable;
4646
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
@@ -388,8 +388,8 @@ class BlockSender implements java.io.Closeable {
388388
DataNode.LOG.debug("replica=" + replica);
389389
}
390390
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
391-
if (blockIn instanceof FileInputStream) {
392-
blockInFd = ((FileInputStream)blockIn).getFD();
391+
if (blockIn instanceof AltFileInputStream) {
392+
blockInFd = ((AltFileInputStream)blockIn).getFD();
393393
} else {
394394
blockInFd = null;
395395
}
@@ -579,7 +579,7 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
579579
sockOut.write(buf, headerOff, dataOff - headerOff);
580580

581581
// no need to flush since we know out is not a buffered stream
582-
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
582+
FileChannel fileCh = ((AltFileInputStream)blockIn).getChannel();
583583
LongWritable waitTime = new LongWritable();
584584
LongWritable transferTime = new LongWritable();
585585
sockOut.transferToFully(fileCh, blockInPosition, dataLen,
@@ -742,9 +742,9 @@ private long doSendBlock(DataOutputStream out, OutputStream baseStream,
742742
int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
743743
boolean transferTo = transferToAllowed && !verifyChecksum
744744
&& baseStream instanceof SocketOutputStream
745-
&& blockIn instanceof FileInputStream;
745+
&& blockIn instanceof AltFileInputStream;
746746
if (transferTo) {
747-
FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
747+
FileChannel fileChannel = ((AltFileInputStream)blockIn).getChannel();
748748
blockInPosition = fileChannel.position();
749749
streamForSendChunks = baseStream;
750750
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);

0 commit comments

Comments
 (0)