Skip to content

Commit

Permalink
Merge pull request #28 from Open-EO/update-header
Browse files Browse the repository at this point in the history
Update the dask header to support the dask_gateway
  • Loading branch information
SerRichard authored Mar 23, 2022
2 parents 3c11583 + eaa7c30 commit eaaf962
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 48 deletions.
7 changes: 5 additions & 2 deletions src/openeo_odc/map_processes_odc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from openeo_odc.string_creation import create_param_string


DASK_CHUNK_SIZE_X = 12114
DASK_CHUNK_SIZE_Y = 12160

def map_load_collection(id, process):
""" Map to load_collection process for ODC datacubes.
Expand All @@ -30,7 +33,7 @@ def map_load_collection(id, process):

params = {
'product': process['arguments']['id'],
'dask_chunks': {'time': 'auto', 'x': 1000, 'y': 1000},
'dask_chunks': {'y': DASK_CHUNK_SIZE_Y,'x':DASK_CHUNK_SIZE_X, 'time':'auto'},
}
if 'spatial_extent' in process['arguments']:
if process['arguments']['spatial_extent'] is not None:
Expand Down Expand Up @@ -82,7 +85,7 @@ def map_load_result(id, process) -> str:
product_name = process['arguments']['id'].replace("-", "_")
params = {
'product': product_name,
'dask_chunks': {'time': 'auto', 'x': 1000, 'y': 1000},
'dask_chunks': {'y': DASK_CHUNK_SIZE_Y,'x':DASK_CHUNK_SIZE_X, 'time':'auto'},
}
if 'spatial_extent' in process['arguments']:
if process['arguments']['spatial_extent'] is not None:
Expand Down
29 changes: 23 additions & 6 deletions src/openeo_odc/map_to_odc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
from openeo_odc.utils import ExtraFuncUtils, PROCS_WITH_VARS


def map_to_odc(graph, odc_env, odc_url):
def map_to_odc(graph, odc_env, odc_url, job_id: str = None, user_id: str = None):
"""Map openEO process graph to xarray/opendatacube functions."""
if (not job_id) or (not user_id):
raise TypeError("Both the job_id and user_id must be provided.")

extra_func_utils = ExtraFuncUtils()

nodes = {}
Expand Down Expand Up @@ -68,9 +71,10 @@ def map_to_odc(graph, odc_env, odc_url):
for fc_proc in extra_func.values():
final_fc.update(**fc_proc)
return {
'header': create_job_header(odc_env_collection=odc_env, dask_url=odc_url),
'header': create_job_header(odc_env_collection=odc_env, dask_url=odc_url, job_id=job_id, user_id=user_id),
**final_fc,
**nodes,
'tail': create_job_tail(),
}


Expand Down Expand Up @@ -110,15 +114,28 @@ def resolve_from_parameter(node):
return in_nodes


def create_job_header(dask_url: str, odc_env_collection: str = "default", odc_env_user_gen: str = "user_generated"):
def create_job_header(dask_url: str, job_id: str, user_id: str, odc_env_collection: str = "default", odc_env_user_gen: str = "user_generated"):
"""Create job imports."""
return f"""from dask.distributed import Client
return f"""from dask_gateway import Gateway
import datacube
import openeo_processes as oeop
import time
# Initialize ODC instance
cube = datacube.Datacube(app='collection', env='{odc_env_collection}')
cube_user_gen = datacube.Datacube(app='user_gen', env='{odc_env_user_gen}')
# Connect to Dask Scheduler
client = Client('{dask_url}')
# Connect to the gateway
gateway = Gateway('{dask_url}')
options = gateway.cluster_options()
options.user_id = '{user_id}'
options.job_id = '{job_id}'
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=1, maximum=3)
time.sleep(60)
client = cluster.get_client()
"""

def create_job_tail():
"""Ensure shutdown of cluster"""
return f"""cluster.shutdown()
gateway.close()"""
4 changes: 3 additions & 1 deletion src/openeo_odc/string_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,6 @@ def create_param_string(dict_input: dict, process_name: str):
if dict_input:
replace_str += ', '

return f"**{dict_input}".replace('{', replace_str, 1)
return_string = f"**{dict_input}".replace('{', replace_str, 1)

return return_string
18 changes: 14 additions & 4 deletions tests/ref_jobs/apply_job_odc_ref.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
from dask.distributed import Client
from dask_gateway import Gateway
import datacube
import openeo_processes as oeop
import time

