From cee1cef3bc1cf3e013231ff9c0b39af71e5fb31e Mon Sep 17 00:00:00 2001 From: Julian Sitarek Date: Wed, 15 May 2024 11:46:34 +0000 Subject: [PATCH 1/6] refactoring of the code to avoid duplication of code for M1 and M2 --- .../semi_automatic_scripts/merging_runs.py | 149 +++++++----------- 1 file changed, 61 insertions(+), 88 deletions(-) diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py index 604b2e5b..4ca77211 100644 --- a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py @@ -16,16 +16,16 @@ Workingdir/DL1/MC/PARTICLE/Merged Usage: -$ python merging_runs_and_splitting_training_samples.py (-c config.yaml) +$ merging_runs (-c config.yaml) If you want to merge only the MAGIC or only the MC data, you can do as follows: Only MAGIC: -$ python merging_runs_and_splitting_training_samples.py --analysis-type onlyMAGIC (-c config.yaml) +$ merging_runs --analysis-type onlyMAGIC (-c config.yaml) Only MC: -$ python merging_runs_and_splitting_training_samples.py --analysis-type onlyMC (-c config.yaml) +$ merging_runs --analysis-type onlyMC (-c config.yaml) """ import argparse @@ -142,48 +142,46 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): """ process_name = f"merging_{source}" + + MAGIC_DL1_dir = f"{target_dir}/v{__version__}/{source}/DL1/" if not NSB_match: - MAGIC_DL1_dir = f"{target_dir}/{source}/DL1/Observations" + MAGIC_DL1_dir += "Observations/" - with open(f"{source}_Merge_MAGIC_{identification}.sh", "w") as f: - lines = slurm_lines( - queue="short", - job_name=process_name, - out_name=f"{MAGIC_DL1_dir}/Merged/slurm-%x.%j", - ) - f.writelines(lines) + lines = slurm_lines( + queue="short", + job_name=process_name, + mem="2g", + out_name=f"{MAGIC_DL1_dir}/Merged/slurm-%x.%j", + ) + with open(f"{source}_Merge_MAGIC_{identification}.sh", "w") as f: + f.writelines(lines) + if not NSB_match: if identification == "0_subruns": if os.path.exists(f"{MAGIC_DL1_dir}/M1") & os.path.exists( f"{MAGIC_DL1_dir}/M2" ): for i in MAGIC_runs: - os.makedirs( - f"{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}" + f"{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}", exist_ok=True ) # Creating a merged directory for the respective run - os.system( - f'find {MAGIC_DL1_dir}/M1/{i[0]}/{i[1]} -type f -name "*.h5" -size -3k -delete' - ) - f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M1/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} >{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/merge_M1_{i[0]}_{i[1]}_" - + "${SLURM_JOB_ID}.log\n" - ) - - os.system( - f'find {MAGIC_DL1_dir}/M2/{i[0]}/{i[1]} -type f -name "*.h5" -size -3k -delete' - ) - f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M2/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} >{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/merge_M2_{i[0]}_{i[1]}_" - + "${SLURM_JOB_ID}.log\n" - ) + for magic in [1, 2]: + os.system( + f'find {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} -type f -name "dl1_M{magic}.Run*.h5" -size -3k -delete' + ) + f.write( + f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} >{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/merge_M{magic}_{i[0]}_{i[1]}_" + + "${SLURM_JOB_ID}.log\n" + ) elif identification == "1_M1M2": if os.path.exists(f"{MAGIC_DL1_dir}/M1") & os.path.exists( f"{MAGIC_DL1_dir}/M2" ): for i in MAGIC_runs: - os.makedirs(f"{MAGIC_DL1_dir}/Merged/{i[0]}/Merged") + os.makedirs( + f"{MAGIC_DL1_dir}/Merged/{i[0]}/Merged", exist_ok=True + ) f.write( f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/Merged --run-wise >{MAGIC_DL1_dir}/Merged/{i[0]}/Merged/merge_{i[0]}_{[1]}_" + "${SLURM_JOB_ID}.log\n" @@ -191,75 +189,43 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): else: for i in MAGIC_runs: os.makedirs( - f"{MAGIC_DL1_dir}/Merged/Merged_{i[0]}" + f"{MAGIC_DL1_dir}/Merged/Merged_{i[0]}", exist_ok=True ) # Creating a merged directory for each night f.write( f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i[0]}/Merged --output-dir {MAGIC_DL1_dir}/Merged/Merged_{i[0]} >{MAGIC_DL1_dir}/Merged/Merged_{i[0]}/merge_night_{i[0]}_" + "${SLURM_JOB_ID}.log \n" ) - else: - - process_name = f"merging_{source}" - - MAGIC_DL1_dir = f"{target_dir}/v{__version__}/{source}/DL1/" - - lines = slurm_lines( - queue="short", - job_name=process_name, - mem="2g", - out_name=f"{MAGIC_DL1_dir}/Merged/slurm-%x.%j", - ) - with open(f"{source}_Merge_MAGIC_{identification}.sh", "w") as f: - f.writelines(lines) + else: if identification == "0_subruns": - if os.path.exists(f"{MAGIC_DL1_dir}/M1") & os.path.exists( f"{MAGIC_DL1_dir}/M2" ): - dates = [ - os.path.basename(x) for x in glob.glob(f"{MAGIC_DL1_dir}/M1/*") - ] - for i in dates: - runs = [ + for magic in [1, 2]: + dates = [ os.path.basename(x) - for x in glob.glob(f"{MAGIC_DL1_dir}/M1/{i}/*") + for x in glob.glob(f"{MAGIC_DL1_dir}/M{magic}/*") ] - for r in runs: - - os.makedirs( - f"{MAGIC_DL1_dir}/Merged/{i}/{r}/logs" - ) # Creating a merged directory for the respective run - os.system( - f'find {MAGIC_DL1_dir}/M1/{i}/{r} -type f -name "*.h5" -size -3k -delete' - ) - f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M1/{i}/{r} --output-dir {MAGIC_DL1_dir}/Merged/{i}/{r} >{MAGIC_DL1_dir}/Merged/{i}/{r}/logs/merge_M1_{i}_{r}_" - + "${SLURM_JOB_ID}.log \n" - ) - - dates = [ - os.path.basename(x) for x in glob.glob(f"{MAGIC_DL1_dir}/M2/*") - ] - - for i in dates: - runs = [ - os.path.basename(x) - for x in glob.glob(f"{MAGIC_DL1_dir}/M2/{i}/*") - ] + for i in dates: + runs = [ + os.path.basename(x) + for x in glob.glob(f"{MAGIC_DL1_dir}/M{2}/{i}/*") + ] + + for r in runs: + os.makedirs( + f"{MAGIC_DL1_dir}/Merged/{i}/{r}/logs", + exist_ok=True, + ) # Creating a merged directory for the respective run + os.system( + f'find {MAGIC_DL1_dir}/M{magic}/{i}/{r} -type f -name "dl1_M{magic}.Run*.h5" -size -3k -delete' + ) + f.write( + f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M{magic}/{i}/{r} --output-dir {MAGIC_DL1_dir}/Merged/{i}/{r} >{MAGIC_DL1_dir}/Merged/{i}/{r}/logs/merge_M{magic}_{i}_{r}_" + + "${SLURM_JOB_ID}.log \n" + ) - for r in runs: - os.makedirs( - f"{MAGIC_DL1_dir}/Merged/{i}/{r}/logs" - ) # Creating a merged directory for the respective run - os.system( - f'find {MAGIC_DL1_dir}/M2/{i}/{r} -type f -name "*.h5" -size -3k -delete' - ) - f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M2/{i}/{r} --output-dir {MAGIC_DL1_dir}/Merged/{i}/{r} >{MAGIC_DL1_dir}/Merged/{i}/{r}/logs/merge_M2_{i}_{r}_" - + "${SLURM_JOB_ID}.log \n" - ) elif identification == "1_M1M2": if os.path.exists(f"{MAGIC_DL1_dir}/M1") & os.path.exists( f"{MAGIC_DL1_dir}/M2" @@ -276,7 +242,10 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): if (len(glob.glob(f"{MAGIC_DL1_dir}/M1/{i}/{r}")) > 0) and ( len(glob.glob(f"{MAGIC_DL1_dir}/M2/{i}/{r}")) > 0 ): - os.makedirs(f"{MAGIC_DL1_dir}/Merged/{i}/Merged/logs") + os.makedirs( + f"{MAGIC_DL1_dir}/Merged/{i}/Merged/logs", + exist_ok=True, + ) f.write( f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i}/{r} --output-dir {MAGIC_DL1_dir}/Merged/{i}/Merged --run-wise >{MAGIC_DL1_dir}/Merged/{i}/Merged/logs/merge_{i}_{r}_" + "${SLURM_JOB_ID}.log \n" @@ -296,7 +265,9 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): > 0 ): - os.makedirs(f"{MAGIC_DL1_dir}/Merged/Merged_{i}/logs") + os.makedirs( + f"{MAGIC_DL1_dir}/Merged/Merged_{i}/logs", exist_ok=True + ) f.write( f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i}/Merged --output-dir {MAGIC_DL1_dir}/Merged/Merged_{i} >{MAGIC_DL1_dir}/Merged/Merged_{i}/logs/merge_night_{i}_" + "${SLURM_JOB_ID}.log \n" @@ -325,7 +296,7 @@ def mergeMC(target_dir, identification, env_name, cwd, source_name): process_name = f"merging_{source_name}" MC_DL1_dir = f"{target_dir}/{source_name}/DL1/MC" - os.makedirs(f"{MC_DL1_dir}/{identification}/Merged") + os.makedirs(f"{MC_DL1_dir}/{identification}/Merged", exist_ok=True) if identification == "protons": list_of_nodes = np.sort(glob.glob(f"{MC_DL1_dir}/{identification}/train/node*")) @@ -408,7 +379,9 @@ def main(): for source_name in source_list: # Below we run the analysis on the MC data MAGIC_runs_and_dates = f"{source_name}_MAGIC_runs.txt" - MAGIC_runs = np.genfromtxt(MAGIC_runs_and_dates, dtype=str, delimiter=",") + MAGIC_runs = np.genfromtxt( + MAGIC_runs_and_dates, dtype=str, delimiter=",", ndmin=2 + ) if not NSB_match: if (args.analysis_type == "onlyMC") or ( args.analysis_type == "doEverything" From adae025d4a29b5feb9462e6ed7b9a4244dc7c8ff Mon Sep 17 00:00:00 2001 From: Julian Sitarek Date: Wed, 15 May 2024 14:55:00 +0000 Subject: [PATCH 2/6] refactoring for the case of NSB_match set to true --- .../semi_automatic_scripts/merging_runs.py | 116 ++++++++---------- 1 file changed, 49 insertions(+), 67 deletions(-) diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py index 4ca77211..734d4a35 100644 --- a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py @@ -170,7 +170,7 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): f'find {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} -type f -name "dl1_M{magic}.Run*.h5" -size -3k -delete' ) f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} >{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/merge_M{magic}_{i[0]}_{i[1]}_" + f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} >{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/logs/merge_M{magic}_{i[0]}_{i[1]}_" + "${SLURM_JOB_ID}.log\n" ) @@ -198,80 +198,62 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): else: if identification == "0_subruns": - if os.path.exists(f"{MAGIC_DL1_dir}/M1") & os.path.exists( - f"{MAGIC_DL1_dir}/M2" - ): - for magic in [1, 2]: - dates = [ - os.path.basename(x) - for x in glob.glob(f"{MAGIC_DL1_dir}/M{magic}/*") - ] - - for i in dates: - runs = [ - os.path.basename(x) - for x in glob.glob(f"{MAGIC_DL1_dir}/M{2}/{i}/*") - ] - - for r in runs: - os.makedirs( - f"{MAGIC_DL1_dir}/Merged/{i}/{r}/logs", - exist_ok=True, - ) # Creating a merged directory for the respective run - os.system( - f'find {MAGIC_DL1_dir}/M{magic}/{i}/{r} -type f -name "dl1_M{magic}.Run*.h5" -size -3k -delete' - ) - f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M{magic}/{i}/{r} --output-dir {MAGIC_DL1_dir}/Merged/{i}/{r} >{MAGIC_DL1_dir}/Merged/{i}/{r}/logs/merge_M{magic}_{i}_{r}_" - + "${SLURM_JOB_ID}.log \n" - ) + for magic in [1, 2]: + for i in MAGIC_runs: + # Here is a difference w.r.t. original code. If only one telescope data are available they will be merged now for this telescope + if os.path.exists(f"{MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]}"): + os.makedirs( + f"{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/logs", + exist_ok=True, + ) # Creating a merged directory for the respective run + os.system( + f'find {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} -type f -name "dl1_M{magic}.Run*.h5" -size -3k -delete' + ) + f.write( + f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} >{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/logs/merge_M{magic}_{i[0]}_{i[1]}_" + + "${SLURM_JOB_ID}.log\n" + ) + else: + print( + f"ERROR: {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} does not exist" + ) elif identification == "1_M1M2": - if os.path.exists(f"{MAGIC_DL1_dir}/M1") & os.path.exists( - f"{MAGIC_DL1_dir}/M2" - ): - dates = [ - os.path.basename(x) for x in glob.glob(f"{MAGIC_DL1_dir}/M1/*") - ] - for i in dates: - runs = [ - os.path.basename(x) - for x in glob.glob(f"{MAGIC_DL1_dir}/M2/{i}/*") - ] - for r in runs: - if (len(glob.glob(f"{MAGIC_DL1_dir}/M1/{i}/{r}")) > 0) and ( - len(glob.glob(f"{MAGIC_DL1_dir}/M2/{i}/{r}")) > 0 - ): - os.makedirs( - f"{MAGIC_DL1_dir}/Merged/{i}/Merged/logs", - exist_ok=True, - ) - f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i}/{r} --output-dir {MAGIC_DL1_dir}/Merged/{i}/Merged --run-wise >{MAGIC_DL1_dir}/Merged/{i}/Merged/logs/merge_{i}_{r}_" - + "${SLURM_JOB_ID}.log \n" - ) + for i in MAGIC_runs: + if os.path.exists( + f"{MAGIC_DL1_dir}/M1/{i[0]}/{i[1]}" + ) & os.path.exists(f"{MAGIC_DL1_dir}/M2/{i[0]}/{i[1]}"): + os.makedirs( + f"{MAGIC_DL1_dir}/Merged/{i[0]}/Merged/logs", + exist_ok=True, + ) + f.write( + f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/Merged --run-wise >{MAGIC_DL1_dir}/Merged/{i[0]}/Merged/logs/merge_{i[0]}_{i[1]}_" + + "${SLURM_JOB_ID}.log \n" + ) + else: + print( + f"ERROR {MAGIC_DL1_dir}/M1/{i[0]}/{i[1]} or {MAGIC_DL1_dir}/M2/{i[0]}/{i[1]} does not exist" + ) else: - dates = [ - os.path.basename(x) for x in glob.glob(f"{MAGIC_DL1_dir}/M1/*") - ] + dates = np.unique(MAGIC_runs.T[0]) for i in dates: - if not os.path.exists(f"{MAGIC_DL1_dir}/Merged/{i}/Merged"): - continue - if ( - len(glob.glob(f"{MAGIC_DL1_dir}/Merged/{i}/Merged/*MAGIC*.h5")) - > 0 - ): + # if ( + # len(glob.glob(f"{MAGIC_DL1_dir}/Merged/{i}/Merged/*MAGIC*.h5")) + # > 0 + # ): # this is strange, those files should not be there yet at the moment of creating of this script + # runs = MAGIC_runs.T[1][MAGIC_runs.T[0]==i] - os.makedirs( - f"{MAGIC_DL1_dir}/Merged/Merged_{i}/logs", exist_ok=True - ) - f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i}/Merged --output-dir {MAGIC_DL1_dir}/Merged/Merged_{i} >{MAGIC_DL1_dir}/Merged/Merged_{i}/logs/merge_night_{i}_" - + "${SLURM_JOB_ID}.log \n" - ) + os.makedirs( + f"{MAGIC_DL1_dir}/Merged/Merged_{i}/logs", exist_ok=True + ) + f.write( + f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i}/Merged --output-dir {MAGIC_DL1_dir}/Merged/Merged_{i} >{MAGIC_DL1_dir}/Merged/Merged_{i}/logs/merge_night_{i}_" + + "${SLURM_JOB_ID}.log \n" + ) def mergeMC(target_dir, identification, env_name, cwd, source_name): From cec4833ed95facdfc8ee905caab9f377b2b65f50 Mon Sep 17 00:00:00 2001 From: Julian Sitarek Date: Wed, 15 May 2024 15:02:18 +0000 Subject: [PATCH 3/6] refactoring to unify the code for NSB_match yes and no --- .../semi_automatic_scripts/merging_runs.py | 123 ++++++------------ 1 file changed, 40 insertions(+), 83 deletions(-) diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py index 734d4a35..d5cff4ba 100644 --- a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py @@ -156,104 +156,61 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): with open(f"{source}_Merge_MAGIC_{identification}.sh", "w") as f: f.writelines(lines) - if not NSB_match: - if identification == "0_subruns": - if os.path.exists(f"{MAGIC_DL1_dir}/M1") & os.path.exists( - f"{MAGIC_DL1_dir}/M2" - ): - for i in MAGIC_runs: - os.makedirs( - f"{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}", exist_ok=True - ) # Creating a merged directory for the respective run - for magic in [1, 2]: - os.system( - f'find {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} -type f -name "dl1_M{magic}.Run*.h5" -size -3k -delete' - ) - f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} >{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/logs/merge_M{magic}_{i[0]}_{i[1]}_" - + "${SLURM_JOB_ID}.log\n" - ) - - elif identification == "1_M1M2": - if os.path.exists(f"{MAGIC_DL1_dir}/M1") & os.path.exists( - f"{MAGIC_DL1_dir}/M2" - ): - for i in MAGIC_runs: - os.makedirs( - f"{MAGIC_DL1_dir}/Merged/{i[0]}/Merged", exist_ok=True - ) - f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/Merged --run-wise >{MAGIC_DL1_dir}/Merged/{i[0]}/Merged/merge_{i[0]}_{[1]}_" - + "${SLURM_JOB_ID}.log\n" - ) - else: + if identification == "0_subruns": + for magic in [1, 2]: for i in MAGIC_runs: - os.makedirs( - f"{MAGIC_DL1_dir}/Merged/Merged_{i[0]}", exist_ok=True - ) # Creating a merged directory for each night - f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i[0]}/Merged --output-dir {MAGIC_DL1_dir}/Merged/Merged_{i[0]} >{MAGIC_DL1_dir}/Merged/Merged_{i[0]}/merge_night_{i[0]}_" - + "${SLURM_JOB_ID}.log \n" - ) - - else: - if identification == "0_subruns": - for magic in [1, 2]: - for i in MAGIC_runs: - # Here is a difference w.r.t. original code. If only one telescope data are available they will be merged now for this telescope - if os.path.exists(f"{MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]}"): - os.makedirs( - f"{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/logs", - exist_ok=True, - ) # Creating a merged directory for the respective run - os.system( - f'find {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} -type f -name "dl1_M{magic}.Run*.h5" -size -3k -delete' - ) - f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} >{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/logs/merge_M{magic}_{i[0]}_{i[1]}_" - + "${SLURM_JOB_ID}.log\n" - ) - else: - print( - f"ERROR: {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} does not exist" - ) - - elif identification == "1_M1M2": - for i in MAGIC_runs: - if os.path.exists( - f"{MAGIC_DL1_dir}/M1/{i[0]}/{i[1]}" - ) & os.path.exists(f"{MAGIC_DL1_dir}/M2/{i[0]}/{i[1]}"): + # Here is a difference w.r.t. original code. If only one telescope data are available they will be merged now for this telescope + if os.path.exists(f"{MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]}"): os.makedirs( - f"{MAGIC_DL1_dir}/Merged/{i[0]}/Merged/logs", + f"{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/logs", exist_ok=True, + ) # Creating a merged directory for the respective run + os.system( + f'find {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} -type f -name "dl1_M{magic}.Run*.h5" -size -3k -delete' ) f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/Merged --run-wise >{MAGIC_DL1_dir}/Merged/{i[0]}/Merged/logs/merge_{i[0]}_{i[1]}_" - + "${SLURM_JOB_ID}.log \n" + f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} >{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/logs/merge_M{magic}_{i[0]}_{i[1]}_" + + "${SLURM_JOB_ID}.log\n" ) else: print( - f"ERROR {MAGIC_DL1_dir}/M1/{i[0]}/{i[1]} or {MAGIC_DL1_dir}/M2/{i[0]}/{i[1]} does not exist" + f"ERROR: {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} does not exist" ) - else: - dates = np.unique(MAGIC_runs.T[0]) - for i in dates: - if not os.path.exists(f"{MAGIC_DL1_dir}/Merged/{i}/Merged"): - continue - - # if ( - # len(glob.glob(f"{MAGIC_DL1_dir}/Merged/{i}/Merged/*MAGIC*.h5")) - # > 0 - # ): # this is strange, those files should not be there yet at the moment of creating of this script - # runs = MAGIC_runs.T[1][MAGIC_runs.T[0]==i] + elif identification == "1_M1M2": + for i in MAGIC_runs: + if os.path.exists(f"{MAGIC_DL1_dir}/M1/{i[0]}/{i[1]}") & os.path.exists( + f"{MAGIC_DL1_dir}/M2/{i[0]}/{i[1]}" + ): os.makedirs( - f"{MAGIC_DL1_dir}/Merged/Merged_{i}/logs", exist_ok=True + f"{MAGIC_DL1_dir}/Merged/{i[0]}/Merged/logs", + exist_ok=True, ) f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i}/Merged --output-dir {MAGIC_DL1_dir}/Merged/Merged_{i} >{MAGIC_DL1_dir}/Merged/Merged_{i}/logs/merge_night_{i}_" + f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/Merged --run-wise >{MAGIC_DL1_dir}/Merged/{i[0]}/Merged/logs/merge_{i[0]}_{i[1]}_" + "${SLURM_JOB_ID}.log \n" ) + else: + print( + f"ERROR {MAGIC_DL1_dir}/M1/{i[0]}/{i[1]} or {MAGIC_DL1_dir}/M2/{i[0]}/{i[1]} does not exist" + ) + else: + dates = np.unique(MAGIC_runs.T[0]) + for i in dates: + if not os.path.exists(f"{MAGIC_DL1_dir}/Merged/{i}/Merged"): + continue + + # if ( + # len(glob.glob(f"{MAGIC_DL1_dir}/Merged/{i}/Merged/*MAGIC*.h5")) + # > 0 + # ): # this is strange, those files should not be there yet at the moment of creating of this script + # runs = MAGIC_runs.T[1][MAGIC_runs.T[0]==i] + + os.makedirs(f"{MAGIC_DL1_dir}/Merged/Merged_{i}/logs", exist_ok=True) + f.write( + f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i}/Merged --output-dir {MAGIC_DL1_dir}/Merged/Merged_{i} >{MAGIC_DL1_dir}/Merged/Merged_{i}/logs/merge_night_{i}_" + + "${SLURM_JOB_ID}.log \n" + ) def mergeMC(target_dir, identification, env_name, cwd, source_name): From 12bd08f06231a60534c5d59ae1aefee586cdc124 Mon Sep 17 00:00:00 2001 From: Julian Sitarek Date: Thu, 16 May 2024 10:10:49 +0000 Subject: [PATCH 4/6] added error management for merging scripts also some refactoring in the setting up, mergi and coincidence scripts --- .../semi_automatic_scripts/clusters.py | 28 +++++++++- .../coincident_events.py | 14 ++--- .../semi_automatic_scripts/merging_runs.py | 56 +++++++++++-------- .../setting_up_config_and_dir.py | 50 +++++++---------- 4 files changed, 86 insertions(+), 62 deletions(-) diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/clusters.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/clusters.py index 8f66417b..54d14922 100644 --- a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/clusters.py +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/clusters.py @@ -1,7 +1,7 @@ """ Module for generating bash script lines for running analysis in different clusters """ -__all__ = ["slurm_lines"] +__all__ = ["slurm_lines", "rc_lines"] def slurm_lines(queue, job_name, array=None, mem=None, out_name=None): @@ -40,3 +40,29 @@ def slurm_lines(queue, job_name, array=None, mem=None, out_name=None): "ulimit -a\n\n", ] return lines + + +def rc_lines(store, out): + """ + Function for creating the general lines for error tracking. + + Parameters + ---------- + store : str + String what to store in addition to $rc + out : str + Base name for the log files with return codes, all output will go into {out}_return.log, only errors to {out}_failed.log + + Returns + ------- + list + List of strings to attach to a shell script + """ + lines = [ + "rc=$?\n", + 'if [ "$rc" -ne "0" ]; then\n', + f" echo {store} $rc >> {out}_failed.log\n", + "fi\n", + f"echo {store} $rc >> {out}_return.log\n", + ] + return lines diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/coincident_events.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/coincident_events.py index 27c48da4..b81cc932 100644 --- a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/coincident_events.py +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/coincident_events.py @@ -104,19 +104,15 @@ def linking_bash_lst( If real data are matched to pre-processed MCs or not """ - if (len(LST_runs) == 2) and (len(LST_runs[0]) == 10): - LST = LST_runs - - LST_runs = [] - LST_runs.append(LST) - if NSB_match: coincidence_DL1_dir = f"{target_dir}/v{__version__}/{source_name}" MAGIC_DL1_dir = f"{target_dir}/v{__version__}/{source_name}/DL1" else: - coincidence_DL1_dir = f"{target_dir}/{source_name}/DL1/Observations" - MAGIC_DL1_dir = f"{target_dir}/{source_name}/DL1/Observations/" + coincidence_DL1_dir = ( + f"{target_dir}/v{__version__}/{source_name}/DL1/Observations" + ) + MAGIC_DL1_dir = f"{target_dir}/v{__version__}/{source_name}/DL1/Observations/" dates = [os.path.basename(x) for x in glob.glob(f"{MAGIC_DL1_dir}/Merged/Merged_*")] @@ -218,7 +214,7 @@ def main(): configfile_coincidence(telescope_ids, target_dir, source_name, NSB_match) LST_runs_and_dates = f"{source_name}_LST_runs.txt" - LST_runs = np.genfromtxt(LST_runs_and_dates, dtype=str, delimiter=",") + LST_runs = np.genfromtxt(LST_runs_and_dates, dtype=str, delimiter=",", ndmin=2) try: diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py index d5cff4ba..a043f9f1 100644 --- a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py @@ -40,7 +40,10 @@ from tqdm import tqdm from magicctapipe import __version__ -from magicctapipe.scripts.lst1_magic.semi_automatic_scripts.clusters import slurm_lines +from magicctapipe.scripts.lst1_magic.semi_automatic_scripts.clusters import ( + rc_lines, + slurm_lines, +) __all__ = ["cleaning", "split_train_test", "merge", "mergeMC"] @@ -151,8 +154,9 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): queue="short", job_name=process_name, mem="2g", - out_name=f"{MAGIC_DL1_dir}/Merged/slurm-%x.%j", + out_name=f"{MAGIC_DL1_dir}/Merged/logs/slurm-%x.%j", ) + os.makedirs(f"{MAGIC_DL1_dir}/Merged/logs", exist_ok=True) with open(f"{source}_Merge_MAGIC_{identification}.sh", "w") as f: f.writelines(lines) @@ -160,36 +164,38 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): for magic in [1, 2]: for i in MAGIC_runs: # Here is a difference w.r.t. original code. If only one telescope data are available they will be merged now for this telescope - if os.path.exists(f"{MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]}"): - os.makedirs( - f"{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/logs", - exist_ok=True, - ) # Creating a merged directory for the respective run + indir = f"{MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]}" + if os.path.exists(f"{indir}"): + outdir = f"{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}" + os.makedirs(f"{outdir}/logs", exist_ok=True) os.system( - f'find {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} -type f -name "dl1_M{magic}.Run*.h5" -size -3k -delete' + f'find {indir} -type f -name "dl1_M{magic}.Run*.h5" -size -3k -delete' ) f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} >{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}/logs/merge_M{magic}_{i[0]}_{i[1]}_" - + "${SLURM_JOB_ID}.log\n" + f"conda run -n {env_name} merge_hdf_files --input-dir {indir} --output-dir {outdir} >{outdir}/logs/merge_M{magic}_{i[0]}_{i[1]}_${{SLURM_JOB_ID}}.log\n" ) - else: - print( - f"ERROR: {MAGIC_DL1_dir}/M{magic}/{i[0]}/{i[1]} does not exist" + rc = rc_lines( + store=f"{indir} ${{SLURM_JOB_ID}}", out="{outdir}/logs/list" ) + f.writelines(rc) + else: + print(f"ERROR: {indir} does not exist") elif identification == "1_M1M2": for i in MAGIC_runs: if os.path.exists(f"{MAGIC_DL1_dir}/M1/{i[0]}/{i[1]}") & os.path.exists( f"{MAGIC_DL1_dir}/M2/{i[0]}/{i[1]}" ): - os.makedirs( - f"{MAGIC_DL1_dir}/Merged/{i[0]}/Merged/logs", - exist_ok=True, - ) + indir = f"{MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]}" + outdir = f"{MAGIC_DL1_dir}/Merged/{i[0]}/Merged" + os.makedirs(f"{outdir}/logs", exist_ok=True) f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i[0]}/{i[1]} --output-dir {MAGIC_DL1_dir}/Merged/{i[0]}/Merged --run-wise >{MAGIC_DL1_dir}/Merged/{i[0]}/Merged/logs/merge_{i[0]}_{i[1]}_" - + "${SLURM_JOB_ID}.log \n" + f"conda run -n {env_name} merge_hdf_files --input-dir {indir} --output-dir {outdir} --run-wise >{outdir}/logs/merge_{i[0]}_{i[1]}_${{SLURM_JOB_ID}}.log\n" ) + rc = rc_lines( + store=f"{indir} ${{SLURM_JOB_ID}}", out="{outdir}/logs/list" + ) + f.writelines(rc) else: print( f"ERROR {MAGIC_DL1_dir}/M1/{i[0]}/{i[1]} or {MAGIC_DL1_dir}/M2/{i[0]}/{i[1]} does not exist" @@ -205,12 +211,16 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): # > 0 # ): # this is strange, those files should not be there yet at the moment of creating of this script # runs = MAGIC_runs.T[1][MAGIC_runs.T[0]==i] - - os.makedirs(f"{MAGIC_DL1_dir}/Merged/Merged_{i}/logs", exist_ok=True) + indir = f"{MAGIC_DL1_dir}/Merged/{i}/Merged" + outdir = f"{MAGIC_DL1_dir}/Merged/Merged_{i}" + os.makedirs(f"{outdir}/logs", exist_ok=True) f.write( - f"conda run -n {env_name} merge_hdf_files --input-dir {MAGIC_DL1_dir}/Merged/{i}/Merged --output-dir {MAGIC_DL1_dir}/Merged/Merged_{i} >{MAGIC_DL1_dir}/Merged/Merged_{i}/logs/merge_night_{i}_" - + "${SLURM_JOB_ID}.log \n" + f"conda run -n {env_name} merge_hdf_files --input-dir {indir} --output-dir {outdir} >{outdir}/logs/merge_night_{i}_${{SLURM_JOB_ID}}.log\n" + ) + rc = rc_lines( + store=f"{indir} ${{SLURM_JOB_ID}}", out="{outdir}/logs/list" ) + f.writelines(rc) def mergeMC(target_dir, identification, env_name, cwd, source_name): diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/setting_up_config_and_dir.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/setting_up_config_and_dir.py index 4d6110b7..994cf21d 100644 --- a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/setting_up_config_and_dir.py +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/setting_up_config_and_dir.py @@ -27,7 +27,10 @@ from magicctapipe import __version__ from magicctapipe.io import resource_file -from magicctapipe.scripts.lst1_magic.semi_automatic_scripts.clusters import slurm_lines +from magicctapipe.scripts.lst1_magic.semi_automatic_scripts.clusters import ( + rc_lines, + slurm_lines, +) __all__ = [ "config_file_gen", @@ -206,8 +209,7 @@ def lists_and_bash_generator( f"SAMPLE_LIST=($(<$INF/list_folder_{particle_type}.txt))\n", "SAMPLE=${SAMPLE_LIST[${SLURM_ARRAY_TASK_ID}]}\n", "cd $SAMPLE\n\n", - f"export LOG={dir1}/DL1/MC/{particle_type}/logs" - + "/simtel_{$SAMPLE}_${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}_all.log\n", + f"export LOG={dir1}/DL1/MC/{particle_type}/logs/simtel_{{$SAMPLE}}_${{SLURM_ARRAY_JOB_ID}}_${{SLURM_ARRAY_TASK_ID}}_all.log\n", "cat logs/list_dl0_ok.txt | while read line\n", "do\n", f" cd {dir1}/../\n", @@ -250,14 +252,6 @@ def lists_and_bash_gen_MAGIC( obs_tag = "" if NSB_match else "Observations" with open(f"{source}_linking_MAGIC_data_paths.sh", "w") as f: f.writelines(lines) - if NSB_match: - - if (len(MAGIC_runs) == 2) and (len(MAGIC_runs[0]) == 10): - MAGIC = MAGIC_runs - - MAGIC_runs = [] - MAGIC_runs.append(MAGIC) - for i in MAGIC_runs: for magic in [1, 2]: # if 1 then magic is second from last, if 2 then last @@ -286,18 +280,21 @@ def lists_and_bash_gen_MAGIC( mem="2g", out_name=f"{target_dir}/v{__version__}/{source}/DL1/{obs_tag}/M{magic}/{i[0]}/{i[1]}/logs/slurm-%x.%A_%a", # without version for no NSB_match ) - lines = slurm + [ # without version for no NSB_match - f"export OUTPUTDIR={target_dir}/v{__version__}/{source}/DL1/{obs_tag}/M{magic}/{i[0]}/{i[1]}\n", - "SAMPLE_LIST=($(<$OUTPUTDIR/logs/list_dl0.txt))\n", - "SAMPLE=${SAMPLE_LIST[${SLURM_ARRAY_TASK_ID}]}\n\n", - "export LOG=$OUTPUTDIR/logs/real_0_1_task_${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}.log\n", - f"conda run -n {env_name} magic_calib_to_dl1 --input-file $SAMPLE --output-dir $OUTPUTDIR --config-file {target_dir}/v{__version__}/{source}/config_DL0_to_DL1.yaml >$LOG 2>&1\n", - "rc=$?\n", - 'if [ "$rc" -ne "0" ]; then\n', - " echo $SAMPLE ${SLURM_ARRAY_JOB_ID} ${SLURM_ARRAY_TASK_ID} $rc >> $OUTPUTDIR/logs/list_failed.log\n", - "fi\n", - "echo $SAMPLE ${SLURM_ARRAY_JOB_ID} ${SLURM_ARRAY_TASK_ID} $rc >> $OUTPUTDIR/logs/list_return.log\n", - ] + rc = rc_lines( + store="$SAMPLE ${SLURM_ARRAY_JOB_ID} ${SLURM_ARRAY_TASK_ID}", + out="$OUTPUTDIR/logs/list", + ) + lines = ( + slurm + + [ # without version for no NSB_match + f"export OUTPUTDIR={target_dir}/v{__version__}/{source}/DL1/{obs_tag}/M{magic}/{i[0]}/{i[1]}\n", + "SAMPLE_LIST=($(<$OUTPUTDIR/logs/list_dl0.txt))\n", + "SAMPLE=${SAMPLE_LIST[${SLURM_ARRAY_TASK_ID}]}\n\n", + "export LOG=$OUTPUTDIR/logs/real_0_1_task_${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}.log\n", + f"conda run -n {env_name} magic_calib_to_dl1 --input-file $SAMPLE --output-dir $OUTPUTDIR --config-file {target_dir}/v{__version__}/{source}/config_DL0_to_DL1.yaml >$LOG 2>&1\n", + ] + + rc + ) with open( f"{source}_MAGIC-" + "I" * magic + f"_dl0_to_dl1_run_{i[1]}.sh", "w", @@ -360,11 +357,6 @@ def directories_generator( ########################################### # MAGIC ########################################### - if (len(MAGIC_runs) == 2) and (len(MAGIC_runs[0]) == 10): - MAGIC = MAGIC_runs - - MAGIC_runs = [] - MAGIC_runs.append(MAGIC) for i in MAGIC_runs: for magic in [1, 2]: if telescope_ids[magic - 3] > 0: @@ -428,7 +420,7 @@ def main(): MAGIC_runs_and_dates = f"{source_name}_MAGIC_runs.txt" MAGIC_runs = np.genfromtxt( - MAGIC_runs_and_dates, dtype=str, delimiter="," + MAGIC_runs_and_dates, dtype=str, delimiter=",", ndmin=2 ) # READ LIST OF DATES AND RUNS: format table where each line is like "2020_11_19,5093174" noise_value = [0, 0, 0] From c829703920f922db678202e8d199ec8b1a2306cc Mon Sep 17 00:00:00 2001 From: Julian Sitarek Date: Thu, 16 May 2024 12:33:03 +0000 Subject: [PATCH 5/6] added support for job_accounting running over merging files --- .../semi_automatic_scripts/job_accounting.py | 34 ++++++++++++++----- .../semi_automatic_scripts/merging_runs.py | 15 ++++---- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py index f9c54d9f..b8114eeb 100644 --- a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py @@ -89,14 +89,17 @@ def main(): work_dir = config["directories"]["workspace_dir"] print(f"Checking progress of jobs stored in {work_dir}") - dirs = sorted(glob.glob(f"{work_dir}/v{args.version}/*/{args.data_level}/*/*")) + dirs = sorted( + glob.glob(f"{work_dir}/v{args.version}/*/{args.data_level}/[0-9]*/[M0-9]*") + + glob.glob(f"{work_dir}/v{args.version}/*/{args.data_level}/Merged_[0-9]*") + ) if dirs == []: versions = [x.split("/v")[-1] for x in glob.glob(f"{work_dir}/v*")] print("Error, no directories found") print(f"for path {work_dir} found in {args.config_file} this is available") print(f"Versions {versions}") tag = "" if NSB_matching else "/Observations" - print(f"Supported data types: DL1{tag}/M1, DL1{tag}/M2") + print(f"Supported data types: DL1{tag}/M1, DL1{tag}/M2, DL1{tag}/Merged") exit(1) all_todo = 0 @@ -124,21 +127,31 @@ def main(): returns = fp.readlines() this_return = len(returns) for line in returns: - file_in, slurm_id, task_id, rc = line.split() + line = line.split() + file_in = line[0] + slurm_id = f"{line[1]}_{line[2]}" if len(line) == 4 else line[1] + rc = line[-1] if rc == "0": this_good += 1 # now check accounting if not args.no_accounting: out = run_shell( - f'sacct --format="JobID,CPUTime,MaxRSS" --units=M -j {slurm_id}_{task_id}| tail -1' - ) - _, cpu, mem = out.split() + f'sacct --format="JobID,CPUTime,MaxRSS" --units=M -j {slurm_id}| tail -1' + ).split() + if len(out) == 3: + _, cpu, mem = out + else: + cpu = out[1] + mem = None hh, mm, ss = (int(x) for x in str(cpu).split(":")) delta = timedelta( days=hh // 24, hours=hh % 24, minutes=mm, seconds=ss ) this_cpu.append(delta) - this_mem.append(float(mem[0:-1])) + if mem is not None: + this_mem.append(float(mem[0:-1])) + else: + print("Memory usage information is missing") else: print(f"file {file_in} failed with error {rc}") if len(this_cpu) > 0: @@ -146,8 +159,13 @@ def main(): all_mem += this_mem this_cpu = np.array(this_cpu) this_mem = np.array(this_mem) + mem_info = ( + f"memory [M]: median={np.median(this_mem)}, max={this_mem.max()}" + if len(this_mem) + else "" + ) print( - f"CPU: median={np.median(this_cpu)}, max={this_cpu.max()}; memory [M]: median={np.median(this_mem)}, max={this_mem.max()}" + f"CPU: median={np.median(this_cpu)}, max={this_cpu.max()}; {mem_info}" ) except IOError: diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py index a043f9f1..abe653a1 100644 --- a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/merging_runs.py @@ -175,9 +175,11 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): f"conda run -n {env_name} merge_hdf_files --input-dir {indir} --output-dir {outdir} >{outdir}/logs/merge_M{magic}_{i[0]}_{i[1]}_${{SLURM_JOB_ID}}.log\n" ) rc = rc_lines( - store=f"{indir} ${{SLURM_JOB_ID}}", out="{outdir}/logs/list" + store=f"{indir} ${{SLURM_JOB_ID}}", + out=f"{outdir}/logs/list", ) f.writelines(rc) + os.system(f"echo {indir} >> {outdir}/logs/list_dl0.txt") else: print(f"ERROR: {indir} does not exist") @@ -193,9 +195,10 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): f"conda run -n {env_name} merge_hdf_files --input-dir {indir} --output-dir {outdir} --run-wise >{outdir}/logs/merge_{i[0]}_{i[1]}_${{SLURM_JOB_ID}}.log\n" ) rc = rc_lines( - store=f"{indir} ${{SLURM_JOB_ID}}", out="{outdir}/logs/list" + store=f"{indir} ${{SLURM_JOB_ID}}", out=f"{outdir}/logs/list" ) f.writelines(rc) + os.system(f"echo {indir} >> {outdir}/logs/list_dl0.txt") else: print( f"ERROR {MAGIC_DL1_dir}/M1/{i[0]}/{i[1]} or {MAGIC_DL1_dir}/M2/{i[0]}/{i[1]} does not exist" @@ -206,11 +209,6 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): if not os.path.exists(f"{MAGIC_DL1_dir}/Merged/{i}/Merged"): continue - # if ( - # len(glob.glob(f"{MAGIC_DL1_dir}/Merged/{i}/Merged/*MAGIC*.h5")) - # > 0 - # ): # this is strange, those files should not be there yet at the moment of creating of this script - # runs = MAGIC_runs.T[1][MAGIC_runs.T[0]==i] indir = f"{MAGIC_DL1_dir}/Merged/{i}/Merged" outdir = f"{MAGIC_DL1_dir}/Merged/Merged_{i}" os.makedirs(f"{outdir}/logs", exist_ok=True) @@ -218,9 +216,10 @@ def merge(target_dir, identification, MAGIC_runs, env_name, source, NSB_match): f"conda run -n {env_name} merge_hdf_files --input-dir {indir} --output-dir {outdir} >{outdir}/logs/merge_night_{i}_${{SLURM_JOB_ID}}.log\n" ) rc = rc_lines( - store=f"{indir} ${{SLURM_JOB_ID}}", out="{outdir}/logs/list" + store=f"{indir} ${{SLURM_JOB_ID}}", out=f"{outdir}/logs/list" ) f.writelines(rc) + os.system(f"echo {indir} >> {outdir}/logs/list_dl0.txt") def mergeMC(target_dir, identification, env_name, cwd, source_name): From 2111e953926c8c42c104d9a5dc6dbe08a3d0692b Mon Sep 17 00:00:00 2001 From: Julian Sitarek Date: Thu, 16 May 2024 16:58:27 +0000 Subject: [PATCH 6/6] added counting of total CPU time it works even if some files were produced with the same job --- .../semi_automatic_scripts/job_accounting.py | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py index b8114eeb..9b6df96b 100644 --- a/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py +++ b/magicctapipe/scripts/lst1_magic/semi_automatic_scripts/job_accounting.py @@ -107,6 +107,8 @@ def main(): all_good = 0 all_cpu = [] all_mem = [] + total_time = 0 + all_jobs = [] for dir in dirs: print(dir) # fixme list_dl0.txt is only available for DL1/M[12] processing @@ -140,18 +142,25 @@ def main(): ).split() if len(out) == 3: _, cpu, mem = out - else: + elif ( + len(out) == 2 + ): # MaxRSS sometimes is missing in the output cpu = out[1] mem = None - hh, mm, ss = (int(x) for x in str(cpu).split(":")) - delta = timedelta( - days=hh // 24, hours=hh % 24, minutes=mm, seconds=ss - ) - this_cpu.append(delta) + print("Memory usage information is missing") + else: + print("Unexpected sacct output: {out}") + if cpu is not None: + hh, mm, ss = (int(x) for x in str(cpu).split(":")) + delta = timedelta( + days=hh // 24, hours=hh % 24, minutes=mm, seconds=ss + ) + if slurm_id not in all_jobs: + total_time += delta.total_seconds() / 3600 + all_jobs += [slurm_id] + this_cpu.append(delta) if mem is not None: this_mem.append(float(mem[0:-1])) - else: - print("Memory usage information is missing") else: print(f"file {file_in} failed with error {rc}") if len(this_cpu) > 0: @@ -204,7 +213,7 @@ def main(): all_cpu = np.array(all_cpu) all_mem = np.array(all_mem) print( - f"CPU: median={np.median(all_cpu)}, max={all_cpu.max()}; memory [M]: median={np.median(all_mem)}, max={all_mem.max()}" + f"CPU: median={np.median(all_cpu)}, max={all_cpu.max()}, total={total_time:.2f} CPU hrs; memory [M]: median={np.median(all_mem)}, max={all_mem.max()}" )