Closed
Description
Not sure if this is an fsspec/async issue or xarray/intake-xarray. Since the change to async operation with aiohttp it is no longer possible to use an OpenDAP server endpoint with xarray when a file-like object is passed. Minimum reproducible example:
import intake
import xarray as xr
from intake.catalog import Catalog
from intake.catalog.local import LocalCatalogEntry
mycat = Catalog.from_dict({
'eta': LocalCatalogEntry('test', 'test single file', 'netcdf', args={'urlpath':'https://dapds00.nci.org.au/thredds/dodsC/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc'}),
})
print(mycat.eta.yaml())
mycat.eta.to_dask()
Results in:
sources:
test:
args:
urlpath: https://dapds00.nci.org.au/thredds/dodsC/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc
description: test single file
driver: intake_xarray.netcdf.NetCDFSource
metadata:
catalog_dir: ''
---------------------------------------------------------------------------
ClientResponseError Traceback (most recent call last)
<ipython-input-28-667a22c3f2d5> in <module>
5 })
6 print(mycat.spectra.yaml())
----> 7 mycat.spectra.to_dask()
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/intake_xarray/base.py in to_dask(self)
67 def to_dask(self):
68 """Return xarray object where variables are dask arrays"""
---> 69 return self.read_chunked()
70
71 def close(self):
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/intake_xarray/base.py in read_chunked(self)
42 def read_chunked(self):
43 """Return xarray object (which will have chunks)"""
---> 44 self._load_metadata()
45 return self._ds
46
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/intake/source/base.py in _load_metadata(self)
124 """load metadata only if needed"""
125 if self._schema is None:
--> 126 self._schema = self._get_schema()
127 self.datashape = self._schema.datashape
128 self.dtype = self._schema.dtype
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/intake_xarray/base.py in _get_schema(self)
16
17 if self._ds is None:
---> 18 self._open_dataset()
19
20 metadata = {
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/intake_xarray/netcdf.py in _open_dataset(self)
90 url = fsspec.open(self.urlpath, **self.storage_options).open()
91
---> 92 self._ds = _open_dataset(url, chunks=self.chunks, **kwargs)
93
94 def _add_path_to_ds(self, ds):
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, group, decode_cf, mask_and_scale, decode_times, autoclose, concat_characters, decode_coords, engine, chunks, lock, cache, drop_variables, backend_kwargs, use_cftime, decode_timedelta)
555 else:
556 if engine is None:
--> 557 engine = _autodetect_engine(filename_or_obj)
558
559 extra_kwargs = {}
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/xarray/backends/api.py in _autodetect_engine(filename_or_obj)
161 engine = _get_default_engine(filename_or_obj, allow_remote=True)
162 else:
--> 163 engine = _get_engine_from_magic_number(filename_or_obj)
164 return engine
165
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/xarray/backends/api.py in _get_engine_from_magic_number(filename_or_obj)
130 "please close and reopen, or use a context manager"
131 )
--> 132 magic_number = filename_or_obj.read(8)
133 filename_or_obj.seek(0)
134
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/implementations/http.py in read(self, length)
434 ) # all fits in one block anyway
435 ):
--> 436 self._fetch_all()
437 if self.size is None:
438 if length < 0:
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
119 def wrapper(*args, **kwargs):
120 self = obj or args[0]
--> 121 return maybe_sync(func, self, *args, **kwargs)
122
123 return wrapper
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in maybe_sync(func, self, *args, **kwargs)
98 if inspect.iscoroutinefunction(func):
99 # run the awaitable on the loop
--> 100 return sync(loop, func, *args, **kwargs)
101 else:
102 # just call the blocking function
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in sync(loop, func, callback_timeout, *args, **kwargs)
69 if error[0]:
70 typ, exc, tb = error[0]
---> 71 raise exc.with_traceback(tb)
72 else:
73 return result[0]
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in f()
53 if callback_timeout is not None:
54 future = asyncio.wait_for(future, callback_timeout)
---> 55 result[0] = await future
56 except Exception:
57 error[0] = sys.exc_info()
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/implementations/http.py in async_fetch_all(self)
451 r = await self.session.get(self.url, **self.kwargs)
452 async with r:
--> 453 r.raise_for_status()
454 out = await r.read()
455 self.cache = AllBytes(
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/aiohttp/client_reqrep.py in raise_for_status(self)
1003 status=self.status,
1004 message=self.reason,
-> 1005 headers=self.headers,
1006 )
1007
ClientResponseError: 400, message='Bad Request', url=URL('https://dapds00.nci.org.au/thredds/dodsC/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc')
Similar to intake/intake-xarray#98 this previously worked.
Testing that the URL is correct - this works:
ds = xr.open_dataset(mycat.spectra.urlpath)
ds
However using the h5netcdf engine with a file-like object causes the same error:
import fsspec
with fsspec.open(mycat.eta.urlpath) as f:
ds = xr.open_dataset(f,engine='h5netcdf')
---------------------------------------------------------------------------
ClientResponseError Traceback (most recent call last)
<ipython-input-41-ad0d50259212> in <module>
2
3 with fsspec.open(mycat.eta.urlpath) as f:
----> 4 ds = xr.open_dataset(f,engine='h5netcdf')
5 ds
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, group, decode_cf, mask_and_scale, decode_times, autoclose, concat_characters, decode_coords, engine, chunks, lock, cache, drop_variables, backend_kwargs, use_cftime, decode_timedelta)
570
571 opener = _get_backend_cls(engine)
--> 572 store = opener(filename_or_obj, **extra_kwargs, **backend_kwargs)
573
574 with close_on_error(store):
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/xarray/backends/h5netcdf_.py in open(cls, filename, mode, format, group, lock, autoclose, invalid_netcdf, phony_dims)
136 )
137 else:
--> 138 magic_number = filename.read(8)
139 filename.seek(0)
140 if not magic_number.startswith(b"\211HDF\r\n\032\n"):
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/implementations/http.py in read(self, length)
434 ) # all fits in one block anyway
435 ):
--> 436 self._fetch_all()
437 if self.size is None:
438 if length < 0:
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
119 def wrapper(*args, **kwargs):
120 self = obj or args[0]
--> 121 return maybe_sync(func, self, *args, **kwargs)
122
123 return wrapper
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in maybe_sync(func, self, *args, **kwargs)
98 if inspect.iscoroutinefunction(func):
99 # run the awaitable on the loop
--> 100 return sync(loop, func, *args, **kwargs)
101 else:
102 # just call the blocking function
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in sync(loop, func, callback_timeout, *args, **kwargs)
69 if error[0]:
70 typ, exc, tb = error[0]
---> 71 raise exc.with_traceback(tb)
72 else:
73 return result[0]
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/asyn.py in f()
53 if callback_timeout is not None:
54 future = asyncio.wait_for(future, callback_timeout)
---> 55 result[0] = await future
56 except Exception:
57 error[0] = sys.exc_info()
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/fsspec/implementations/http.py in async_fetch_all(self)
451 r = await self.session.get(self.url, **self.kwargs)
452 async with r:
--> 453 r.raise_for_status()
454 out = await r.read()
455 self.cache = AllBytes(
~/miniconda3/envs/roampy-dev/lib/python3.7/site-packages/aiohttp/client_reqrep.py in raise_for_status(self)
1003 status=self.status,
1004 message=self.reason,
-> 1005 headers=self.headers,
1006 )
1007
ClientResponseError: 400, message='Bad Request', url=URL('https://dapds00.nci.org.au/thredds/dodsC/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc')
This seems to be something unique to the opendap endpoint, as the standard http server works fine, e.g:
import fsspec
with fsspec.open('https://dapds00.nci.org.au/thredds/fileServer/rr6/oceanmaps_datasets/version_3.3/forecast/latest/ocean_fc_2021011512_000_eta.nc') as f:
ds = xr.open_dataset(f,engine='h5netcdf')
ds
I have confirmed this with a few different opendap servers.
Metadata
Metadata
Assignees
Labels
No labels