|
| 1 | +# This script generates all covs |
| 2 | + |
| 3 | +import os |
| 4 | +from datetime import datetime |
| 5 | +import pickle |
| 6 | +import hashlib |
| 7 | +from typing import Callable |
| 8 | +import traceback |
| 9 | +import desi_y3_files.file_manager as desi_y3_file_manager |
| 10 | +from RascalC.raw_covariance_matrices import cat_raw_covariance_matrices, collect_raw_covariance_matrices |
| 11 | +from RascalC.post_process.legendre import post_process_legendre |
| 12 | +from RascalC.post_process.legendre_mix_jackknife import post_process_legendre_mix_jackknife |
| 13 | +from RascalC.convergence_check_extra import convergence_check_extra |
| 14 | +from RascalC.cov_utils import export_cov_legendre |
| 15 | +from RascalC.comb.combine_covs_legendre import combine_covs_legendre |
| 16 | + |
| 17 | +max_l = 4 |
| 18 | +nbin = 50 # radial bins for output cov |
| 19 | +rmax = 200 # maximum output cov radius in Mpc/h |
| 20 | + |
| 21 | +jackknife = 1 |
| 22 | +njack = 60 if jackknife else 0 |
| 23 | +if jackknife: mbin = 100 |
| 24 | + |
| 25 | +skip_r_bins = 5 |
| 26 | +skip_l = 0 |
| 27 | + |
| 28 | +r_step = rmax // nbin |
| 29 | +rmin_real = r_step * skip_r_bins |
| 30 | + |
| 31 | +xilabel = "".join([str(i) for i in range(0, max_l+1, 2)]) |
| 32 | + |
| 33 | +# Settings for filenames |
| 34 | +version = "v2" |
| 35 | + |
| 36 | +# Set DESI CFS before creating the file manager |
| 37 | +os.environ["DESICFS"] = "/dvs_ro/cfs/cdirs/desi" # read-only path |
| 38 | + |
| 39 | +fm = desi_y3_file_manager.get_abacus_file_manager() |
| 40 | + |
| 41 | +regs = ('SGC', 'NGC') # regions for filenames |
| 42 | +reg_comb = "GCcomb" |
| 43 | + |
| 44 | +tracers = ['LRG'] * 3 + ['ELG_LOPnotqso'] * 2 + ['LRG+ELG_LOPnotqso', 'BGS_BRIGHT-21.5'] + ['BGS_BRIGHT-21.35'] * 2 + ['BGS_BRIGHT-20.2'] * 2 + ['QSO', 'BGS_ANY-02'] |
| 45 | +zs = [(0.4, 0.6), (0.6, 0.8), (0.8, 1.1), (0.8, 1.1), (1.1, 1.6), (0.8, 1.1), (0.1, 0.4), (0.1, 0.4), (0.25, 0.4), (0.1, 0.25), (0.1, 0.4), (0.8, 2.1), (0.1, 0.4)] |
| 46 | + |
| 47 | +hash_dict_file = "make_covs.hash_dict.pkl" |
| 48 | +if os.path.isfile(hash_dict_file): |
| 49 | + # Load hash dictionary from file |
| 50 | + with open(hash_dict_file, "rb") as f: |
| 51 | + hash_dict = pickle.load(f) |
| 52 | +else: |
| 53 | + # Initialize hash dictionary as empty |
| 54 | + hash_dict = {} |
| 55 | +# Hash dict keys are goal filenames, the elements are also dictionaries with dependencies/sources filenames as keys |
| 56 | + |
| 57 | +# Set up logging |
| 58 | +logfile = "make_covs.log.txt" |
| 59 | + |
| 60 | +def print_and_log(s: object = "") -> None: |
| 61 | + print(s) |
| 62 | + print_log(s) |
| 63 | +print_log = lambda l: os.system(f"echo \"{l}\" >> {logfile}") |
| 64 | + |
| 65 | +print_and_log(datetime.now()) |
| 66 | +print_and_log(f"Executing {__file__}") |
| 67 | + |
| 68 | +def my_make(goal: str, deps: list[str], recipe: Callable, force: bool = False, verbose: bool = False) -> None: |
| 69 | + need_make, current_dep_hashes = hash_check(goal, deps, force=force, verbose=verbose) |
| 70 | + if need_make: |
| 71 | + print_and_log(f"Making {goal} from {deps}") |
| 72 | + try: |
| 73 | + # make sure the directory exists |
| 74 | + goal_dir = os.path.dirname(goal) |
| 75 | + if goal_dir: os.makedirs(goal_dir, exist_ok = True) # creating empty directory throws an error |
| 76 | + # now can actually run |
| 77 | + recipe() |
| 78 | + except Exception as e: |
| 79 | + traceback.print_exc() |
| 80 | + print_and_log(f"{goal} not built: {e}") |
| 81 | + return |
| 82 | + hash_dict[goal] = current_dep_hashes # update the dependency hashes only if the make was successfully performed |
| 83 | + print_and_log() |
| 84 | + |
| 85 | +def hash_check(goal: str, srcs: list[str], force: bool = False, verbose: bool = False) -> tuple[bool, dict]: |
| 86 | + # First output indicates whether we need to/should execute the recipe to make goal from srcs |
| 87 | + # Also returns the src hashes in the dictionary current_src_hashes |
| 88 | + current_src_hashes = {} |
| 89 | + for src in srcs: |
| 90 | + if not os.path.exists(src): |
| 91 | + if verbose: print_and_log(f"Can not make {goal} from {srcs}: {src} missing\n") # and next operations can be omitted |
| 92 | + return False, current_src_hashes |
| 93 | + current_src_hashes[src] = sha256sum(src) |
| 94 | + if not os.path.exists(goal) or force: return True, current_src_hashes # need to make if goal is missing or we are forcing, but hashes needed to be collected beforehand, also ensuring the existence of sources |
| 95 | + try: |
| 96 | + if set(current_src_hashes.values()) == set(hash_dict[goal].values()): # comparing to hashes of sources used to build the goal last, regardless of order and names. Collisions seem unlikely |
| 97 | + if verbose: print_and_log(f"{goal} uses the same {srcs} as previously, no need to make\n") |
| 98 | + return False, current_src_hashes |
| 99 | + except KeyError: pass # if hash dict is empty need to make, just proceed |
| 100 | + return True, current_src_hashes |
| 101 | + |
| 102 | +def sha256sum(filename: str, buffer_size: int = 128*1024) -> str: # from https://stackoverflow.com/a/44873382 |
| 103 | + h = hashlib.sha256() |
| 104 | + b = bytearray(buffer_size) |
| 105 | + mv = memoryview(b) |
| 106 | + with open(filename, 'rb', buffering=0) as f: |
| 107 | + while n := f.readinto(mv): |
| 108 | + h.update(mv[:n]) |
| 109 | + return h.hexdigest() |
| 110 | + |
| 111 | +# Make steps for making covs |
| 112 | +for tracer, z_range in zip(tracers, zs): |
| 113 | + tlabels = [tracer] |
| 114 | + z_min, z_max = z_range |
| 115 | + reg_results = [] |
| 116 | + # get options automatically |
| 117 | + xi_setup = desi_y3_file_manager.get_baseline_2pt_setup(tlabels[0], z_range, recon = True) |
| 118 | + xi_setup.update({"version": version, "tracer": tracer, "region": regs, "zrange": z_range, "cut": None, "njack": 0}) # specify regions, version, z range and no cut; no need for jackknives |
| 119 | + recon_spec = 'recon_sm{smoothing_radius:.0f}_{algorithm}_{mode}'.format_map(xi_setup) # recon specifier string |
| 120 | + recon_spec += '' if (zr := xi_setup['recon_zrange']) is None else '_z{zrange[0]:.1f}-{zrange[1]:.1f}'.format(zrange = zr) |
| 121 | + recon_spec += '' if (w := xi_setup['recon_weighting']) == 'default' else '_{}'.format(w) |
| 122 | + if jackknife: reg_results_jack = [] |
| 123 | + for reg in regs: |
| 124 | + outdir = os.path.join("outdirs", version, recon_spec, "_".join(tlabels + [reg]) + f"_z{z_min}-{z_max}") # output file directory |
| 125 | + if not os.path.isdir(outdir): # try to find the dirs with suffixes and concatenate samples from them |
| 126 | + outdirs_w_suffixes = [outdir + "_" + str(i) for i in range(11)] # append suffixes |
| 127 | + outdirs_w_suffixes = [dirname for dirname in outdirs_w_suffixes if os.path.isdir(dirname)] # filter only existing dirs |
| 128 | + if len(outdirs_w_suffixes) == 0: continue # can't really do anything else |
| 129 | + cat_raw_covariance_matrices(nbin, f"l{max_l}", outdirs_w_suffixes, [None] * len(outdirs_w_suffixes), outdir, print_function = print_and_log) # concatenate subsamples |
| 130 | + |
| 131 | + raw_name = os.path.join(outdir, f"Raw_Covariance_Matrices_n{nbin}_l{max_l}.npz") |
| 132 | + |
| 133 | + # detect the per-file dirs if any |
| 134 | + outdirs_perfile = [int(name) for name in os.listdir(outdir) if name.isdigit()] # per-file dir names are pure integers |
| 135 | + if len(outdirs_perfile) > 0: # if such dirs found, need to cat the raw covariance matrices |
| 136 | + outdirs_perfile = [os.path.join(outdir, str(index)) for index in sorted(outdirs_perfile)] # sort integers, transform back to strings and prepend the parent directory |
| 137 | + cat_raw_covariance_matrices(nbin, f"l{max_l}", outdirs_perfile, [None] * len(outdirs_perfile), outdir, print_function = print_and_log) # concatenate the subsamples |
| 138 | + elif not os.path.isfile(raw_name): # run the raw matrix collection, which creates this file. Non-existing file breaks the logic in my_make() |
| 139 | + collect_raw_covariance_matrices(outdir, print_function = print_and_log) |
| 140 | + |
| 141 | + # Gaussian covariances |
| 142 | + |
| 143 | + results_name = os.path.join(outdir, 'Rescaled_Covariance_Matrices_Legendre_n%d_l%d.npz' % (nbin, max_l)) |
| 144 | + reg_results.append(results_name) |
| 145 | + |
| 146 | + cov_dir = f"cov_txt/{version}/{recon_spec}" |
| 147 | + cov_name = f"{cov_dir}/xi" + xilabel + "_" + "_".join(tlabels + [reg]) + f"_z{z_min}-{z_max}_default_FKP_lin{r_step}_s{rmin_real}-{rmax}_cov_RascalC_Gaussian.txt" |
| 148 | + |
| 149 | + def make_gaussian_cov(): |
| 150 | + results = post_process_legendre(outdir, nbin, max_l, outdir, skip_r_bins = skip_r_bins, skip_l = skip_l, print_function = print_and_log) |
| 151 | + convergence_check_extra(results, print_function = print_and_log) |
| 152 | + |
| 153 | + # RascalC results depend on full output (most straightforwardly) |
| 154 | + my_make(results_name, [raw_name], make_gaussian_cov) |
| 155 | + # Recipe: run post-processing |
| 156 | + # Also perform convergence check (optional but nice) |
| 157 | + |
| 158 | + # Individual cov file depends on RascalC results |
| 159 | + my_make(cov_name, [results_name], lambda: export_cov_legendre(results_name, max_l, cov_name)) |
| 160 | + # Recipe: export cov |
| 161 | + |
| 162 | + # Jackknife post-processing |
| 163 | + if jackknife: |
| 164 | + results_name_jack = os.path.join(outdir, 'Rescaled_Covariance_Matrices_Legendre_Jackknife_n%d_l%d_j%d.npz' % (nbin, max_l, njack)) |
| 165 | + reg_results_jack.append(results_name_jack) |
| 166 | + xi_jack_name = os.path.join(outdir, f"xi_jack/xi_jack_n{nbin}_m{mbin}_j{njack}_11.dat") |
| 167 | + |
| 168 | + def make_rescaled_cov(): |
| 169 | + results = post_process_legendre_mix_jackknife(xi_jack_name, os.path.join(outdir, 'weights'), outdir, mbin, max_l, outdir, skip_r_bins = skip_r_bins, skip_l = skip_l, print_function = print_and_log) |
| 170 | + convergence_check_extra(results, print_function = print_and_log) |
| 171 | + |
| 172 | + # RascalC results depend on full output (most straightforwardly) |
| 173 | + my_make(results_name_jack, [raw_name], make_rescaled_cov) |
| 174 | + # Recipe: run post-processing |
| 175 | + # Also perform convergence check (optional but nice) |
| 176 | + |
| 177 | + cov_name_jack = f"{cov_dir}/xi" + xilabel + "_" + "_".join(tlabels + [reg]) + f"_z{z_min}-{z_max}_default_FKP_lin{r_step}_s{rmin_real}-{rmax}_cov_RascalC.txt" |
| 178 | + # Individual cov file depends on RascalC results |
| 179 | + my_make(cov_name_jack, [results_name_jack], lambda: export_cov_legendre(results_name_jack, max_l, cov_name_jack)) |
| 180 | + # Recipe: run convert cov |
| 181 | + |
| 182 | + # obtain the counts names |
| 183 | + reg_pycorr_names = [f.filepath for f in fm.select(id = 'correlation_recon_abacus_y3', **xi_setup)] |
| 184 | + |
| 185 | + if len(reg_pycorr_names) == len(regs): # if we have pycorr files for all regions |
| 186 | + if len(reg_results) == len(regs): # if we have RascalC results for all regions |
| 187 | + # Combined Gaussian cov |
| 188 | + |
| 189 | + cov_name = f"{cov_dir}/xi" + xilabel + "_" + "_".join(tlabels + [reg_comb]) + f"_z{z_min}-{z_max}_default_FKP_lin{r_step}_s{rmin_real}-{rmax}_cov_RascalC_Gaussian.txt" # combined cov name |
| 190 | + |
| 191 | + # Comb cov depends on the region RascalC results |
| 192 | + my_make(cov_name, reg_results, lambda: combine_covs_legendre(*reg_results, *reg_pycorr_names, cov_name, max_l, r_step = r_step, skip_r_bins = skip_r_bins, print_function = print_and_log)) |
| 193 | + # Recipe: run combine covs |
| 194 | + |
| 195 | + if jackknife and len(reg_results_jack) == len(regs): # if jackknife and we have RascalC jack results for all regions |
| 196 | + # Combined rescaled cov |
| 197 | + cov_name_jack = f"{cov_dir}/xi" + xilabel + "_" + "_".join(tlabels + [reg_comb]) + f"_z{z_min}-{z_max}_default_FKP_lin{r_step}_s{rmin_real}-{rmax}_cov_RascalC.txt" # combined cov name |
| 198 | + |
| 199 | + # Comb cov depends on the region RascalC results |
| 200 | + my_make(cov_name_jack, reg_results_jack, lambda: combine_covs_legendre(*reg_results_jack, *reg_pycorr_names, cov_name_jack, max_l, r_step = r_step, skip_r_bins = skip_r_bins, print_function = print_and_log)) |
| 201 | + # Recipe: run combine covs |
| 202 | + |
| 203 | +# Save the updated hash dictionary |
| 204 | +with open(hash_dict_file, "wb") as f: |
| 205 | + pickle.dump(hash_dict, f) |
| 206 | + |
| 207 | +print_and_log(datetime.now()) |
| 208 | +print_and_log("Finished execution.") |
0 commit comments