Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
eca9e0a
fixes and add demo notebook
MoAly98 Oct 7, 2025
e4cc23a
improve
MoAly98 Oct 7, 2025
0c6ee18
fixes and clean
MoAly98 Oct 8, 2025
d10e89e
add example-demo folder to separate from more complete example
MoAly98 Oct 8, 2025
2cf470d
dask
MoAly98 Oct 9, 2025
10b27b3
pip install for coffea casa setup
MoAly98 Oct 9, 2025
195aa76
intermediate prep for full run-2 dataset
MoAly98 Oct 11, 2025
b9064a5
missing file
MoAly98 Oct 11, 2025
4a5b91d
feat: implement Dataset class and multi-directory support
MoAly98 Oct 12, 2025
ec710b8
chore: update demo_workflow notebook for Dataset class
MoAly98 Oct 12, 2025
8df719b
fix: allow variable-length tuples in DatasetConfig schema
MoAly98 Oct 12, 2025
3ee8cde
fix: use correct run periods for 2018 data
MoAly98 Oct 12, 2025
d9d31b6
feat: add detailed failure tracking for skimming
MoAly98 Oct 12, 2025
537a604
feat: log failures after each retry and make max_retries configurable
MoAly98 Oct 12, 2025
43fb1b7
debug: disable workitem output writing for testing
MoAly98 Oct 12, 2025
281add0
feat: replace ROOT with parquet for skimmed output
MoAly98 Oct 12, 2025
5b16bb8
feat: add processes filter to all pipeline stages
MoAly98 Oct 12, 2025
b2b72ba
feat: upgrade coffea to 2025.10.0 and fix parquet schema handling
MoAly98 Oct 12, 2025
5258e06
chore: update demo workflow and dev test for coffea 2025.10.0
MoAly98 Oct 13, 2025
ed91b17
fix: change muon trigger name
MoAly98 Oct 13, 2025
ec7b4bd
fix: correct nevts lookup for multi-directory datasets
MoAly98 Oct 13, 2025
b0d2d88
debug: some prints and subset of workitems processing to debug lack o…
MoAly98 Oct 13, 2025
7fe2916
check: try to support writing via S3
MoAly98 Oct 13, 2025
e356d40
remove extra files from repo
MoAly98 Oct 15, 2025
a74e2fb
debug: checkpoint
MoAly98 Oct 15, 2025
5296bb5
add dataset sizes summaries to queries
MoAly98 Oct 22, 2025
5bfb218
cleanup pt1
MoAly98 Oct 22, 2025
da096fe
gitattributes to filterout notebook outputs
MoAly98 Oct 22, 2025
e04c685
wrap pip insall in demoworkflow notebook inside COFFEA_CASA logic
MoAly98 Oct 22, 2025
6a98957
remove output files from repo
MoAly98 Oct 22, 2025
c88a702
renames
MoAly98 Oct 22, 2025
bcb4dc1
remove duplicate
MoAly98 Oct 22, 2025
cbe9bbb
separate cms from opendata datasets and corrections
MoAly98 Oct 22, 2025
32cf62c
re-add dataset sizes in queries.py
MoAly98 Oct 22, 2025
3901d58
move configs into example/ directories for more organisation
MoAly98 Oct 22, 2025
f74c2bf
fix imports
MoAly98 Oct 22, 2025
48cac1d
remove user/ directory since everything been organised into examples
MoAly98 Oct 22, 2025
60a4323
make data handling more robust
MoAly98 Oct 22, 2025
6c503ea
feat: improve handling skimming outputs to support local, remote, pro…
MoAly98 Oct 22, 2025
bc8d16d
clean up some configs
MoAly98 Oct 22, 2025
d578906
more clean up after changes
MoAly98 Oct 22, 2025
8954a67
Update README
MoAly98 Oct 22, 2025
ea7817c
docstrings and functio names
MoAly98 Oct 22, 2025
47abf23
reorder skimming
MoAly98 Oct 22, 2025
89cdaef
allow lazy callables in config to be able to run functions on workers…
MoAly98 Oct 23, 2025
db15dc2
fixes
MoAly98 Oct 23, 2025
549a99e
remove old files
MoAly98 Oct 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions cms/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.ipynb filter=nbstripout
38 changes: 27 additions & 11 deletions cms/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,40 @@ cms/
analysis.py # Main analysis script
pixi.toml, pixi.lock # Environment configuration (pixi)
analysis/ # Analysis logic and base classes
corrections/ # Correction files (JSON, text, etc.)
example/ # Example datasets and outputs
user/ # User analysis configuration, cuts, observables
utils/ # Utility modules (output, plotting, stats, etc.)
example_opendata/ # Open-data example configs, cuts, datasets
example_cms/ # CMS internal-style example configs
utils/ # Utility modules (output manager, skimming, schema, etc.)
```
- Configuration files for the analysis are found in `cms/user/` (e.g., `configuration.py`).
- Start from the example configurations in `example_opendata/configs/` or `example_cms/configs/`—they provide complete analysis dictionaries (datasets, skimming, channels) that you can copy and adapt for your own campaign.
- Main scripts and entry points are in `cms/`.

## Metadata and preprocessing
Metadata extraction and preprocessing are handled before the main analysis. Metadata includes information about datasets, event counts, and cross sections, and is used to configure the analysis and normalization. Preprocessing steps may include filtering, object selection, and preparing input files for skimming and analysis.
Metadata extraction and preprocessing are handled before the main analysis. Metadata includes information about datasets, event counts, and cross-sections, and is used to configure the analysis and normalization. Preprocessing steps may include filtering, object selection, and preparing input files for skimming and analysis.

## Skimming
To skim NanoAOD datasets, use the provided scripts and configuration files in the `analysis/` and `user/` directories. Adjust the configuration as needed for your analysis channels and observables.
Preprocessing is controlled by the `preprocess` block in the configuration. The `skimming` subsection now uses a single `output` stanza to steer how skimmed NanoAOD chunks are persisted:

Currently, the code writes out skimmed files as intermediate outputs. The plan is to integrate the workflow so that all steps, including skimming, are performed on-the-fly without writing intermediate files, streamlining the analysis process.
```python
preprocess = {
"skimming": {
"function": default_skim_selection,
"use": [("PuppiMET", None), ("HLT", None)],
"output": {
"format": "parquet", # other options: root_ttree, rntuple, safetensors (stubs)
"protocol": "local", # or "s3", "xrootd", ...
"base_uri": "s3://bucket", # used when protocol != "local"
"to_kwargs": {"compression": "zstd"}, # forwarded to ak.to_parquet
"from_kwargs": {"storage_options": {...}} # forwarded to NanoEventsFacotry.from_parquet
},
},
}
```

The file suffix is fixed to `{dataset}/file_{index}/part_{chunk}.{ext}`, so switching between local and remote storage only requires changing `protocol`/`base_uri`.

If you need pre-skimmed data, it is available on CERNBox upon request. Please contact Mohamed Aly (mohamed.aly@cern.ch) for access.
If you want to reproduce the skimmed files yourself, set the option `general.run_skimming=True` in the configuration file `cms/user/configuration.py`. This takes roughly 1-1.5 hours for the whole set of data. If you want only a subset, you can specify the maximum number of files to process per dataset using the `datasets.max_files` option in the same configuration file under the dataset configuration section.
- Set `general.run_skimming=True` to regenerate skims. Use `datasets.max_files` to limit input size when experimenting.
- Downstream steps load the same path, so no separate cache copy is needed; cached Awkward objects are still produced automatically for faster reruns.
- Dataset-level options such as lumi masks live next to each dataset definition (for example `lumi_mask`: `{ "function": cuts.lumi_mask, "use": [...], "static_kwargs": {"lumifile": "...json"} }`).

## Running code
To run the main analysis chain, execute the relevant Python scripts or notebooks. Outputs such as histograms and fit results will be saved in the `outputs/` directory. For example:
Expand All @@ -48,4 +64,4 @@ The following is guaranteed to produce a result if skimming is already performed

```sh
python3 analysis.py general.run_skimming=False general.read_from_cache=True general.run_mva_training=False general.run_plots_only=False general.run_metadata_generation=False
```
```
17 changes: 10 additions & 7 deletions cms/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
import warnings

