-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integration with dask/distributed (xarray backend design) #798
Comments
The full mailing list discussion is at https://groups.google.com/d/msgid/xarray/CAJ8oX-E7Xx6NT4F6J8B4__Q-kBazoob9_qe_oFLi5hany9-%3DKQ%40mail.gmail.com?utm_medium=email&utm_source=footer |
See also dask/dask#922 |
Copying over a comment from that issue: Yes, so the problem as I see it is that, for serialization and open-file reasons we want to use a function like the following: def get_chunk_of_array(filename, datapath, slice):
with netCDF4.Dataset(filename) as f:
return f.variables[datapath][slice] However, this opens and closes many files, which while robust, is slow. We can alleviate this by maintaining an LRU cache in a global variable so that it is created separately per process. from toolz import memoize
cache = LRUDict(size=100, on_eviction=lambda file: file.close())
netCDF4_Dataset = memoize(netCDF4.Dataset, cache=cache)
def def get_chunk_of_array(filename, datapath, slice):
f = netCDF4_Dataset(filename)
return f.variables[datapath][slice] I'm happy to supply the We would then need to use such a function within the dask.array and xarary codebases. Anyway, that's one approach. Thoughts welcome. |
Here is an example of a use case for a |
@shoyer and @mrocklin, I've updated the summary above in the PR description with a to do list. Do either of you see any obvious tasks I missed on the list in the PR description? If so, can you please update the to do list so that I can see what needs done to modify the backend for the dask/distributed integration? |
Repeating @mrocklin:
|
I believe that robustly supporting HDF/NetCDF reads with the mechanism mentioned above will resolve most problems from a dask.array perspective. I have no doubt that other things will arise though. Switching from shared to distributed memory always come with (surmountable) obstacles |
Another note in support of this PR, especially "robustly support HDF/NetCDF reads": I am having problems with |
FWIW I've uploaded a tiny LRU dict implementation to a new http://zict.readthedocs.org/en/latest/
from zict import LRU
d = LRU(100, dict()) There are a number of good alternatives out there though for LRU dictionaries. |
Thanks @mrocklin! This has been really helpful and was what I needed to get going. A prelim design I'm seeing is to modify the
A clean way to do this is just to make sure that each time Unless I'm missing something big, I don't think this change will require at large refactor but it is quite possible I overlooked something important. @shoyer and @mrocklin, do you see any obvious pitfalls in this scope? If not, it shouldn't be too hard to implement. |
Sorry if I am just producing noise here (I am not a specialist), but I have two naive questions: To 1. how will you handle concurrent access to the LRU cache if it's a global variable? To 2. Once the file has been closed by the LRU, won't it also be erased from it? So that a simple |
Correct, the LRU dict should be global. I believe the file restriction is generally per-process, and creating a global dict should assure that works properly.
The challenge is that we only call the My bigger concern was how to make use of a method like |
Just to be clear, we are talking about this https://github.com/mrocklin/hdf5lazy/blob/master/hdf5lazy/core.py#L83 for @mrocklin's |
@fmaussion, for
|
@pwolfram I was referring to this comment for @mrocklin's |
Yes, this is correct. In principle, if we have a very large number of files containing many variables each, we might want to do the read in parallel using futures, and then use something like |
It's probably best to avoid futures within |
Has this issue progressed since? Being able to distribute loading of files to a dask cluster and composing an xarray Is @mrocklin's blog post from Feb 2016 still the reference for remote data loading on a cluster? Adapting it to loading xarray Datasets rather than plain arrays is not straightforward since there is no way to combine futures representing Datasets out of the box. |
I haven't worked on this but agree that it is important. |
@kynan, I'm still interested in this but have not had time to advance this further. Are you interested in contributing to this too? I view this as a key component of future climate analysis workflows. This may also be something that is addressed at the upcoming hackathon at Columbia with @rabernat early next month. Also, I suspect that both @mrocklin and @shoyer would be willing to continue to provide key advice because this appears to be aligned with their interests too (please correct me if I'm wrong in this assessment). |
Definitely happy to support from the Dask side. I think that the LRU method described above is feasible. |
If XArray devs want to chat sometime I suspect we could hammer out a plan fairly quickly. My hope is that once a plan exists then a developer will arise to implement that plan. I'm free all of today and tomorrow. |
@mrocklin, I would be happy to chat because I am interested in seeing this happen (e.g., eventually contributing code). The question is whether we need additional expertise from @shoyer, @jhamman, @rabernat etc who likely have a greater in-depth understanding of xarray than me. Perhaps this warrants an email to the wider list? |
I agree that this conversation needs expertise from a core xarray developer. I suspect that this change is more likely to happen in xarray than in dask.array. Happy to continue the conversation wherever. I do have a slight preference to switch to real-time at some point though. I suspect that we can hash this out in a moderate number of minutes. |
We have something very hacky working with #1095 I'm also going to see if I can get something working with the LRU cache, since that seems closer to the solution we want eventually. |
FYI Dask is committed to maintaining this: https://github.com/dask/zict/blob/master/zict/lru.py |
One slight subtlety is writes. We'll need to switch from 'w' to 'a' mode
|
A few other thoughts on thread safety with the LRU approach:
|
Great to see this moving! I take it the workshop was productive? How does #1095 work in the scenario of a distributed scheduler with remote workers? Do I understand correctly that all workers and the client would need to see the same shared filesystem from where NetCDF files are read? |
Yes. On Tue, Nov 8, 2016 at 5:17 PM, Florian Rathgeber notifications@github.com
|
When using xarray with the There could be a
(Could create a separate issue for this if preferred). |
One solution is to create protocols on the Dask side to enable |
@mrocklin Any thoughts on my thread safety concerns (#798 (comment)) for the LRU cache? I suppose the simplest thing to do is to simply refuse to evict a file until the per-file lock is released, but I can see that strategy failing pretty badly in edge cases. |
A lock on the LRU cache makes sense to me.
If it were me I would just block on the evicted file until it becomes available (the stop-gap measure) until it became a performance problem. |
…writing Fixes pydata#1172 The serializable lock will be useful for dask.distributed or multi-processing (xref pydata#798, pydata#1173, among others).
…ing (#1179) * Switch to shared Lock (SerializableLock if possible) for reading and writing Fixes #1172 The serializable lock will be useful for dask.distributed or multi-processing (xref #798, #1173, among others). * Test serializable lock * Use conda-forge for builds * remove broken/fragile .test_lock
Has anyone used XArray on NetCDF data on cluster without resorting to any tricks? |
@shoyer regarding per-file locking this probably only matters if we are writing as well, yes? Here is a small implementation of a generic file-open cache. I haven't yet decided on a eviction policy but either LRU or random (filtered by closeable files) should work OK. from contextlib import contextmanager
import threading
class OpenCache(object):
def __init__(self, maxsize=100):
self.refcount = defaultdict(lambda: 0)
self.maxsize = 0
self.cache = {}
self.i = 0
self.lock = threading.Lock()
@contextmanager
def open(self, myopen, fn, mode='r'):
assert 'r' in mode
key = (myopen, fn, mode)
with self.lock:
try:
file = self.cache[key]
except KeyError:
file = myopen(fn, mode=mode)
self.cache[key] = file
self.refcount[key] += 1
if len(self.cache) > self.maxsize:
# Clear old files intelligently
try:
yield file
finally:
with self.lock:
self.refcount[key] -= 1
cache = OpenCache()
with cache.open(h5py.File, 'myfile.hdf5') as f:
x = f['/data/x']
y = x[:1000, :1000] Is this still useful? I'm curious to hear from users like @pwolfram and @rabernat who may be running into the many file problem about what the current pain points are. |
Dask (https://github.com/dask/dask) currently provides on-node parallelism for medium-size data problems. However, large climate data sets will require multiple-node parallelism to analyze large climate data sets because this constitutes a big data problem. A likely solution to this issue is integration of distributed (https://github.com/dask/distributed) with dask. Distributed is now integrated with dask and its benefits are already starting to be realized, e.g., see http://matthewrocklin.com/blog/work/2016/02/26/dask-distributed-part-3.
Thus, this issue is designed to identify the steps needed to perform this integration, at a high-level. As stated by @shoyer, it will
Thus, we have the chance to make xarray big-data capable as well as provide improvements to the backend.
To this end, I'm starting this issue to help begin the design process following the xarray mailing list discussion some of us have been having (@shoyer, @mrocklin, @rabernat).
Task To Do List:
to_netcdf
output is resolved (e.g., dask.async.RuntimeError: NetCDF: HDF error on xarray to_netcdf #793)The text was updated successfully, but these errors were encountered: