Skip to content

Commit

Permalink
Merge pull request #221 from cta-observatory/auto_MCP_errors_coin
Browse files Browse the repository at this point in the history
error tracking in merging in automatic MCP processing
  • Loading branch information
jsitarek authored May 16, 2024
2 parents 87423b3 + 2111e95 commit 584b632
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 206 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Module for generating bash script lines for running analysis in different clusters
"""
__all__ = ["slurm_lines"]
__all__ = ["slurm_lines", "rc_lines"]


def slurm_lines(queue, job_name, array=None, mem=None, out_name=None):
Expand Down Expand Up @@ -40,3 +40,29 @@ def slurm_lines(queue, job_name, array=None, mem=None, out_name=None):
"ulimit -a\n\n",
]
return lines


def rc_lines(store, out):
"""
Function for creating the general lines for error tracking.
Parameters
----------
store : str
String what to store in addition to $rc
out : str
Base name for the log files with return codes, all output will go into {out}_return.log, only errors to {out}_failed.log
Returns
-------
list
List of strings to attach to a shell script
"""
lines = [
"rc=$?\n",
'if [ "$rc" -ne "0" ]; then\n',
f" echo {store} $rc >> {out}_failed.log\n",
"fi\n",
f"echo {store} $rc >> {out}_return.log\n",
]
return lines
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,15 @@ def linking_bash_lst(
If real data are matched to pre-processed MCs or not
"""

if (len(LST_runs) == 2) and (len(LST_runs[0]) == 10):
LST = LST_runs

LST_runs = []
LST_runs.append(LST)

if NSB_match:
coincidence_DL1_dir = f"{target_dir}/v{__version__}/{source_name}"

MAGIC_DL1_dir = f"{target_dir}/v{__version__}/{source_name}/DL1"
else:
coincidence_DL1_dir = f"{target_dir}/{source_name}/DL1/Observations"
MAGIC_DL1_dir = f"{target_dir}/{source_name}/DL1/Observations/"
coincidence_DL1_dir = (
f"{target_dir}/v{__version__}/{source_name}/DL1/Observations"
)
MAGIC_DL1_dir = f"{target_dir}/v{__version__}/{source_name}/DL1/Observations/"

dates = [os.path.basename(x) for x in glob.glob(f"{MAGIC_DL1_dir}/Merged/Merged_*")]

Expand Down Expand Up @@ -218,7 +214,7 @@ def main():
configfile_coincidence(telescope_ids, target_dir, source_name, NSB_match)

LST_runs_and_dates = f"{source_name}_LST_runs.txt"
LST_runs = np.genfromtxt(LST_runs_and_dates, dtype=str, delimiter=",")
LST_runs = np.genfromtxt(LST_runs_and_dates, dtype=str, delimiter=",", ndmin=2)

try:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,26 @@ def main():
work_dir = config["directories"]["workspace_dir"]

print(f"Checking progress of jobs stored in {work_dir}")
dirs = sorted(glob.glob(f"{work_dir}/v{args.version}/*/{args.data_level}/*/*"))
dirs = sorted(
glob.glob(f"{work_dir}/v{args.version}/*/{args.data_level}/[0-9]*/[M0-9]*")
+ glob.glob(f"{work_dir}/v{args.version}/*/{args.data_level}/Merged_[0-9]*")
)
if dirs == []:
versions = [x.split("/v")[-1] for x in glob.glob(f"{work_dir}/v*")]
print("Error, no directories found")
print(f"for path {work_dir} found in {args.config_file} this is available")
print(f"Versions {versions}")
tag = "" if NSB_matching else "/Observations"
print(f"Supported data types: DL1{tag}/M1, DL1{tag}/M2")
print(f"Supported data types: DL1{tag}/M1, DL1{tag}/M2, DL1{tag}/Merged")
exit(1)

all_todo = 0
all_return = 0
all_good = 0
all_cpu = []
all_mem = []
total_time = 0
all_jobs = []
for dir in dirs:
print(dir)
# fixme list_dl0.txt is only available for DL1/M[12] processing
Expand All @@ -124,30 +129,52 @@ def main():
returns = fp.readlines()
this_return = len(returns)
for line in returns:
file_in, slurm_id, task_id, rc = line.split()
line = line.split()
file_in = line[0]
slurm_id = f"{line[1]}_{line[2]}" if len(line) == 4 else line[1]
rc = line[-1]
if rc == "0":
this_good += 1
# now check accounting
if not args.no_accounting:
out = run_shell(
f'sacct --format="JobID,CPUTime,MaxRSS" --units=M -j {slurm_id}_{task_id}| tail -1'
)
_, cpu, mem = out.split()
hh, mm, ss = (int(x) for x in str(cpu).split(":"))
delta = timedelta(
days=hh // 24, hours=hh % 24, minutes=mm, seconds=ss
)
this_cpu.append(delta)
this_mem.append(float(mem[0:-1]))
f'sacct --format="JobID,CPUTime,MaxRSS" --units=M -j {slurm_id}| tail -1'
).split()
if len(out) == 3:
_, cpu, mem = out
elif (
len(out) == 2
): # MaxRSS sometimes is missing in the output
cpu = out[1]
mem = None
print("Memory usage information is missing")
else:
print("Unexpected sacct output: {out}")
if cpu is not None:
hh, mm, ss = (int(x) for x in str(cpu).split(":"))
delta = timedelta(
days=hh // 24, hours=hh % 24, minutes=mm, seconds=ss
)
if slurm_id not in all_jobs:
total_time += delta.total_seconds() / 3600
all_jobs += [slurm_id]
this_cpu.append(delta)
if mem is not None:
this_mem.append(float(mem[0:-1]))
else:
print(f"file {file_in} failed with error {rc}")
if len(this_cpu) > 0:
all_cpu += this_cpu
all_mem += this_mem
this_cpu = np.array(this_cpu)
this_mem = np.array(this_mem)
mem_info = (
f"memory [M]: median={np.median(this_mem)}, max={this_mem.max()}"
if len(this_mem)
else ""
)
print(
f"CPU: median={np.median(this_cpu)}, max={this_cpu.max()}; memory [M]: median={np.median(this_mem)}, max={this_mem.max()}"
f"CPU: median={np.median(this_cpu)}, max={this_cpu.max()}; {mem_info}"
)

except IOError:
Expand Down Expand Up @@ -186,7 +213,7 @@ def main():
all_cpu = np.array(all_cpu)
all_mem = np.array(all_mem)
print(
f"CPU: median={np.median(all_cpu)}, max={all_cpu.max()}; memory [M]: median={np.median(all_mem)}, max={all_mem.max()}"
f"CPU: median={np.median(all_cpu)}, max={all_cpu.max()}, total={total_time:.2f} CPU hrs; memory [M]: median={np.median(all_mem)}, max={all_mem.max()}"
)


Expand Down
Loading

0 comments on commit 584b632

Please sign in to comment.