-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-17682. ABFS: Support FileStatus input to OpenFileWithOptions() via OpenFileParameters #2975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
Outdated
Show resolved
Hide resolved
TEST RESULTS HNS Account Location: East US 2
Existing JIRA to track timeout errors: AbstractContractDistCp |
TEST RESULTS HNS Account Location: East US 2
Note about Yetus error: The asflicense warning is for a file from an older commit that has now been deleted. |
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
Outdated
Show resolved
Hide resolved
...s/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
Outdated
Show resolved
Hide resolved
...tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
Outdated
Show resolved
Hide resolved
@@ -204,38 +204,51 @@ public void registerListener(Listener listener1) { | |||
public FSDataInputStream open(final Path path, final int bufferSize) throws IOException { | |||
LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize); | |||
// bufferSize is unused. | |||
return open(path, Optional.empty()); | |||
return open(path, new OpenFileParameters()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concern on using this way and passing OpenFileParameters param.
When FS#openFile() API is been used, you can see how system create the OpenFileParameters(). There may be some stuff coming in as default there.
eg: bufferSize = fileSystem.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT);
So correct buffer size as per conf will be there in OpenFileParameters also
We might not be using buffer size from the param now. But later its possible to use some new stuff like this.
So better dont create OpenFileParameters () here and pass.
Instead we can have the private method accept Optional.
In open(Path), we can Optional.empty() anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi,
only just seen this...feel free to let me know when you are doing big bits work, especially with unstable bits of the API that I've worked on.
FWIW, I'll be able to use this in the manifest committer #2971; as it well let me go straight from the listing of the directory of manifests to opening each manifest without the HEAD requests. I also have an internal PoC parquet lib which does the same -and showed up the bugs in s3a FS there.
I have some concerns with the patch as is, which I'd like addressed in a "stabilize abfs openFile.withStatus" JIRA, which, once is in, can go in to -3.3 back to back with this
In particular, the openFile code on L705 warns if the FileStatus is of the wrong type.
This turns out to cause problems in Hive or any other FS which wraps the inner FS, and
creates its own FileStatus objects. Why does hive do this? Because FileStatus.getPath()
returns the full path of the reference; hive has to replace this with a new FileStatus
with its mapped path.
printing WARN if the type is wrong is going to fill up Hive logs and
remove all speedup options
In #3289 I soften the requirement for the path to == the object; I'm only checking the final elements are equal. This allows parquet through Hive to use the API.
In #2584, which is my real "stabilize openfile patch" I do it properly
- only check name
- reject directories
- handle LocatedFileStatus
- and any other FS subclass, only picking up the length value
- and as this has got complex, factored out into its own class
Really, it's only the length which is important, other than that it is just a declaration of "this is a file, skip the HEAD"
So for ABFS to work, I'm going to require the equivalent of that PrepareToOpenFile: support for more FileStatus types. Doesn't need to be factored out, though that turned out to work well for testing.
This makes clear that I need to get #2584 in; it's been neglected.
That patch allows for a seek policy to be specified; all the whole file reads ask for sequential IO, so when hive clusters are deployed with the default seek policy set to random, things like distcp and hadoop fs put don't fail. It critically defines a standard set of seek policies, which Azure can implement too.
I've realized we need one more "whole-file", which says "no need for lazy seek, just do the GET".
Does anyone want to take that PR up? I keep getting distracted by other high-importance tasks.
Meanwhile the manifest committer can be the first user of this; I will verify that it delivers some speed up. It should: job commit with 1000 tasks will go from one list, 1000 HEAD, 1000 GET to 1 list and the 1000 GETs.
FWIW, this is the code in callthe magic committer patch which becomes safe to use with the filename fix there:
https://github.com/apache/hadoop/pull/3289/files#diff-3da795bf4618a7e504e21d3c53f9f7a42c52a75fe232906feb4e66f98794d41eR94
...I'll do the same with the manifest committer; if I do it this week its new tests will help stress the feature.
FutureDataInputStreamBuilder builder = fs.openFile(path); | ||
builder.withFileStatus(fileStatus); | ||
FSDataInputStream in = builder.build().get(); | ||
assertEquals(String.format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use AssertJ
eTag = ((VersionedFileStatus) fileStatus).getVersion(); | ||
} else { | ||
if (fileStatus != null) { | ||
LOG.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, just use length & type, don't worry about etag. Or at least just log @ debug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will modify in new JIRA
long contentLength; | ||
if (fileStatus instanceof VersionedFileStatus) { | ||
path = path.makeQualified(this.uri, path); | ||
Preconditions.checkArgument(fileStatus.getPath().equals(path), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only check getPath().getName() for equivalence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will make changes in HADOOP-17896
I believe getName() will verify the file name only. Would it allow paths with parent directories that don't match?
contentLength = Long.parseLong( | ||
op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); | ||
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); | ||
} | ||
|
||
if (parseIsDirectory(resourceType)) { | ||
throw new AbfsRestOperationException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a test for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, there is a contract test to verify that a FileNotFound exception is thrown on attempt to open a directory
org.apache.hadoop.fs.contract.AbstractContractOpenTest#testOpenReadDir
Hi @steveloughran, Thanks a lot for taking a look at this change. We will make updates to address the comments provided. There is one comment on the eTag that I would like to check with you. eTag is a parameter that is sent with every read request to server today. Server ensures that the file is indeed of the expected eTag before proceeding with the read operation. If the file was recreated during the timespan inputStream was created and read is issued, eTag check will fail and so will the read, else read will end up wrongly getting data from the new file. This is the reason we are mandating eTag to be present and in else case fall back to triggering a GetFileStatus. We will changing the log type to debug as per your comment in the else case. I did have a look at #2584 which provides the means to pass mandatory and optional keys through OpenFileParameters. Noticed an example where S3 eTag is passed as an optional key (referring to link , for the Hive case where FileStatus is wrapped, is this how eTag gets passed ? And was also wondering how we can get various Hadoop workloads to send it without being aware of which is the current active FileSystem considering the key name is specific to it. Also wanted to check if it would be right to add eTag as a field into the base FileStatus class. Probably as a sub structure within FileStatus so that it can be expanded in future to hold properties that Cloud storage systems commonly depend on. OpenFile() change is one of the key performance improvement that we are looking at adopting to. The aspect of FileStatus being passed down in itself reduces HEAD request count by half and look forward to adopting to the new set of read policies too. We will work on understanding how to map various read policies to the current optimizations that the driver has for different read patterns. I think that would translate to a change equivalent to PrepareToOpenFile for ABFS driver. Would it be ok if we make this change once #2584 checks in ? |
I do know about etags. The S3A connector picks it up on the HEAD or first GET and caches it, to at least ensure that things are unchanged for the duration of the stream.
not in use at all by hive, even in PoCs. I'm not worrying about changes between the list and the open, just about consistency once a file is opened. If a file was changed between hive query planning and work execution then as it is invariably of a different length the workers will notice soon enough. Anyway, HDFS doesn't worry about this so why should the other stores? When you skip the HEAD, the first GET will return the etag. All you have to do is skip setting the If-Match header on the first request, pick the etag from the response and use it after.
This was already in the openFile() API call from the outset. Indeed, abfs already supports one, What the new PR adds is
No API change, other than
way too late for that. used everywhere, gets serialized etc. What you could do is add some interface Where this could be useful is in distcp applications where the etag of an uploaded file could be cached and compared when rescanning a directory. IF a store implements the getFileChecksum() method you can serve the etag that way but (a) distcp can't handle it and (b) if you could get it on every file from a list() call, you save a lot of IO. So please, have someone implement this in hadoop-common, with specification, tests etc.
I've just tried to let apps declare how they want to read things; there's generally a straightforward map to sequential vs random vs adaptive...but I hope that the orc and parquet options would provide even more opportunity to tweak behaviour, e.g .by knowing that there will be an initial footer read sequence, then stripes will be read.
yes, but I do want to at least have the azure impl not reimplement the same brittleness around
I really want to get a hadoop 3.3.2 out the door with this API in it. It's going to happen before '17853 as that is a big piece of work, and which is going to need lots of review time by others, myself included. |
… via OpenFileParameters (apache#2975)
… via OpenFileParameters (apache#2975)
… via OpenFileParameters (#2975) Change-Id: I039a0c3cb1c9b603f7dd1be0df03f795525d92bc
… via OpenFileParameters (apache#2975) commit: dcddc6a
…tion Rename resume optimizations: Changes include: 1. Resume the rename only if srcDirectory is present and it's etag matches to the one that was there at the time of rename. Else, just delete the pendingJson. 2. PendingJson will now have Etag information of the srcDir at the time of rename. 3. There is listStatus done after its found that renamePendingJson is there. But since there can be scenario where rename resume shouldnt be done (like srcDir not there, or original srcDir is changed(etag change)), the second listStatus shall be redundant. In case of not requirement, it will not be called. 4. For the resume, fileStatus is already there with listStatus or getFileStatus for renamePendingJson file. On ABFS_3,3.2_dev, for reading the file, we have to open which needs getFileStatus. This is a redundant call and needs optimization. This optimization is taken. For this backported HADOOP-17682. ABFS: Support FileStatus input to OpenFileWithOptions() via OpenFileParameters apache#2975 at commit: dcddc6a
ABFS
open
methods require certain information (contentLength, eTag, etc) to create an InputStream for the file at the given path. This information is retrieved via aGetFileStatus
request to backend.However, client applications may often have access to the FileStatus prior to invoking the open API. Providing this FileStatus to the driver through the OpenFileParameters argument of
openFileWithOptions()
can help avoid the call to Store for FileStatus.This PR adds handling for the FileStatus instance (if any) provided via the
OpenFileParameters
argument.