Skip to content

Cli/pvlive #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/open_data_pvnet/configs/met_office_uk_data_config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
general:
description: Config for accessing Met Office UK Deterministic NWP data
provider: "met_office"
name: PVNet current (Met Office UK)
destination_platform: "huggingface"
destination_dataset_id: "openclimatefix/met-office-uk-deterministic-solar"
Expand Down
10 changes: 10 additions & 0 deletions src/open_data_pvnet/configs/pvlive_data_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
general:
description: Config for accessing PV-Live data
provider: "pv_live"
name: pvlive
destination_platform: "huggingface"
destination_dataset_id: "Ali-ws/ocf-pvlive"


input_data:
local_data: "tmp/data/pvlive"
7 changes: 7 additions & 0 deletions src/open_data_pvnet/scripts/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from open_data_pvnet.nwp.met_office import process_met_office_data
from open_data_pvnet.nwp.gfs import process_gfs_data
from open_data_pvnet.nwp.dwd import process_dwd_data
from open_data_pvnet.scripts.collect_pvlive_data import process_pvlive_data

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,5 +62,11 @@ def handle_archive(
f"Processing DWD data for {year}-{month:02d}-{day:02d} at hour {hour:02d} with overwrite={overwrite}"
)
process_dwd_data(year, month, day, hour, overwrite=overwrite)
elif provider == "pvlive":
logger.info(
f"Processing PVLive data for {year}-{month:02d}-{day:02d} with overwrite={overwrite}"
)
process_pvlive_data(year, month, day, hour, region, overwrite=overwrite, archive_type=archive_type)

else:
raise NotImplementedError(f"Provider {provider} not yet implemented")
88 changes: 71 additions & 17 deletions src/open_data_pvnet/scripts/collect_pvlive_data.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,86 @@
import pandas as pd
import logging
from datetime import datetime
from fetch_pvlive_data import PVLiveData
from open_data_pvnet.utils.env_loader import PROJECT_BASE
from pathlib import Path
from datetime import datetime, timedelta
from open_data_pvnet.scripts.fetch_pvlive_data import PVLiveData
from open_data_pvnet.utils.config_loader import load_config
from open_data_pvnet.utils.data_uploader import upload_to_huggingface
from open_data_pvnet.utils.data_converters import convert_nc_to_zarr
import pytz
import xarray as xr
import numpy as np
import os
import calendar
import shutil

logger = logging.getLogger(__name__)

pv = PVLiveData()
# Define the configuration paths for UK and Global
CONFIG_PATHS = {
"uk": PROJECT_BASE / "src/open_data_pvnet/configs/pvlive_data_config.yaml",
}

