diff --git a/scripts/run_analysis.py b/scripts/run_analysis.py index 4763a1ad..cd43d445 100644 --- a/scripts/run_analysis.py +++ b/scripts/run_analysis.py @@ -52,6 +52,14 @@ def main(): fileset = runner_utils.load_json(args.fileset) logger.info(f"Loaded fileset from {args.fileset}") + if args.limit == 0: + raise ValueError("Limit cannot be 0") + if args.limit is not None: + logger.info(f"Limiting each dataset of the fileset to the first {args.limit} files") + for dataset in fileset.keys(): + files = fileset[dataset]["files"] + fileset[dataset]["files"] = dict(list(files.items())[: args.limit]) + if args.executor == "dask/casa" or args.executor.startswith("tls://"): # use xcache for coffea-casa xrootd_pfx = "root://" @@ -239,11 +247,6 @@ def main(): out = runner_utils.process_out(out, args.output) logger.info(f"Final output after post-processing:\n{out}") logger.info("Finished the E/Gamma Tag and Probe workflow") - logger.info("Shutting down the client and cluster if any") - if client: - client.shutdown() - if cluster: - cluster.close() if __name__ == "__main__": diff --git a/src/egamma_tnp/utils/runner_utils.py b/src/egamma_tnp/utils/runner_utils.py index e06cb8e6..49134b62 100644 --- a/src/egamma_tnp/utils/runner_utils.py +++ b/src/egamma_tnp/utils/runner_utils.py @@ -408,6 +408,8 @@ def get_main_parser(): Path to the output directory. The default is the current working directory. --executor: str, optional The executor to use for the computations. The default is None and lets dask decide. + --limit: int, optional + Limit to the first N files of each dataset in the filset. The default is None. --preprocess: bool, optional Preprocess the fileset before running the workflow. The default is False. --skip_report: bool, optional @@ -458,6 +460,7 @@ def get_main_parser(): parser.add_argument("--binning", type=str, help="Path to a JSON file specifying the binning. Default is None.") parser.add_argument("--output", type=str, help="Path to the output directory. Default is None.") parser.add_argument("--executor", type=str, help="The executor to use for the computations. Default is None and lets dask decide.") + parser.add_argument("--limit", type=int, help="Limit to the first N files of each dataset in the filset. Default is None.") parser.add_argument("--preprocess", action="store_true", default=False, help="Preprocess the fileset before running the workflow. Default is False.") parser.add_argument("--skip_report", action="store_true", default=False, help="Skip computing and saving the report. Default is False.") parser.add_argument("--repartition_n_to_one", type=int, help="The number of partitions to merge during saving. Default is None.")