Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to use RemoteFileSystem to interact with WebHDFS #6957

Open
4 tasks done
hateyouinfinity opened this issue Sep 24, 2022 · 1 comment
Open
4 tasks done

Unable to use RemoteFileSystem to interact with WebHDFS #6957

hateyouinfinity opened this issue Sep 24, 2022 · 1 comment
Labels
bug Something isn't working great writeup This is a wonderful example of our standards

Comments

@hateyouinfinity
Copy link
Contributor

hateyouinfinity commented Sep 24, 2022

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

Currently it does not seem possible to use RemoteFileSystem with WebHDFS as the underlying implementation.
There are 2 problems afaict.
Assume you define your filesystem block as follows:
myfs = RemoteFileSystem(basepath="webhdfs://home/user/project", settings={"host": "example.com"}).


Calling write_path fails due to an improperly formatted url.

myfs.write_path("filename", b"content") calls myfs.filesystem.makedirs("webhdfs://home/user/project), but the underlying implementation doesn't do any preprocessing and basically appends path to base url, producing something like https://example.com/webhdfs/v1webhdfs%3A//home/user/project?op=MKDIRS.

Calling the above url fails.
Was this url generated properly it would look like this: https://example.com/webhdfs/v1/home/user/project?op=MKDIRS.
That is, path param that gets passed to WebHDFS._call should begin with a slash and have no scheme.

This doesn't seem to be a problem for other RemoteFileSystem methods since all of them call (be it directly or implicitly) fs.filesystem.open, which (in the case of WebHDFS) calls fsspec.utils.infer_storage_options, stripping the scheme. However, infer_storage_options causes another problem.


First segment of fs.basepath gets stripped, leading to accessing incorrect remote paths.

fs.filesystem.open calls fs.filesystem._strip_protocol (link).
Filesystem implementations commonly override _strip_protocol.
WebHDFS implementation's of _strip_protocol calls fsspec.utils.infer_storage_options.
As far as I can infer, infer_storage_options expects its input to either have no scheme (in which case the whole path is returned), or have netloc following the scheme (in which case netloc is stripped away along with the scheme).
As a result, /user/project gets accessed instead of /home/user/project.

One can work around this by prepending an extra segment to basepath (e.g. basepath="webhdfs://fakehost/home/user/project"), but that requires knowing how a particular implementation behaves (and is ugly to boot).
Of note here is that it treats s3/gcs schemes as special cases (doesn't strip the first segment), so the above method can't be used blindly.
I'd like to mention that current docs have usage examples only for cloud storage providers, which are seemingly immune to this issue.

As an aside, it's not clear why an implementation that takes hostname/port as parameters expects path to contain a netloc at all.


Previous section got me thinking whether WebHDFS is unique or maybe there are other implementations that have the same problem?
So I picked some implementations and wrote a script to compare what _strip_protocol outputs for the same input path.

Script
# pip install adlfs gcsfs ocifs paramiko pyarrow s3fs smbprotocol webdav4

#!/usr/bin/env python3

from typing import Dict, List

from fsspec import get_filesystem_class

DEFAULT_SCHEMES = [
    "arrow_hdfs",
    "az",
    "dbfs",
    "file",
    "ftp",
    "github",
    "gs",
    "hdfs",
    "http",
    "https",
    "oci",
    "s3",
    "sftp",
    "smb",
    "webdav",
    "webhdfs",
]


def get_resolved_path(scheme: str, path: str) -> str:
    return get_filesystem_class(scheme)._strip_protocol(path)


def main(
    schemes_to_check: List[str] = DEFAULT_SCHEMES, no_scheme_path="/home/user/file"
) -> Dict[str, str]:
    res = {}
    for scheme in schemes_to_check:
        try:
            res[scheme] = get_resolved_path(
                scheme=scheme, path=f"{scheme}://{no_scheme_path.lstrip('/')}"
            )
        except Exception as e:
            print(e)
    return res


if __name__ == "__main__":
    for scheme, resolved_path in main().items():
        print(f"{scheme: <20}{resolved_path}")

Here's what I get if I run it:

arrow_hdfs          home/user/file
az                  home/user/file
dbfs                home/user/file
file                /workspaces/prefect/home/user/file
ftp                 /user/file
github              home/user/file
gs                  home/user/file
hdfs                /user/file
http                http://home/user/file
https               https://home/user/file
oci                 home/user/file
s3                  home/user/file
sftp                /user/file
smb                 /user/file
webdav              home/user/file
webhdfs             /user/file

Going by this, a few other filesystems might have something similar going on.
SFTP seems reasonably easy to test.

docker run --name sftp -p 2222:22 -d atmoz/sftp foo:pass:::upload
from prefect.filesystems import RemoteFileSystem

sftp_settings = {"host": "localhost","port": 2222, "username": "foo", "password": "pass"}

unprefixed_sftp = RemoteFileSystem(basepath="sftp://upload/", settings=sftp_settings)
prefixed_sftp = RemoteFileSystem(basepath="sftp://fakehost/upload/", settings=sftp_settings)

# We can't use write_path since SFTPFileSystem.makedirs errors out for the same reason WebHDFS.makedirs does
# So let's write a file manually and read it back

# It doesn't matter whether we use prefixed_sftp or unprefixed_sftp for this
with prefixed_sftp.filesystem.open("upload/filename", "wb") as file:
    file.write(b'Hi!')

for fs in [unprefixed_sftp, prefixed_sftp]:
    try:
        print(fs.basepath, end="\n\t")
        print(fs.read_path("filename"))
    except Exception as e:
        print(e)
# sftp://upload/
#         [Errno 2] No such file
# sftp://fakenetloc/upload/
#         b'Hi!'

print(unprefixed_sftp.filesystem.ls('.'))
# ['./upload']
print(unprefixed_sftp.filesystem.ls('./upload'))
# ['./upload/filename']
docker exec -it sftp ls /home/foo/upload
# filename

WebHDFS doesn't seem to be the only problematic implementation.

Reproduction

See above

Error

No response

Versions

Version: 2.4.2
API version: 0.8.0
Python version: 3.10.4
Git commit: 65807e8
Built: Fri, Sep 23, 2022 10:43 AM
OS/Arch: win32/AMD64
Profile: default
Server type: ephemeral
Server:
Database: sqlite
SQLite version: 3.37.2

Additional context

No response

@hateyouinfinity hateyouinfinity added bug Something isn't working status:triage labels Sep 24, 2022
@hateyouinfinity
Copy link
Contributor Author

hateyouinfinity commented Sep 24, 2022

Looking at the docstring of fsspec.implementations.smb.SMBFileSystem (link), I noticed it talks about using the class via fsspec.core.open(URI), in which case URI must contain a netloc.
fsspec.core.open calls fsspec.core.open_files, which calls fsspec.core.get_fs_token_paths.

get_fs_token_paths does roughly the following (link):

  • Takes a full url as input (e.g. sftp://foo:pass@localhost:2222/upload/filename)
  • Extracts the scheme, uses it to get a filesystem implementation (not a instance) (e.g. cls = fsspec.implementations.sftp.SFTPFileSystem)
  • Calls cls._get_kwargs_from_urls to extract params used to instantiate a class from url (params not contained in url are to be passed to get_fs_token_paths as storage_options)
  • Uses extracted params and storage_options to instantiate a filesystem
  • Calls cls._strip_protocol on its input, producing a valid filepath
  • Returns instantiated filesystem, filepath and some cache-related token

Contrast example below with OP.

docker run --name sftp -p 2222:22 -d atmoz/sftp foo:pass:::upload
from fsspec.core import get_fs_token_paths
from fsspec.core import open as fsopen

fs, token, filepath = get_fs_token_paths("sftp://foo:pass@localhost:2222/upload/filename")
print(fs.host, fs.ssh_kwargs, filepath)
# localhost {'port': 2222, 'username': 'foo', 'password': 'pass'} ['/upload/filename']

with fsopen(filepath[0], "wb") as f:
    f.write(b"Bye!")

print(fs.ls('.'))
# ['./upload']

print(fs.ls('./upload'))
# ['./upload/filename']

print(fs.cat("./upload/filename"))
# b'test'

I reckon fsspec implementations fall into two groups:

  1. Implementations in the first group do their own (seemingly idempotent) preprocessing in makedirs and treat first non-scheme segment of _strip_protocol's input as part of the filepath.
  2. Implementations in the second group expect external preprocessing to be done before makedirs is called and treat first non-scheme segment of _strip_protocol's input as a netloc.

prefect.filesystems.RemoteFileSystem assumes that all implementations fall into the first group, leading to the above described problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working great writeup This is a wonderful example of our standards
Projects
None yet
Development

No branches or pull requests

4 participants