# Initialize ODC instance
cube = datacube.Datacube(app='collection', env='default')
cube_user_gen = datacube.Datacube(app='user_gen', env='user_generated')
# Connect to Dask Scheduler
client = Client('tcp://xx.yyy.zz.kk:8786')
# Connect to the gateway
gateway = Gateway('tcp://xx.yyy.zz.kk:8786')
options = gateway.cluster_options()
options.user_id = 'test-user'
options.job_id = 'test-job'
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=1, maximum=3)
time.sleep(60)
client = cluster.get_client()

_loadcollection1_0 = oeop.load_collection(odc_cube=cube, **{'product': 'S2_L2A_T32TPS', 'dask_chunks': {'time': 'auto', 'x': 1000, 'y': 1000}, 'x': (10.960229020571205, 10.975120481571418), 'y': (45.91379959511596, 45.920009625521885), 'time': ['2017-07-01T00:00:00Z', '2017-07-07T23:59:59Z'], 'measurements': ['B04_10m', 'B03_10m', 'B02_10m']})
_loadcollection1_0 = oeop.load_collection(odc_cube=cube, **{'product': 'S2_L2A_T32TPS', 'dask_chunks': {'y': 12160, 'x': 12114, 'time': 'auto'}, 'x': (10.960229020571205, 10.975120481571418), 'y': (45.91379959511596, 45.920009625521885), 'time': ['2017-07-01T00:00:00Z', '2017-07-07T23:59:59Z'], 'measurements': ['B04_10m', 'B03_10m', 'B02_10m']})
_min1_2 = oeop.min(**{'data': _loadcollection1_0, 'dimension': 'time'})
_reducedimension2_1 = oeop.reduce_dimension(**{'data': _min1_2, 'dimension': 't', 'reducer': {}})
_sqrt1_4 = oeop.sqrt(**{'x': _reducedimension2_1})
_apply1_3 = oeop.apply(**{'data': _sqrt1_4, 'process': _sqrt1_4, 'context': ''})
_saveresult1_5 = oeop.save_result(**{'data': _apply1_3, 'format': 'GTiff', 'options': {}})
cluster.shutdown()
gateway.close()
18 changes: 14 additions & 4 deletions tests/ref_jobs/evi_odc_ref.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
from dask.distributed import Client
from dask_gateway import Gateway
import datacube
import openeo_processes as oeop
import time

# Initialize ODC instance
cube = datacube.Datacube(app='collection', env='default')
cube_user_gen = datacube.Datacube(app='user_gen', env='user_generated')
# Connect to Dask Scheduler
client = Client('tcp://xx.yyy.zz.kk:8786')
# Connect to the gateway
gateway = Gateway('tcp://xx.yyy.zz.kk:8786')
options = gateway.cluster_options()
options.user_id = 'test-user'
options.job_id = 'test-job'
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=1, maximum=3)
time.sleep(60)
client = cluster.get_client()

_dc_0 = oeop.load_collection(odc_cube=cube, **{'product': 'boa_sentinel_2', 'dask_chunks': {'time': 'auto', 'x': 1000, 'y': 1000}, 'x': (9.9, 10.0), 'y': (46.5, 46.6), 'time': ['2018-06-15T00:00:00Z', '2018-06-16T00:00:00Z'], 'measurements': ['B08', 'B04', 'B02']})
_dc_0 = oeop.load_collection(odc_cube=cube, **{'product': 'boa_sentinel_2', 'dask_chunks': {'y': 12160, 'x': 12114, 'time': 'auto'}, 'x': (9.9, 10.0), 'y': (46.5, 46.6), 'time': ['2018-06-15T00:00:00Z', '2018-06-16T00:00:00Z'], 'measurements': ['B08', 'B04', 'B02']})
_nir_2 = oeop.array_element(**{'data': _dc_0, 'index': 0, 'dimension': 'bands'})
_red_3 = oeop.array_element(**{'data': _dc_0, 'index': 1, 'dimension': 'bands'})
_blue_4 = oeop.array_element(**{'data': _dc_0, 'index': 2, 'dimension': 'bands'})
Expand All @@ -22,3 +30,5 @@
_min_12 = oeop.min(**{'data': _evi_1, 'dimension': 'time'})
_mintime_11 = oeop.reduce_dimension(**{'data': _min_12, 'dimension': 'temporal', 'reducer': {}})
_save_13 = oeop.save_result(**{'data': _mintime_11, 'format': 'netCDF'})
cluster.shutdown()
gateway.close()
18 changes: 14 additions & 4 deletions tests/ref_jobs/fit_curve_cm_odc_ref.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
from dask.distributed import Client
from dask_gateway import Gateway
import datacube
import openeo_processes as oeop
import time

