Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
python-version: [3.8, 3.7, 3.6]
python-version: [3.8,]# 3.7, 3.6]

steps:
- uses: actions/checkout@v2
Expand Down
24 changes: 24 additions & 0 deletions aurora/general_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,30 @@
MT_METADATA_DATA = Path(mt_metadata_init).parent.parent.joinpath("data")


def count_lines(file_name):
"""
acts like wc -l in unix,
raise FileNotFoundError: if file_name does not exist.

Parameters
----------
file_name: str or pathlib.Path
The file to apply line counting to

Returns
-------
num_lines: int
Number of lines present in fileName or -1 if file does not exist

"""
i = -1
with open(file_name) as f:
for i, l in enumerate(f):
pass
num_lines = i + 1
return num_lines


def execute_subprocess(cmd, **kwargs):
"""

Expand Down
22 changes: 15 additions & 7 deletions aurora/pipelines/process_mth5.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
extracting the metadata at the start of this method. It would be a good idea in
general to run a pre-check on the data that identifies which decimation levels are
valid for each run. (see Issue #182)

"""
# =============================================================================
# Imports
Expand Down Expand Up @@ -285,6 +286,7 @@ def populate_dataset_df(i_dec_level, config, dataset_df):
config.decimation.sample_rate,
start=fix_time(row.start),
end=fix_time(row.end),
survey=row.survey,
)
dataset_df["run"].at[i] = run_dict["run"]
# see Note 2 in this function doc notes
Expand All @@ -298,22 +300,20 @@ def populate_dataset_df(i_dec_level, config, dataset_df):
run_xrts = row["run_dataarray"].to_dataset("channel")
input_dict = {"run": row["run"], "mvts": run_xrts}
run_dict = prototype_decimate(config.decimation, input_dict)
dataset_df["run"].loc[i] = run_dict["run"]
dataset_df["run_dataarray"].loc[i] = run_dict["mvts"].to_array("channel")
dataset_df["run"].at[i] = run_dict["run"]
dataset_df["run_dataarray"].at[i] = run_dict["mvts"].to_array("channel")

return dataset_df


