Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
ac27606
add cas04 test outline for merged runs
kkappler Jun 6, 2022
23d0529
Add handling for estimator_engine
kkappler Jun 7, 2022
c918726
minor changes
kkappler Jun 11, 2022
01e632f
added placeholder for clock zero
kkappler Jun 11, 2022
2d80928
change xr.diff to xr.differentiate in prewhitening
kkappler Jun 11, 2022
918dd5e
First Implementation of clock-zero
kkappler Jun 12, 2022
20ae2f3
added a clock-zero test to parkfield
kkappler Jun 12, 2022
45c5ee2
added another test to pkd for clock_zero_type= to satisfy codecov
kkappler Jun 12, 2022
7badc8e
uncomment normal pkd tests
kkappler Jun 12, 2022
ee45943
add some doc strings
kkappler Jun 12, 2022
86510b7
rename main to test to see if codecov will stop complaining
kkappler Jun 12, 2022
81a13c0
Merge pull request #185 from simpeg/fix_issue_42
kkappler Jun 12, 2022
11df046
Fix issue 178
kkappler Jun 12, 2022
58961aa
added a test for estimate_per_channel in config
kkappler Jun 12, 2022
d3227b8
Replace matrix multiplication with einsum
kkappler Jun 12, 2022
48ab653
Merge pull request #186 from simpeg/fix_issue_78
kkappler Jun 13, 2022
e332774
Debugging operate_aurora
kkappler Jun 18, 2022
8ba79c3
Address handling of multiple stations in mth5
kkappler Jun 19, 2022
4621282
Update Nomenclature for TF Kernek Dataset
kkappler Jun 19, 2022
8f9019e
add method to look at channel summary while working on test
kkappler Jun 19, 2022
d5d4406
remove tab
kkappler Jun 19, 2022
e97b8a5
suppress inf/nan in stft obj
kkappler Jun 19, 2022
5df2271
Factor method for run_summary from dataset.py, into tf_kernel/helpers.py
kkappler Jun 19, 2022
fea9213
cleanup doc (a bit)
kkappler Jun 22, 2022
8f92815
KernelDataset Introduced
kkappler Jun 24, 2022
ef5ca41
oops - add file
kkappler Jun 24, 2022
4e3ac6c
bug fix for python 3.8 only
kkappler Jun 24, 2022
2fdf720
move run_summary wrangling into KernelDataset
kkappler Jun 24, 2022
6609f56
towards fixing operate_aurora to use kernel_dataset
kkappler Jun 25, 2022
caae96e
update operate aurora to use KernelDataset
kkappler Jun 25, 2022
e362296
Multiple runs now entered into TF XML
kkappler Jun 25, 2022
5bc935c
Merge pull request #187 from simpeg/fix_issue_181
kkappler Jun 25, 2022
cef7738
remove base, add doc to dataset
kkappler Jun 25, 2022
9c805cc
rm unused imports
kkappler Jun 25, 2022
3cf4879
modify test to use KernelDataset
kkappler Jun 25, 2022
bf95472
remove call to extract_run_summaries_from_mth5s, replace with RunSummary
kkappler Jun 25, 2022
3575381
merge run_summary helpers into run_summary module
kkappler Jun 25, 2022
ddfe5ea
tidy doc (a little)
kkappler Jun 25, 2022
3d4250a
move tf_kernel/dataset.py to transfer_function/kernel_dataset.py
kkappler Jun 25, 2022
db97e7e
move run_summary from tf_kernel to pipelines
kkappler Jun 25, 2022
6bf76e2
factor issue out of synthetic tests to its own test
kkappler Jun 25, 2022
f324c52
Major cleanup of synthetic data make method
kkappler Jun 25, 2022
fc3f1f3
added multirun test to synthetic
kkappler Jun 26, 2022
80bb4a7
remove duplicate defintion of filters
kkappler Jun 26, 2022
f42b66d
replace reference_station_id by remote_station_id so consistent langu…
kkappler Jun 26, 2022
a84a169
Fix duration bug & change sortby columns
kkappler Jun 26, 2022
da8e175
restrict_run_intervals_to_simultaneous
kkappler Jun 26, 2022
9671db8
bug fix
kkappler Jun 27, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
python-version: [3.8, 3.7, 3.6]
python-version: [3.8,]# 3.7, 3.6]

