Skip to content

http: general enhancements #731

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
Changelog
=========

2021.08.1
---------

Enhancements

HTTTP

- get_file/put_file APIs now support callbacks
- New put_file API for transferring data to the remote server (chunked)
- Customizable client initializers (through passing ``get_client`` argument)
- Support for various checksum / fingerprint headers in ``info()`` (``ETag``, ``MD5-Checksum``, ``Digest``)

2021.07.0
---------

Expand Down
131 changes: 99 additions & 32 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import requests

from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper
from fsspec.callbacks import _DEFAULT_CALLBACK
from fsspec.exceptions import FSTimeoutError
from fsspec.spec import AbstractBufferedFile
from fsspec.utils import DEFAULT_BLOCK_SIZE, tokenize
Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(
asynchronous=False,
loop=None,
client_kwargs=None,
get_client=get_client,
**storage_options,
):
"""
Expand All @@ -79,6 +81,10 @@ def __init__(
Passed to aiohttp.ClientSession, see
https://docs.aiohttp.org/en/stable/client_reference.html
For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}``
get_client: Callable[..., aiohttp.ClientSession]
A callable which takes keyword arguments and constructs
an aiohttp.ClientSession. It's state will be managed by
the HTTPFileSystem class.
storage_options: key-value
Any other parameters passed on to requests
cache_type, cache_options: defaults used in open
Expand All @@ -90,6 +96,7 @@ def __init__(
self.cache_type = cache_type
self.cache_options = cache_options
self.client_kwargs = client_kwargs or {}
self.get_client = get_client
self.kwargs = storage_options
self._session = None

Expand Down Expand Up @@ -121,7 +128,7 @@ def close_session(loop, session):

async def set_session(self):
if self._session is None:
self._session = await get_client(loop=self.loop, **self.client_kwargs)
self._session = await self.get_client(loop=self.loop, **self.client_kwargs)
if not self.asynchronous:
weakref.finalize(self, self.close_session, self.loop, self._session)
return self._session
Expand Down Expand Up @@ -223,18 +230,61 @@ async def _cat_file(self, url, start=None, end=None, **kwargs):
out = await r.read()
return out

async def _get_file(self, rpath, lpath, chunk_size=5 * 2 ** 20, **kwargs):
async def _get_file(
self, rpath, lpath, chunk_size=5 * 2 ** 20, callback=_DEFAULT_CALLBACK, **kwargs
):
kw = self.kwargs.copy()
kw.update(kwargs)
logger.debug(rpath)
session = await self.set_session()
async with session.get(rpath, **self.kwargs) as r:
try:
size = int(r.headers["content-length"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the chances of this being wrong due to transfer compression? (_file_info has head["Accept-Encoding"] = "identity")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Humm, not sure. This seemed like the easiest way to predict the general length, though there might certainly some catches about it. Do you have other suggestions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the purpose is to report on the transfer, then I suppose it's not important - but the number of bytes written to disk won't match. I don't know if aiohttp makes available the actual number of bytes read on the stream before decompression (probably no).

except (ValueError, KeyError):
size = None

callback.set_size(size)
self._raise_not_found_for_status(r, rpath)
with open(lpath, "wb") as fd:
chunk = True
while chunk:
chunk = await r.content.read(chunk_size)
fd.write(chunk)
callback.relative_update(len(chunk))

async def _put_file(
self,
rpath,
lpath,
chunk_size=5 * 2 ** 20,
callback=_DEFAULT_CALLBACK,
method="post",
**kwargs,
):
async def gen_chunks():
with open(rpath, "rb") as f:
callback.set_size(f.seek(0, 2))
f.seek(0)

chunk = f.read(64 * 1024)
while chunk:
yield chunk
callback.relative_update(len(chunk))
chunk = f.read(64 * 1024)

kw = self.kwargs.copy()
kw.update(kwargs)
session = await self.set_session()

method = method.lower()
if method not in ("post", "put"):
raise ValueError(
f"method has to be either 'post' or 'put', not: {method!r}"
)

meth = getattr(session, method)
async with meth(lpath, data=gen_chunks(), **kw) as resp:
self._raise_not_found_for_status(resp, lpath)

async def _exists(self, path, **kwargs):
kw = self.kwargs.copy()
Expand Down Expand Up @@ -316,22 +366,29 @@ async def _info(self, url, **kwargs):
which case size will be given as None (and certain operations on the
corresponding file will not work).
"""
size = False
kw = self.kwargs.copy()
kw.update(kwargs)
info = {}
session = await self.set_session()

