Skip to content

Commit

Permalink
Merge branch 'features/#112-option-testmode' into features/#9-migrate…
Browse files Browse the repository at this point in the history
…-substation-section
  • Loading branch information
IlkaCu committed Mar 5, 2021
2 parents d66a909 + cc3009b commit 17ede54
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 28 deletions.
26 changes: 20 additions & 6 deletions src/egon/data/airflow/dags/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
# Prepare connection to db for operators
airflow_db_connection()

# Temporary set dataset variable here
dataset = 'Schleswig-Holstein'

with airflow.DAG(
"egon-data-processing-pipeline",
description="The eGo^N data processing DAG.",
Expand All @@ -36,17 +39,23 @@

# Openstreetmap data import
osm_download = PythonOperator(
task_id="download-osm", python_callable=import_osm.download_pbf_file
task_id="download-osm",
python_callable=import_osm.download_pbf_file,
op_args={dataset},
)
osm_import = PythonOperator(
task_id="import-osm", python_callable=import_osm.to_postgres
task_id="import-osm",
python_callable=import_osm.to_postgres,
op_args={dataset},
)
osm_migrate = PythonOperator(
task_id="migrate-osm",
python_callable=process_osm.modify_tables,
)
osm_add_metadata = PythonOperator(
task_id="add-osm-metadata", python_callable=import_osm.add_metadata
task_id="add-osm-metadata",
python_callable=import_osm.add_metadata,
op_args={dataset},
)
setup >> osm_download >> osm_import >> osm_migrate >> osm_add_metadata

Expand All @@ -56,8 +65,10 @@
python_callable=import_vg250.download_vg250_files,
)
vg250_import = PythonOperator(
task_id="import-vg250", python_callable=import_vg250.to_postgres
task_id="import-vg250", python_callable=import_vg250.to_postgres,
op_args={dataset}
)

vg250_nuts_mview = PostgresOperator(
task_id="vg250_nuts_mview",
sql="vg250_lan_nuts_id_mview.sql",
Expand Down Expand Up @@ -95,15 +106,18 @@

population_import = PythonOperator(
task_id="import-zensus-population",
python_callable=import_zs.population_to_postgres
python_callable=import_zs.population_to_postgres,
op_args={dataset}
)

zensus_misc_import = PythonOperator(
task_id="import-zensus-misc",
python_callable=import_zs.zensus_misc_to_postgres
python_callable=import_zs.zensus_misc_to_postgres,
op_args={dataset}
)
setup >> zensus_download_population >> zensus_download_misc
zensus_download_misc >> zensus_tables >> population_import
vg250_clean_and_prepare >> population_import
population_import >> zensus_misc_import

# DemandRegio data import
Expand Down
6 changes: 4 additions & 2 deletions src/egon/data/datasets.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
openstreetmap:
original_data:
source:
url: "https://download.geofabrik.de/europe/germany/bremen-200101.osm.pbf"
url: "http://download.geofabrik.de/europe/germany-200101.osm.pbf"
url_testmode: "https://download.geofabrik.de/europe/germany/schleswig-holstein-200101.osm.pbf"
stylefile: "oedb.style"
target:
table_prefix: "osm"
path: "bremen-200101.osm.pbf"
path: "germany-200101.osm.pbf"
path_testmode: "schleswig-holstein-200101.osm.pbf"
processed:
schema: "openstreetmap"
tables:
Expand Down
30 changes: 29 additions & 1 deletion src/egon/data/importing/demandregio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ def create_tables():
EgonDemandRegioHouseholds.__table__.create(bind=engine, checkfirst=True)
EgonDemandRegioWz.__table__.create(bind=engine, checkfirst=True)

def data_in_boundaries(df):
""" Select rows with nuts3 code within boundaries, used for testmode
Parameters
----------
df : pandas.DataFrame
Data for all nuts3 regions
Returns
-------
pandas.DataFrame
Data for nuts3 regions within boundaries
"""
engine = db.engine()

return df[df.index.isin(pd.read_sql(
"SELECT DISTINCT ON (nuts) nuts FROM boundaries.vg250_krs",
engine).nuts)]

def insert_cts_ind_wz_definitions():
""" Insert demandregio's definitions of CTS and industrial branches
Expand Down Expand Up @@ -250,6 +269,9 @@ def insert_hh_demand(scenario, year, engine, cfg):
# get demands of private households per nuts and size from demandregio
ec_hh = disagg_households_power(scenario, year)

# Select demands for nuts3-regions in boundaries (needed for testmode)
ec_hh = data_in_boundaries(ec_hh)

# insert into database
for hh_size in ec_hh.columns:
df = pd.DataFrame(ec_hh[hh_size])
Expand Down Expand Up @@ -303,6 +325,9 @@ def insert_cts_ind_demand(scenario, year, engine, target_values, cfg):
if scenario == 'eGon2035':
ec_cts_ind = adjust_cts_ind_nep(ec_cts_ind, sector, cfg)

# Select demands for nuts3-regions in boundaries (needed for testmode)
ec_cts_ind = data_in_boundaries(ec_cts_ind)

# insert into database
for wz in ec_cts_ind.columns:
df = pd.DataFrame(ec_cts_ind[wz])
Expand Down Expand Up @@ -382,12 +407,15 @@ def insert_society_data():
df_pop = pd.DataFrame(data.population(year=year))
df_pop['year'] = year
df_pop = df_pop.rename({'value': 'population'}, axis='columns')
# Select data for nuts3-regions in boundaries (needed for testmode)
df_pop = data_in_boundaries(df_pop)
df_pop.to_sql(cfg['table_names']['population'],
engine,
schema=cfg['schema'],
if_exists='append')
df_hh = pd.DataFrame(data.households_per_size(year=year))

# Select data for nuts3-regions in boundaries (needed for testmode)
df_hh = data_in_boundaries(df_hh)
for hh_size in df_hh.columns:
df = pd.DataFrame(df_hh[hh_size])
df['year'] = year
Expand Down
67 changes: 57 additions & 10 deletions src/egon/data/importing/openstreetmap/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,50 @@
import egon.data.subprocess as subprocess


def download_pbf_file():
"""Download OpenStreetMap `.pbf` file."""
def download_pbf_file(dataset='main'):
"""Download OpenStreetMap `.pbf` file.
Parameters
----------
dataset: str, optional
Toggles between production (`dataset='main'`) and test mode e.g.
(`dataset='Schleswig-Holstein'`).
In production mode, data covering entire Germany
is used. In the test mode a subset of this data is used for testing the
workflow.
When test mode is activated, data for a federal state instead of
Germany gets downloaded.
"""
data_config = egon.data.config.datasets()
osm_config = data_config["openstreetmap"]["original_data"]


if dataset == 'main':
source_url =osm_config["source"]["url"]
target_path = osm_config["target"]["path"]
else:
source_url = osm_config["source"]["url_testmode"]
target_path = osm_config["target"]["path_testmode"]

target_file = os.path.join(
os.path.dirname(__file__), osm_config["target"]["path"]
os.path.dirname(__file__), target_path
)

if not os.path.isfile(target_file):
urlretrieve(osm_config["source"]["url"], target_file)
urlretrieve(source_url, target_file)


def to_postgres(num_processes=4, cache_size=4096):
def to_postgres(dataset='main', num_processes=4, cache_size=4096):
"""Import OSM data from a Geofabrik `.pbf` file into a PostgreSQL database.
Parameters
----------
dataset: str, optional
Toggles between production (`dataset='main'`) and test mode e.g.
(`dataset='Schleswig-Holstein'`).
In production mode, data covering entire Germany
is used. In the test mode a subset of this data is used for testing the
workflow.
num_processes : int, optional
Number of parallel processes used for processing during data import
cache_size: int, optional
Expand All @@ -49,8 +75,14 @@ def to_postgres(num_processes=4, cache_size=4096):
# Get dataset config
data_config = egon.data.config.datasets()
osm_config = data_config["openstreetmap"]["original_data"]

if dataset=='main':
target_path = osm_config["target"]["path"]
else:
target_path = osm_config["target"]["path_testmode"]

input_file = os.path.join(
os.path.dirname(__file__), osm_config["target"]["path"]
os.path.dirname(__file__), target_path
)

# Prepare osm2pgsql command
Expand Down Expand Up @@ -78,12 +110,28 @@ def to_postgres(num_processes=4, cache_size=4096):
)