steps:
- uses: actions/checkout@v2
Expand Down
19 changes: 13 additions & 6 deletions aurora/config/config_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def __init__(self, **kwargs):

def create_run_processing_object(
self, station_id=None, run_id=None, mth5_path=None, sample_rate=1,
input_channels=["hx", "hy"], output_channels=["hz", "ex", "ey"],
input_channels=["hx", "hy"], output_channels=["hz", "ex", "ey"],
estimator=None,
emtf_band_file=BANDS_DEFAULT_FILE, **kwargs):
"""
Create a default processing object
Expand Down Expand Up @@ -60,11 +61,17 @@ def create_run_processing_object(
else:
d = 4
sr = sample_rate / (d ** int(key))
processing_obj.decimations_dict[key].decimation.factor = d
processing_obj.decimations_dict[key].decimation.sample_rate = sr
processing_obj.decimations_dict[key].input_channels = input_channels
processing_obj.decimations_dict[key].output_channels = output_channels

decimation_obj = processing_obj.decimations_dict[key]
decimation_obj.decimation.factor = d
decimation_obj.decimation.sample_rate = sr
decimation_obj.input_channels = input_channels
decimation_obj.output_channels = output_channels
#set estimator if provided as kwarg
if estimator:
try:
decimation_obj.estimator.engine = estimator["engine"]
except KeyError:
pass
return processing_obj

def to_json(self, processing_obj, path=None, nested=True, required=False):
Expand Down
13 changes: 12 additions & 1 deletion aurora/config/metadata/decimation_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,18 @@ def frequency_bands_obj(self):
self.window.num_samples)
return frequency_bands



@property
def windowing_scheme(self):
from aurora.time_series.windowing_scheme import WindowingScheme
windowing_scheme = WindowingScheme(
taper_family=self.window.type,
num_samples_window=self.window.num_samples,
num_samples_overlap=self.window.overlap,
taper_additional_args=self.window.additional_args,
sample_rate=self.decimation.sample_rate,
)
return windowing_scheme

# def to_stft_config_dict(self):
# """
Expand Down
22 changes: 21 additions & 1 deletion aurora/config/metadata/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,24 @@ def num_decimation_levels(self):
def drop_reference_channels(self):
for decimation in self.decimations:
decimation.reference_channels = []
return
return

def validate(self):
"""
Placeholder. Some of the checks and methods here maybe better placed in
TFKernel, which would validate the dataset against the processing config.

The reason the validator is being created is that the default estimation
engine from the json file is "RME_RR", which is fine (we expect to in general
do more RR processing than SS) but if there is only one station (no remote)
then the RME_RR should be replaced by default with "RME".

Returns
-------

"""
# Make sure a RR method is not being called for a SS config
if not self.stations.remote:
for decimation in self.decimations:
if decimation.estimator.engine == "RME_RR":
decimation.estimator.engine = "RME"
25 changes: 25 additions & 0 deletions aurora/config/metadata/standards/window.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,30 @@
"alias": [],
"example": "hamming",
"default": "boxcar"
},
"clock_zero_type": {
"type": "string",
"required": true,
"units": null,
"style": "controlled vocabulary",
"description": "how the clock-zero is specified",
"options": [
"user specified",
"data start",
"ignore"],
"alias": [],
"example": "user specified",
"default": "ignore"
},
"clock_zero": {
"type": "string",
"required": false,
"units": null,
"style": "time",
"description": "Start date and time of the first data window",
"options": [],
"alias": [],
"example": "2020-02-01T09:23:45.453670+00:00",
"default": "1980-01-01T00:00:00+00:00"
}
}
2 changes: 1 addition & 1 deletion aurora/config/metadata/stations.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def from_dataset_dataframe(self, df):

station = df[df.remote==False].station_id.unique()[0]
rr_stations = df[df.remote==True].station_id.unique()

self.local.from_dataset_dataframe(df[df.station_id==station])

