Skip to content

Commit 111c7bb

Browse files
committed
HADOOP-13327 Output Stream Specification.
This PR removes the changes related to S3A output stream lifecycle, so only covers the specification of Syncable and ensures that StreamCapabilities passes all the way through to the final implementation classes. All streams which implement Syncable hsync/hflush declare this in their stream capabilities Change-Id: I82b16a8e0965f34eb0c42504da43e8fbeabcb68c
1 parent 83c7c2b commit 111c7bb

File tree

18 files changed

+1157
-34
lines changed

18 files changed

+1157
-34
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.fs.CanSetDropBehind;
2929
import org.apache.hadoop.fs.StreamCapabilities;
3030
import org.apache.hadoop.fs.Syncable;
31+
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
3132

3233
import com.google.common.base.Preconditions;
3334

@@ -308,9 +309,6 @@ private void freeBuffers() {
308309

309310
@Override
310311
public boolean hasCapability(String capability) {
311-
if (out instanceof StreamCapabilities) {
312-
return ((StreamCapabilities) out).hasCapability(capability);
313-
}
314-
return false;
312+
return StoreImplementationUtils.hasCapability(out, capability);
315313
}
316314
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CanSetDropBehind.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,6 @@ public interface CanSetDropBehind {
3636
* UnsupportedOperationException If this stream doesn't support
3737
* setting the drop-behind.
3838
*/
39-
public void setDropBehind(Boolean dropCache)
39+
void setDropBehind(Boolean dropCache)
4040
throws IOException, UnsupportedOperationException;
4141
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import org.apache.hadoop.classification.InterfaceAudience;
3131
import org.apache.hadoop.classification.InterfaceStability;
32+
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
3233
import org.apache.hadoop.io.ByteBufferPool;
3334
import org.apache.hadoop.util.IdentityHashStore;
3435

@@ -234,10 +235,7 @@ public void unbuffer() {
234235

235236
@Override
236237
public boolean hasCapability(String capability) {
237-
if (in instanceof StreamCapabilities) {
238-
return ((StreamCapabilities) in).hasCapability(capability);
239-
}
240-
return false;
238+
return StoreImplementationUtils.hasCapability(in, capability);
241239
}
242240

243241
/**

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
27+
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
2728

2829
/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
2930
*/
@@ -122,10 +123,7 @@ public OutputStream getWrappedStream() {
122123

123124
@Override
124125
public boolean hasCapability(String capability) {
125-
if (wrappedStream instanceof StreamCapabilities) {
126-
return ((StreamCapabilities) wrappedStream).hasCapability(capability);
127-
}
128-
return false;
126+
return StoreImplementationUtils.hasCapability(wrappedStream, capability);
129127
}
130128

131129
@Override // Syncable

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
*/
3434
@InterfaceAudience.LimitedPrivate({"HDFS"})
3535
@InterfaceStability.Unstable
36-
abstract public class FSOutputSummer extends OutputStream {
36+
abstract public class FSOutputSummer extends OutputStream implements
37+
StreamCapabilities {
3738
// data checksum
3839
private final DataChecksum sum;
3940
// internal buffer for storing data before it is checksumed
@@ -254,4 +255,9 @@ protected synchronized void setChecksumBufSize(int size) {
254255
protected synchronized void resetChecksumBufSize() {
255256
setChecksumBufSize(sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS);
256257
}
258+
259+
@Override
260+
public boolean hasCapability(String capability) {
261+
return false;
262+
}
257263
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,24 @@
2323
import org.apache.hadoop.classification.InterfaceAudience;
2424
import org.apache.hadoop.classification.InterfaceStability;
2525

26-
/** This interface for flush/sync operation. */
26+
/**
27+
* This is the interface for flush/sync operation.
28+
* Consult the Hadoop filesystem specification for the definition of the
29+
* semantics of this operation.
30+
*/
2731
@InterfaceAudience.Public
28-
@InterfaceStability.Evolving
32+
@InterfaceStability.Stable
2933
public interface Syncable {
30-
34+
3135
/** Flush out the data in client's user buffer. After the return of
3236
* this call, new readers will see the data.
3337
* @throws IOException if any error occurs
3438
*/
35-
public void hflush() throws IOException;
36-
39+
void hflush() throws IOException;
40+
3741
/** Similar to posix fsync, flush out the data in client's user buffer
3842
* all the way to the disk device (but the disk may have it in its cache).
3943
* @throws IOException if error occurs
4044
*/
41-
public void hsync() throws IOException;
45+
void hsync() throws IOException;
4246
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.impl;
20+
21+
import java.io.InputStream;
22+
import java.io.OutputStream;
23+
24+
import org.apache.hadoop.classification.InterfaceAudience;
25+
import org.apache.hadoop.classification.InterfaceStability;
26+
import org.apache.hadoop.fs.StreamCapabilities;
27+
28+
import static org.apache.hadoop.fs.StreamCapabilities.HFLUSH;
29+
import static org.apache.hadoop.fs.StreamCapabilities.HSYNC;
30+
31+
/**
32+
* Utility classes to help implementing filesystems and streams.
33+
*/
34+
@InterfaceAudience.Private
35+
@InterfaceStability.Unstable
36+
public final class StoreImplementationUtils {
37+
38+
private StoreImplementationUtils() {
39+
}
40+
41+
/**
42+
* Check the supplied capabilities for being those required for full
43+
* {@code Syncable.hsync()} and {@code Syncable.hflush()} functionality.
44+
* @param capability capability string.
45+
* @return true if either refers to one of the Syncable operations.
46+
*/
47+
public static boolean supportsSyncable(String capability) {
48+
return capability.equalsIgnoreCase(HSYNC) ||
49+
capability.equalsIgnoreCase(HFLUSH);
50+
}
51+
52+
/**
53+
* Probe for an object having a capability; returns true
54+
* iff the stream implements {@link StreamCapabilities} and its
55+
* {@code hasCapabilities()} method returns true for the capability.
56+
* This is a package private method intended to provided a common
57+
* implementation for input and output streams.
58+
* {@link StreamCapabilities#hasCapability(String)} call is for public use.
59+
* @param object object to probe.
60+
* @param capability capability to probe for
61+
* @return true iff the object implements stream capabilities and
62+
* declares that it supports the capability.
63+
*/
64+
static boolean objectHasCapability(Object object, String capability) {
65+
if (object instanceof StreamCapabilities) {
66+
return ((StreamCapabilities) object).hasCapability(capability);
67+
}
68+
return false;
69+
}
70+
71+
/**
72+
* Probe for an output stream having a capability; returns true
73+
* iff the stream implements {@link StreamCapabilities} and its
74+
* {@code hasCapabilities()} method returns true for the capability.
75+
* @param out output stream
76+
* @param capability capability to probe for
77+
* @return true iff the stream declares that it supports the capability.
78+
*/
79+
public static boolean hasCapability(OutputStream out, String capability) {
80+
return objectHasCapability(out, capability);
81+
}
82+
83+
/**
84+
* Probe for an input stream having a capability; returns true
85+
* iff the stream implements {@link StreamCapabilities} and its
86+
* {@code hasCapabilities()} method returns true for the capability.
87+
* @param out output stream
88+
* @param capability capability to probe for
89+
* @return true iff the stream declares that it supports the capability.
90+
*/
91+
public static boolean hasCapability(InputStream out, String capability) {
92+
return objectHasCapability(out, capability);
93+
}
94+
95+
}

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,8 @@ For instance, HDFS may raise an `InvalidPathException`.
616616

617617
result = FSDataOutputStream
618618

619+
A zero byte file must exist at the end of the specified path, visible to all
620+
619621
The updated (valid) FileSystem must contains all the parent directories of the path, as created by `mkdirs(parent(p))`.
620622

621623
The result is `FSDataOutputStream`, which through its operations may generate new filesystem states with updated values of
@@ -629,10 +631,18 @@ The result is `FSDataOutputStream`, which through its operations may generate ne
629631
clients creating files with `overwrite==true` to fail if the file is created
630632
by another client between the two tests.
631633

632-
* S3A, Swift and potentially other Object Stores do not currently change the FS state
634+
* S3A, Swift and potentially other Object Stores do not currently change the `FS` state
633635
until the output stream `close()` operation is completed.
634-
This MAY be a bug, as it allows >1 client to create a file with `overwrite==false`,
635-
and potentially confuse file/directory logic
636+
This is a significant difference between the behavior of object stores
637+
and that of filesystems, as it allows >1 client to create a file with `overwrite==false`,
638+
and potentially confuse file/directory logic. In particular, using `create()` to acquire
639+
an exclusive lock on a file (whoever creates the file without an error is considered
640+
the holder of the lock) may not not a safe algorithm to use when working with object stores.
641+
642+
* Object stores may create an empty file as a marker when a file is created.
643+
However, object stores with `overwrite=true` semantics may not implement this atomically,
644+
so creating files with `overwrite=false` cannot be used as an implicit exclusion
645+
mechanism between processes.
636646

637647
* The Local FileSystem raises a `FileNotFoundException` when trying to create a file over
638648
a directory, hence it is listed as an exception that MAY be raised when
@@ -669,7 +679,7 @@ Implementations without a compliant call SHOULD throw `UnsupportedOperationExcep
669679

670680
#### Postconditions
671681

672-
FS
682+
FS' = FS
673683
result = FSDataOutputStream
674684

675685
Return: `FSDataOutputStream`, which can update the entry `FS.Files[p]`

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ HDFS as these are commonly expected by Hadoop client applications.
3232
1. [Notation](notation.html)
3333
1. [Model](model.html)
3434
1. [FileSystem class](filesystem.html)
35+
1. [OutputStream, Syncable and `StreamCapabilities`](outputstream.html)
3536
1. [FSDataInputStream class](fsdatainputstream.html)
3637
1. [PathCapabilities interface](pathcapabilities.html)
3738
1. [FSDataOutputStreamBuilder class](fsdataoutputstreambuilder.html)

0 commit comments

Comments
 (0)