def close_mths_objs(df):
"""
Loop over all unique mth5_objs in the df and make sure they are closed

Parameters
----------
df: pd.DataFrame


Returns
-------
usually this is the dataframe associated with an instance of KernelDataset

"""
mth5_objs = df["mth5_obj"].unique()
Expand Down Expand Up @@ -453,11 +453,19 @@ def process_mth5(
station_metadata = tfk_dataset.get_station_metadata(local_station_id)

# https://github.com/kujaku11/mt_metadata/issues/90 (Do we need if/else here?)
#
# Also, assuming mth5 file versions are either 0.1.0 or 0.2.0, and not yet
# looking at mixe versions -- although that could happen. That is something
# to check earlier, like when we populate data dataset_df
if len(mth5_objs) == 1:
key = list(mth5_objs.keys())[0]
survey_dict = mth5_objs[key].survey_group.metadata.to_dict()
if mth5_objs[key].file_version == "0.1.0":
survey_dict = mth5_objs[key].survey_group.metadata.to_dict()
elif mth5_objs[key].file_version == "0.2.0":
survey_dict = mth5_objs[key].surveys_group.metadata.to_dict()
else:
print("WARN: Need test for multiple mth5 objs for non-tf_collection output")
print("WARN: Also need to add handling of 0.1.0 vs 0.2.0 mth5 file_version")
key = list(mth5_objs.keys())[0]
survey_dict = mth5_objs[key].survey_group.metadata.to_dict()
# raise Exception
Expand Down
27 changes: 20 additions & 7 deletions aurora/pipelines/run_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ class RunSummary:
"""

def __init__(self, **kwargs):
self.columns = ["station_id", "run_id", "start", "end"]
self.column_dtypes = [str, str, pd.Timestamp, pd.Timestamp]
self._input_dict = kwargs.get("input_dict", None)
self.df = kwargs.get("df", None)
self._mini_summary_columns = ["survey", "station_id", "run_id", "start", "end"]

def clone(self):
return copy.deepcopy(self)
Expand All @@ -81,6 +81,14 @@ def from_mth5s(self, mth5_list):
run_summary_df = extract_run_summaries_from_mth5s(mth5_list)
self.df = run_summary_df

@property
def mini_summary(self):
return self.df[self._mini_summary_columns]

@property
def print_mini_summary(self):
print(self.mini_summary)

def add_duration(self, df=None):
"""

Expand Down Expand Up @@ -172,8 +180,10 @@ def channel_summary_to_run_summary(
ch_summary_df = ch_summary.to_dataframe()
elif isinstance(ch_summary, pd.DataFrame):
ch_summary_df = ch_summary
grouper = ch_summary_df.groupby(["station", "run"])
group_by_columns = ["survey", "station", "run"]
grouper = ch_summary_df.groupby(group_by_columns)
n_station_runs = len(grouper)
survey_ids = n_station_runs * [None]
station_ids = n_station_runs * [None]
run_ids = n_station_runs * [None]
start_times = n_station_runs * [None]
Expand All @@ -183,11 +193,13 @@ def channel_summary_to_run_summary(
output_channels = n_station_runs * [None]
channel_scale_factors = n_station_runs * [None]
i = 0
for (station_id, run_id), group in grouper:
# print(f"{i} {station_id} {run_id}")
# print(group)
station_ids[i] = station_id
run_ids[i] = run_id
for group_values, group in grouper:
group_info = dict(zip(group_by_columns, group_values)) # handy for debug
# for k, v in group_info.items():
# print(f"{k} = {v}")
survey_ids[i] = group_info["survey"]
station_ids[i] = group_info["station"]
run_ids[i] = group_info["run"]
start_times[i] = group.start.iloc[0]
end_times[i] = group.end.iloc[0]
sample_rates[i] = group.sample_rate.iloc[0]
Expand All @@ -199,6 +211,7 @@ def channel_summary_to_run_summary(
i += 1

data_dict = {}
data_dict["survey"] = survey_ids
data_dict["station_id"] = station_ids
data_dict["run_id"] = run_ids
data_dict["start"] = start_times
Expand Down
10 changes: 8 additions & 2 deletions aurora/pipelines/time_series_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,13 @@ def calibrate_stft_obj(stft_obj, run_obj, units="MT", channel_scale_factors=None


def get_run_run_ts_from_mth5(
mth5_obj, station_id, run_id, expected_sample_rate, start=None, end=None
mth5_obj,
station_id,
run_id,
expected_sample_rate,
start=None,
end=None,
survey=None,
):
"""
ToDo: Review if this method should be moved into mth5.
Expand Down Expand Up @@ -307,7 +313,7 @@ def get_run_run_ts_from_mth5(
"mvts" maps to xarray.core.dataset.Dataset

"""
run_obj = mth5_obj.get_run(station_id, run_id)
run_obj = mth5_obj.get_run(station_id, run_id, survey=survey)
run_ts = run_obj.to_runts(start=start, end=end)
validate_sample_rate(run_ts, expected_sample_rate)
run_run_ts = {"run": run_obj, "mvts": run_ts.dataset}
Expand Down
55 changes: 36 additions & 19 deletions aurora/test_utils/synthetic/make_mth5_from_asc.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ def create_run_ts_from_synthetic_run(run, df):


def create_mth5_synthetic_file(
station_cfgs,
mth5_path,
plot=False,
add_nan_values=False,
station_cfgs, mth5_path, plot=False, add_nan_values=False, file_version="0.1.0"
):
"""

Expand All @@ -111,11 +108,16 @@ def create_mth5_synthetic_file(
mth5_path = Path(mth5_path.__str__().replace(".h5", "_nan.h5"))

# open output h5
m = MTH5(file_version="0.1.0")
m = MTH5(file_version=file_version)
m.open_mth5(mth5_path, mode="w")
# survey = Survey()
if file_version == "0.2.0":
survey_id = "EMTF Synthetic"
m.add_survey(survey_id)
else:
survey_id = None

for station_cfg in station_cfgs:
station_group = m.add_station(station_cfg.id)
station_group = m.add_station(station_cfg.id, survey=survey_id)
for run in station_cfg.runs:

# read in data
Expand Down Expand Up @@ -147,13 +149,20 @@ def create_mth5_synthetic_file(
# add filters
active_filters = make_filters(as_list=True)
for fltr in active_filters:
m.filters_group.add_filter(fltr)
if file_version == "0.1.0":
m.filters_group.add_filter(fltr)
elif file_version == "0.2.0":
survey = m.get_survey(survey_id)
survey.filters_group.add_filter(fltr)
else:
print(f"unexpected file_version = {file_version}")
raise NotImplementedError

m.close_mth5()
return mth5_path


def create_test1_h5():
def create_test1_h5(file_version="0.1.0"):
station_01_params = make_station_01()
mth5_path = station_01_params.mth5_path # DATA_PATH.joinpath("test1.h5")
mth5_path = create_mth5_synthetic_file(
Expand All @@ -162,11 +171,12 @@ def create_test1_h5():
],
mth5_path,
plot=False,
file_version=file_version,
)
return mth5_path


def create_test2_h5():
def create_test2_h5(file_version="0.1.0"):
station_02_params = make_station_02()
mth5_path = station_02_params.mth5_path
mth5_path = create_mth5_synthetic_file(
Expand All @@ -175,11 +185,12 @@ def create_test2_h5():
],
mth5_path,
plot=False,
file_version=file_version,
)
return mth5_path


def create_test1_h5_with_nan():
def create_test1_h5_with_nan(file_version="0.1.0"):
station_01_params = make_station_01()
mth5_path = station_01_params.mth5_path # DATA_PATH.joinpath("test1.h5")
mth5_path = create_mth5_synthetic_file(
Expand All @@ -189,36 +200,42 @@ def create_test1_h5_with_nan():
mth5_path,
plot=False,
add_nan_values=True,
file_version=file_version,
)
return mth5_path


def create_test12rr_h5():
def create_test12rr_h5(file_version="0.1.0"):
station_01_params = make_station_01()
station_02_params = make_station_02()
station_params = [station_01_params, station_02_params]
mth5_path = station_01_params.mth5_path.__str__().replace("test1.h5", "test12rr.h5")
mth5_path = create_mth5_synthetic_file(station_params, mth5_path)
mth5_path = create_mth5_synthetic_file(
station_params, mth5_path, file_version=file_version
)
return mth5_path


def create_test3_h5():
def create_test3_h5(file_version="0.1.0"):
station_params = make_station_03()
mth5_path = create_mth5_synthetic_file(
[
station_params,
],
station_params.mth5_path,
file_version=file_version,
)
return mth5_path


def main():
create_test1_h5()
create_test1_h5_with_nan()
create_test2_h5()
create_test12rr_h5()
create_test3_h5()
file_version = "0.1.0"
# file_version="0.2.0"
create_test1_h5(file_version=file_version)
create_test1_h5_with_nan(file_version=file_version)
create_test2_h5(file_version=file_version)
create_test12rr_h5(file_version=file_version)
create_test3_h5(file_version=file_version)


if __name__ == "__main__":
Expand Down
10 changes: 8 additions & 2 deletions aurora/test_utils/synthetic/processing_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from aurora.pipelines.process_mth5 import process_mth5


def process_sythetic_data(processing_config, tfk_dataset, units="MT", z_file_path=""):
def process_sythetic_data(
processing_config, tfk_dataset, units="MT", z_file_path="", return_collection=True
):
"""

Parameters
Expand Down Expand Up @@ -37,6 +39,10 @@ class that has a df that describes the runs to be processed.
raise Exception

tf_collection = process_mth5(
config, tfk_dataset, units=units, z_file_path=z_file_path
config,
tfk_dataset,
units=units,
z_file_path=z_file_path,
return_collection=return_collection,
)
return tf_collection
Loading