Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions cwmscli/commands/commands_cwms.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ def shefcritimport(filename, office, api_root, api_key, api_key_loc):
@api_root_option
@api_key_option
@click.option(
"-l",
"--location",
"--input-keys",
"input_keys",
default="all",
show_default=True,
help='Location ID. Use "-p=all" for all locations.',
help='Input keys. Defaults to all keys/files with --input-keys=all. These are the keys under "input_files" in a given config file. This option lets you run a single file from a config that contains multiple files. Example: --input-keys=file1',
)
@click.option(
"-lb",
Expand All @@ -76,15 +76,6 @@ def shefcritimport(filename, office, api_root, api_key, api_key_loc):
help="Override CSV file (else use config)",
)
@click.option("--log", show_default=True, help="Path to the log file.")
@click.option(
"-dp",
"--data-path",
"data_path",
default=".",
show_default=True,
type=click.Path(exists=True, file_okay=False),
help="Directory where csv files are stored",
)
@click.option("--dry-run", is_flag=True, help="Log only (no HTTP calls)")
@click.option("--begin", type=str, help="YYYY-MM-DDTHH:MM (local to --tz)")
@click.option("-tz", "--timezone", "tz", default="GMT", show_default=True)
Expand Down
68 changes: 38 additions & 30 deletions cwmscli/commands/csv2cwms/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,44 @@ To View the Help: `cwms-cli csv2cwms --help`

Usage: cwms-cli csv2cwms [OPTIONS]

Store CSV TimeSeries data to CWMS using a config file
Store CSV TimeSeries data to CWMS using a config file

Options:
-o, --office TEXT Office to grab data for [required]
-a, --api_root TEXT Api Root for CDA. Can be user defined or placed
in a env variable CDA_API_ROOT [required]
-k, --api_key TEXT api key for CDA. Can be user defined or place in
env variable CDA_API_KEY. one of api_key or
api_key_loc are required
-l, --location TEXT Location ID. Use "-p=all" for all locations.
[default: all]
-lb, --lookback INTEGER Lookback period in HOURS [default: 120]
-v, --verbose Verbose logging
-c, --config PATH Path to JSON config file [required]
[default: all]
-lb, --lookback INTEGER Lookback period in HOURS [default: 120]
-v, --verbose Verbose logging
[default: all]
[default: all]
-lb, --lookback INTEGER Lookback period in HOURS [default: 120]
-v, --verbose Verbose logging
-c, --config PATH Path to JSON config file [required]
-df, --data-file TEXT Override CSV file (else use config)
--log TEXT Path to the log file.
-dp, --data-path DIRECTORY Directory where csv files are stored [default:
.]
--dry-run Log only (no HTTP calls)
--begin TEXT YYYY-MM-DDTHH:MM (local to --tz)
-tz, --timezone TEXT [default: GMT]
--ignore-ssl-errors Ignore TLS errors (testing only)
--version Show the version and exit.
--help Show this message and exit.
-o, --office TEXT Office to grab data for [required]
-a, --api_root TEXT Api Root for CDA. Can be user defined or placed
in a env variable CDA_API_ROOT [required]
-k, --api_key TEXT api key for CDA. Can be user defined or place in
env variable CDA_API_KEY. one of api_key or
api_key_loc are required
-l, --location TEXT Location ID. Use "-p=all" for all locations.
[default: all]
-lb, --lookback INTEGER Lookback period in HOURS [default: 120]
-v, --verbose Verbose logging
-c, --config PATH Path to JSON config file [required]
[default: all]
-lb, --lookback INTEGER Lookback period in HOURS [default: 120]
-v, --verbose Verbose logging
[default: all]
[default: all]
-lb, --lookback INTEGER Lookback period in HOURS [default: 120]
-v, --verbose Verbose logging
-c, --config PATH Path to JSON config file [required]
-df, --data-file TEXT Override CSV file (else use config)
--log TEXT Path to the log file.
-dp, --data-path DIRECTORY Directory where csv files are stored [default:
.]
--dry-run Log only (no HTTP calls)
--begin TEXT YYYY-MM-DDTHH:MM (local to --tz)
-tz, --timezone TEXT [default: GMT]
--ignore-ssl-errors Ignore TLS errors (testing only)
--version Show the version and exit.
--help Show this message and exit.

