Skip to content

Commit c48c969

Browse files
sahilTakiardeepakdamri
authored andcommitted
HDFS-3246: pRead equivalent for direct read path (apache#597)
HDFS-3246: pRead equivalent for direct read path Contributed by Sahil Takiar
1 parent 8196cea commit c48c969

File tree

14 files changed

+1164
-269
lines changed

14 files changed

+1164
-269
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

Lines changed: 201 additions & 96 deletions
Large diffs are not rendered by default.
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.fs;
19+
20+
import java.io.IOException;
21+
import java.nio.ByteBuffer;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience;
24+
import org.apache.hadoop.classification.InterfaceStability;
25+
26+
/**
27+
* Implementers of this interface provide a positioned read API that writes to a
28+
* {@link ByteBuffer} rather than a {@code byte[]}.
29+
*
30+
* @see PositionedReadable
31+
* @see ByteBufferReadable
32+
*/
33+
@InterfaceAudience.Public
34+
@InterfaceStability.Evolving
35+
public interface ByteBufferPositionedReadable {
36+
/**
37+
* Reads up to {@code buf.remaining()} bytes into buf from a given position
38+
* in the file and returns the number of bytes read. Callers should use
39+
* {@code buf.limit(...)} to control the size of the desired read and
40+
* {@code buf.position(...)} to control the offset into the buffer the data
41+
* should be written to.
42+
* <p>
43+
* After a successful call, {@code buf.position()} will be advanced by the
44+
* number of bytes read and {@code buf.limit()} will be unchanged.
45+
* <p>
46+
* In the case of an exception, the state of the buffer (the contents of the
47+
* buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
48+
* undefined, and callers should be prepared to recover from this
49+
* eventuality.
50+
* <p>
51+
* Callers should use {@link StreamCapabilities#hasCapability(String)} with
52+
* {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying
53+
* stream supports this interface, otherwise they might get a
54+
* {@link UnsupportedOperationException}.
55+
* <p>
56+
* Implementations should treat 0-length requests as legitimate, and must not
57+
* signal an error upon their receipt.
58+
*
59+
* @param position position within file
60+
* @param buf the ByteBuffer to receive the results of the read operation.
61+
* @return the number of bytes read, possibly zero, or -1 if reached
62+
* end-of-stream
63+
* @throws IOException if there is some error performing the read
64+
*/
65+
int read(long position, ByteBuffer buf) throws IOException;
66+
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
22+
2223
import org.apache.hadoop.classification.InterfaceAudience;
2324
import org.apache.hadoop.classification.InterfaceStability;
2425

@@ -32,18 +33,20 @@ public interface ByteBufferReadable {
3233
/**
3334
* Reads up to buf.remaining() bytes into buf. Callers should use
3435
* buf.limit(..) to control the size of the desired read.
35-
* <p/>
36-
* After a successful call, buf.position() will be advanced by the number
37-
* of bytes read and buf.limit() should be unchanged.
38-
* <p/>
39-
* In the case of an exception, the values of buf.position() and buf.limit()
40-
* are undefined, and callers should be prepared to recover from this
36+
* <p>
37+
* After a successful call, {@code buf.position()} will be advanced by the
38+
* number of bytes read and {@code buf.limit()} will be unchanged.
39+
* <p>
40+
* In the case of an exception, the state of the buffer (the contents of the
41+
* buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
42+
* undefined, and callers should be prepared to recover from this
4143
* eventuality.
42-
* <p/>
43-
* Many implementations will throw {@link UnsupportedOperationException}, so
44-
* callers that are not confident in support for this method from the
45-
* underlying filesystem should be prepared to handle that exception.
46-
* <p/>
44+
* <p>
45+
* Callers should use {@link StreamCapabilities#hasCapability(String)} with
46+
* {@link StreamCapabilities#READBYTEBUFFER} to check if the underlying
47+
* stream supports this interface, otherwise they might get a
48+
* {@link UnsupportedOperationException}.
49+
* <p>
4750
* Implementations should treat 0-length requests as legitimate, and must not
4851
* signal an error upon their receipt.
4952
*

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
public class FSDataInputStream extends DataInputStream
3939
implements Seekable, PositionedReadable,
4040
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
41-
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
41+
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
42+
ByteBufferPositionedReadable {
4243
/**
4344
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
4445
* objects
@@ -147,7 +148,8 @@ public int read(ByteBuffer buf) throws IOException {
147148
return ((ByteBufferReadable)in).read(buf);
148149
}
149150

150-
throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
151+
throw new UnsupportedOperationException("Byte-buffer read unsupported " +
152+
"by input stream");
151153
}
152154

153155
@Override
@@ -246,4 +248,13 @@ public boolean hasCapability(String capability) {
246248
public String toString() {
247249
return super.toString() + ": " + in;
248250
}
251+
252+
@Override
253+
public int read(long position, ByteBuffer buf) throws IOException {
254+
if (in instanceof ByteBufferPositionedReadable) {
255+
return ((ByteBufferPositionedReadable) in).read(position, buf);
256+
}
257+
throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
258+
"by input stream");
259+
}
249260
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ public interface StreamCapabilities {
5959
*/
6060
String UNBUFFER = "in:unbuffer";
6161

62+
/**
63+
* Stream read(ByteBuffer) capability implemented by
64+
* {@link ByteBufferReadable#read(java.nio.ByteBuffer)}.
65+
*/
66+
String READBYTEBUFFER = "in:readbytebuffer";
67+
68+
/**
69+
* Stream read(long, ByteBuffer) capability implemented by
70+
* {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}.
71+
*/
72+
String PREADBYTEBUFFER = "in:preadbytebuffer";
73+
6274
/**
6375
* Capabilities that a stream can support and be queried for.
6476
*/

0 commit comments

Comments
 (0)