Skip to content

Commit 35ad5ac

Browse files
committed
(#13) Threadsafe I/O support
Need to load threadsafe HDF, e.g., module load hdf5/1.12.1-threadsafe-gcc-11.2.0 module load netcdf-c/4.8.1-gcc-11.2.0 and reinstall Python library against these libraries: pip install --no-binary=...
1 parent 0f7759e commit 35ad5ac

File tree

1 file changed

+75
-14
lines changed

1 file changed

+75
-14
lines changed

runs/icon_etopo_global.py

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -538,25 +538,48 @@ def parallel_wrapper(grid, params, reader, writer, chunk_output_dir, clat_rad, c
538538
SYSTEM_CONFIG = 'laptop_performance' # ← Edit this line to switch configs
539539
# ========================================================================
540540

541+
# ========================================================================
542+
# QUICK START GUIDE - Processing Specific Cell Ranges
543+
# ========================================================================
544+
# To process specific cell ranges (e.g., to regenerate corrupted chunks):
545+
#
546+
# 1. Scroll down to "CELL RANGE CONFIGURATION" section (around line 690)
547+
# 2. Set cell_start and cell_end:
548+
#
549+
# Examples:
550+
# cell_start = 0, cell_end = 100 → Process cells 0-99 only
551+
# cell_start = 2900, cell_end = 3000 → Process cells 2900-2999 only
552+
# cell_start = 0, cell_end = None → Process all cells from 0 to end
553+
# cell_start = 3000, cell_end = None → Process from 3000 to end
554+
#
555+
# 3. Run the script - it will create appropriately named NetCDF files
556+
#
557+
# Note: Files are created in chunks of netcdf_chunk_size (default: 100)
558+
# Example: cells 0-99 → icon_etopo_global_cells_00000-00099.nc
559+
# ========================================================================
560+
541561
CONFIGS = {
542562
'generic_laptop': {
543563
'total_cores': 12, # Conservative: use 12 of 16 threads
544564
'total_memory_gb': 12.0,
545565
'netcdf_chunk_size': 100,
566+
'threads_per_worker': 1, # Set to None for auto-compute
546567
'memory_per_cpu_mb': None, # Will calculate dynamically
547568
'description': 'Generic laptop (16 threads, 16GB RAM)'
548569
},
549570
'dkrz_hpc': {
550571
'total_cores': 250,
551572
'total_memory_gb': 240.0,
552573
'netcdf_chunk_size': 100,
574+
'threads_per_worker': None, # Auto-compute based on worker memory
553575
'memory_per_cpu_mb': None, # SLURM quota on interactive partition
554576
'description': 'DKRZ HPC interactive partition (standard memory node)'
555577
},
556578
'laptop_performance': {
557579
'total_cores': 20, # Use 20 of 24 threads (leave 4 for background)
558580
'total_memory_gb': 80.0,
559581
'netcdf_chunk_size': 100,
582+
'threads_per_worker': None, # Auto-compute based on worker memory
560583
'memory_per_cpu_mb': None, # Will calculate dynamically
561584
'description': 'AMD Ryzen AI 9 HX 370 (24 threads, 94GB RAM)'
562585
}
@@ -654,11 +677,15 @@ def parallel_wrapper(grid, params, reader, writer, chunk_output_dir, clat_rad, c
654677
logger.info(f" Available cores: {total_cores}")
655678
logger.info(f" Available memory: {total_memory_gb} GB")
656679
logger.info(f" NetCDF chunk size: {netcdf_chunk_size} cells")
680+
681+
# Threading configuration display
682+
if config['threads_per_worker'] is not None:
683+
logger.info(f" Threading mode: MANUAL (threads_per_worker = {config['threads_per_worker']})")
684+
else:
685+
logger.info(f" Threading mode: AUTO (will compute based on worker count)")
686+
657687
if config['memory_per_cpu_mb'] is not None:
658688
logger.info(f" SLURM quota: {config['memory_per_cpu_mb']} MB per CPU")
659-
logger.info(f" Mode: HPC (threads scale with worker memory)")
660-
else:
661-
logger.info(f" Mode: Laptop (threads distributed evenly)")
662689
logger.info("=" * 80)
663690

664691
# Group cells by memory requirements for dynamic worker allocation
@@ -676,13 +703,33 @@ def parallel_wrapper(grid, params, reader, writer, chunk_output_dir, clat_rad, c
676703
client = None
677704
current_batch_idx = None
678705

679-
logger.info(f"Total cells to process: {n_cells}")
706+
logger.info(f"Total cells in grid: {n_cells}")
707+
708+
# ========================================================================
709+
# CELL RANGE CONFIGURATION
710+
# ========================================================================
711+
# Set cell_start and cell_end to process specific ranges
712+
# Examples:
713+
# cell_start = 0, cell_end = None → Process all cells (0 to n_cells-1)
714+
# cell_start = 2900, cell_end = 3000 → Process cells 2900-2999 only
715+
# cell_start = 0, cell_end = 100 → Process cells 0-99 only
716+
cell_start = 0 # First cell to process (inclusive)
717+
cell_end = None # Last cell to process (exclusive), None means process to end
718+
# ========================================================================
719+
720+
# Validate and set cell_end
721+
if cell_end is None:
722+
cell_end = n_cells
723+
else:
724+
cell_end = min(cell_end, n_cells) # Don't exceed total cells
680725

681-
cell_start = 0 # Start from beginning (can be modified for restart)
726+
if cell_start >= cell_end:
727+
raise ValueError(f"Invalid cell range: cell_start ({cell_start}) >= cell_end ({cell_end})")
682728

683729
# Progress tracking
684-
total_netcdf_chunks = (n_cells - cell_start + netcdf_chunk_size - 1) // netcdf_chunk_size
685-
logger.info(f"\nProcessing {n_cells - cell_start} cells:")
730+
cells_to_process = cell_end - cell_start
731+
total_netcdf_chunks = (cells_to_process + netcdf_chunk_size - 1) // netcdf_chunk_size
732+
logger.info(f"\nProcessing cell range: {cell_start} to {cell_end-1} ({cells_to_process} cells)")
686733
logger.info(f" NetCDF chunks: {total_netcdf_chunks} files ({netcdf_chunk_size} cells each)\n")
687734

688735
# Statistics
@@ -703,11 +750,11 @@ def parallel_wrapper(grid, params, reader, writer, chunk_output_dir, clat_rad, c
703750

704751
# Outer loop: NetCDF file creation (one file per netcdf_chunk_size cells)
705752
for netcdf_chunk_idx, netcdf_chunk_start in enumerate(tqdm(
706-
range(cell_start, n_cells, netcdf_chunk_size),
753+
range(cell_start, cell_end, netcdf_chunk_size),
707754
desc="NetCDF chunks",
708755
total=total_netcdf_chunks
709756
)):
710-
netcdf_chunk_end = min(netcdf_chunk_start + netcdf_chunk_size, n_cells)
757+
netcdf_chunk_end = min(netcdf_chunk_start + netcdf_chunk_size, cell_end)
711758

712759
# Create subdirectory for this NetCDF chunk's plots
713760
chunk_output_dir = base_output_dir / f"cells_{netcdf_chunk_start:05d}-{netcdf_chunk_end-1:05d}"
@@ -744,15 +791,29 @@ def parallel_wrapper(grid, params, reader, writer, chunk_output_dir, clat_rad, c
744791
n_workers = batch_config['n_workers']
745792
memory_per_worker = f"{int(batch_config['memory_per_worker_gb'])}GB"
746793

747-
# CRITICAL: threads_per_worker MUST be 1 because HDF5 is not thread-safe
748-
# HDF5 was not compiled with --enable-threadsafe on this system.
749-
# Even opening different NetCDF files from different threads causes crashes.
750-
# Use more workers instead of threads for parallelism.
751-
threads_per_worker = 1
794+
# ============================================================
795+
# THREADS PER WORKER CONFIGURATION
796+
# ============================================================
797+
# If threads_per_worker is explicitly set in config, use that value
798+
# Otherwise, auto-compute based on available cores and workers
799+
if config['threads_per_worker'] is not None:
800+
threads_per_worker = config['threads_per_worker']
801+
logger.info(f"\n Using manual threads_per_worker: {threads_per_worker}")
802+
else:
803+
# Auto-compute: distribute available cores among workers
804+
# Reserve at least 1 thread per worker, and cap at reasonable maximum
805+
threads_per_worker = max(1, min(4, total_cores // n_workers))
806+
logger.info(f"\n Auto-computed threads_per_worker: {threads_per_worker}")
807+
logger.info(f" (Based on {total_cores} cores / {n_workers} workers)")
808+
809+
# Note: Thread-safe HDF5 is required for threads_per_worker > 1
810+
# Verify with: python3 -c "import netCDF4; print(netCDF4.__hdf5libversion__)"
811+
# ============================================================
752812

753813
logger.info(f"\n Starting Dask client for memory batch {mem_batch_idx}:")
754814
logger.info(f" Workers: {n_workers} × {memory_per_worker}")
755815
logger.info(f" Threads per worker: {threads_per_worker}")
816+
logger.info(f" Total parallel threads: {n_workers * threads_per_worker}")
756817
logger.info(f" Expected memory per cell: {batch_config['memory_per_cell_gb']:.1f} GB")
757818

758819
client = Client(

0 commit comments

Comments
 (0)