## Features

- Allow for specifying one or more date formats that might be seen per input csv file
- Allow mathematical operations across multiple columns and storing into one timeseries
- Store one column of data with a user-specified precision and units to a timeseries identifier
- Dry runs to test what data might look like prior to database storage
- Verbose logging via the -v flag
- Colored terminal output for user readability
108 changes: 54 additions & 54 deletions cwmscli/commands/csv2cwms/__main__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Script Entry File
import json
import os
import sys
import time
Expand Down Expand Up @@ -51,47 +50,40 @@
API_KEY = os.getenv("CDA_API_KEY")
OFFICE = os.getenv("CDA_OFFICE", "SWT")
HOST = os.getenv("CDA_HOST")
LOOKBACK_DAYS = int(os.getenv("CDA_LOOKBACK_DAYS", 5)) # Default to 5 days if not set

if [API_KEY, OFFICE, HOST].count(None) > 0:
raise ValueError(
"Environment variables CDA_API_KEY, CDA_OFFICE, and CDA_HOST must be set."
)


def parse_file(file_path, begin_time, lookback, timezone="GMT"):
def parse_file(file_path, begin_time, date_format, timezone="GMT"):
csv_data = load_csv(file_path)
header = csv_data[0]
data = csv_data[1:]
ts_data = {}
lookback_datetime = begin_time - timedelta(hours=lookback)
logger.debug(f"Begin time: {begin_time}")
logger.debug(f"Lookback datetime: {lookback_datetime}")
for row in data:
# Skip empty rows or rows without a timestamp
if not row:
continue
row_datetime = parse_date(row[0], tz_str=timezone)
# Skip rows that are before/older than the lookback period and after the begin time
logger.debug(f"Row datetime: {row_datetime}")
if row_datetime < lookback_datetime or row_datetime > begin_time:
continue
row_datetime = parse_date(row[0], tz_str=timezone, date_format=date_format)
# Guarantee only one entry per timestamp
ts_data[int(row_datetime.timestamp())] = row
return {"header": header, "data": ts_data}


def load_timeseries(file_data, project, config):
def load_timeseries(file_data, file_key, config):
header = file_data.get("header", [])
data = file_data.get("data", {})

if not header or not data:
raise ValueError(
"No data found in the CSV file for the range selected: check the --lookback period and/or --begin time. You will also want to ensure you set the timezone of the CSV file with --tz America/Chicago or similar."
"No data found in the CSV file for the range selected. Please ensure you set the timezone of the CSV file with --tz America/Chicago or similar."
)

ts_config = config["projects"][project]["timeseries"]
project_ts = []
ts_config = config["input_files"][file_key]["timeseries"]
file_ts = []

# Interval in seconds
interval = config.get("interval")
Expand Down Expand Up @@ -138,9 +130,9 @@ def load_timeseries(file_data, project, config):
logger.debug(
f"Timeseries {name} data range: {colorize(datetime.fromtimestamp(start_epoch), 'blue')} to {colorize(datetime.fromtimestamp(end_epoch), 'blue')}"
)
project_ts.append(ts_obj)
file_ts.append(ts_obj)

return project_ts
return file_ts