# Initialize ODC instance
cube = datacube.Datacube(app='collection', env='default')
cube_user_gen = datacube.Datacube(app='user_gen', env='user_generated')
# Connect to Dask Scheduler
client = Client('tcp://xx.yyy.zz.kk:8786')
# Connect to the gateway
gateway = Gateway('tcp://xx.yyy.zz.kk:8786')
options = gateway.cluster_options()
options.user_id = 'test-user'
options.job_id = 'test-job'
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=1, maximum=3)
time.sleep(60)
client = cluster.get_client()


def extra_func_fitcurve1_3(x, *parameters):
Expand All @@ -24,8 +32,10 @@ def extra_func_fitcurve1_3(x, *parameters):
return _add2_13


_loadcollection1_0 = oeop.load_collection(odc_cube=cube, **{'product': 'boa_sentinel_2', 'dask_chunks': {'time': 'auto', 'x': 1000, 'y': 1000}, 'x': (11.410299, 11.413905), 'y': (46.341515, 46.343144), 'time': ['2016-09-01', '2018-08-31'], 'measurements': ['B02', 'B03', 'B04', 'B05', 'B08']})
_loadcollection1_0 = oeop.load_collection(odc_cube=cube, **{'product': 'boa_sentinel_2', 'dask_chunks': {'y': 12160, 'x': 12114, 'time': 'auto'}, 'x': (11.410299, 11.413905), 'y': (46.341515, 46.343144), 'time': ['2016-09-01', '2018-08-31'], 'measurements': ['B02', 'B03', 'B04', 'B05', 'B08']})
_clip1_2 = oeop.clip(**{'x': _loadcollection1_0, 'min': 0, 'max': 4000})
_apply2_1 = oeop.apply(**{'process': _clip1_2, 'data': _clip1_2})
_fitcurve1_3 = oeop.fit_curve(**{'data': _apply2_1, 'function': extra_func_fitcurve1_3, 'parameters': [1, 1, 1], 'dimension': 't'})
_saveresult1_15 = oeop.save_result(**{'data': _fitcurve1_3, 'format': 'NetCDF', 'options': {}})
cluster.shutdown()
gateway.close()
18 changes: 14 additions & 4 deletions tests/ref_jobs/fit_curve_odc_ref.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
from dask.distributed import Client
from dask_gateway import Gateway
import datacube
import openeo_processes as oeop
import time

# Initialize ODC instance
cube = datacube.Datacube(app='collection', env='default')
cube_user_gen = datacube.Datacube(app='user_gen', env='user_generated')
# Connect to Dask Scheduler
client = Client('tcp://xx.yyy.zz.kk:8786')
# Connect to the gateway
gateway = Gateway('tcp://xx.yyy.zz.kk:8786')
options = gateway.cluster_options()
options.user_id = 'test-user'
options.job_id = 'test-job'
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=1, maximum=3)
time.sleep(60)
client = cluster.get_client()


def extra_func_18_0(x, *parameters):
Expand All @@ -30,8 +38,10 @@ def extra_func_18_0(x, *parameters):
return _b4mf181yp_3


_23_20 = oeop.load_collection(odc_cube=cube, **{'product': 'boa_sentinel_2', 'dask_chunks': {'time': 'auto', 'x': 1000, 'y': 1000}, 'x': (11.5381, 11.5381), 'y': (46.4868, 46.4868), 'time': ['2016-01-01T00:00:00Z', '2016-05-31T00:00:00Z'], 'measurements': []})
_23_20 = oeop.load_collection(odc_cube=cube, **{'product': 'boa_sentinel_2', 'dask_chunks': {'y': 12160, 'x': 12114, 'time': 'auto'}, 'x': (11.5381, 11.5381), 'y': (46.4868, 46.4868), 'time': ['2016-01-01T00:00:00Z', '2016-05-31T00:00:00Z'], 'measurements': []})
_1_19 = oeop.clip(**{'x': _23_20, 'min': 0, 'max': 4000})
_22_18 = oeop.apply(**{'process': _1_19, 'data': _1_19, 'context': ''})
_18_0 = oeop.fit_curve(**{'data': _22_18, 'function': extra_func_18_0, 'parameters': [1, 1, 1], 'dimension': 't'})
_saveresult1_21 = oeop.save_result(**{'data': _18_0, 'format': 'NETCDF'})
cluster.shutdown()
gateway.close()
18 changes: 14 additions & 4 deletions tests/ref_jobs/rename_labels_odc_ref.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
from dask.distributed import Client
from dask_gateway import Gateway
import datacube
import openeo_processes as oeop
import time

