Skip to content

Commit 640f554

Browse files
committed
fixup
1 parent 4020ed1 commit 640f554

File tree

2 files changed

+38
-35
lines changed

2 files changed

+38
-35
lines changed

pycsa/core/fourier.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ def get_freq_grid(self, a_m):
312312
cos_terms = a_m[: len(self.k_idx)]
313313
sin_terms = a_m[len(self.k_idx) :]
314314

315-
fourier_coeff = np.zeros((nhar_i, nhar_j), dtype=np.complex_)
315+
fourier_coeff = np.zeros((nhar_i, nhar_j), dtype=np.complex128)
316316

317317
for cnt, (row, col) in enumerate(zip(self.k_idx, self.l_idx)):
318318
fourier_coeff[row, col] = cos_terms[cnt] + 1.0j * sin_terms[cnt]

pycsa/core/io.py

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import numpy as np
77
import h5py
88
import os
9+
import threading
910

1011
from datetime import datetime
1112
from scipy import interpolate
@@ -149,6 +150,7 @@ def __init__(self, cell, params, verbose=False, is_parallel=False):
149150
self.verbose = verbose
150151
self.opened_dfs = []
151152
self.file_cache = {} # Cache for opened NetCDF files: {filepath: Dataset}
153+
self._cache_lock = threading.Lock() # Thread-safe cache access
152154

153155
self.fn_lon = np.array(
154156
[
@@ -186,12 +188,14 @@ def _get_cached_file(self, filepath):
186188
"""
187189
Get a cached NetCDF file handle, or open and cache it if not already open.
188190
This dramatically speeds up parallel processing by avoiding repeated file opens.
191+
Thread-safe with locking to prevent race conditions.
189192
"""
190-
if filepath not in self.file_cache:
191-
if self.verbose:
192-
print(f"Opening and caching: {filepath}")
193-
self.file_cache[filepath] = nc.Dataset(filepath, "r")
194-
return self.file_cache[filepath]
193+
with self._cache_lock:
194+
if filepath not in self.file_cache:
195+
if self.verbose:
196+
print(f"Opening and caching: {filepath}")
197+
self.file_cache[filepath] = nc.Dataset(filepath, "r")
198+
return self.file_cache[filepath]
195199

196200
def close_cached_files(self):
197201
"""Close all cached NetCDF files."""
@@ -624,6 +628,7 @@ def __init__(self, cell, params, verbose=False, is_parallel=False):
624628
self.verbose = verbose
625629
self.opened_dfs = []
626630
self.file_cache = {} # Cache for opened NetCDF files: {filepath: Dataset}
631+
self._cache_lock = threading.Lock() # Thread-safe cache access
627632

628633
# ETOPO 2022 tiles are at 15 degree intervals
629634
self.fn_lon = np.array([
@@ -648,36 +653,34 @@ def _get_cached_file(self, filepath):
648653
Get a cached NetCDF file handle, or open and cache if not already open.
649654
This dramatically speeds up parallel processing by avoiding repeated file opens.
650655
651-
Uses diskless=True to avoid HDF5 file locking issues in parallel/distributed environments.
656+
Thread-safe with locking. Uses standard file mode (not diskless) to avoid
657+
memory explosion when multiple threads load large files simultaneously.
652658
"""
653-
if filepath not in self.file_cache:
654-
if self.verbose:
655-
print(f"Opening and caching: {filepath}")
656-
657-
import time
658-
max_retries = 3
659-
retry_delay = 0.5
660-
661-
for attempt in range(max_retries):
662-
try:
663-
# diskless=True loads file into memory, avoiding HDF5 multiprocess locking issues
664-
self.file_cache[filepath] = nc.Dataset(filepath, "r", diskless=True, persist=False)
665-
break
666-
except (OSError, RuntimeError, TypeError) as e:
667-
if attempt < max_retries - 1:
668-
# Retry with exponential backoff
669-
if self.verbose:
670-
print(f"Warning: Attempt {attempt+1} failed for {filepath}, retrying: {e}")
671-
time.sleep(retry_delay * (2 ** attempt))
672-
else:
673-
# Final attempt: try without diskless
674-
if self.verbose:
675-
print(f"Warning: diskless mode failed after {max_retries} attempts, trying normal mode: {e}")
676-
try:
677-
self.file_cache[filepath] = nc.Dataset(filepath, "r")
678-
except Exception as e2:
679-
raise RuntimeError(f"Failed to open {filepath} with both diskless and normal modes: {e2}")
680-
return self.file_cache[filepath]
659+
with self._cache_lock:
660+
if filepath not in self.file_cache:
661+
if self.verbose:
662+
print(f"Opening and caching: {filepath}")
663+
664+
import time
665+
max_retries = 3
666+
retry_delay = 0.5
667+
668+
for attempt in range(max_retries):
669+
try:
670+
# Use standard file access (not diskless) to avoid memory issues
671+
# with multiple threads loading 60GB files into RAM simultaneously.
672+
# The OS file cache handles caching efficiently across threads.
673+
self.file_cache[filepath] = nc.Dataset(filepath, "r")
674+
break
675+
except (OSError, RuntimeError, TypeError) as e:
676+
if attempt < max_retries - 1:
677+
# Retry with exponential backoff
678+
if self.verbose:
679+
print(f"Warning: Attempt {attempt+1} failed for {filepath}, retrying: {e}")
680+
time.sleep(retry_delay * (2 ** attempt))
681+
else:
682+
raise RuntimeError(f"Failed to open {filepath} after {max_retries} attempts: {e}")
683+
return self.file_cache[filepath]
681684

682685
def close_cached_files(self):
683686
"""Close all cached NetCDF files."""

0 commit comments

Comments
 (0)