from analysis.nondiff import NonDiffAnalysis
from user.configuration import config as ZprimeConfig
from example_opendata.configs.configuration import config as ZprimeConfig
from utils.datasets import ConfigurableDatasetManager
from utils.logging import setup_logging, log_banner
from utils.schema import Config, load_config_with_restricted_cli
from utils.metadata_extractor import NanoAODMetadataGenerator
from utils.skimming import process_workitems_with_skimming
from utils.skimming import process_and_load_events
from utils.output_manager import OutputDirectoryManager

# -----------------------------
Expand Down Expand Up @@ -56,17 +56,20 @@ def main():
# Generate metadata and fileset from NanoAODs
generator = NanoAODMetadataGenerator(dataset_manager=dataset_manager, output_manager=output_manager)
generator.run(generate_metadata=config.general.run_metadata_generation)
fileset = generator.fileset
datasets = generator.datasets
workitems = generator.workitems
if not workitems:
logger.error("No workitems available. Please ensure metadata generation completed successfully.")
sys.exit(1)
if not datasets:
logger.error("No datasets available. Please ensure metadata generation completed successfully.")
sys.exit(1)

logger.info(log_banner("SKIMMING AND PROCESSING"))
logger.info(f"Processing {len(workitems)} workitems")
logger.info(f"Processing {len(workitems)} workitems across {len(datasets)} datasets")