# Initialize ODC instance
cube = datacube.Datacube(app='collection', env='default')
cube_user_gen = datacube.Datacube(app='user_gen', env='user_generated')
# Connect to Dask Scheduler
client = Client('tcp://xx.yyy.zz.kk:8786')
# Connect to the gateway
gateway = Gateway('tcp://xx.yyy.zz.kk:8786')
options = gateway.cluster_options()
options.user_id = 'test-user'
options.job_id = 'test-job'
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=1, maximum=3)
time.sleep(60)
client = cluster.get_client()


def extra_func_fitcurve1_4(x, *parameters):
Expand Down Expand Up @@ -38,7 +46,7 @@ def extra_func_predictcurve1_15(x, *parameters):
return _add2_25


_loadcollection1_0 = oeop.load_collection(odc_cube=cube, **{'product': 'boa_sentinel_2', 'dask_chunks': {'time': 'auto', 'x': 1000, 'y': 1000}, 'x': (11.411777, 11.411977), 'y': (46.342355, 46.342555000000004), 'time': ['2016-09-01', '2019-08-31'], 'measurements': ['B08']})
_loadcollection1_0 = oeop.load_collection(odc_cube=cube, **{'product': 'boa_sentinel_2', 'dask_chunks': {'y': 12160, 'x': 12114, 'time': 'auto'}, 'x': (11.411777, 11.411977), 'y': (46.342355, 46.342555000000004), 'time': ['2016-09-01', '2019-08-31'], 'measurements': ['B08']})
_clip1_2 = oeop.clip(**{'x': _loadcollection1_0, 'max': 5000, 'min': 0})
_apply1_1 = oeop.apply(**{'data': _clip1_2, 'process': _clip1_2})
_dimensionlabels1_3 = oeop.dimension_labels(**{'data': _apply1_1, 'dimension': 't'})
Expand All @@ -47,3 +55,5 @@ def extra_func_predictcurve1_15(x, *parameters):
_renamelabels1_26 = oeop.rename_labels(**{'data': _predictcurve1_15, 'dimension': 'bands', 'target': ['B08_predicted']})
_mergecubes1_27 = oeop.merge_cubes(**{'cube1': _apply1_1, 'cube2': _renamelabels1_26})
_saveresult1_28 = oeop.save_result(**{'data': _mergecubes1_27, 'format': 'NetCDF', 'options': {}})
cluster.shutdown()
gateway.close()
18 changes: 14 additions & 4 deletions tests/ref_jobs/uc6_1_2_odc_ref.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
from dask.distributed import Client
from dask_gateway import Gateway
import datacube
import openeo_processes as oeop
import time

# Initialize ODC instance
cube = datacube.Datacube(app='collection', env='default')
cube_user_gen = datacube.Datacube(app='user_gen', env='user_generated')
# Connect to Dask Scheduler
client = Client('tcp://xx.yyy.zz.kk:8786')
# Connect to the gateway
gateway = Gateway('tcp://xx.yyy.zz.kk:8786')
options = gateway.cluster_options()
options.user_id = 'test-user'
options.job_id = 'test-job'
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=1, maximum=3)
time.sleep(60)
client = cluster.get_client()


def extra_func_4_4(x, *parameters):
Expand Down Expand Up @@ -40,7 +48,7 @@ def extra_func_fitcurve1_22(x, *parameters):
return _add2_24