for rr_station in rr_stations:
Expand Down
102 changes: 50 additions & 52 deletions aurora/pipelines/process_mth5.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,9 @@ def make_stft_objects(processing_config, i_dec_level, run_obj, run_xrts, units,
"""
Operates on a "per-run" basis

Note 1: CHECK DATA COVERAGE IS THE SAME IN BOTH LOCAL AND RR
This should be pushed into a previous validator before pipeline starts
# if config.reference_station_id:
# local_run_xrts = local_run_xrts.where(local_run_xrts.time <=
# remote_run_xrts.time[-1]).dropna(
# dim="time")

This method could be modifed in a multiple station code so that it doesn't care
if the station is "local" or "remote" but rather uses scale factors keyed by
station_id

Parameters
----------
Expand All @@ -138,18 +134,15 @@ def make_stft_objects(processing_config, i_dec_level, run_obj, run_xrts, units,
"""
stft_config = processing_config.get_decimation_level(i_dec_level)
stft_obj = run_ts_to_stft(stft_config, run_xrts)

print("fix this so that it gets from config based on station_id, without caring "
"if local or remote")
# stft_obj = run_ts_to_stft_scipy(stft_config, run_xrts)
run_id = run_obj.metadata.id
if station_id==processing_config.stations.local.id:
scale_factors = processing_config.stations.local.run_dict[
run_id].channel_scale_factors
#Need to add logic here to look through list of remote ids
elif station_id==processing_config.stations.remote[0].id:
scale_factors = processing_config.stations.remote[0].run_dict[
run_id].channel_scale_factors
# local_stft_obj = run_ts_to_stft_scipy(config, local_run_xrts)

stft_obj = calibrate_stft_obj(
stft_obj,
run_obj,
Expand Down Expand Up @@ -204,9 +197,17 @@ def export_tf(tf_collection, station_metadata_dict={}, survey_dict={}):
This method may wind up being embedded in the TF class
Assign transfer_function, residual_covariance, inverse_signal_power, station, survey

Parameters
----------
tf_collection: aurora.transfer_function.transfer_function_collection
.TransferFunctionCollection
station_metadata_dict: dict
survey_dict: dict

Returns
-------

tf_cls: mt_metadata.transfer_functions.core.TF
Transfer function container
"""
merged_tf_dict = tf_collection.get_merged_dict()
tf_cls = TF()
Expand All @@ -233,7 +234,7 @@ def export_tf(tf_collection, station_metadata_dict={}, survey_dict={}):

def populate_dataset_df(i_dec_level, config, dataset_df):
"""
Move this into a method of DatasetDefinition, self.populate_with_data()
Move this into a method of TFKDataset, self.populate_with_data()

Notes:
1. When iterating over dataframe, (i)ndex must run from 0 to len(df), otherwise
Expand Down Expand Up @@ -313,24 +314,29 @@ def close_mths_objs(df):

def process_mth5(
config,
dataset_definition=None,
tfk_dataset=None,
units="MT",
show_plot=False,
z_file_path=None,
return_collection=True,
):
"""
1. Read in the config and figure out how many decimation levels there are
2. ToDo: Based on the run durations, and sampling rates, determined which runs
2. ToDo TFK: Based on the run durations, and sampling rates, determined which runs
are valid for which decimation levels, or for which effective sample rates. This
action should be taken before we get here. The dataset_definition should already
action should be taken before we get here. The tfk_dataset should already
be trimmed to exactly what will be processed.
3. ToDo TFK Check that data coverage is the same in both local and RR data
# if config.remote_station_id:
# local_run_xrts = local_run_xrts.where(local_run_xrts.time <=
# remote_run_xrts.time[-1]).dropna(
# dim="time")

Parameters
----------
config: aurora.config.metadata.processing.Processing or path to json
All processing parameters
dataset_definition: aurora.tf_kernel.dataset.DatasetDefinition or None
tfk_dataset: aurora.tf_kernel.dataset.Dataset or None
Specifies what datasets to process according to config
units: string
"MT" or "SI". To be deprecated once data have units embedded
Expand All @@ -348,13 +354,16 @@ def process_mth5(
"""

processing_config, mth5_objs = initialize_pipeline(config)
dataset_df = dataset_definition.df
dataset_df = tfk_dataset.df

# Here is where any checks that would be done by TF Kernel would be applied
#see notes labelled with ToDo TFK above

#Assign additional columns to dataset_df, populate with mth5_objs
all_mth5_objs = len(dataset_df) * [None]
mth5_obj_column = len(dataset_df) * [None]
for i, station_id in enumerate(dataset_df["station_id"]):
all_mth5_objs[i] = mth5_objs[station_id]
dataset_df["mth5_obj"] = all_mth5_objs
mth5_obj_column[i] = mth5_objs[station_id]
dataset_df["mth5_obj"] = mth5_obj_column
dataset_df["run"] = None
dataset_df["run_dataarray"] = None
dataset_df["stft"] = None
Expand All @@ -369,15 +378,22 @@ def process_mth5(
dataset_df = populate_dataset_df(i_dec_level, dec_level_config, dataset_df)
#ANY MERGING OF RUNS IN TIME DOMAIN WOULD GO HERE

#TFK 1: get clock-zero from data if needed
if dec_level_config.window.clock_zero_type == "data start":
dec_level_config.window.clock_zero = str(dataset_df.start.min())

# Apply STFT to all runs
local_stfts = []
remote_stfts = []
for i,row in dataset_df.iterrows():
run_xrts = row["run_dataarray"].to_dataset("channel")
run_obj = row["run"]
station_id = row.station_id
stft_obj = make_stft_objects(processing_config, i_dec_level, run_obj,
run_xrts, units, station_id)
stft_obj = make_stft_objects(processing_config,
i_dec_level,
run_obj,
run_xrts,
units,
row.station_id)

if row.station_id == processing_config.stations.local.id:
local_stfts.append(stft_obj)
Expand All @@ -390,7 +406,7 @@ def process_mth5(
# Could mute bad FCs here - Not implemented yet.
# RETURN FC_OBJECT

if processing_config.stations.remote:#reference_station_id:
if processing_config.stations.remote:
remote_merged_stft_obj = xr.concat(remote_stfts, "time")
else:
remote_merged_stft_obj = None
Expand Down Expand Up @@ -422,39 +438,21 @@ def process_mth5(
close_mths_objs(dataset_df)
return tf_collection
else:
# intended to be the default in future

#See ISSUE #181: Uncomment this once we have a mature multi-run test
# #dataset_definition.get_station_metadata_for_tf_archive()
# #get a list of local runs:
# cond1 = dataset_df["station_id"]==processing_config.stations.local.id
# sub_df = dataset_df[cond1]
# #sanity check:
# run_ids = sub_df.run_id.unique()
# assert(len(run_ids) == len(sub_df))
# # iterate over these runs, packing metadata into
# station_metadata = None
# for i,row in sub_df.iterrows():
# local_run_obj = row.run
# if station_metadata is None:
# station_metadata = local_run_obj.station_group.metadata
# station_metadata._runs = []
# run_metadata = local_run_obj.metadata
# station_metadata.add_run(run_metadata)

station_metadata = local_run_obj.station_group.metadata
station_metadata._runs = []
run_metadata = local_run_obj.metadata
station_metadata.add_run(run_metadata)
# intended to be the default in future (return tf_cls, not tf_collection)

local_station_id = processing_config.stations.local.id
station_metadata = tfk_dataset.get_station_metadata(local_station_id)

# Need to create an issue for this as well
if len(mth5_objs) == 1:
key = list(mth5_objs.keys())[0]
survey_dict = mth5_objs[key].survey_group.metadata.to_dict()
else:
print("We do not currently handle multiple mth5 objs for "
print("WARNING We do not currently handle multiple mth5 objs for "
"non-tf_collection output")
raise Exception
key = list(mth5_objs.keys())[0]
survey_dict = mth5_objs[key].survey_group.metadata.to_dict()
#raise Exception

print(station_metadata.run_list)
tf_cls = export_tf(
Expand Down
Loading