# Process workitems with dask-awkward
processed_datasets = process_workitems_with_skimming(workitems, config, output_manager, fileset, generator.nanoaods_summary)
# Process workitems and populate Dataset objects with events
datasets = process_and_load_events(workitems, config, output_manager, datasets, generator.nanoaods_summary)


analysis_mode = config.general.analysis
Expand All @@ -77,7 +80,7 @@ def main():
return
elif analysis_mode == "nondiff":
logger.info(log_banner("Running Non-Differentiable Analysis"))
nondiff_analysis = NonDiffAnalysis(config, processed_datasets, output_manager)
nondiff_analysis = NonDiffAnalysis(config, datasets, output_manager)
nondiff_analysis.run_analysis_chain()


Expand Down
46 changes: 29 additions & 17 deletions cms/analysis/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Analysis:
def __init__(
self,
config: Dict[str, Any],
processed_datasets: Dict[str, List[Tuple[Any, Dict[str, Any]]]],
datasets: List,
output_manager: OutputDirectoryManager,
) -> None:
"""
Expand All @@ -78,16 +78,16 @@ def __init__(
- 'corrections': Correction configurations
- 'channels': Analysis channel definitions
- 'general': General settings including output directory
processed_datasets : Dict[str, List[Tuple[Any, Dict[str, Any]]]]
Pre-processed datasets from skimming (required)
datasets : List[Dataset]
List of Dataset objects with populated events from skimming (required)
output_manager : OutputDirectoryManager
Centralized output directory manager (required)
"""
self.config = config
self.channels = config.channels
self.systematics = config.systematics
self.corrections = config.corrections
self.processed_datasets = processed_datasets
self.datasets = datasets
self.output_manager = output_manager
self.corrlib_evaluators = self._load_correctionlib()