_loadcollection1_16 = oeop.load_collection(odc_cube=cube, **{'product': 'boa_sentinel_2', 'dask_chunks': {'time': 'auto', 'x': 1000, 'y': 1000}, 'x': (11.410299, 11.413905), 'y': (46.341515, 46.343144), 'time': ['2016-09-01', '2018-08-31'], 'measurements': ['B02', 'B03', 'B04', 'B05', 'B08']})
_loadcollection1_16 = oeop.load_collection(odc_cube=cube, **{'product': 'boa_sentinel_2', 'dask_chunks': {'y': 12160, 'x': 12114, 'time': 'auto'}, 'x': (11.410299, 11.413905), 'y': (46.341515, 46.343144), 'time': ['2016-09-01', '2018-08-31'], 'measurements': ['B02', 'B03', 'B04', 'B05', 'B08']})
_1_1 = oeop.eq(**{'x': _loadcollection1_16, 'y': 0})
_clip1_18 = oeop.clip(**{'x': _loadcollection1_16, 'max': 4000, 'min': 0})
_clip1_20 = oeop.clip(**{'x': _loadcollection1_16, 'min': 0, 'max': 4000})
Expand All @@ -53,3 +61,5 @@ def extra_func_fitcurve1_22(x, *parameters):
_1_3 = oeop.multiply(**{'x': _fitcurve1_22, 'y': _2_0})
_3_2 = oeop.merge_cubes(**{'cube1': _fitcurve1_22, 'cube2': _2_0, 'overlap_resolver': _1_3})
_saveresult1_34 = oeop.save_result(**{'data': _3_2, 'format': 'NetCDF', 'options': {}})
cluster.shutdown()
gateway.close()
20 changes: 15 additions & 5 deletions tests/ref_jobs/uc6_2_odc_ref.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
from dask.distributed import Client
from dask_gateway import Gateway
import datacube
import openeo_processes as oeop
import time

# Initialize ODC instance
cube = datacube.Datacube(app='collection', env='default')
cube_user_gen = datacube.Datacube(app='user_gen', env='user_generated')
# Connect to Dask Scheduler
client = Client('tcp://xx.yyy.zz.kk:8786')
# Connect to the gateway
gateway = Gateway('tcp://xx.yyy.zz.kk:8786')
options = gateway.cluster_options()
options.user_id = 'test-user'
options.job_id = 'test-job'
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=1, maximum=3)
time.sleep(60)
client = cluster.get_client()


def extra_func_fitcurve1_9(x, *parameters):
Expand All @@ -24,9 +32,9 @@ def extra_func_fitcurve1_9(x, *parameters):
return _add2_11


_1_0 = oeop.load_result(odc_cube=cube_user_gen, **{'product': 'jb_fa7a7188_c3ce_40fb_85d8_a5443ac112c3', 'dask_chunks': {'time': 'auto', 'x': 1000, 'y': 1000}})
_1_0 = oeop.load_result(odc_cube=cube_user_gen, **{'product': 'jb_fa7a7188_c3ce_40fb_85d8_a5443ac112c3', 'dask_chunks': {'y': 12160, 'x': 12114, 'time': 'auto'}})

_loadcollection1_5 = oeop.load_collection(odc_cube=cube, **{'product': 'boa_sentinel_2', 'dask_chunks': {'time': 'auto', 'x': 1000, 'y': 1000}, 'x': (11.410299, 11.413905), 'y': (46.341515, 46.343144), 'time': ['2016-09-01', '2018-08-31'], 'measurements': ['B02', 'B03', 'B04', 'B05', 'B08']})
_loadcollection1_5 = oeop.load_collection(odc_cube=cube, **{'product': 'boa_sentinel_2', 'dask_chunks': {'y': 12160, 'x': 12114, 'time': 'auto'}, 'x': (11.410299, 11.413905), 'y': (46.341515, 46.343144), 'time': ['2016-09-01', '2018-08-31'], 'measurements': ['B02', 'B03', 'B04', 'B05', 'B08']})
_1_2 = oeop.eq(**{'x': _loadcollection1_5, 'y': 0})
_clip1_7 = oeop.clip(**{'x': _loadcollection1_5, 'max': 4000, 'min': 0})
_2_1 = oeop.apply(**{'data': _1_2, 'process': _1_2})
Expand All @@ -36,3 +44,5 @@ def extra_func_fitcurve1_9(x, *parameters):
_1_4 = oeop.multiply(**{'x': _fitcurve1_9, 'y': _2_1})
_3_3 = oeop.merge_cubes(**{'cube1': _fitcurve1_9, 'cube2': _2_1, 'overlap_resolver': _1_4})
_saveresult1_21 = oeop.save_result(**{'data': _3_3, 'format': 'NetCDF', 'options': {}})
cluster.shutdown()
gateway.close()
Loading

0 comments on commit eaaf962

Please sign in to comment.