def add_metadata():
"""Writes metadata JSON string into table comment."""
def add_metadata(dataset='main'):
"""Writes metadata JSON string into table comment.
Parameters
----------
dataset: str, optional
Toggles between production (`testmode=False`) and test mode
(`testmode=False`). In production mode, data covering entire Germany
is used. In the test mode a subset of this data is used for testing the
workflow.
"""
# Prepare variables
osm_config = egon.data.config.datasets()["openstreetmap"]

if dataset=='main':
osm_url = osm_config["original_data"]["source"]["url"]
target_path = osm_config["original_data"]["target"]["path"]
else:
osm_url = osm_config["original_data"]["source"]["url_testmode"]
target_path = osm_config["original_data"]["target"]["path_testmode"]
spatial_and_date = os.path.basename(
osm_config["original_data"]["target"]["path"]
target_path
).split("-")
spatial_extend = spatial_and_date[0]
osm_data_date = (
Expand All @@ -94,7 +142,6 @@ def add_metadata():
+ "-"
+ spatial_and_date[1][4:6]
)
osm_url = osm_config["original_data"]["source"]["url"]

# Insert metadata for each table
licenses = [
Expand Down
20 changes: 19 additions & 1 deletion src/egon/data/importing/vg250.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def download_vg250_files():
urlretrieve(vg250_config["source"]["url"], target_file)


def to_postgres():
def to_postgres(dataset='main'):

# Get information from data configuraiton file
data_config = egon.data.config.datasets()
Expand All @@ -56,6 +56,23 @@ def to_postgres():
f"vg250_ebenen_0101/{filename}"
)

if dataset != 'main':
# read-in borders of federal state Schleswig-Holstein
data_sta = gpd.read_file(
f"zip://{zip_file}!vg250_01-01.geo84.shape.ebenen/"
f"vg250_ebenen_0101/VG250_LAN.shp"
).query(f"GEN == '{dataset}'")
data_sta.BEZ = 'Bundesrepublik'
data_sta.NUTS = 'DE'
# import borders of Schleswig-Holstein as borders of state
if table == 'vg250_sta':
data = data_sta
# choose only areas in Schleswig-Holstein
else:
data = data[data.within(
data_sta.dissolve(by='GEN').geometry.values[0])]


# Set index column and format column headings
data.index.set_names("gid", inplace=True)
data.columns = [x.lower() for x in data.columns]
Expand Down Expand Up @@ -88,6 +105,7 @@ def to_postgres():
)



def add_metadata():
"""Writes metadata JSON string into table comment."""
# Prepare variables
Expand Down
Loading

0 comments on commit 17ede54

Please sign in to comment.