def config_check(config):
Expand All @@ -149,20 +141,25 @@ def config_check(config):
logger.warning(
"Configuration file does not contain an 'interval' key (and value in seconds), this is recommended per CSV file to avoid ambiguity."
)
if not config.get("projects"):
raise ValueError("Configuration file must contain a 'projects' key.")
for proj, proj_data in config.get("projects").items():
# Only check the specified project or if all projects are specified
if proj != "all" and proj != proj.lower():
if config.get("projects"):
logger.warning(
"Configuration file contains a 'projects' key, this has been renamed to 'input_files' for clarity. Continuing for backwards compatibility."
)
config["input_files"] = config.pop("projects")
if not config.get("input_files"):
raise ValueError("Configuration file must contain an 'input_files' key.")
for file_key, file_data in config.get("input_files").items():
# Only check the specified keys or if all keys are specified
if file_key != "all" and file_key != file_key.lower():
continue
if not proj_data.get("timeseries"):
if not file_data.get("timeseries"):
raise ValueError(
f"Configuration file must contain a 'timeseries' key for project '{proj}'."
f"Configuration file must contain a 'timeseries' key for file '{file_key}'."
)
for ts_name, ts_data in proj_data.get("timeseries").items():
for ts_name, ts_data in file_data.get("timeseries").items():
if not ts_data.get("columns"):
raise ValueError(
f"Configuration file must contain a 'columns' key for timeseries '{ts_name}' in project '{proj}'."
f"Configuration file must contain a 'columns' key for timeseries '{ts_name}' in file '{file_key}'."
)


Expand Down Expand Up @@ -190,71 +187,74 @@ def main(*args, **kwargs):
setup_logger(kwargs.get("log"), verbose=kwargs.get("verbose"))
logger.info(f"Begin time: {begin_time}")
logger.debug(f"Timezone: {tz}")
logger.debug(f"Lookback period: {kwargs.get("lookback")} hours")
# Override environment variables if provided in CLI
if kwargs.get("coop"):
HOST = os.getenv("CDA_COOP_HOST")
if not HOST:
raise ValueError(
"Environment variable CDA_COOP_HOST must be set to use --coop flag."
)
config = read_config(kwargs.get("config_path"))
config_path = kwargs.get("config_path")
config = read_config(config_path)
config_check(config)
PROJECTS = config.get("projects")
# Override projects if one is specified in CLI
if kwargs.get("project"):
if kwargs.get("project") == "all":
PROJECTS = config.get("projects", {}).keys()
INPUT_FILES = config.get("input_files", {})
# Override file names if one is specified in CLI
if kwargs.get("input_keys"):
if kwargs.get("input_keys") == "all":
INPUT_FILES = config.get("input_files", {}).keys()
else:
PROJECTS = [kwargs.get("project")]
if not PROJECTS:
raise ValueError("Configuration file must contain a 'projects' key.")
logger.info(f"Started for {','.join(PROJECTS)} projects.")
INPUT_FILES = kwargs.get("input_keys").split(",")
logger.info(f"Started for {','.join(INPUT_FILES)} input files.")
# Input checks
# if kwargs.get("project") != "all" and kwargs.get("project") not in PROJECTS:
# if kwargs.get("file_name") != "all" and kwargs.get("file_name") not in INPUT_FILES:
# raise ValueError(
# f"Invalid project name '{kwargs.get("project")}'. Valid options are: {', '.join(PROJECTS)}"
# f"Invalid file name '{kwargs.get("file_name")}'. Valid options are: {', '.join(INPUT_FILES)}"
# )
if kwargs.get("lookback") < 0:
raise ValueError("Lookback period must be a non-negative integer.")

# Loop the projects and post the data
for proj in PROJECTS:
HYDRO_DIR = config.get("projects", {}).get(proj, {}).get("dir", "")

# Check if the user wants to override the data file name from what is in the config
DATA_FILE = kwargs.get("data_file") or config.get("projects", {}).get(
proj, {}
).get("file", "")
# Loop the file names and post the data
for file_name in INPUT_FILES:
# Grab the csv file path from the config
CONFIG_ITEM = config.get("input_files", {}).get(file_name, {})
DATA_FILE = CONFIG_ITEM.get("data_path", "")
if not DATA_FILE:
logger.warning(
f"No data file specified for project '{proj}'. {colorize(f'Skipping {proj}', 'red')}. Please provide a valid CSV file path using --data_file or ensure the 'file' key is set in the config."
# TODO: List URL to example in doc site once available
f"No data file specified for input-keys '{file_name}' in {config_path}. {colorize(f'Skipping {file_name}', 'red')}. Please provide a valid CSV file path by ensuring the 'data_path' key is set in the config."
)
continue
csv_data = parse_file(
os.path.join(kwargs.get("data_path"), HYDRO_DIR, DATA_FILE),
DATA_FILE,
begin_time,
kwargs.get("lookback"),
CONFIG_ITEM.get("date_format"),
kwargs.get("tz"),
)
ts_min_data = load_timeseries(csv_data, proj, config)
try:
ts_min_data = load_timeseries(csv_data, file_name, config)
except ValueError as e:
logger.error(f"Error loading timeseries for {file_name}: {e}")
continue