start = datetime(2020, 1, 1, 0, 0, 0, 0, tzinfo=pytz.UTC)
end = datetime(2025, 1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
def collect_pvlive_data(
year: int,
month: int,
overwrite: bool = False,
):
config_path = CONFIG_PATHS["uk"]
config = load_config(config_path)
logger.info(f"Loaded configuration from {config_path}")

data = pv.get_data_between(start=start, end=end, extra_fields="capacity_mwp")
df = pd.DataFrame(data)
local_path = PROJECT_BASE / config["input_data"]["local_data"] / f"target_data_{year}_{month:02d}.nc"

logger.info(f"Downloading PVlive data to {local_path}")
print(f"Downloading PVlive data to {local_path}")

df["datetime_gmt"] = pd.to_datetime(df["datetime_gmt"], utc=True)
df["datetime_gmt"] = df["datetime_gmt"].dt.tz_convert(None)
pv = PVLiveData()

ds = xr.Dataset.from_dataframe(df)
start = datetime(year, month, 1, 0, 0, 0, tzinfo=pytz.utc)
end = datetime(year, month, calendar.monthrange(year, month)[1], 23, 59, 59, tzinfo=pytz.utc)

ds["datetime_gmt"] = ds["datetime_gmt"].astype(np.datetime64)
df = pv.get_data_between(start=start, end=end, period=30, extra_fields="capacity_mwp,installedcapacity_mwp")
df = pd.DataFrame(df).reset_index()

local_path = os.path.join(os.path.dirname(__file__), "..", "data", "target_data.nc")
df["datetime_gmt"] = pd.to_datetime(df["datetime_gmt"], utc=True)
df["datetime_gmt"] = df["datetime_gmt"].dt.tz_convert(None)

ds = df.set_index(["gsp_id", "datetime_gmt"]).to_xarray()
ds = ds.transpose("gsp_id", "datetime_gmt")

os.makedirs(os.path.dirname(local_path), exist_ok=True)
ds.to_netcdf(local_path)
if not overwrite and os.path.exists(local_path):
logger.info(f"File {local_path} already exists and overwrite is set to False.")
else:
os.makedirs(os.path.dirname(local_path), exist_ok=True)
ds.to_netcdf(local_path)
logger.info(f"PVLive data stored successfully in {local_path}")

logger.info(f"Data successfully stored in {local_path}")
return local_path

def process_pvlive_data(
year: int,
month: int,
overwrite: bool = False,
):
local_path = collect_pvlive_data(year, month, overwrite)
if not local_path:
logger.error(f"Failed to collect PVlive data for {year}-{month:02d}.")
return

local_path = Path(local_path)

local_path = local_path.parent
output_dir = local_path / "zarr" / f"target_data_{year}_{month:02d}"
convert_nc_to_zarr(local_path, output_dir, overwrite)

upload_to_huggingface(config_path=CONFIG_PATHS["uk"], folder_name=output_dir.name, year=year, month=month, overwrite=overwrite)

# Remove the local files
os.remove(local_path / f"target_data_{year}_{month:02d}.nc")
## remove contents in the folder as well

shutil.rmtree(local_path)
logger.info(f"PVlive data for {year}-{month:02d} uploaded successfully.")


if __name__ == "__main__":
process_pvlive_data(2023, 1, overwrite=True)
47 changes: 44 additions & 3 deletions src/open_data_pvnet/scripts/fetch_pvlive_data.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from pvlive_api import PVLive
import logging

from datetime import datetime
import pytz
import pandas as pd

logger = logging.getLogger(__name__)


class PVLiveData:
def __init__(self):
self.pvl = PVLive()
Expand All @@ -14,6 +15,8 @@ def get_latest_data(self, period, entity_type="gsp", entity_id=0, extra_fields="
Get the latest data from PVlive
"""
try:
if entity_id == 0 and entity_type == "gsp":
return self._get_latest_data_for_all_gsps(period, extra_fields)
df = self.pvl.latest(
entity_type=entity_type,
entity_id=entity_id,
Expand All @@ -26,14 +29,17 @@ def get_latest_data(self, period, entity_type="gsp", entity_id=0, extra_fields="
logger.error(e)
return None

def get_data_between(self, start, end, entity_type="gsp", entity_id=0, extra_fields=""):
def get_data_between(self, start, end, period, entity_type="gsp", entity_id=0, extra_fields=""):
"""
Get the data between two dates
"""
try:
if entity_id == 0 and entity_type == "gsp":
return self._get_data_between_for_all_gsps(start, end, period, extra_fields)
df = self.pvl.between(
start=start,
end=end,
period=period,
entity_type=entity_type,
entity_id=entity_id,
extra_fields=extra_fields,
Expand All @@ -56,3 +62,38 @@ def get_data_at_time(self, dt):
except Exception as e:
logger.error(e)
return None

def _get_latest_data_for_all_gsps(self, period, extra_fields):
data = None
for gsp_id in self.pvl.gsp_ids:
data_ = self.pvl.latest(
entity_type="gsp",
entity_id=gsp_id,
extra_fields=extra_fields,
period=period,
dataframe=True,
)
if data is None:
data = data_
else:
data = pd.concat((data, data_), ignore_index=True)
return data

def _get_data_between_for_all_gsps(self, start, end, period, extra_fields):
data = None
for gsp_id in self.pvl.gsp_ids:
data_ = self.pvl.between(
period=period,
start=start,
end=end,
entity_type="gsp",
entity_id=gsp_id,
extra_fields=extra_fields,
dataframe=True,
)
if data is None:
data = data_
else:
data = pd.concat((data, data_), ignore_index=True)
return data

27 changes: 18 additions & 9 deletions src/open_data_pvnet/utils/data_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
def _validate_config(config):
"""Validate configuration and return required values."""
repo_id = config.get("general", {}).get("destination_dataset_id")
provider = config.get("general", {}).get("provider")
if not repo_id:
raise ValueError("No destination_dataset_id found in the configuration file.")

local_output_dir = config["input_data"]["nwp"]["met_office"]["local_output_dir"]
if provider =="met_office":
local_output_dir = config["input_data"]["nwp"]["met_office"]["local_output_dir"]
elif provider == "pv_live":
local_output_dir = config["input_data"]["local_data"]
zarr_base_path = Path(local_output_dir) / "zarr"
return repo_id, zarr_base_path


def _validate_token():
"""Validate Hugging Face token and return API instance."""
hf_token = os.getenv("HUGGINGFACE_TOKEN")
Expand Down Expand Up @@ -90,7 +92,7 @@ def _upload_archive(
overwrite: bool,
year: int,
month: int,
day: int,
day: int = None,
):
"""
Upload an archive file to the Hugging Face repository in the data/year/month/day structure.
Expand All @@ -103,10 +105,14 @@ def _upload_archive(
overwrite (bool): Whether to overwrite existing files.
year (int): Year for folder structure.
month (int): Month for folder structure.
day (int): Day for folder structure.
day (int, optional): Day for folder structure. If not provided, only year/month will be used.
"""
# Create the path structure: data/year/month/day/archive_name
target_path = f"data/{year:04d}/{month:02d}/{day:02d}/{archive_path.name}"
if day is not None:
target_path = f"data/{year:04d}/{month:02d}/{day:02d}/{archive_path.name}"
else:
target_path = f"data/{year:04d}/{month:02d}/{archive_path.name}"

logger.info(f"Uploading archive {archive_path} to {repo_id}:{target_path}")

try:
Expand Down Expand Up @@ -196,7 +202,7 @@ def upload_to_huggingface(
folder_name: str,
year: int,
month: int,
day: int,
day: int = None,
overwrite: bool = False,
archive_type: str = "zarr.zip",
):
Expand All @@ -208,7 +214,7 @@ def upload_to_huggingface(
folder_name (str): Name of the folder to upload (e.g., '2022-12-01-00').
year (int): Year for folder structure.
month (int): Month for folder structure.
day (int): Day for folder structure.
day (int, optional): Day for folder structure. If not provided, only year/month will be used.
overwrite (bool): Whether to overwrite existing files in the repository.
archive_type (str): Type of archive to create ("zarr.zip" or "tar").

Expand Down Expand Up @@ -238,7 +244,10 @@ def upload_to_huggingface(
archive_path = create_tar_archive(folder_path, archive_name, overwrite=overwrite)

# Upload archive with year/month/day structure
_upload_archive(hf_api, archive_path, repo_id, hf_token, overwrite, year, month, day)
if day is not None:
_upload_archive(hf_api, archive_path, repo_id, hf_token, overwrite, year, month, day)
else:
_upload_archive(hf_api, archive_path, repo_id, hf_token, overwrite, year, month)

logger.info(f"Upload to Hugging Face completed: {repo_id}")

Expand Down
Loading