From cf58e66586da0e6139ac280a6c1c5953947253f8 Mon Sep 17 00:00:00 2001 From: Giacomo Galloni Date: Tue, 27 Aug 2024 17:00:34 +0200 Subject: [PATCH] Initial type-hinting attempt --- soliket/bandpass/bandpass.py | 24 +- soliket/bias/bias.py | 19 +- soliket/cash/cash.py | 13 +- soliket/cash/cash_data.py | 15 +- soliket/ccl/ccl.py | 11 +- soliket/clusters/clusters.py | 42 ++-- soliket/clusters/massfunc.py | 24 +- soliket/clusters/survey.py | 44 ++-- soliket/clusters/sz_utils.py | 56 ++--- soliket/clusters/tinker.py | 33 ++- .../cross_correlation/cross_correlation.py | 238 ++++++++---------- soliket/foreground/foreground.py | 59 +++-- soliket/gaussian/gaussian.py | 43 ++-- soliket/gaussian/gaussian_data.py | 74 +++--- soliket/halo_model/halo_model.py | 34 ++- soliket/lensing/lensing.py | 41 +-- soliket/mflike/mflike.py | 28 ++- soliket/mflike/theoryforge_MFLike.py | 31 +-- soliket/poisson/poisson.py | 19 +- soliket/poisson/poisson_data.py | 36 ++- soliket/ps/ps.py | 28 ++- soliket/utils.py | 9 +- soliket/xcorr/limber.py | 35 ++- soliket/xcorr/xcorr.py | 39 +-- 24 files changed, 530 insertions(+), 465 deletions(-) diff --git a/soliket/bandpass/bandpass.py b/soliket/bandpass/bandpass.py index 149a53b3..e76e9c00 100644 --- a/soliket/bandpass/bandpass.py +++ b/soliket/bandpass/bandpass.py @@ -87,7 +87,7 @@ """ import os -from typing import Optional +from typing import List, Optional, Union import numpy as np from cobaya.log import LoggedError @@ -100,7 +100,7 @@ # Numerical factors not included, it needs proper normalization when used. -def _cmb2bb(nu): +def _cmb2bb(nu: np.ndarray) -> np.ndarray: r""" Computes the conversion factor :math:`\frac{\partial B_{\nu}}{\partial T}` from CMB thermodynamic units to differential source intensity. @@ -132,9 +132,9 @@ def initialize(self): "bandint_shift_LAT_145", "bandint_shift_LAT_225"] - self.exp_ch = None - # self.eff_freqs = None - self.bands = None + self.exp_ch: Optional[List[str]] = None + # self.eff_freqs: Optional[list] = None + self.bands: dict = None # To read passbands stored in the sacc files # default for mflike @@ -171,7 +171,7 @@ def initialize_with_params(self): self.log, "Configuration error in parameters: %r.", differences) - def must_provide(self, **requirements): + def must_provide(self, **requirements: dict): # bandint_freqs is required by Foreground # and requires some params to be computed # Assign those from Foreground @@ -180,7 +180,7 @@ def must_provide(self, **requirements): self.exp_ch = [k.replace("_s0", "") for k in self.bands.keys() if "_s0" in k] - def calculate(self, state, want_derived=False, **params_values_dict): + def calculate(self, state: dict, want_derived=False, **params_values_dict: dict): r""" Adds the bandpass transmission to the ``state`` dictionary of the BandPass Theory class. @@ -198,7 +198,7 @@ def calculate(self, state, want_derived=False, **params_values_dict): state["bandint_freqs"] = self.bandint_freqs - def get_bandint_freqs(self): + def get_bandint_freqs(self) -> dict: """ Returns the ``state`` dictionary of bandpass transmissions """ @@ -206,7 +206,9 @@ def get_bandint_freqs(self): # Takes care of the bandpass construction. It returns a list of nu-transmittance for # each frequency or an array with the effective freqs. - def _bandpass_construction(self, **params): + def _bandpass_construction( + self, **params: dict + ) -> Union[np.ndarray, List[np.ndarray]]: r""" Builds the bandpass transmission :math:`\frac{\frac{\partial B_{\nu+\Delta \nu}}{\partial T} @@ -277,7 +279,7 @@ def _bandpass_construction(self, **params): return bandint_freqs - def _init_external_bandpass_construction(self, path, exp_ch): + def _init_external_bandpass_construction(self, path: str, exp_ch: List[str]): """ Initializes the passband reading for ``_external_bandpass_construction``. @@ -290,7 +292,7 @@ def _init_external_bandpass_construction(self, path, exp_ch): nu_ghz, bp = np.loadtxt(path + "/" + expc, usecols=(0, 1), unpack=True) self.external_bandpass.append([expc, nu_ghz, bp]) - def _external_bandpass_construction(self, **params): + def _external_bandpass_construction(self, **params: dict) -> List[np.ndarray]: r""" Builds bandpass transmission :math:`\frac{\frac{\partial B_{\nu+\Delta \nu}}{\partial T} diff --git a/soliket/bias/bias.py b/soliket/bias/bias.py index 94a14258..b4b1bcd0 100644 --- a/soliket/bias/bias.py +++ b/soliket/bias/bias.py @@ -26,24 +26,27 @@ function (have a look at the linear bias model for ideas). """ +from typing import Dict, Set, Tuple + import numpy as np -from cobaya.theory import Theory +from cobaya.theory import Provider, Theory class Bias(Theory): """Parent class for bias models.""" + provider: Provider _logz = np.linspace(-3, np.log10(1100), 150) _default_z_sampling = 10 ** _logz _default_z_sampling[0] = 0 - def initialize(self): - self._var_pairs = set() + def initialize(self) -> None: + self._var_pairs: Set[Tuple[str, str]] = set() - def get_requirements(self): + def get_requirements(self) -> Dict[str, dict]: return {} - def must_provide(self, **requirements): + def must_provide(self, **requirements: dict) -> Dict[str, dict]: options = requirements.get("linear_bias") or {} self.kmax = max(self.kmax, options.get("kmax", self.kmax)) @@ -52,7 +55,7 @@ def must_provide(self, **requirements): np.atleast_1d(self.z)))) # Dictionary of the things needed from CAMB/CLASS - needs = {} + needs: Dict[str, dict] = {} self.nonlinear = self.nonlinear or options.get("nonlinear", False) self._var_pairs.update( @@ -69,7 +72,7 @@ def must_provide(self, **requirements): assert len(self._var_pairs) < 2, "Bias doesn't support other Pk yet" return needs - def _get_Pk_mm(self): + def _get_Pk_mm(self) -> np.ndarray: self.k, self.z, Pk_mm = \ self.provider.get_Pk_grid(var_pair=list(self._var_pairs)[0], nonlinear=self.nonlinear) @@ -90,7 +93,7 @@ class Linear_bias(Bias): """ def calculate(self, state: dict, want_derived: bool = True, - **params_values_dict): + **params_values_dict) -> None: Pk_mm = self._get_Pk_mm() state["Pk_gg_grid"] = params_values_dict["b_lin"] ** 2. * Pk_mm diff --git a/soliket/cash/cash.py b/soliket/cash/cash.py index 3f071a93..a0ba06f1 100644 --- a/soliket/cash/cash.py +++ b/soliket/cash/cash.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Tuple import numpy as np from cobaya.likelihood import Likelihood from .cash_data import CashCData @@ -9,25 +9,24 @@ class CashCLikelihood(Likelihood): name: str = "Cash-C" - datapath = Optional[str] - - def initialize(self): + datapath: Optional[str] = None + def initialize(self) -> None: x, N = self._get_data() self.data = CashCData(self.name, N) - def _get_data(self): + def _get_data(self) -> Tuple[np.ndarray, np.ndarray]: data = np.loadtxt(self.datapath, unpack=False) N = data[:, -1] # assume data stored like column_stack([z, q, N]) x = data[:, :-1] return x, N - def _get_theory(self, **kwargs): + def _get_theory(self, **kwargs: dict) -> np.ndarray: if "cash_test_logp" in kwargs: return np.arange(kwargs["cash_test_logp"]) else: raise NotImplementedError - def logp(self, **params_values): + def logp(self, **params_values: dict) -> float: theory = self._get_theory(**params_values) return self.data.loglike(theory) diff --git a/soliket/cash/cash_data.py b/soliket/cash/cash_data.py index 4075c122..55146b1a 100644 --- a/soliket/cash/cash_data.py +++ b/soliket/cash/cash_data.py @@ -1,8 +1,13 @@ +from typing import Union import numpy as np from scipy.special import factorial -def cash_c_logpdf(theory, data, usestirling=True): +def cash_c_logpdf( + theory: Union[np.ndarray, float], + data: Union[np.ndarray, float], + usestirling: bool = True +) -> float: data = np.asarray(data, dtype=int) ln_fac = np.zeros_like(data, dtype=float) @@ -24,13 +29,15 @@ class CashCData: """Named multi-dimensional Cash-C distributed data """ - def __init__(self, name, N, usestirling=True): + def __init__( + self, name: str, N: Union[np.ndarray, float], usestirling: bool = True + ) -> None: self.name = str(name) self.data = N self.usestirling = usestirling - def __len__(self): + def __len__(self) -> int: return len(self.data) - def loglike(self, theory): + def loglike(self, theory: Union[np.ndarray, float]) -> float: return cash_c_logpdf(theory, self.data) diff --git a/soliket/ccl/ccl.py b/soliket/ccl/ccl.py index ec34bbeb..8766ffe8 100644 --- a/soliket/ccl/ccl.py +++ b/soliket/ccl/ccl.py @@ -79,8 +79,8 @@ # https://cobaya.readthedocs.io/en/devel/theories_and_dependencies.html import numpy as np -from typing import Sequence -from cobaya.theory import Theory +from typing import Dict, Sequence +from cobaya.theory import Provider, Theory from cobaya.tools import LoggedError @@ -92,6 +92,7 @@ class CCL(Theory): kmax: float z: np.ndarray nonlinear: bool + provider: Provider def initialize(self) -> None: try: @@ -125,7 +126,7 @@ def must_provide(self, **requirements) -> dict: np.atleast_1d(self.z)))) # Dictionary of the things CCL needs from CAMB/CLASS - needs = {} + needs: Dict[str, dict] = {} if self.kmax: self.nonlinear = self.nonlinear or options.get('nonlinear', False) @@ -156,7 +157,7 @@ def get_can_support_params(self) -> Sequence[str]: return [] def calculate(self, state: dict, want_derived: bool = True, - **params_values_dict): + **params_values_dict) -> None: # calculate the general CCL cosmo object which likelihoods can then use to get # what they need (likelihoods should cache results appropriately) # get our requirements from self.provider @@ -231,5 +232,5 @@ def calculate(self, state: dict, want_derived: bool = True, for required_result, method in self._required_results.items(): state['CCL'][required_result] = method(cosmo) - def get_CCL(self): + def get_CCL(self) -> dict: return self._current_state['CCL'] diff --git a/soliket/clusters/clusters.py b/soliket/clusters/clusters.py index ae7a5390..21c6b783 100644 --- a/soliket/clusters/clusters.py +++ b/soliket/clusters/clusters.py @@ -18,18 +18,19 @@ p """ import os +from typing import Dict import numpy as np import pandas as pd from scipy.interpolate import interp1d +from soliket.constants import C_KM_S from soliket.clusters import massfunc as mf from soliket.poisson import PoissonLikelihood from .survey import SurveyData from .sz_utils import szutils, trapezoid from cobaya import LoggedError - -C_KM_S = 2.99792e5 +from cobaya.theory import Provider class SZModel: @@ -42,6 +43,7 @@ class ClusterLikelihood(PoissonLikelihood): """ name = "Clusters" columns = ["tsz_signal", "z", "tsz_signal_err"] + provider: Provider # data_name = resource_filename("soliket", # "clusters/data/MFMF_WebSkyHalos_A10tSZ_3freq_tiles_mass.fits") @@ -96,45 +98,44 @@ def get_requirements(self): # # model.szk = SZTracer(cosmo) # return model - def _get_catalog(self): + def _get_catalog(self) -> pd.DataFrame: self.survey = SurveyData( self.data_path, self.data_name ) # , MattMock=False,tiles=False) self.szutils = szutils(self.survey) - df = pd.DataFrame( + return pd.DataFrame( { "z": self.survey.clst_z.byteswap().newbyteorder(), "tsz_signal": self.survey.clst_y0.byteswap().newbyteorder(), "tsz_signal_err": self.survey.clst_y0err.byteswap().newbyteorder(), } ) - return df - def _get_om(self): + def _get_om(self) -> float: return (self.provider.get_param("omch2") + self.provider.get_param("ombh2")) / ( (self.provider.get_param("H0") / 100.0) ** 2 ) - def _get_ob(self): + def _get_ob(self) -> float: return (self.provider.get_param("ombh2")) / ( (self.provider.get_param("H0") / 100.0) ** 2 ) - def _get_Ez(self): + def _get_Ez(self) -> np.ndarray: return self.provider.get_Hubble(self.zarr) / self.provider.get_param("H0") - def _get_Ez_interpolator(self): + def _get_Ez_interpolator(self) -> interp1d: return interp1d(self.zarr, self._get_Ez()) - def _get_DAz(self): + def _get_DAz(self) -> np.ndarray: return self.provider.get_angular_diameter_distance(self.zarr) - def _get_DAz_interpolator(self): + def _get_DAz_interpolator(self) -> interp1d: return interp1d(self.zarr, self._get_DAz()) - def _get_HMF(self): + def _get_HMF(self) -> mf.HMF: h = self.provider.get_param("H0") / 100.0 Pk_interpolator = self.provider.get_Pk_interpolator( @@ -149,11 +150,9 @@ def _get_HMF(self): ) # self.provider.get_Hubble(self.zarr) / self.provider.get_param("H0") om = self._get_om() - hmf = mf.HMF(om, Ez, pk=pks * h ** 3, kh=self.k / h, zarr=self.zarr) - - return hmf + return mf.HMF(om, Ez, pk=pks * h ** 3, kh=self.k / h, zarr=self.zarr) - def _get_param_vals(self, **kwargs): + def _get_param_vals(self, **kwargs) -> Dict[str, float]: # Read in scaling relation parameters # scat = kwargs['scat'] # massbias = kwargs['massbias'] @@ -190,7 +189,7 @@ def _get_rate_fn(self, **kwargs): h = self.provider.get_param("H0") / 100.0 - def Prob_per_cluster(z, tsz_signal, tsz_signal_err): + def Prob_per_cluster(z, tsz_signal, tsz_signal_err) -> np.ndarray: c_y = tsz_signal c_yerr = tsz_signal_err c_z = z @@ -201,13 +200,12 @@ def Prob_per_cluster(z, tsz_signal, tsz_signal_err): dn_dzdm = 10 ** np.squeeze(dn_dzdm_interp((np.log10(HMF.M), c_z))) * h ** 4.0 - ans = trapezoid(dn_dzdm * Pfunc_ind, dx=np.diff(HMF.M, axis=0), axis=0) - return ans + return trapezoid(dn_dzdm * Pfunc_ind, dx=np.diff(HMF.M, axis=0), axis=0) return Prob_per_cluster # Implement a function that returns a rate function (function of (tsz_signal, z)) - def _get_dVdz(self): + def _get_dVdz(self) -> np.ndarray: DA_z = self.provider.get_angular_diameter_distance(self.zarr) dV_dz = ( @@ -219,7 +217,7 @@ def _get_dVdz(self): # dV_dz *= (self.provider.get_param("H0") / 100.0) ** 3.0 # was h0 return dV_dz - def _get_n_expected(self, **kwargs): + def _get_n_expected(self, **kwargs) -> float: """ Calculates expected number of clusters at the current parameter values. """ @@ -253,7 +251,7 @@ def _get_n_expected(self, **kwargs): return Ntot - def _test_n_tot(self, **kwargs): + def _test_n_tot(self, **kwargs) -> float: HMF = self._get_HMF() # param_vals = self._get_param_vals(**kwargs) # Ez_fn = self._get_Ez_interpolator() diff --git a/soliket/clusters/massfunc.py b/soliket/clusters/massfunc.py index 0f944391..62a80959 100644 --- a/soliket/clusters/massfunc.py +++ b/soliket/clusters/massfunc.py @@ -1,7 +1,7 @@ """ .. module:: massfunction -The ``HMF`` class build the halo mass function internally required for the cluster +The ``HMF`` class build the halo mass function internally required for the cluster likelihood. Calculates the Halo Mass Function as in Tinker et al (2008) [2]_ . """ @@ -51,25 +51,22 @@ def __init__(self, om, Ez, pk=None, kh=None, zarr=None): self.kh = kh # self.kh, self.pk = self._pk(self.zarr) - def rhoc(self): + def rhoc(self) -> np.ndarray: """ Critical density as a function of z """ - ans = self.rho_crit0H100 * self.E_z ** 2. - return ans + return self.rho_crit0H100 * self.E_z ** 2. - def rhom(self): + def rhom(self) -> np.ndarray: """ Mean matter density as a function of z """ - ans = self.rhoc0om * (1.0 + self.zarr) ** 3 - return ans + return self.rhoc0om * (1.0 + self.zarr) ** 3 - def critdensThreshold(self, deltac): - rho_treshold = deltac * self.rhoc() / self.rhom() - return rho_treshold + def critdensThreshold(self, deltac) -> np.ndarray: + return deltac * self.rhoc() / self.rhom() - def dn_dM(self, M, delta): + def dn_dM(self, M, delta) -> np.ndarray: """ dN/dmdV Mass Function @@ -82,13 +79,12 @@ def dn_dM(self, M, delta): dn_dm = dn_dlnm / M[:, None] return dn_dm - def inter_dndmLogm(self, delta, M=None): + def inter_dndmLogm(self, delta, M=None) -> RegularGridInterpolator: """ Interpolating over M and z for faster calculations """ if M is None: M = self.M dndM = self.dn_dM(M, delta) - ans = RegularGridInterpolator((np.log10(M), self.zarr), + return RegularGridInterpolator((np.log10(M), self.zarr), np.log10(dndM), method='cubic', fill_value=0) - return ans diff --git a/soliket/clusters/survey.py b/soliket/clusters/survey.py index 33617028..22eda035 100644 --- a/soliket/clusters/survey.py +++ b/soliket/clusters/survey.py @@ -8,6 +8,7 @@ """ import os +from typing import Any, List, Optional, Tuple, TypeAlias, Union import astropy.io.fits as pyfits import astropy.table as atpy @@ -17,8 +18,13 @@ from astropy.wcs import WCS from scipy import interpolate +# TODO: Placeholder for the NemoConfig type +NemoConfig: TypeAlias = Any -def read_clust_cat(fitsfile, qmin): + +def read_clust_cat( + fitsfile: str, qmin: float +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: list = fits.open(fitsfile) data = list[1].data SNR = data.field("SNR2p4") @@ -31,7 +37,9 @@ def read_clust_cat(fitsfile, qmin): return z[ind], zerr[ind], Y0[ind], Y0err[ind] -def read_mock_cat(fitsfile, qmin): +def read_mock_cat( + fitsfile: str, qmin: float +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: list = fits.open(fitsfile) data = list[1].data SNR = data.field("fixed_SNR") @@ -43,7 +51,9 @@ def read_mock_cat(fitsfile, qmin): return z[ind], zerr[ind], Y0[ind], Y0err[ind] -def read_matt_mock_cat(fitsfile, qmin): +def read_matt_mock_cat( + fitsfile: str, qmin: float +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: list = fits.open(fitsfile) data = list[1].data # ra = data.field("RADeg") @@ -58,7 +68,9 @@ def read_matt_mock_cat(fitsfile, qmin): return z[ind], zerr[ind], Y0[ind], Y0err[ind] -def read_matt_cat(fitsfile, qmin): +def read_matt_cat( + fitsfile: str, qmin: float +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: list = fits.open(fitsfile) data = list[1].data z = data.field("redshift") @@ -70,12 +82,12 @@ def read_matt_cat(fitsfile, qmin): return z[ind], zerr[ind], Y0[ind], Y0err[ind] -def loadAreaMask(extName, DIR): +def loadAreaMask(extName: str, DIR: str) -> Tuple[np.ndarray, WCS]: """Loads the survey area mask (i.e., after edge-trimming and point source masking, produced by nemo). Returns map array, wcs """ - areaImg = pyfits.open(os.path.join(DIR, "areaMask%s.fits.gz" % extName)) + areaImg = pyfits.open(os.path.join(DIR, f"areaMask{extName}.fits.gz")) areaMap = areaImg[0].data wcs = WCS(areaImg[0].header) # , mode="pyfits") areaImg.close() @@ -83,12 +95,12 @@ def loadAreaMask(extName, DIR): return areaMap, wcs -def loadRMSmap(extName, DIR): +def loadRMSmap(extName: str, DIR: str) -> Tuple[np.ndarray, WCS]: """Loads the survey RMS map (produced by nemo). Returns map array, wcs """ areaImg = pyfits.open( - os.path.join(DIR, "RMSMap_Arnaud_M2e14_z0p4%s.fits.gz" % extName) + os.path.join(DIR, f"RMSMap_Arnaud_M2e14_z0p4{extName}.fits.gz") ) areaMap = areaImg[0].data wcs = WCS(areaImg[0].header) # , mode="pyfits") @@ -97,7 +109,9 @@ def loadRMSmap(extName, DIR): return areaMap, wcs -def loadQ(source, tileNames=None): +def loadQ( + source: Union[NemoConfig, str], tileNames: Optional[List[str]] = None +) -> dict: """Load the filter mismatch function Q as a dictionary of spline fits. Args: source (NemoConfig or str): Either the path to a .fits table (containing Q fits @@ -109,10 +123,10 @@ def loadQ(source, tileNames=None): A dictionary (with tile names as keys), containing spline knots for the Q function for each tile. """ - if type(source) == str: + if isinstance(source, str): combinedQTabFileName = source else: - # We should add a check to confirm this is actually a NemoConfig object + # TODO: We should add a check to confirm this is actually a NemoConfig object combinedQTabFileName = os.path.join(source.selFnDir, "QFit.fits") tileNames = source.tileNames tckDict = {} @@ -143,10 +157,10 @@ def __init__( nemoOutputDir, ClusterCat, qmin=5.6, + num_noise_bins=20, szarMock=False, MattMock=False, tiles=False, - num_noise_bins=20, ): self.nemodir = nemoOutputDir @@ -174,10 +188,10 @@ def __init__( if tiles: self.filetile = self.nemodir + "/tileAreas.txt" self.tilenames = np.loadtxt( - self.filetile, dtype=np.str, usecols=0, unpack=True + self.filetile, dtype=str, usecols=0, unpack=True ) self.tilearea = np.loadtxt( - self.filetile, dtype=np.float, usecols=1, unpack=True + self.filetile, dtype=float, usecols=1, unpack=True ) self.fsky = [] @@ -212,7 +226,7 @@ def __init__( self.Ythresh = 10 ** ((bin_edge[:-1] + bin_edge[1:]) / 2.0) @property - def Q(self): + def Q(self) -> np.ndarray: if self.tiles: return self.tckQFit["Q"] else: diff --git a/soliket/clusters/sz_utils.py b/soliket/clusters/sz_utils.py index 800ba181..0102d405 100644 --- a/soliket/clusters/sz_utils.py +++ b/soliket/clusters/sz_utils.py @@ -26,7 +26,7 @@ / G_CGS * MPC2CM / MSUN_CGS -def gaussian(xx, mu, sig, noNorm=False): +def gaussian(xx, mu, sig, noNorm=False) -> np.ndarray: if noNorm: return np.exp(-1.0 * (xx - mu) ** 2 / (2.0 * sig ** 2.0)) else: @@ -42,7 +42,7 @@ def __init__(self, Survey): # self.rho_crit0H100 = (3. / (8. * np.pi) * \ # (100. * 1.e5)**2.) / G_in_cgs * Mpc_in_cm / MSun_in_g - def P_Yo(self, LgY, M, z, param_vals, Ez_fn, Da_fn): + def P_Yo(self, LgY, M, z, param_vals, Ez_fn, Da_fn) -> np.ndarray: H0 = param_vals["H0"] Ma = np.outer(M, np.ones(len(LgY[0, :]))) @@ -65,13 +65,12 @@ def P_Yo(self, LgY, M, z, param_vals, Ez_fn, Da_fn): # print ("M,z,y~",M[ind],z,Ytilde[ind,0]) numer = -1.0 * (np.log(Y / Ytilde)) ** 2 - ans = ( + return ( 1.0 / (param_vals["scat"] * np.sqrt(2 * np.pi)) * np.exp(numer / (2.0 * param_vals["scat"] ** 2)) ) - return ans - def P_Yo_vec(self, LgY, M, z, param_vals, Ez_fn, Da_fn): + def P_Yo_vec(self, LgY, M, z, param_vals, Ez_fn, Da_fn) -> np.ndarray: H0 = param_vals["H0"] # Ma = np.outer(M, np.ones(len(LgY[0, :]))) @@ -90,19 +89,18 @@ def P_Yo_vec(self, LgY, M, z, param_vals, Ez_fn, Da_fn): Ytilde = np.repeat(Ytilde[:, :, np.newaxis], LgY.shape[2], axis=2) numer = -1.0 * (np.log(Y / Ytilde)) ** 2 - ans = ( + return ( 1.0 / (param_vals["scat"] * np.sqrt(2 * np.pi)) * np.exp(numer / (2.0 * param_vals["scat"] ** 2)) ) - return ans - def Y_erf(self, Y, Ynoise): + def Y_erf(self, Y, Ynoise) -> np.ndarray: qmin = self.Survey.qmin ans = Y * 0.0 ans[Y - qmin * Ynoise > 0] = 1.0 return ans - def P_of_gt_SN(self, LgY, MM, zz, Ynoise, param_vals, Ez_fn, Da_fn): + def P_of_gt_SN(self, LgY, MM, zz, Ynoise, param_vals, Ez_fn, Da_fn) -> np.ndarray: Y = 10 ** LgY sig_tr = np.outer(np.ones([MM.shape[0], MM.shape[1]]), self.Y_erf(Y, Ynoise)) @@ -114,46 +112,42 @@ def P_of_gt_SN(self, LgY, MM, zz, Ynoise, param_vals, Ez_fn, Da_fn): P_Y = np.nan_to_num(self.P_Yo_vec(LgYa2, MM, zz, param_vals, Ez_fn, Da_fn)) - ans = trapezoid(P_Y * sig_thresh, x=LgY, axis=2) * np.log(10) - return ans + return trapezoid(P_Y * sig_thresh, x=LgY, axis=2) * np.log(10) - def PfuncY(self, YNoise, M, z_arr, param_vals, Ez_fn, Da_fn): + def PfuncY(self, YNoise, M, z_arr, param_vals, Ez_fn, Da_fn) -> np.ndarray: LgY = self.LgY - P_func = np.outer(M, np.zeros([len(z_arr)])) M_arr = np.outer(M, np.ones([len(z_arr)])) - P_func = self.P_of_gt_SN(LgY, M_arr, z_arr, YNoise, param_vals, Ez_fn, Da_fn) - return P_func + return self.P_of_gt_SN(LgY, M_arr, z_arr, YNoise, param_vals, Ez_fn, Da_fn) - def P_of_Y_per(self, LgY, MM, zz, Y_c, Y_err, param_vals): + def P_of_Y_per(self, LgY, MM, zz, Y_c, Y_err, param_vals) -> np.ndarray: P_Y_sig = np.outer(np.ones(len(MM)), self.Y_prob(Y_c, LgY, Y_err)) LgYa = np.outer(np.ones([MM.shape[0], MM.shape[1]]), LgY) LgYa2 = np.reshape(LgYa, (MM.shape[0], MM.shape[1], len(LgY))) P_Y = np.nan_to_num(self.P_Yo(LgYa2, MM, zz, param_vals)) - ans = trapezoid(P_Y * P_Y_sig, LgY, np.diff(LgY), axis=1) * np.log(10) - return ans + return trapezoid(P_Y * P_Y_sig, LgY, np.diff(LgY), axis=1) * np.log(10) - def Y_prob(self, Y_c, LgY, YNoise): + def Y_prob(self, Y_c, LgY, YNoise) -> np.ndarray: Y = 10 ** LgY - ans = gaussian(Y, Y_c, YNoise) - return ans + return gaussian(Y, Y_c, YNoise) - def Pfunc_per(self, MM, zz, Y_c, Y_err, param_vals, Ez_fn, Da_fn): + def Pfunc_per(self, MM, zz, Y_c, Y_err, param_vals, Ez_fn, Da_fn) -> np.ndarray: LgY = self.LgY LgYa = np.outer(np.ones(len(MM)), LgY) P_Y_sig = self.Y_prob(Y_c, LgY, Y_err) P_Y = np.nan_to_num(self.P_Yo(LgYa, MM, zz, param_vals, Ez_fn, Da_fn)) - ans = trapezoid(P_Y * P_Y_sig, LgY, np.diff(LgY), axis=1) - return ans + return trapezoid(P_Y * P_Y_sig, LgY, np.diff(LgY), axis=1) - def Pfunc_per_parallel(self, Marr, zarr, Y_c, Y_err, param_vals, Ez_fn, Da_fn): + def Pfunc_per_parallel( + self, Marr, zarr, Y_c, Y_err, param_vals, Ez_fn, Da_fn + ) -> np.ndarray: # LgY = self.LgY # LgYa = np.outer(np.ones(Marr.shape[0]), LgY) @@ -172,11 +166,9 @@ def Pfunc_per_parallel(self, Marr, zarr, Y_c, Y_err, param_vals, Ez_fn, Da_fn): P_Y_sig = self.Y_prob(Y_c, self.LgY, Y_err) P_Y = np.nan_to_num(self.P_Yo(self.LgY, Marr, zarr, param_vals, Ez_fn, Da_fn)) - ans = trapezoid(P_Y * P_Y_sig, x=self.LgY, axis=2) - - return ans + return trapezoid(P_Y * P_Y_sig, x=self.LgY, axis=2) - def Pfunc_per_zarr(self, MM, z_c, Y_c, Y_err, int_HMF, param_vals): + def Pfunc_per_zarr(self, MM, z_c, Y_c, Y_err, param_vals) -> np.ndarray: LgY = self.LgY # old was z_arr @@ -185,9 +177,7 @@ def Pfunc_per_zarr(self, MM, z_c, Y_c, Y_err, int_HMF, param_vals): # M200 = np.outer(MM, np.zeros([len(z_arr)])) # zarr = np.outer(np.ones([len(M)]), z_arr) - P_func = self.P_of_Y_per(LgY, MM, z_c, Y_c, Y_err, param_vals) - - return P_func + return self.P_of_Y_per(LgY, MM, z_c, Y_c, Y_err, param_vals) ### @@ -389,7 +379,7 @@ def y0FromLogM500( """ - if type(Mpivot) == str: + if isinstance(Mpivot, str): raise Exception( "Mpivot is a string - check Mpivot in your .yml config file:\ use, e.g., 3.0e+14 (not 3e14 or 3e+14)" diff --git a/soliket/clusters/tinker.py b/soliket/clusters/tinker.py index 55d160bb..13af58b0 100644 --- a/soliket/clusters/tinker.py +++ b/soliket/clusters/tinker.py @@ -5,6 +5,7 @@ """ +from typing import Optional, Tuple, Union import numpy as np from scipy.integrate import simpson from scipy.interpolate import InterpolatedUnivariateSpline as iuSpline @@ -26,7 +27,9 @@ tinker_splines = None -def tinker_params_spline(delta, z=None): +def tinker_params_spline( + delta: float, z: Optional[Union[float, np.ndarray]] = None +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: global tinker_splines if tinker_splines is None: tinker_splines = [] @@ -50,7 +53,9 @@ def tinker_params_spline(delta, z=None): return A, a, b, c -def tinker_params_analytic(delta, z=None): +def tinker_params_analytic( + delta: Union[float, np.ndarray], z: Optional[Union[float, np.ndarray]] = None +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: alpha = None if np.asarray(delta).ndim == 0: # scalar delta. A0, a0, b0, c0 = [p[0] for p in @@ -87,21 +92,25 @@ def tinker_params_analytic(delta, z=None): tinker_params = tinker_params_spline -def tinker_f(sigma, params): +def tinker_f( + sigma: Union[float, np.ndarray], params: Tuple[float, float, float, float] +) -> Union[float, np.ndarray]: A, a, b, c = params return A * ((sigma / b) ** -a + 1) * np.exp(-c / sigma ** 2) # Sigma-evaluation, and top-hat functions. -def radius_from_mass(M, rho): +def radius_from_mass( + M: Union[float, np.ndarray], rho: Union[float, np.ndarray] +) -> Union[float, np.ndarray]: """ Convert mass M to radius R assuming density rho. """ return (3. * M / (4. * np.pi * rho)) ** (1 / 3.) -def top_hatf(kR): +def top_hatf(kR: Union[float, np.ndarray]) -> Union[float, np.ndarray]: """ Returns the Fourier transform of the spherical top-hat function evaluated at a given k*R. @@ -114,7 +123,9 @@ def top_hatf(kR): return out -def sigma_sq_integral(R_grid, power_spt, k_val): +def sigma_sq_integral( + R_grid: np.ndarray, power_spt: np.ndarray, k_val: np.ndarray +) -> np.ndarray: """ Determines the sigma^2 parameter over the m-z grid by integrating over k. Notes: @@ -132,7 +143,15 @@ def sigma_sq_integral(R_grid, power_spt, k_val): return simpson(to_integ / (2 * np.pi ** 2), x=k_val, axis=0) -def dn_dlogM(M, z, rho, delta, k, P, comoving=False): +def dn_dlogM( + M: np.ndarray, + z: np.ndarray, + rho: np.ndarray, + delta: Union[float, np.ndarray], + k: np.ndarray, + P: np.ndarray, + comoving: bool = False +) -> np.ndarray: """ M is (nM) or (nM, nz) z is (nz) diff --git a/soliket/cross_correlation/cross_correlation.py b/soliket/cross_correlation/cross_correlation.py index b6cbccba..fb29d827 100644 --- a/soliket/cross_correlation/cross_correlation.py +++ b/soliket/cross_correlation/cross_correlation.py @@ -5,14 +5,19 @@ :Authors: Pablo Lemos, Ian Harrison. """ +from typing import ClassVar, Dict, List, Optional, Tuple import numpy as np + try: from numpy import trapezoid except ImportError: from numpy import trapz as trapezoid import sacc from cobaya.log import LoggedError +from cobaya.theory import Provider +from pyccl import Tracer +from soliket import CCL from soliket.gaussian import GaussianData, GaussianLikelihood @@ -21,19 +26,21 @@ class CrossCorrelationLikelihood(GaussianLikelihood): Generic likelihood for cross-correlations of CCL tracer objects. """ + provider: Provider + def initialize(self): self._get_sacc_data() self._check_tracers() - def get_requirements(self): + def get_requirements(self) -> Dict[str, dict]: return {"CCL": {"kmax": 10, "nonlinear": True}, "zstar": None} - def _get_CCL_results(self): + def _get_CCL_results(self) -> Tuple[CCL, dict]: cosmo_dict = self.provider.get_CCL() return cosmo_dict["ccl"], cosmo_dict["cosmo"] - def _check_tracers(self): + def _check_tracers(self) -> None: # check correct tracers for tracer_comb in self.sacc_data.get_tracer_combinations(): @@ -41,33 +48,38 @@ def _check_tracers(self): if (self.sacc_data.tracers[tracer_comb[0]].quantity == self.sacc_data.tracers[tracer_comb[1]].quantity): raise LoggedError(self.log, - 'You have tried to use {0} to calculate an \ - autocorrelation, but it is a cross-correlation \ - likelihood. Please check your tracer selection in the \ - ini file.'.format(self.__class__.__name__)) + f'You have tried to use {self.__class__.__name__} \ + to calculate an autocorrelation, but it is a \ + cross-correlation likelihood. Please check your \ + tracer selection in the ini file.') for tracer in tracer_comb: if self.sacc_data.tracers[tracer].quantity not in self._allowable_tracers: - raise LoggedError(self.log, - 'You have tried to use a {0} tracer in \ - {1}, which only allows {2}. Please check your \ - tracer selection in the ini file.\ - '.format(self.sacc_data.tracers[tracer].quantity, - self.__class__.__name__, - self._allowable_tracers)) - - def _get_nz(self, z, tracer, tracer_name, **params_values): - + raise LoggedError( + self.log, + f'You have tried to use a \ + {self.sacc_data.tracers[tracer].quantity} tracer in \ + {self.__class__.__name__}, which only allows \ + {self._allowable_tracers}. Please check your \ + tracer selection in the ini file.' + ) + + def _get_nz( + self, + z: np.ndarray, + tracer: Tracer, + tracer_name: str, + **params_values: dict + ) -> np.ndarray: if self.z_nuisance_mode == 'deltaz': - bias = params_values['{}_deltaz'.format(tracer_name)] + bias = params_values[f'{tracer_name}_deltaz'] nz_biased = tracer.get_dndz(z - bias) # nz_biased /= np.trapezoid(nz_biased, z) return nz_biased - def _get_sacc_data(self, **params_values): - + def _get_sacc_data(self, **params_values: dict) -> None: self.sacc_data = sacc.Sacc.load_fits(self.datapath) if self.use_spectra == 'all': @@ -83,8 +95,7 @@ def _get_sacc_data(self, **params_values): self.data = GaussianData(self.name, self.x, self.y, self.cov, self.ncovsims) - def _construct_ell_bins(self): - + def _construct_ell_bins(self) -> np.ndarray: ell_eff = [] for tracer_comb in self.sacc_data.get_tracer_combinations(): @@ -94,7 +105,9 @@ def _construct_ell_bins(self): return np.concatenate(ell_eff) - def _get_data(self, **params_values): + def _get_data( + self, **params_values: dict + ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: data_auto = np.loadtxt(self.auto_file) data_cross = np.loadtxt(self.cross_file) @@ -113,8 +126,7 @@ def _get_data(self, **params_values): return x, y, dy - def get_binning(self, tracer_comb): - + def get_binning(self, tracer_comb: tuple) -> Tuple[np.ndarray, np.ndarray]: bpw_idx = self.sacc_data.indices(tracers=tracer_comb) bpw = self.sacc_data.get_bandpower_windows(bpw_idx) ells_theory = bpw.values @@ -123,7 +135,7 @@ def get_binning(self, tracer_comb): return ells_theory, w_bins - def logp(self, **params_values): + def logp(self, **params_values: dict) -> float: theory = self._get_theory(**params_values) return self.data.loglike(theory) @@ -132,16 +144,14 @@ class GalaxyKappaLikelihood(CrossCorrelationLikelihood): r""" Likelihood for cross-correlations of galaxy and CMB lensing data. """ - _allowable_tracers = ['cmb_convergence', 'galaxy_density'] - - def _get_theory(self, **params_values): + _allowable_tracers: ClassVar[List[str]] = ['cmb_convergence', 'galaxy_density'] + def _get_theory(self, **params_values: dict) -> np.ndarray: ccl, cosmo = self._get_CCL_results() tracer_comb = self.sacc_data.get_tracer_combinations() for tracer in np.unique(tracer_comb): - if self.sacc_data.tracers[tracer].quantity == "cmb_convergence": cmbk_tracer = tracer elif self.sacc_data.tracers[tracer].quantity == "galaxy_density": @@ -151,16 +161,13 @@ def _get_theory(self, **params_values): nz_gal_tracer = self.sacc_data.tracers[gal_tracer].nz # this should use the bias theory! - tracer_g = ccl.NumberCountsTracer(cosmo, - has_rsd=False, - dndz=(z_gal_tracer, nz_gal_tracer), - bias=(z_gal_tracer, - params_values["b1"] * - np.ones(len(z_gal_tracer))), - mag_bias=(z_gal_tracer, - params_values["s1"] * - np.ones(len(z_gal_tracer))) - ) + tracer_g = ccl.NumberCountsTracer( + cosmo, + has_rsd=False, + dndz=(z_gal_tracer, nz_gal_tracer), + bias=(z_gal_tracer, params_values["b1"] * np.ones(len(z_gal_tracer))), + mag_bias=(z_gal_tracer, params_values["s1"] * np.ones(len(z_gal_tracer))), + ) tracer_k = ccl.CMBLensingTracer(cosmo, z_source=self.provider.get_param('zstar')) ells_theory_gk, w_bins_gk = self.get_binning((gal_tracer, cmbk_tracer)) @@ -176,115 +183,78 @@ class ShearKappaLikelihood(CrossCorrelationLikelihood): r""" Likelihood for cross-correlations of galaxy weak lensing shear and CMB lensing data. """ - _allowable_tracers = ["cmb_convergence", "galaxy_shear"] - - def _get_theory(self, **params_values): + _allowable_tracers: ClassVar[List[str]] = ["cmb_convergence", "galaxy_shear"] + def _get_theory(self, **params_values: dict) -> np.ndarray: ccl, cosmo = self._get_CCL_results() - - cl_binned_list = [] + cl_binned_list: List[np.ndarray] = [] for tracer_comb in self.sacc_data.get_tracer_combinations(): - - if self.sacc_data.tracers[tracer_comb[0]].quantity == "cmb_convergence": - tracer1 = ccl.CMBLensingTracer(cosmo, - z_source=self.provider.get_param('zstar')) - - elif self.sacc_data.tracers[tracer_comb[0]].quantity == "galaxy_shear": - - sheartracer_name = tracer_comb[0] - - z_tracer1 = self.sacc_data.tracers[tracer_comb[0]].z - nz_tracer1 = self.sacc_data.tracers[tracer_comb[0]].nz - - if self.ia_mode is None: - ia_z = None - elif self.ia_mode == 'nla': - A_IA = params_values['A_IA'] - eta_IA = params_values['eta_IA'] - z0_IA = trapezoid(z_tracer1 * nz_tracer1) - - ia_z = (z_tracer1, A_IA * ((1 + z_tracer1) / (1 + z0_IA)) ** eta_IA) - elif self.ia_mode == 'nla-perbin': - A_IA = params_values['{}_A_IA'.format(sheartracer_name)] - ia_z = (z_tracer1, A_IA * np.ones_like(z_tracer1)) - elif self.ia_mode == 'nla-noevo': - A_IA = params_values['A_IA'] - ia_z = (z_tracer1, A_IA * np.ones_like(z_tracer1)) - - tracer1 = ccl.WeakLensingTracer(cosmo, - dndz=(z_tracer1, nz_tracer1), - ia_bias=ia_z) - - if self.z_nuisance_mode is not None: - nz_tracer1 = self._get_nz(z_tracer1, - tracer1, - tracer_comb[0], - **params_values) - - tracer1 = ccl.WeakLensingTracer(cosmo, - dndz=(z_tracer1, nz_tracer1), - ia_bias=ia_z) - - if self.sacc_data.tracers[tracer_comb[1]].quantity == "cmb_convergence": - tracer2 = ccl.CMBLensingTracer(cosmo, - z_source=self.provider.get_param('zstar')) - - elif self.sacc_data.tracers[tracer_comb[1]].quantity == "galaxy_shear": - - sheartracer_name = tracer_comb[1] - - z_tracer2 = self.sacc_data.tracers[tracer_comb[1]].z - nz_tracer2 = self.sacc_data.tracers[tracer_comb[1]].nz - - if self.ia_mode is None: - ia_z = None - elif self.ia_mode == 'nla': - A_IA = params_values['A_IA'] - eta_IA = params_values['eta_IA'] - z0_IA = trapezoid(z_tracer2 * nz_tracer2) - - ia_z = (z_tracer2, A_IA * ((1 + z_tracer2) / (1 + z0_IA)) ** eta_IA) - elif self.ia_mode == 'nla-perbin': - A_IA = params_values['{}_A_IA'.format(sheartracer_name)] - ia_z = (z_tracer2, A_IA * np.ones_like(z_tracer2)) - elif self.ia_mode == 'nla-noevo': - A_IA = params_values['A_IA'] - ia_z = (z_tracer2, A_IA * np.ones_like(z_tracer2)) - - tracer2 = ccl.WeakLensingTracer(cosmo, - dndz=(z_tracer2, nz_tracer2), - ia_bias=ia_z) - - if self.z_nuisance_mode is not None: - nz_tracer2 = self._get_nz(z_tracer2, - tracer2, - tracer_comb[1], - **params_values) - - tracer2 = ccl.WeakLensingTracer(cosmo, - dndz=(z_tracer2, nz_tracer2), - ia_bias=ia_z) + tracer1 = self._get_tracer(ccl, cosmo, tracer_comb[0], params_values) + tracer2 = self._get_tracer(ccl, cosmo, tracer_comb[1], params_values) bpw_idx = self.sacc_data.indices(tracers=tracer_comb) bpw = self.sacc_data.get_bandpower_windows(bpw_idx) - ells_theory = bpw.values - ells_theory = np.asarray(ells_theory, dtype=int) + ells_theory = np.asarray(bpw.values, dtype=int) w_bins = bpw.weight.T cl_unbinned = ccl.cells.angular_cl(cosmo, tracer1, tracer2, ells_theory) - if self.m_nuisance_mode is not None: - # note this allows wrong calculation, as we can do - # shear x shear if the spectra are in the sacc - # but then we would want (1 + m1) * (1 + m2) - m_bias = params_values['{}_m'.format(sheartracer_name)] + sheartracer_name = ( + tracer_comb[1] + if self.sacc_data.tracers[tracer_comb[1]].quantity == "galaxy_shear" + else tracer_comb[0] + ) + m_bias = params_values[f'{sheartracer_name}_m'] cl_unbinned = (1 + m_bias) * cl_unbinned cl_binned = np.dot(w_bins, cl_unbinned) - cl_binned_list.append(cl_binned) cl_binned_total = np.concatenate(cl_binned_list) - return cl_binned_total + + def _get_tracer( + self, ccl: CCL, cosmo: dict, tracer_name: str, params_values: dict + ) -> Tracer: + tracer_data = self.sacc_data.tracers[tracer_name] + if tracer_data.quantity == "cmb_convergence": + return ccl.CMBLensingTracer(cosmo, z_source=self.provider.get_param('zstar')) + elif tracer_data.quantity == "galaxy_shear": + z_tracer = self.sacc_data.tracers[tracer_name].z + nz_tracer = self.sacc_data.tracers[tracer_name].nz + ia_z = self._get_ia_bias(z_tracer, nz_tracer, tracer_name, params_values) + + tracer = ccl.WeakLensingTracer( + cosmo, dndz=(z_tracer, nz_tracer), ia_bias=ia_z + ) + if self.z_nuisance_mode is not None: + nz_tracer = self._get_nz(z_tracer, tracer, tracer_name, **params_values) + tracer = ccl.WeakLensingTracer( + cosmo, dndz=(z_tracer, nz_tracer), ia_bias=ia_z + ) + return tracer + return None + + + def _get_ia_bias( + self, + z_tracer: np.ndarray, + nz_tracer: np.ndarray, + tracer_name: str, + params_values: dict + ) -> Optional[Tuple[np.ndarray, np.ndarray]]: + if self.ia_mode is None: + return None + elif self.ia_mode == 'nla': + A_IA = params_values['A_IA'] + eta_IA = params_values['eta_IA'] + z0_IA = trapezoid(z_tracer * nz_tracer) + return (z_tracer, A_IA * ((1 + z_tracer) / (1 + z0_IA)) ** eta_IA) + elif self.ia_mode == 'nla-perbin': + A_IA = params_values[f'{tracer_name}_A_IA'] + return (z_tracer, A_IA * np.ones_like(z_tracer)) + elif self.ia_mode == 'nla-noevo': + A_IA = params_values['A_IA'] + return (z_tracer, A_IA * np.ones_like(z_tracer)) + return None diff --git a/soliket/foreground/foreground.py b/soliket/foreground/foreground.py index bb9fbac7..abb42cdd 100644 --- a/soliket/foreground/foreground.py +++ b/soliket/foreground/foreground.py @@ -48,11 +48,11 @@ """ import os -from typing import Optional +from typing import Dict, List, Optional import numpy as np from cobaya.log import LoggedError -from cobaya.theory import Theory +from cobaya.theory import Provider, Theory from cobaya.tools import are_different_params_lists @@ -61,6 +61,7 @@ class Foreground(Theory): foregrounds: dict eff_freqs: Optional[list] exp_ch: Optional[list] + provider: Provider # Initializes the foreground model. It sets the SED and reads the templates def initialize(self): @@ -77,11 +78,11 @@ def initialize(self): "a_c", "beta_c", "a_s", "a_gtt", "a_gte", "a_gee", "a_psee", "a_pste", "xi", "T_d"] - self.requested_cls = self.spectra["polarizations"] - self.lmin = self.spectra["lmin"] - self.lmax = self.spectra["lmax"] - self.exp_ch = self.spectra["exp_ch"] - self.eff_freqs = self.spectra["eff_freqs"] + self.requested_cls: List[str] = self.spectra["polarizations"] + self.lmin: int = self.spectra["lmin"] + self.lmax: int = self.spectra["lmax"] + self.exp_ch: List[str] = self.spectra["exp_ch"] + self.eff_freqs: List[float] = self.spectra["eff_freqs"] if hasattr(self.eff_freqs, "__len__"): if not len(self.exp_ch) == len(self.eff_freqs): @@ -92,16 +93,19 @@ def initialize(self): # self.bands to be filled with passbands read from sacc file # if mflike is used - self.bands = {f"{expc}_s0": {'nu': [self.eff_freqs[iexpc]], 'bandpass': [1.]} - for iexpc, expc in enumerate(self.exp_ch)} + self.bands: Dict[str, Dict[str, List[float]]] = { + f"{expc}_s0": {'nu': [self.eff_freqs[iexpc]], 'bandpass': [1.]} + for iexpc, expc in enumerate(self.exp_ch) + } - template_path = os.path.join(os.path.dirname(os.path.abspath(fgp.__file__)), - 'data') - cibc_file = os.path.join(template_path, 'cl_cib_Choi2020.dat') + template_path: str = os.path.join( + os.path.dirname(os.path.abspath(fgp.__file__)), 'data' + ) + cibc_file: str = os.path.join(template_path, 'cl_cib_Choi2020.dat') # set pivot freq and multipole - self.fg_nu_0 = self.foregrounds["normalisation"]["nu_0"] - self.fg_ell_0 = self.foregrounds["normalisation"]["ell_0"] + self.fg_nu_0: float = self.foregrounds["normalisation"]["nu_0"] + self.fg_ell_0: float = self.foregrounds["normalisation"]["ell_0"] # We don't seem to be using this # cirrus = fgc.FactorizedCrossSpectrum(fgf.PowerLaw(), fgp.PowerLaw()) @@ -114,7 +118,7 @@ def initialize(self): self.dust = fgc.FactorizedCrossSpectrum(fgf.ModifiedBlackBody(), fgp.PowerLaw()) self.tSZ_and_CIB = fgc.SZxCIB_Choi2020() - self.components = self.foregrounds["components"] + self.components: List[str] = self.foregrounds["components"] def initialize_with_params(self): # Check that the parameters are the right ones @@ -127,13 +131,15 @@ def initialize_with_params(self): differences) # Gets the actual power spectrum of foregrounds given the passed parameters - def _get_foreground_model(self, - requested_cls=None, - ell=None, - exp_ch=None, - bandint_freqs=None, - eff_freqs=None, - **fg_params): + def _get_foreground_model( + self, + requested_cls: Optional[List[str]] = None, + ell: Optional[np.ndarray] = None, + exp_ch: Optional[List[str]] = None, + bandint_freqs: Optional[np.ndarray] = None, + eff_freqs: Optional[np.ndarray] = None, + **fg_params: dict + ) -> dict: r""" Gets the foreground power spectra for each component computed by ``fgspectra``. The computation assumes the bandpass transmissions from the ``BandPass`` class @@ -296,7 +302,7 @@ def _get_foreground_model(self, fg_dict[s, "all", f1, f2] += fg_dict[s, comp, f1, f2] return fg_dict - def must_provide(self, **requirements): + def must_provide(self, **requirements: dict) -> dict: # fg_dict is required by theoryforge # and requires some params to be computed # Assign those from theoryforge @@ -311,14 +317,15 @@ def must_provide(self, **requirements): self.bands = req.get("bands", self.bands) self.exp_ch = req.get("exp_ch", self.exp_ch) return {"bandint_freqs": {"bands": self.bands}} + return {} - def get_bandpasses(self, **params): + def get_bandpasses(self, **params: dict) -> np.ndarray: """ Gets bandpass transmissions from the ``BandPass`` class. """ return self.provider.get_bandint_freqs() - def calculate(self, state, want_derived=False, **params_values_dict): + def calculate(self, state: dict, want_derived=False, **params_values_dict: dict): """ Fills the ``state`` dictionary of the ``Foreground`` Theory class with the foreground spectra, computed using the bandpass @@ -349,7 +356,7 @@ def calculate(self, state, want_derived=False, **params_values_dict): bandint_freqs=self.bandint_freqs, **fg_params) - def get_fg_dict(self): + def get_fg_dict(self) -> dict: """ Returns the ``state`` dictionary of fogreground spectra """ diff --git a/soliket/gaussian/gaussian.py b/soliket/gaussian/gaussian.py index 012f2ce1..a0af0f78 100644 --- a/soliket/gaussian/gaussian.py +++ b/soliket/gaussian/gaussian.py @@ -1,8 +1,9 @@ -from typing import Optional, Sequence +from typing import Dict, List, Optional, Sequence, Tuple import numpy as np from cobaya.input import merge_info from cobaya.likelihood import Likelihood +from cobaya.theory import Provider, Theory from cobaya.tools import recursive_update from cobaya.typing import empty_dict @@ -16,34 +17,35 @@ class GaussianLikelihood(Likelihood): datapath: Optional[str] = None covpath: Optional[str] = None ncovsims: Optional[int] = None + provider: Provider - def initialize(self): + def initialize(self) -> None: x, y = self._get_data() cov = self._get_cov() self.data = GaussianData(self.name, x, y, cov, self.ncovsims) - def _get_data(self): + def _get_data(self) -> Tuple[np.ndarray, np.ndarray]: x, y = np.loadtxt(self.datapath, unpack=True) return x, y - def _get_cov(self): + def _get_cov(self) -> np.ndarray: cov = np.loadtxt(self.covpath) return cov - def _get_theory(self, **kwargs): + def _get_theory(self, **kwargs: dict) -> np.ndarray: raise NotImplementedError - def logp(self, **params_values): + def logp(self, **params_values: dict) -> float: theory = self._get_theory(**params_values) return self.data.loglike(theory) class CrossCov(dict): - def save(self, path): + def save(self, path: str) -> None: np.savez(path, **{str(k): v for k, v in self.items()}) @classmethod - def load(cls, path): + def load(cls, path: Optional[str]) -> Optional["CrossCov"]: if path is None: return None return cls({eval(k): v for k, v in np.load(path).items()}) @@ -54,41 +56,44 @@ class MultiGaussianLikelihood(GaussianLikelihood): options: Optional[Sequence] = None cross_cov_path: Optional[str] = None - def __init__(self, info=empty_dict, **kwargs): + def __init__(self, info: dict = empty_dict, **kwargs) -> None: if 'components' in info: - self.likelihoods = [get_likelihood(*kv) for kv in zip(info['components'], - info['options'])] + self.likelihoods: List[Likelihood] = [ + get_likelihood(*kv) for kv in zip(info['components'], info['options']) + ] - default_info = merge_info(*[like.get_defaults() for like in self.likelihoods]) + default_info: dict = merge_info( + *[like.get_defaults() for like in self.likelihoods] + ) default_info.update(info) super().__init__(info=default_info, **kwargs) - def initialize(self): - self.cross_cov = CrossCov.load(self.cross_cov_path) + def initialize(self) -> None: + self.cross_cov: Optional[CrossCov] = CrossCov.load(self.cross_cov_path) data_list = [like.data for like in self.likelihoods] self.data = MultiGaussianData(data_list, self.cross_cov) self.log.info('Initialized.') - def initialize_with_provider(self, provider): # pragma: no cover + def initialize_with_provider(self, provider: Provider) -> None: # pragma: no cover for like in self.likelihoods: like.initialize_with_provider(provider) # super().initialize_with_provider(provider) - def get_helper_theories(self): # pragma: no cover - helpers = {} + def get_helper_theories(self) -> Dict[str, Theory]: # pragma: no cover + helpers: Dict[str, Theory] = {} for like in self.likelihoods: helpers.update(like.get_helper_theories()) return helpers - def _get_theory(self, **kwargs): + def _get_theory(self, **kwargs) -> np.ndarray: return np.concatenate([like._get_theory(**kwargs) for like in self.likelihoods]) - def get_requirements(self): # pragma: no cover + def get_requirements(self) -> dict: # pragma: no cover # Reqs with arguments like 'lmax', etc. may have to be carefully treated here to # merge diff --git a/soliket/gaussian/gaussian_data.py b/soliket/gaussian/gaussian_data.py index aa98b99b..9d959b0d 100644 --- a/soliket/gaussian/gaussian_data.py +++ b/soliket/gaussian/gaussian_data.py @@ -1,4 +1,4 @@ -from typing import Optional, Sequence +from typing import Dict, List, Optional, Sequence, Tuple import numpy as np from cobaya.likelihoods.base_classes import _fast_chi_square @@ -16,34 +16,34 @@ class GaussianData: _fast_chi_squared = _fast_chi_square() - def __init__(self, name, x: Sequence, y: Sequence[float], cov: np.ndarray, - ncovsims: Optional[int] = None): + def __init__(self, name: str, x: Sequence[float], y: Sequence[float], cov: np.ndarray, + ncovsims: Optional[int] = None) -> None: - self.name = str(name) - self.ncovsims = ncovsims + self.name: str = str(name) + self.ncovsims: Optional[int] = ncovsims if not (len(x) == len(y) and cov.shape == (len(x), len(x))): raise ValueError(f"Incompatible shapes! x={len(x)}, y={len(y)}, \ cov={cov.shape}") - self.x = x - self.y = np.ascontiguousarray(y) - self.cov = cov - self.eigenevalues = np.linalg.eigvalsh(cov) + self.x: Sequence[float] = x + self.y: np.ndarray = np.ascontiguousarray(y) + self.cov: np.ndarray = cov + self.eigenevalues: np.ndarray = np.linalg.eigvalsh(cov) if self.eigenevalues.min() <= 0: raise ValueError("Covariance is not positive definite!") - self.inv_cov = np.linalg.inv(self.cov) + self.inv_cov: np.ndarray = np.linalg.inv(self.cov) if ncovsims is not None: hartlap_factor = (self.ncovsims - len(x) - 2) / (self.ncovsims - 1) self.inv_cov *= hartlap_factor log_det = np.log(self.eigenevalues).sum() self.norm_const = -(np.log(2 * np.pi) * len(x) + log_det) / 2 - def __len__(self): + def __len__(self) -> int: return len(self.x) - def loglike(self, theory): + def loglike(self, theory: np.ndarray) -> float: delta = self.y - theory return -0.5 * self._fast_chi_squared(self.inv_cov, delta) + self.norm_const @@ -60,7 +60,11 @@ class MultiGaussianData(GaussianData): Cross-covariances, keyed by (name1, name2) tuples. """ - def __init__(self, data_list, cross_covs=None): + def __init__( + self, + data_list: List[GaussianData], + cross_covs: Optional[Dict[Tuple[str, str], np.ndarray]] = None, + ) -> None: if cross_covs is None: cross_covs = {} @@ -86,44 +90,47 @@ def __init__(self, data_list, cross_covs=None): else: cross_covs[key] = np.zeros((len(d1), len(d2))) - self.data_list = data_list - self.lengths = [len(d) for d in data_list] - self.names = [d.name for d in data_list] - self.cross_covs = cross_covs + self.data_list: List[GaussianData] = data_list + self.lengths: List[int] = [len(d) for d in data_list] + self.names: List[str] = [d.name for d in data_list] + self.cross_covs: Dict[Tuple[str, str], np.ndarray] = cross_covs - self._data = None + self._data: Optional[np.ndarray] = None @property - def data(self): + def data(self) -> GaussianData: if self._data is None: self._assemble_data() return self._data - def loglike(self, theory): + def loglike(self, theory: np.ndarray) -> float: return self.data.loglike(theory) @property - def name(self): + def name(self) -> str: return self.data.name @property - def inv_cov(self): + def inv_cov(self) -> np.ndarray: return self.data.inv_cov @property - def cov(self): + def cov(self) -> np.ndarray: return self.data.cov @property - def norm_const(self): + def norm_const(self) -> float: return self.data.norm_const @property - def labels(self): - return [x for y in [[name] * len(d) for - name, d in zip(self.names, self.data_list)] for x in y] + def labels(self) -> List[str]: + return [ + x + for y in [[name] * len(d) for name, d in zip(self.names, self.data_list)] + for x in y + ] - def _index_range(self, name): + def _index_range(self, name: str) -> Tuple[int, int]: if name not in self.names: raise ValueError(f"{name} not in {self.names}!") @@ -135,13 +142,13 @@ def _index_range(self, name): i0 += length return i0, i1 - def _slice(self, *names): + def _slice(self, *names: str) -> slice: if isinstance(names, str): names = [names] return np.s_[tuple(slice(*self._index_range(n)) for n in names)] - def _assemble_data(self): + def _assemble_data(self) -> None: x = np.concatenate([d.x for d in self.data_list]) y = np.concatenate([d.y for d in self.data_list]) @@ -154,7 +161,7 @@ def _assemble_data(self): self._data = GaussianData(" + ".join(self.names), x, y, cov) - def plot_cov(self, **kwargs): + def plot_cov(self, **kwargs) -> None: import holoviews as hv data = [ @@ -163,5 +170,6 @@ def plot_cov(self, **kwargs): for j, lj in zip(range(len(self.data)), self.labels) ] - return hv.HeatMap(data).opts(tools=["hover"], width=800, height=800, - invert_yaxis=True, xrotation=90) + return hv.HeatMap(data).opts( + tools=["hover"], width=800, height=800, invert_yaxis=True, xrotation=90 + ) diff --git a/soliket/halo_model/halo_model.py b/soliket/halo_model/halo_model.py index 6cb4719a..fce72b7a 100644 --- a/soliket/halo_model/halo_model.py +++ b/soliket/halo_model/halo_model.py @@ -32,9 +32,10 @@ function (have a look at the simple pyhalomodel model for ideas). """ +from typing import Dict, Optional import numpy as np import pyhalomodel as halo -from cobaya.theory import Theory +from cobaya.theory import Provider, Theory # from cobaya.theories.cosmo.boltzmannbase import PowerSpectrumInterpolator from scipy.interpolate import RectBivariateSpline @@ -45,31 +46,29 @@ class HaloModel(Theory): _logz = np.linspace(-3, np.log10(1100), 150) _default_z_sampling = 10**_logz _default_z_sampling[0] = 0 + provider: Provider - def initialize(self): + def initialize(self) -> None: self._var_pairs = set() self._required_results = {} # def must_provide(self, **requirements): # options = requirements.get("halo_model") or {} - def _get_Pk_mm_lin(self): + def _get_Pk_mm_lin(self) -> np.ndarray: for pair in self._var_pairs: self.k, self.z, Pk_mm = \ self.provider.get_Pk_grid(var_pair=pair, nonlinear=False) return Pk_mm - def get_Pk_mm_grid(self): - + def get_Pk_mm_grid(self) -> np.ndarray: return self.current_state["Pk_mm_grid"] - def get_Pk_gg_grid(self): - + def get_Pk_gg_grid(self) -> np.ndarray: return self.current_state["Pk_gg_grid"] - def get_Pk_gm_grid(self): - + def get_Pk_gm_grid(self) -> np.ndarray: return self.current_state["Pk_gm_grid"] @@ -81,17 +80,15 @@ class HaloModel_pyhm(HaloModel): `_ code. """ - def initialize(self): + def initialize(self) -> None: super().initialize() self.Ms = np.logspace(np.log10(self.Mmin), np.log10(self.Mmax), self.nM) - def get_requirements(self): - + def get_requirements(self) -> Dict[str, Optional[None]]: return {"omegam": None} - def must_provide(self, **requirements): - - options = requirements.get("halo_model") or {} + def must_provide(self, **requirements) -> dict: + options: dict = requirements.get("halo_model") or {} self._var_pairs.update( set((x, y) for x, y in options.get("vars_pairs", [("delta_tot", "delta_tot")]))) @@ -116,13 +113,12 @@ def must_provide(self, **requirements): "R": np.linspace(0.14, 66, 256) # list of radii required } - return needs def calculate(self, state: dict, want_derived: bool = True, - **params_values_dict): + **params_values_dict) -> None: - Pk_mm_lin = self._get_Pk_mm_lin() + Pk_mm_lin: np.ndarray = self._get_Pk_mm_lin() # now wish to interpolate sigma_R to these Rs zinterp, rinterp, sigmaRinterp = self.provider.get_sigma_R() @@ -157,7 +153,7 @@ def calculate(self, state: dict, want_derived: bool = True, # state['Pk_gm_grid'] = Pk_hm['g-m'] # state['Pk_gg_grid'] = Pk_hm['g-g'] - def _win_NFW(self, k, rv, c): + def _win_NFW(self, k: np.ndarray, rv: np.ndarray, c: np.ndarray) -> np.ndarray: from scipy.special import sici rs = rv / c kv = np.outer(k, rv) diff --git a/soliket/lensing/lensing.py b/soliket/lensing/lensing.py index e17eeac3..6bac2533 100644 --- a/soliket/lensing/lensing.py +++ b/soliket/lensing/lensing.py @@ -12,13 +12,16 @@ """ import os +from typing import ClassVar, Tuple import numpy as np import sacc from cobaya.likelihoods.base_classes import InstallableLikelihood from cobaya.log import LoggedError from cobaya.model import get_model +from cobaya.theory import Provider +from soliket import CCL from soliket.ps import BinnedPSLikelihood @@ -44,19 +47,23 @@ class LensingLikelihood(BinnedPSLikelihood, InstallableLikelihood): information about installable likelihoods. """ - _url = "https://portal.nersc.gov/project/act/jia_qu/lensing_like/likelihood.tar.gz" - install_options = {"download_url": _url} - data_folder = "LensingLikelihood/" - data_filename = "clkk_reconstruction_sim.fits" + _url: str = ( + "https://portal.nersc.gov/project/act/jia_qu/lensing_like/" + "likelihood.tar.gz" + ) + install_options: ClassVar = {"download_url": _url} + data_folder: str = "LensingLikelihood/" + data_filename: str = "clkk_reconstruction_sim.fits" - kind = "pp" - sim_number = 0 - lmax = 3000 - theory_lmax = 10000 + kind: str = "pp" + sim_number: int = 0 + lmax: int = 3000 + theory_lmax: int = 10000 # flag about whether CCL should be used to compute the cmb lensing power spectrum - pp_ccl = False + pp_ccl: bool = False + provider: Provider - fiducial_params = { + fiducial_params: ClassVar = { "ombh2": 0.02219218, "omch2": 0.1203058, "H0": 67.02393, @@ -130,7 +137,7 @@ def initialize(self): super().initialize() - def _get_fiducial_Cls(self): + def _get_fiducial_Cls(self) -> dict: """ Obtain a set of fiducial ``Cls`` from theory provider (e.g. ``camb``). Fiducial ``Cls`` are used to compute correction terms for the theory vector. @@ -148,7 +155,7 @@ def _get_fiducial_Cls(self): Cls = model_fiducial.provider.get_Cl(ell_factor=False) return Cls - def get_requirements(self): + def get_requirements(self) -> dict: """ Set ``lmax`` for theory ``Cls`` @@ -178,24 +185,24 @@ def get_requirements(self): "zstar": None } - def _get_CCL_results(self): + def _get_CCL_results(self) -> Tuple[CCL, dict]: cosmo_dict = self.provider.get_CCL() return cosmo_dict["ccl"], cosmo_dict["cosmo"] - def _get_data(self): + def _get_data(self) -> Tuple[np.ndarray, np.ndarray]: bin_centers, bandpowers, cov = \ self.sacc.get_ell_cl(None, 'ck', 'ck', return_cov=True) self.x = bin_centers self.y = bandpowers return bin_centers, self.y - def _get_cov(self): + def _get_cov(self) -> np.ndarray: bin_centers, bandpowers, cov = \ self.sacc.get_ell_cl(None, 'ck', 'ck', return_cov=True) self.cov = cov return cov - def _get_binning_matrix(self): + def _get_binning_matrix(self) -> np.ndarray: bin_centers, bandpowers, cov, ind = \ self.sacc.get_ell_cl(None, 'ck', 'ck', return_cov=True, return_ind=True) @@ -204,7 +211,7 @@ def _get_binning_matrix(self): self.binning_matrix = binning_matrix return binning_matrix - def _get_theory(self, **params_values): + def _get_theory(self, **params_values: dict) -> np.ndarray: r""" Generate binned theory vector of :math:`\kappa \kappa` with correction terms. diff --git a/soliket/mflike/mflike.py b/soliket/mflike/mflike.py index 3993c729..49b19b98 100644 --- a/soliket/mflike/mflike.py +++ b/soliket/mflike/mflike.py @@ -25,11 +25,12 @@ :width: 400 """ import os -from typing import Optional +from typing import List, Optional, Tuple import numpy as np from cobaya.likelihoods.base_classes import InstallableLikelihood from cobaya.log import LoggedError +from cobaya.theory import Provider from soliket.gaussian import GaussianData, GaussianLikelihood @@ -44,11 +45,12 @@ class MFLike(GaussianLikelihood, InstallableLikelihood): cov_Bbl_file: Optional[str] data: dict defaults: dict + provider: Provider def initialize(self): # Set default values to data member not initialized via yaml file - self.l_bpws = None - self.spec_meta = [] + self.l_bpws: Optional[np.ndarray] = None + self.spec_meta: Optional[list] = [] # Set path to data if ((not getattr(self, "path", None)) and @@ -75,19 +77,21 @@ def initialize(self): self.data_folder, ) - self.requested_cls = [p.lower() for p in self.defaults["polarizations"]] + self.requested_cls: List[str] = [ + p.lower() for p in self.defaults["polarizations"] + ] for x in ["et", "eb", "bt"]: if x in self.requested_cls: self.requested_cls.remove(x) # Read data self.prepare_data() - self.lmax_theory = self.lmax_theory or 9000 + self.lmax_theory: int = self.lmax_theory or 9000 self.log.debug(f"Maximum multipole value: {self.lmax_theory}") self.log.info("Initialized!") - def get_requirements(self): + def get_requirements(self) -> dict: r""" Passes the fields ``ell``, ``requested_cls``, ``lcuts``, ``exp_ch`` (list of array names) and ``bands`` @@ -106,15 +110,15 @@ def get_requirements(self): "bands": self.bands} return reqs - def _get_theory(self, **params_values): + def _get_theory(self, **params_values: dict) -> np.ndarray: cmbfg_dict = self.provider.get_cmbfg_dict() return self._get_power_spectra(cmbfg_dict) - def logp(self, **params_values): + def logp(self, **params_values: dict) -> float: cmbfg_dict = self.provider.get_cmbfg_dict() return self.loglike(cmbfg_dict) - def loglike(self, cmbfg_dict): + def loglike(self, cmbfg_dict: dict) -> float: r""" Computes the gaussian log-likelihood @@ -177,7 +181,7 @@ def prepare_data(self, verbose=False): "BT": "tb", "BB": "bb"} - def get_cl_meta(spec): + def get_cl_meta(spec: dict) -> Tuple[str, str, List[str], dict, bool]: """ Lower-level function of `prepare_data`. For each of the entries of the `spectra` section of the @@ -214,7 +218,7 @@ def get_cl_meta(spec): return exp_1, exp_2, pols, scls, symm - def get_sacc_names(pol, exp_1, exp_2): + def get_sacc_names(pol: str, exp_1: str, exp_2: str) -> Tuple[str, str, str]: r""" Lower-level function of `prepare_data`. Translates the polarization combination and channel @@ -387,7 +391,7 @@ def get_sacc_names(pol, exp_1, exp_2): self.data = GaussianData("mflike", self.ell_vec, self.data_vec, self.cov) - def _get_power_spectra(self, cmbfg): + def _get_power_spectra(self, cmbfg: dict) -> np.ndarray: r""" Get :math:`D_{\ell}` from the theory component already modified by ``theoryforge_MFLike`` diff --git a/soliket/mflike/theoryforge_MFLike.py b/soliket/mflike/theoryforge_MFLike.py index 04c8bd72..7579c4f8 100644 --- a/soliket/mflike/theoryforge_MFLike.py +++ b/soliket/mflike/theoryforge_MFLike.py @@ -53,11 +53,11 @@ """ -from typing import Optional +from typing import List, Optional import numpy as np from cobaya.log import LoggedError -from cobaya.theory import Theory +from cobaya.theory import Provider, Theory from cobaya.tools import are_different_params_lists @@ -68,24 +68,25 @@ class TheoryForge_MFLike(Theory): eff_freqs: list spectra: dict systematics_template: dict + provider: Provider def initialize(self): - self.lmin = self.spectra["lmin"] - self.lmax = self.spectra["lmax"] + self.lmin: int = self.spectra["lmin"] + self.lmax: int = self.spectra["lmax"] self.ell = np.arange(self.lmin, self.lmax + 1) # State requisites to the theory code # Which lmax for theory CMB # Note this must be greater than lmax above to avoid approx errors - self.lmax_boltzmann = self.lmax + 500 + self.lmax_boltzmann: int = self.lmax + 500 # Which lmax for theory FG # This can be larger than lmax boltzmann - self.lmax_fg = self.lmax + 500 + self.lmax_fg: int = self.lmax + 500 # Which spectra to consider - self.requested_cls = self.spectra["polarizations"] + self.requested_cls: List[str] = self.spectra["polarizations"] # Set lmax for theory CMB requirements self.lcuts = {k: self.lmax_boltzmann for k in self.requested_cls} @@ -128,7 +129,7 @@ def initialize_with_params(self): self.log, "Configuration error in parameters: %r.", differences) - def must_provide(self, **requirements): + def must_provide(self, **requirements: dict) -> dict: # cmbfg_dict is required by mflike # and requires some params to be computed # Assign required params from mflike @@ -155,10 +156,10 @@ def must_provide(self, **requirements): "exp_ch": self.exp_ch, "bands": self.bands} return reqs - def get_cmb_theory(self, **params): + def get_cmb_theory(self, **params: dict) -> dict: return self.provider.get_Cl(ell_factor=True) - def get_foreground_theory(self, **params): + def get_foreground_theory(self, **params: dict) -> dict: return self.provider.get_fg_dict() def calculate(self, state, want_derived=False, **params_values_dict): @@ -171,10 +172,10 @@ def calculate(self, state, want_derived=False, **params_values_dict): state["cmbfg_dict"] = self.get_modified_theory(Dls_cut, fg_dict, **params_values_nocosmo) - def get_cmbfg_dict(self): + def get_cmbfg_dict(self) -> dict: return self.current_state["cmbfg_dict"] - def get_modified_theory(self, Dls, fg_dict, **params): + def get_modified_theory(self, Dls: dict, fg_dict: dict, **params: dict) -> dict: r""" Takes the theory :math:`D_{\ell}`, sums it to the total foreground power spectrum (possibly computed with bandpass @@ -212,7 +213,7 @@ def get_modified_theory(self, Dls, fg_dict, **params): return cmbfg_dict - def _get_calibrated_spectra(self, dls_dict, **nuis_params): + def _get_calibrated_spectra(self, dls_dict: dict, **nuis_params: dict) -> dict: r""" Calibrates the spectra through calibration factors at the map level: @@ -266,7 +267,7 @@ def _get_calibrated_spectra(self, dls_dict, **nuis_params): # alpha_LAT_145, etc.. ########################################################################### - def _get_rotated_spectra(self, dls_dict, **nuis_params): + def _get_rotated_spectra(self, dls_dict: dict, **nuis_params: dict) -> dict: r""" Rotates the polarization spectra through polarization angles: @@ -319,7 +320,7 @@ def _init_template_from_file(self): syl.ReadTemplateFromFile(rootname=self.systematics_template["rootname"]) self.dltempl_from_file = templ_from_file(ell=self.ell) - def _get_template_from_file(self, dls_dict, **nuis_params): + def _get_template_from_file(self, dls_dict: dict, **nuis_params: dict) -> dict: r""" Adds the systematics template, modulated by ``nuis_params['templ_freq']`` parameters, to the :math:`D_{\ell}`. diff --git a/soliket/poisson/poisson.py b/soliket/poisson/poisson.py index db2b8fdd..b8804a71 100644 --- a/soliket/poisson/poisson.py +++ b/soliket/poisson/poisson.py @@ -1,3 +1,4 @@ +from typing import Callable, Optional import pandas as pd from cobaya.likelihood import Likelihood @@ -5,34 +6,34 @@ class PoissonLikelihood(Likelihood): - name = "Poisson" - data_path = None - columns = None + name: str = "Poisson" + data_path: Optional[str] = None + columns: Optional[list[str]] = None - def initialize(self): + def initialize(self) -> None: catalog = self._get_catalog() if self.columns is None: self.columns = catalog.columns self.data = PoissonData(self.name, catalog, self.columns) - def get_requirements(self): + def get_requirements(self) -> dict: return {} - def _get_catalog(self): + def _get_catalog(self) -> pd.DataFrame: catalog = pd.read_csv(self.data_path) return catalog - def _get_rate_fn(self, **kwargs): + def _get_rate_fn(self, **kwargs: dict) -> Callable: """Returns a callable rate function that takes each of 'columns' as kwargs. """ raise NotImplementedError - def _get_n_expected(self, **kwargs): + def _get_n_expected(self, **kwargs: dict) -> float: """Computes and returns the integral of the rate function """ raise NotImplementedError - def logp(self, **params_values): + def logp(self, **params_values: dict) -> float: rate_fn = self._get_rate_fn(**params_values) n_expected = self._get_n_expected(**params_values) return self.data.loglike(rate_fn, n_expected) diff --git a/soliket/poisson/poisson_data.py b/soliket/poisson/poisson_data.py index c10bc572..5734eb3a 100644 --- a/soliket/poisson/poisson_data.py +++ b/soliket/poisson/poisson_data.py @@ -1,23 +1,16 @@ -import numpy as np +from typing import Optional, Callable, Dict import pandas as pd +import numpy as np class PoissonData: - """Poisson-process-generated data. - - Parameters - ---------- - catalog : pd.DataFrame - Catalog of observed data. - columns : list - Columns of catalog relevant for computing poisson rate. - samples : dict, optional - Each entry is an N_cat x N_samples array of posterior samples; - plus, should have a 'prior' entry of the same shape that is the value of the - interim prior for each sample. - """ - - def __init__(self, name, catalog, columns, samples=None): + def __init__( + self, + name: str, + catalog: pd.DataFrame, + columns: list[str], + samples: Optional[Dict[str, np.ndarray]] = None, + ) -> None: self.name = str(name) self.catalog = pd.DataFrame(catalog)[columns] @@ -34,7 +27,7 @@ def __init__(self, name, catalog, columns, samples=None): for all samples, under "prior" key!') assert all( - [samples[k].shape == samples["prior"].shape for k in samples] + samples[k].shape == samples["prior"].shape for k in samples ), "Samples all need same shape!" self.N_k = samples["prior"].shape[1] self._len = samples["prior"].shape[0] @@ -44,10 +37,15 @@ def __init__(self, name, catalog, columns, samples=None): self.samples = samples - def __len__(self): + def __len__(self) -> int: return self._len - def loglike(self, rate_fn, n_expected, broadcastable=False): + def loglike( + self, + rate_fn: Callable, + n_expected: float, + broadcastable: bool = False, + ) -> float: """Computes log-likelihood of data under poisson process model rate_fn returns the *observed rate* as a function of self.columns diff --git a/soliket/ps/ps.py b/soliket/ps/ps.py index cb97538f..a6b38d19 100644 --- a/soliket/ps/ps.py +++ b/soliket/ps/ps.py @@ -1,3 +1,5 @@ +from typing import Dict, Tuple +from cobaya.theory import Provider import numpy as np from soliket import utils @@ -8,14 +10,15 @@ class PSLikelihood(GaussianLikelihood): name: str = "TT" kind: str = "tt" lmax: int = 6000 + provider: Provider - def get_requirements(self): + def get_requirements(self) -> Dict[str, Dict[str, int]]: return {"Cl": {self.kind: self.lmax}} - def _get_Cl(self): + def _get_Cl(self) -> Dict[str, np.ndarray]: return self.provider.get_Cl(ell_factor=True) - def _get_theory(self, **params_values): + def _get_theory(self, **params_values: dict) -> np.ndarray: cl_theory = self._get_Cl() return cl_theory[self.kind][:self.lmax] @@ -23,22 +26,25 @@ def _get_theory(self, **params_values): class BinnedPSLikelihood(PSLikelihood): binning_matrix_path: str = "" - def initialize(self): + def initialize(self) -> None: self.binning_matrix = self._get_binning_matrix() - self.bin_centers = \ - self.binning_matrix.dot(np.arange(self.binning_matrix.shape[1])) + self.bin_centers = self.binning_matrix.dot( + np.arange(self.binning_matrix.shape[1]) + ) super().initialize() @classmethod - def binner(cls, x, y, bin_edges): + def binner( + cls, x: np.ndarray, y: np.ndarray, bin_edges: np.ndarray + ) -> Tuple[np.ndarray, np.ndarray]: return utils.binner(x, y, bin_edges) - def _get_binning_matrix(self): + def _get_binning_matrix(self) -> np.ndarray: return np.loadtxt(self.binning_matrix_path) - def _get_data(self): + def _get_data(self) -> Tuple[np.ndarray, np.ndarray]: return self.bin_centers, np.loadtxt(self.datapath) - def _get_theory(self, **params_values): - cl_theory = self._get_Cl() + def _get_theory(self, **params_values: dict) -> np.ndarray: + cl_theory: Dict[str, np.ndarray] = self._get_Cl() return self.binning_matrix.dot(cl_theory[self.kind][:self.lmax]) diff --git a/soliket/utils.py b/soliket/utils.py index de923fb1..21c13129 100644 --- a/soliket/utils.py +++ b/soliket/utils.py @@ -5,6 +5,7 @@ """ +from typing import Optional, Tuple, Dict from importlib import import_module import numpy as np @@ -13,7 +14,9 @@ from scipy.stats import binned_statistic as binnedstat -def binner(ls, cls, bin_edges): +def binner( + ls: np.ndarray, cls: np.ndarray, bin_edges: np.ndarray +) -> Tuple[np.ndarray, np.ndarray]: r""" Simple function intended for binning :math:`\ell`-by-:math:`\ell` data into band powers with a top hat window function. @@ -42,7 +45,7 @@ def binner(ls, cls, bin_edges): return cents, bin_means -def get_likelihood(name, options=None): +def get_likelihood(name: str, options: Optional[dict] = None) -> Likelihood: parts = name.split(".") module = import_module(".".join(parts[:-1])) t = getattr(module, parts[-1]) @@ -64,7 +67,7 @@ class OneWithCls(one): """ lmax = 10000 - def get_requirements(self): + def get_requirements(self) -> Dict[str, Dict[str, int]]: return {"Cl": {"pp": self.lmax, "tt": self.lmax, "te": self.lmax, diff --git a/soliket/xcorr/limber.py b/soliket/xcorr/limber.py index da8f3fef..3fbda2f2 100644 --- a/soliket/xcorr/limber.py +++ b/soliket/xcorr/limber.py @@ -5,6 +5,8 @@ probes under the Limber approximation. """ +from typing import Callable, Dict, Optional, Tuple +from cobaya.theory import Provider import numpy as np try: @@ -16,7 +18,15 @@ oneover_chmpc = 1. / C_HMPC -def mag_bias_kernel(provider, dndz, s1, zatchi, chi_arr, chiprime_arr, zprime_arr): +def mag_bias_kernel( + provider: Provider, + dndz: np.ndarray, + s1: float, + zatchi: Callable[[np.ndarray], np.ndarray], + chi_arr: np.ndarray, + chiprime_arr: np.ndarray, + zprime_arr: np.ndarray +) -> np.ndarray: """Calculates magnification bias kernel.""" dndzprime = np.interp(zprime_arr, dndz[:, 0], dndz[:, 1], left=0, right=0) @@ -38,11 +48,24 @@ def mag_bias_kernel(provider, dndz, s1, zatchi, chi_arr, chiprime_arr, zprime_ar return W_mu -def do_limber(ell_arr, provider, dndz1, dndz2, s1, s2, pk, b1_HF, b2_HF, - alpha_auto, alpha_cross, - chi_grids, - # use_zeff=True, - Nchi=50, dndz1_mag=None, dndz2_mag=None, normed=False): +def do_limber( + ell_arr: np.ndarray, + provider: Provider, + dndz1: np.ndarray, + dndz2: np.ndarray, + s1: float, + s2: float, + pk: Callable[[float, np.ndarray], np.ndarray], + b1_HF: float, + b2_HF: float, + alpha_auto: float, + alpha_cross: float, + chi_grids: Dict[str, np.ndarray], + Nchi: int = 50, + dndz1_mag: Optional[np.ndarray] = None, + dndz2_mag: Optional[np.ndarray] = None, + normed: bool = False +) -> Tuple[np.ndarray, np.ndarray]: zatchi = chi_grids['zatchi'] # chiatz = chi_grids['chiatz'] chi_arr = chi_grids['chival'] diff --git a/soliket/xcorr/xcorr.py b/soliket/xcorr/xcorr.py index 6134119c..6a5e71a0 100644 --- a/soliket/xcorr/xcorr.py +++ b/soliket/xcorr/xcorr.py @@ -8,6 +8,8 @@ """ +from typing import Optional, Tuple +from cobaya.theory import Provider import numpy as np import sacc from scipy.interpolate import InterpolatedUnivariateSpline as Spline @@ -60,6 +62,8 @@ class XcorrLikelihood(GaussianLikelihood): """ + provider: Provider + def initialize(self): name: str = "Xcorr" # noqa F841 self.log.info('Initialising.') @@ -81,8 +85,8 @@ def initialize(self): else: - self.k_tracer_name: Optional[str] # noqa F821 - self.gc_tracer_name: Optional[str] # noqa F821 + self.k_tracer_name: Optional[str] + self.gc_tracer_name: Optional[str] # tracer_combinations: Optional[str] # TODO: implement with keep_selection self.sacc_data = self._get_sacc_data() @@ -94,18 +98,18 @@ def initialize(self): self.ngal = self.sacc_data['ngal'] # TODO is this resolution limit on zarray a CAMB problem? - self.nz: Optional[int] # noqa F821 + self.nz: Optional[int] assert self.nz <= 149, "CAMB limitations requires nz <= 149" self.zarray = np.linspace(self.dndz[:, 0].min(), self.dndz[:, 0].max(), self.nz) self.zbgdarray = np.concatenate([self.zarray, [1100]]) # TODO: unfix zstar - self.Nchi: Optional[int] # noqa F821 - self.Nchi_mag: Optional[int] # noqa F821 + self.Nchi: Optional[int] + self.Nchi_mag: Optional[int] - #self.use_zeff: Optional[bool] # noqa F821 + #self.use_zeff: Optional[bool] - self.Pk_interp_kmax: Optional[float] # noqa F821 + self.Pk_interp_kmax: Optional[float] - self.high_ell: Optional[float] # noqa F821 + self.high_ell: Optional[float] self.ell_range = np.linspace(1, self.high_ell, int(self.high_ell + 1)) # TODO expose these defaults @@ -115,7 +119,7 @@ def initialize(self): self.data = GaussianData(self.name, self.x, self.y, self.cov) - def get_requirements(self): + def get_requirements(self) -> dict: return { 'Cl': {'lmax': self.high_ell, 'pp': self.high_ell}, @@ -141,15 +145,16 @@ def get_requirements(self): 'ns': None } - def _bin(self, theory_cl, lmin, lmax): - binned_theory_cl = np.zeros_like(lmin) + def _bin( + self, theory_cl: np.ndarray, lmin: np.ndarray, lmax: np.ndarray + ) -> np.ndarray: + binned_theory_cl: np.ndarray = np.zeros_like(lmin) for i in range(len(lmin)): binned_theory_cl[i] = np.mean(theory_cl[(self.ell_range >= lmin[i]) & (self.ell_range < lmax[i])]) return binned_theory_cl - def _get_sacc_data(self, **params_values): - + def _get_sacc_data(self, **params_values: dict) -> dict: data_sacc = sacc.Sacc.load_fits(self.datapath) # TODO: would be better to use keep_selection @@ -179,7 +184,9 @@ def _get_sacc_data(self, **params_values): return data - def _get_data(self, **params_values): + def _get_data( + self, **params_values: dict + ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: data_auto = np.loadtxt(self.auto_file) data_cross = np.loadtxt(self.cross_file) @@ -199,7 +206,7 @@ def _get_data(self, **params_values): return x, y, dy - def _setup_chi(self): + def _setup_chi(self) -> dict: chival = self.provider.get_comoving_radial_distance(self.zarray) zatchi = Spline(chival, self.zarray) @@ -225,7 +232,7 @@ def _setup_chi(self): return chi_result - def _get_theory(self, **params_values): + def _get_theory(self, **params_values: dict) -> np.ndarray: setup_chi_out = self._setup_chi()