if kwargs.get("dry_run"):
logger.info("DRY RUN enabled. No data will be posted")
for ts_object in ts_min_data:
try:
ts_object.update({"office-id": kwargs.get("office")})
logger.info(
"Store Rule: " + CONFIG_ITEM.get("store_rule", "")
if CONFIG_ITEM.get("store_rule", "")
else f"No Store Rule specified, will default to REPLACE_ALL in {config_path}."
)
if kwargs.get("dry_run"):
logger.info(f"DRY RUN: {ts_object}")
else:
cwms.store_timeseries(
data=ts_object,
store_rule=kwargs.get("store_rule", "REPLACE_ALL"),
store_rule=CONFIG_ITEM.get("store_rule", "REPLACE_ALL"),
)
logger.info(f"Stored {ts_object['name']} values")
except Exception as e:
logger.error(
f"Error posting data for {proj}: {e}\n{traceback.format_exc()}"
f"Error posting data for {file_name}: {e}\n{traceback.format_exc()}"
)

logger.debug(f"\tExecution time: {round(time.time() - start_time, 3)} seconds.")
Expand Down
19 changes: 19 additions & 0 deletions cwmscli/commands/csv2cwms/examples/complete_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"interval": 3600,
"input_files": {
"BROK": {
"data_path": "cwmscli/commands/csv2cwms/tests/data/sample_brok.csv",
"date_format": [
"%m/%d/%Y %H:%M:%S",
"%m/%d/%Y %H:%M"
],
"timeseries": {
"BROK.Elev.Inst.15Minutes.0.Rev-SCADA-cda": {
"columns": "Headwater",
"units": "ft",
"precision": 2
}
}
}
}
}
8 changes: 7 additions & 1 deletion cwmscli/commands/csv2cwms/tests/data/sample_config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
{
"interval": null,
"projects": {
"input_files": {
"BROK": {
"data_path": "cwmscli/commands/csv2cwms/tests/data/sample_brok.csv",
"store_rule": "REPLACE_ALL",
"date_format": [
"%m/%d/%Y %H:%M:%S",
"%m/%d/%Y %H:%M"
],
"timeseries": {
"BROK.Elev.Inst.15Minutes.0.Rev-SCADA-cda": {
"columns": "Headwater",
Expand Down
3 changes: 2 additions & 1 deletion cwmscli/commands/csv2cwms/tests/test_dateutils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from datetime import datetime, timedelta

import pytest
from utils.dateutils import determine_interval, parse_date, safe_zoneinfo

from ..utils.dateutils import determine_interval, parse_date, safe_zoneinfo


def test_parse_date_valid_formats():
Expand Down
3 changes: 2 additions & 1 deletion cwmscli/commands/csv2cwms/tests/test_expressions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
from utils.expression import eval_expression

from ..utils.expression import eval_expression


@pytest.mark.parametrize(
Expand Down
7 changes: 4 additions & 3 deletions cwmscli/commands/csv2cwms/tests/test_fileio.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os

import pytest
from utils.fileio import load_csv, read_config

from ..utils.fileio import load_csv, read_config


def test_load_csv_valid():
Expand Down Expand Up @@ -31,8 +32,8 @@ def test_read_config_valid():
path = os.path.join(os.path.dirname(__file__), "data", "sample_config.json")
config = read_config(path)
assert isinstance(config, dict)
assert "projects" in config
assert "BROK" in config["projects"]
assert "input_files" in config
assert "BROK" in config["input_files"]


def test_read_config_invalid_json(tmp_path):
Expand Down
Loading
Loading