Expand Down Expand Up @@ -161,13 +161,16 @@ def get_good_objects(
"""
good_objects = {}
for mask_config in masks:
mask_args = get_function_arguments(
mask_args, mask_static_kwargs = get_function_arguments(
mask_config.use,
object_copies,
function_name=mask_config.function.__name__,
static_kwargs=mask_config.get("static_kwargs"),
)

selection_mask = mask_config.function(*mask_args)
selection_mask = mask_config.function(
*mask_args, **mask_static_kwargs
)
if not isinstance(selection_mask, ak.Array):
raise TypeError(
f"Mask must be an awkward array. Got {type(selection_mask)}"
Expand Down Expand Up @@ -301,6 +304,7 @@ def apply_syst_function(
function_args: List[ak.Array],
affected_arrays: Union[ak.Array, List[ak.Array]],
operation: str,
static_kwargs: Optional[Dict[str, Any]] = None,
) -> Union[ak.Array, List[ak.Array]]:
"""
Apply function-based systematic variation.
Expand All @@ -312,7 +316,9 @@ def apply_syst_function(
syst_function : Callable[..., ak.Array]
Variation function
function_args : List[ak.Array]
Function arguments
Positional arguments for the variation function
static_kwargs : Optional[Dict[str, Any]]
Static keyword arguments for the variation function
affected_arrays : Union[ak.Array, List[ak.Array]]
Array(s) to modify
operation : str
Expand All @@ -324,7 +330,8 @@ def apply_syst_function(
Modified array(s)
"""
logger.debug("Applying function-based systematic: %s", syst_name)
variation = syst_function(*function_args)
kwargs = static_kwargs or {}
variation = syst_function(*function_args, **kwargs)

if isinstance(affected_arrays, list):
return [
Expand Down Expand Up @@ -459,10 +466,11 @@ def apply_object_corrections(
continue

# Prepare arguments and targets
args = get_function_arguments(
corr_args, corr_static_kwargs = get_function_arguments(
correction.use,
object_copies,
function_name=f"correction::{correction.name}",
static_kwargs=correction.get("static_kwargs"),
)
targets = self._get_target_arrays(
correction.target,
Expand All @@ -487,7 +495,7 @@ def apply_object_corrections(
correction_name=correction.name,
correction_key=key,
direction=corr_direction,
correction_args=args,
correction_args=corr_args,
target=targets,
operation=operation,
transform=transform,
Expand All @@ -498,9 +506,10 @@ def apply_object_corrections(
corrected_values = self.apply_syst_function(
syst_name=correction.name,
syst_function=syst_func,
function_args=args,
function_args=corr_args,
affected_arrays=targets,
operation=operation,
static_kwargs=corr_static_kwargs,
)
else:
corrected_values = targets
Expand Down Expand Up @@ -542,10 +551,11 @@ def apply_event_weight_correction(
return weights

# Prepare arguments
args = get_function_arguments(
weight_args, weight_static_kwargs = get_function_arguments(
systematic.use,
object_copies,
function_name=f"systematic::{systematic.name}",
static_kwargs=systematic.get("static_kwargs"),
)
operation = systematic.op
key = systematic.key
Expand All @@ -559,7 +569,7 @@ def apply_event_weight_correction(
correction_name=systematic.name,
correction_key=key,
direction=corr_direction,
correction_args=args,
correction_args=weight_args,
target=weights,
operation=operation,
transform=transform,
Expand All @@ -570,9 +580,10 @@ def apply_event_weight_correction(
return self.apply_syst_function(
syst_name=systematic.name,
syst_function=syst_func,
function_args=args,
function_args=weight_args,
affected_arrays=weights,
operation=operation,
static_kwargs=weight_static_kwargs,
)
return weights

Expand All @@ -596,10 +607,11 @@ def compute_ghost_observables(
for ghost in self.config.ghost_observables:

logger.debug("Computing ghost observables: %s", ghost.names)
args = get_function_arguments(
ghost.use, object_copies, function_name=ghost.function.__name__
ghost_args, ghost_static_kwargs = get_function_arguments(
ghost.use, object_copies, function_name=ghost.function.__name__,
static_kwargs=ghost.get("static_kwargs")
)
outputs = ghost.function(*args)
outputs = ghost.function(*ghost_args, **ghost_static_kwargs)

# Normalize outputs to list
if not isinstance(outputs, (list, tuple)):
Expand Down
Loading