Skip to content

Commit

Permalink
0.14.0 Release (#200)
Browse files Browse the repository at this point in the history
* try getting just stage name, but fall back to str representation of stage; close #197

* version 0.13.3a1 for pipestat 0.6.0a1

* updated to pipestat 0.6.0

* updated requirements

* testing, drop python 3.7

* fix f-string quote issue for python 3.10

* minor refactor to use pipestat properties instead of cfg dict

* update changelog and version number

* update v0.13.3 and changelog

* fix _refresh_stats bug and change version to 0.14.0

* potential fix for #201

* changelog

* v0.14.0a1 prerelease

* report_object -> change message_raw to be a values dict to conform with pipestat output schemas

* self.pipestat_results_file should take priority over self.pipeline_stats_file related to databio/pepatac#257

* make pipestat_results_file = pipeline_stats_file if it is not provided

* set pipeline_stats_file if pipestat_results_file IS provided, remove checking for the first record_identifier during get_stat

* add pipestat_pipeline_type, defaulting to sample

* pipestat req version bump, v0.14.0a2 bump for pre-release

* v0.14.0 release prep

---------

Co-authored-by: Vince Reuter <vince.reuter@gmail.com>
Co-authored-by: Khoroshevskyi <sasha99250@gmail.com>
  • Loading branch information
3 people authored Dec 22, 2023
1 parent 8aaede5 commit 3a8465a
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 77 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run-pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10"]
python-version: ["3.8", "3.10"]
os: [ubuntu-latest]

steps:
Expand Down
12 changes: 12 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## [0.14.0] -- 2023-12-22
### Changed
- refactor for pipestat v0.6.0 release
- drop python 2.7
- updated requirements
- changed message_raw to be a value_dict when reporting to conform to pipestat
- ### Fixed
- fixed #196 and #197
- ### Added
- added `force_overwrite` to `report_result` and `report_object`
- added pipestat_pipeline_type, defaulting to sample-level

## [0.13.2] -- 2023-08-02
### Fixed
- fixed self.new_start overriding checkpoints.
Expand Down
2 changes: 1 addition & 1 deletion pypiper/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.13.2"
__version__ = "0.14.0"
138 changes: 74 additions & 64 deletions pypiper/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def __init__(
pipestat_schema=None,
pipestat_results_file=None,
pipestat_config=None,
pipestat_pipeline_type=None,
pipestat_result_formatter=None,
**kwargs,
):
Expand Down Expand Up @@ -329,32 +330,35 @@ def __init__(
signal.signal(signal.SIGTERM, self._signal_term_handler)

# pipestat setup
self.pipestat_sample_name = pipestat_sample_name or DEFAULT_SAMPLE_NAME
# getattr(self, "sample_name", DEFAULT_SAMPLE_NAME)
self.pipestat_record_identifier = pipestat_sample_name or DEFAULT_SAMPLE_NAME
self.pipestat_pipeline_type = pipestat_pipeline_type or "sample"

# don't force default pipestat_results_file value unless
# pipestat config not provided
if pipestat_config is None and pipestat_results_file is None:
pipestat_results_file = pipeline_filepath(
self, filename="pipestat_results.yaml"
)
self.pipestat_results_file = self.pipeline_stats_file
elif pipestat_results_file:
self.pipestat_results_file = pipestat_results_file
self.pipeline_stats_file = self.pipestat_results_file

def _get_arg(args_dict, arg_name):
"""safely get argument from arg dict -- return None if doesn't exist"""
return None if arg_name not in args_dict else args_dict[arg_name]

self._pipestat_manager = PipestatManager(
sample_name=self.pipestat_sample_name
record_identifier=self.pipestat_record_identifier
or _get_arg(args_dict, "pipestat_sample_name")
or DEFAULT_SAMPLE_NAME,
pipeline_name=self.name,
schema_path=pipestat_schema
or _get_arg(args_dict, "pipestat_schema")
or default_pipestat_output_schema(sys.argv[0]),
results_file_path=self.pipeline_stats_file
or _get_arg(args_dict, "pipestat_results_file"),
results_file_path=self.pipestat_results_file
or _get_arg(args_dict, "pipestat_results_file")
or self.pipeline_stats_file,
config_file=pipestat_config or _get_arg(args_dict, "pipestat_config"),
multi_pipelines=multi,
pipeline_type=self.pipestat_pipeline_type,
)

self.start_pipeline(args, multi)
Expand Down Expand Up @@ -437,7 +441,7 @@ def _completed(self):
:return bool: Whether the managed pipeline is in a completed state.
"""
return (
self.pipestat.get_status(self._pipestat_manager.sample_name)
self.pipestat.get_status(self._pipestat_manager.record_identifier)
== COMPLETE_FLAG
)

Expand All @@ -448,7 +452,10 @@ def _failed(self):
:return bool: Whether the managed pipeline is in a failed state.
"""
return self.pipestat.get_status(self._pipestat_manager.sample_name) == FAIL_FLAG
return (
self.pipestat.get_status(self._pipestat_manager.record_identifier)
== FAIL_FLAG
)

@property
def halted(self):
Expand All @@ -457,7 +464,8 @@ def halted(self):
:return bool: Whether the managed pipeline is in a paused/halted state.
"""
return (
self.pipestat.get_status(self._pipestat_manager.sample_name) == PAUSE_FLAG
self.pipestat.get_status(self._pipestat_manager.record_identifier)
== PAUSE_FLAG
)

@property
Expand Down Expand Up @@ -720,11 +728,12 @@ def start_pipeline(self, args=None, multi=False):
results = self._pipestat_manager.__str__().split("\n")
for i in results:
self.info("* " + i)
self.info("* Sample name: " + self.pipestat_sample_name + "\n")
self.info("* Sample name: " + self.pipestat_record_identifier + "\n")
self.info("\n----------------------------------------\n")
self.status = "running"
self.pipestat.set_status(
sample_name=self._pipestat_manager.sample_name, status_identifier="running"
record_identifier=self._pipestat_manager.record_identifier,
status_identifier="running",
)

# Record the start in PIPE_profile and PIPE_commands output files so we
Expand Down Expand Up @@ -770,7 +779,8 @@ def _set_status_flag(self, status):
prev_status = self.status
self.status = status
self.pipestat.set_status(
sample_name=self._pipestat_manager.sample_name, status_identifier=status
record_identifier=self._pipestat_manager.record_identifier,
status_identifier=status,
)
self.debug("\nChanged status from {} to {}.".format(prev_status, self.status))

Expand All @@ -786,8 +796,8 @@ def _flag_file_path(self, status=None):
"""

flag_file_name = "{}_{}_{}".format(
self._pipestat_manager["_pipeline_name"],
self.pipestat_sample_name,
self._pipestat_manager.pipeline_name,
self.pipestat_record_identifier,
flag_name(status or self.status),
)
return pipeline_filepath(self, filename=flag_file_name)
Expand Down Expand Up @@ -1419,7 +1429,7 @@ def _wait_for_lock(self, lock_file):
)
# self._set_status_flag(WAIT_FLAG)
self.pipestat.set_status(
sample_name=self._pipestat_manager.sample_name,
record_identifier=self._pipestat_manager.record_identifier,
status_identifier="waiting",
)
first_message_flag = True
Expand All @@ -1443,7 +1453,7 @@ def _wait_for_lock(self, lock_file):
self.timestamp("File unlocked.")
# self._set_status_flag(RUN_FLAG)
self.pipestat.set_status(
sample_name=self._pipestat_manager.sample_name,
record_identifier=self._pipestat_manager.record_identifier,
status_identifier="running",
)

Expand Down Expand Up @@ -1582,7 +1592,9 @@ def _report_profile(
with open(self.pipeline_profile_file, "a") as myfile:
myfile.write(message_raw + "\n")

def report_result(self, key, value, nolog=False, result_formatter=None):
def report_result(
self, key, value, nolog=False, result_formatter=None, force_overwrite=False
):
"""
Writes a key:value pair to self.pipeline_stats_file.
Expand All @@ -1592,6 +1604,7 @@ def report_result(self, key, value, nolog=False, result_formatter=None):
logfile. Use sparingly in case you will be printing the result in a
different format.
:param str result_formatter: function for formatting via pipestat backend
:param bool force_overwrite: overwrite results if they already exist?
:return str reported_result: the reported result is returned as a list of formatted strings.
"""
Expand All @@ -1602,13 +1615,19 @@ def report_result(self, key, value, nolog=False, result_formatter=None):

reported_result = self.pipestat.report(
values={key: value},
sample_name=self.pipestat_sample_name,
record_identifier=self.pipestat_record_identifier,
result_formatter=rf,
force_overwrite=force_overwrite,
)

if not nolog:
for r in reported_result:
self.info(r)
if isinstance(
reported_result, bool
): # Pipestat can return False if results are NOT reported.
self.info("Result successfully reported? " + str(reported_result))
else:
for r in reported_result:
self.info(r)

return reported_result

Expand All @@ -1621,6 +1640,7 @@ def report_object(
annotation=None,
nolog=False,
result_formatter=None,
force_overwrite=False,
):
"""
Writes a key:value pair to self.pipeline_stats_file. Note: this function
Expand All @@ -1641,6 +1661,7 @@ def report_object(
logfile. Use sparingly in case you will be printing the result in a
different format.
:param str result_formatter: function for formatting via pipestat backend
:param bool force_overwrite: overwrite results if they already exist?
:return str reported_result: the reported result is returned as a list of formatted strings.
"""
warnings.warn(
Expand All @@ -1659,37 +1680,30 @@ def report_object(
anchor_text = str(key).strip()
# better to use a relative path in this file
# convert any absolute paths into relative paths
relative_filename = (
os.path.relpath(filename, self.outfolder)
if os.path.isabs(filename)
else filename
)

if anchor_image:
relative_anchor_image = (
os.path.relpath(anchor_image, self.outfolder)
if os.path.isabs(anchor_image)
else anchor_image
)
else:
relative_anchor_image = "None"

message_raw = "{filename}\t{anchor_text}\t{anchor_image}\t{annotation}".format(
filename=relative_filename,
anchor_text=anchor_text,
anchor_image=relative_anchor_image,
annotation=annotation,
)

val = {key: message_raw.replace("\t", " ")}
values = {
"path": filename,
"thumbnail_path": anchor_image,
"title": anchor_text,
"annotation": annotation,
}
val = {key: values}

reported_result = self.pipestat.report(
values=val, sample_name=self.pipestat_sample_name, result_formatter=rf
values=val,
record_identifier=self.pipestat_record_identifier,
result_formatter=rf,
force_overwrite=force_overwrite,
)

if not nolog:
for r in reported_result:
self.info(r)
return reported_result
if isinstance(
reported_result, bool
): # Pipestat can return False if results are NOT reported.
self.info("Result successfully reported? " + str(reported_result))
else:
for r in reported_result:
self.info(r)

def _safe_write_to_file(self, file, message):
"""
Expand Down Expand Up @@ -1849,15 +1863,11 @@ def _refresh_stats(self):

if os.path.isfile(self.pipeline_stats_file):
_, data = read_yaml_data(path=self.pipeline_stats_file, what="stats_file")
print(data)
pipeline_key = list(
data[self.pipestat["_pipeline_name"]][self.pipestat["_pipeline_type"]]
)[0]
if self.name == pipeline_key:
for key, value in data[self.pipestat["_pipeline_name"]][
self.pipestat["_pipeline_type"]
][pipeline_key].items():
self.stats_dict[key] = value.strip()

for key, value in data[self._pipestat_manager.pipeline_name][
self._pipestat_manager.pipeline_type
][self._pipestat_manager.record_identifier].items():
self.stats_dict[key] = value

def get_stat(self, key):
"""
Expand Down Expand Up @@ -1989,12 +1999,12 @@ def complete(self):
"""Stop a completely finished pipeline."""
self.stop_pipeline(status=COMPLETE_FLAG)

def fail_pipeline(self, exc, dynamic_recover=False):
def fail_pipeline(self, exc: Exception, dynamic_recover: bool = False):
"""
If the pipeline does not complete, this function will stop the pipeline gracefully.
It sets the status flag to failed and skips the normal success completion procedure.
:param Exception e: Exception to raise.
:param Exception exc: Exception to raise.
:param bool dynamic_recover: Whether to recover e.g. for job termination.
"""
# Take care of any active running subprocess
Expand Down Expand Up @@ -2024,9 +2034,8 @@ def fail_pipeline(self, exc, dynamic_recover=False):
total_time = datetime.timedelta(seconds=self.time_elapsed(self.starttime))
self.info("Total time: " + str(total_time))
self.info("Failure reason: " + str(exc))
# self._set_status_flag(FAIL_FLAG)
self.pipestat.set_status(
sample_name=self._pipestat_manager.sample_name,
record_identifier=self._pipestat_manager.record_identifier,
status_identifier="failed",
)

Expand Down Expand Up @@ -2087,7 +2096,8 @@ def stop_pipeline(self, status=COMPLETE_FLAG):
"""
# self._set_status_flag(status)
self.pipestat.set_status(
sample_name=self._pipestat_manager.sample_name, status_identifier=status
record_identifier=self._pipestat_manager.record_identifier,
status_identifier=status,
)
self._cleanup()
elapsed_time_this_run = str(
Expand Down Expand Up @@ -2457,8 +2467,8 @@ def _cleanup(self, dry_run=False):
for fn in glob.glob(self.outfolder + flag_name("*"))
if COMPLETE_FLAG not in os.path.basename(fn)
and not "{}_{}_{}".format(
self._pipestat_manager["_pipeline_name"],
self.pipestat_sample_name,
self._pipestat_manager.pipeline_name,
self.pipestat_record_identifier,
run_flag,
)
== os.path.basename(fn)
Expand Down
2 changes: 1 addition & 1 deletion pypiper/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def run(self, start_point=None, stop_before=None, stop_after=None):
# between results from different stages.
skip_mode = False

print("Running stage: {}".format(stage))
print(f"Running stage: {getattr(stage, 'name', str(stage))}")

stage.run()
self.executed.append(stage)
Expand Down
14 changes: 8 additions & 6 deletions pypiper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,20 +387,20 @@ def split_by_pipes(cmd):
cmdlist = []
newcmd = str()
for char in cmd:
if char is "{":
if char == "{":
stack_brace.append("{")
elif char is "}":
elif char == "}":
stack_brace.pop()
elif char is "(":
elif char == "(":
stack_paren.append("(")
elif char is ")":
elif char == ")":
stack_paren.pop()

if len(stack_brace) > 0 or len(stack_paren) > 0:
# We are inside a parenthetic of some kind; emit character
# no matter what it is
newcmd += char
elif char is "|":
elif char == "|":
# if it's a pipe, finish the command and start a new one
cmdlist.append(newcmd)
newcmd = str()
Expand Down Expand Up @@ -1110,9 +1110,11 @@ def _add_args(parser, args, required):
return parser


def result_formatter_markdown(pipeline_name, sample_name, res_id, value) -> str:
def result_formatter_markdown(pipeline_name, record_identifier, res_id, value) -> str:
"""
Returns Markdown formatted value as string
# Pipeline_name and record_identifier should be kept because pipestat needs it
"""

message_markdown = "\n> `{key}`\t{value}\t_RES_".format(key=res_id, value=value)
Expand Down
Loading

0 comments on commit 3a8465a

Please sign in to comment.