for policy in ["head", "get"]:
try:
session = await self.set_session()
size = await _file_size(url, size_policy=policy, session=session, **kw)
if size:
info.update(
await _file_info(
url,
size_policy=policy,
session=session,
**self.kwargs,
**kwargs,
)
)
if info.get("size") is not None:
break
except Exception as e:
logger.debug((str(e)))
else:
# get failed, so conclude URL does not exist
if size is False:
raise FileNotFoundError(url)
return {"name": url, "size": size or None, "type": "file"}
except Exception as exc:
if policy == "get":
# If get failed, then raise a FileNotFoundError
raise FileNotFoundError(url) from exc
logger.debug(str(exc))

return {"name": url, "size": None, **info, "type": "file"}

async def _glob(self, path, **kwargs):
"""
Expand Down Expand Up @@ -613,6 +670,7 @@ def __init__(self, fs, url, mode="rb", loop=None, session=None, **kwargs):

async def cor():
r = await self.session.get(url, **kwargs).__aenter__()
self.fs._raise_not_found_for_status(r, url)
return r

self.r = sync(self.loop, cor)
Expand Down Expand Up @@ -654,8 +712,8 @@ async def get_range(session, url, start, end, file=None, **kwargs):
return out


async def _file_size(url, session=None, size_policy="head", **kwargs):
"""Call HEAD on the server to get file size
async def _file_info(url, session, size_policy="head", **kwargs):
"""Call HEAD on the server to get details about the file (size/checksum etc.)

Default operation is to explicitly allow redirects and use encoding
'identity' (no compression) to get the true size of the target.
Expand All @@ -666,29 +724,38 @@ async def _file_size(url, session=None, size_policy="head", **kwargs):
head = kwargs.get("headers", {}).copy()
head["Accept-Encoding"] = "identity"
kwargs["headers"] = head
session = session or await get_client()

info = {}
if size_policy == "head":
r = await session.head(url, allow_redirects=ar, **kwargs)
elif size_policy == "get":
r = await session.get(url, allow_redirects=ar, **kwargs)
else:
raise TypeError('size_policy must be "head" or "get", got %s' "" % size_policy)
async with r:
try:
r.raise_for_status()
r.raise_for_status()

# TODO:
# recognise lack of 'Accept-Ranges',
# or 'Accept-Ranges': 'none' (not 'bytes')
# to mean streaming only, no random access => return None
if "Content-Length" in r.headers:
info["size"] = int(r.headers["Content-Length"])
elif "Content-Range" in r.headers:
info["size"] = int(r.headers["Content-Range"].split("/")[1])

for checksum_field in ["ETag", "Content-MD5", "Digest"]:
if r.headers.get(checksum_field):
info[checksum_field] = r.headers[checksum_field]

return info


# TODO:
# recognise lack of 'Accept-Ranges',
# or 'Accept-Ranges': 'none' (not 'bytes')
# to mean streaming only, no random access => return None
if "Content-Length" in r.headers:
return int(r.headers["Content-Length"])
elif "Content-Range" in r.headers:
return int(r.headers["Content-Range"].split("/")[1])
except aiohttp.ClientResponseError:
logger.debug("Error retrieving file size")
return None
r.close()
async def _file_size(url, session=None, *args, **kwargs):
if session is None:
session = await get_client()
info = await _file_info(url, session=session, *args, **kwargs)
return info.get("size")


file_size = sync_wrapper(_file_size)
Loading