Description
MCVE Code Sample
import multiprocessing
import xarray as xr
import numpy as np
from s3fs import S3FileSystem, S3Map
from time import sleep
from main import set_aws_credentials
set_aws_credentials()
def download_file(lead_time):
return 'path_to_your_file'
def make_xarray_dataset(file_path, lead_time):
var1 = np.random.rand(1, 721, 1440, 22)
var2 = np.random.rand(1, 721, 1440, 22)
lat = np.linspace(-90, 90, 721)
lon = np.linspace(0, 360, 1440)
height = range(22)
ds = xr.Dataset({'var1': (['lead_time', 'lat', 'lon', 'height'], var1),
'var2': (['lead_time', 'lat', 'lon', 'height'], var2)},
coords={'lat': lat,
'lon': lon,
'height': height,
'lead_time': [lead_time]})
return ds
def upload_to_s3(dataset, append):
s3 = S3FileSystem()
s3map = S3Map('S3_path_to_your_zarr', s3=s3)
# If we are appending to an already existing dataset
if append:
dataset.to_zarr(store=s3map, mode='a', append_dim='lead_time')
else:
dataset.to_zarr(store=s3map, mode='w')
def lead_time_worker(lead_time, append=True):
file_path = download_file(lead_time)
dataset = make_xarray_dataset(file_path, lead_time)
upload_to_s3(dataset, append=True)
return 0
if __name__ == '__main__':
lead_times = range(10)
first_lead_time = True
processes = []
for lead_time in lead_times:
if first_lead_time:
process = multiprocessing.Process(target=lead_time_worker,
args=(lead_time, False))
process.start()
process.join()
first_lead_time = False
else:
process = multiprocessing.Process(target=lead_time_worker,
args=(lead_time,))
process.start()
processes.append(process)
sleep(5) # Sleep in order to shift the different processes so that they don't begin at the same time
for p in processes:
p.join()
will raise
ValueError: conflicting sizes for dimension 'lead_time': length X on 'Var1' and length Y on 'Var2'
Traceback (most recent call last):
File "main.py", line 200, in lead_time_worker
upload_to_s3(dataset, cloud_zarr_path, append=True)
File "main.py", line 167, in upload_to_gcloud
ds.to_zarr(store=s3map, mode='a', append_dim='lead_time')
File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/core/dataset.py", line 1414, in to_zarr
consolidated=consolidated, append_dim=append_dim)
File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/backends/api.py", line 1101, in to_zarr
dump_to_store(dataset, zstore, writer, encoding=encoding)
File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/backends/api.py", line 929, in dump_to_store
unlimited_dims=unlimited_dims)
File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/backends/zarr.py", line 354, in store
ds = open_zarr(self.ds.store, chunks=None)
File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/backends/zarr.py", line 557, in open_zarr
ds = maybe_decode_store(zarr_store)
File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/backends/zarr.py", line 545, in maybe_decode_store
drop_variables=drop_variables)
File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/conventions.py", line 527, in decode_cf
ds = Dataset(vars, attrs=attrs)
File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/core/dataset.py", line 423, in __init__
self._set_init_vars_and_dims(data_vars, coords, compat)
File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/core/dataset.py", line 445, in _set_init_vars_and_dims
data_vars, coords, compat=compat)
File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/core/merge.py", line 379, in merge_data_and_coords
indexes=indexes)
File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/core/merge.py", line 460, in merge_core
dims = calculate_dimensions(variables)
File "/home/ubuntu/anaconda3/envs/GFS-data-retrieval/lib/python3.7/site-packages/xarray/core/dataset.py", line 125, in calculate_dimensions
(dim, size, k, dims[dim], last_used[dim]))
ValueError: conflicting sizes for dimension 'lead_time': length X on 'var1' and length Y on 'var2'
Problem Description
First of all, thanks a lot to the community for the PR #2706, I was really looking forward to it. I already experienced using the new append parameter, and got some problems trying to do it in a parallel way.
I want to upload a very big zarr (global numerical weather prediction, output of GFS model that you can check out here) on a S3 bucket. In order to speed this up, as each single file of the source contains the data for one lead time(length of time between the issuance of a forecast and the occurrence of the phenomena that were predicted) and I want to concatenate them all, I tried to have one process per lead time and all of them to append to the same data store using Dataset.to_zarr() with append=True
.
However, when doing that, I get the error described above. Indeed, the processes are appending simultaneously, so the data is not necessarily consistent when a new process tries to append, some variables will already have the values of one lead time and some will not because the process is not finished, which will lead to calculate_dimensions() raising this error.
I wonder if there is a way I haven't found to work around this using simply a synchronizer? If not, do you think it would be possible (and reasonable) to implement a parameter allowing to bypass this check on the append dimension in an 'eventually consistent' approach?
Output of xr.show_versions()
INSTALLED VERSIONS
commit: None
python: 3.7.3 (default, Mar 27 2019, 22:11:17)
[GCC 7.3.0]
python-bits: 64
OS: Linux
OS-release: 4.15.0-1032-aws
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: C.UTF-8
LOCALE: en_US.UTF-8
libhdf5: None
libnetcdf: None
xarray: 0.12.2
pandas: 0.24.2
numpy: 1.16.4
scipy: None
netCDF4: None
pydap: None
h5netcdf: None
h5py: None
Nio: 1.5.5
zarr: 2.3.2
cftime: None
nc_time_axis: None
PseudonetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: None
dask: 2.0.0
distributed: 2.0.1
matplotlib: None
cartopy: None
seaborn: None
numbagg: None
setuptools: 41.0.1
pip: 19.1.1
conda: None
pytest: None
IPython: None
sphinx: None