Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cfs3/drs_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ def drs_process(contents, collapsed, skipped):


def drs_pretty(processed_drs, skipped):
print('Skipped???', skipped)
with Capturing() as output:
for k,v in processed_drs.items():
print(f'{_i(k)} : {_e(v)}')
Expand Down
130 changes: 129 additions & 1 deletion cfs3/logging_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import logging
import sys
import os
import time
from functools import wraps
from contextlib import contextmanager
from typing import Optional

def get_logger(name: str = None, level: int = logging.DEBUG) -> logging.Logger:
def get_logger(name: Optional[str] = None, level: int = logging.DEBUG) -> logging.Logger:
"""
Return a consistent logger instance.

Expand Down Expand Up @@ -31,3 +35,127 @@ def get_logger(name: str = None, level: int = logging.DEBUG) -> logging.Logger:
logger.propagate = True

return logger


# Global flag to enable/disable timing
_TIMING_ENABLED = False


def enable_timing():
"""Enable timing output globally."""
global _TIMING_ENABLED
_TIMING_ENABLED = True


def disable_timing():
"""Disable timing output globally."""
global _TIMING_ENABLED
_TIMING_ENABLED = False


def is_timing_enabled():
"""Check if timing is currently enabled."""
return _TIMING_ENABLED


def timed(func=None, *, name=None, output_func=None, threshold=None, enabled=None):
"""
Decorator to time function execution.

Args:
func: The function to decorate (automatically provided when used as @timed)
name: Optional custom name for the timed operation (defaults to function name)
output_func: Optional function to call with timing message (defaults to print)
threshold: Optional minimum time in seconds to report (only report if exceeds threshold)
enabled: Optional boolean to enable/disable timing (overrides global flag)

Usage:
@timed
def my_function():
...

@timed(name="Custom Operation")
def my_function():
...

@timed(threshold=0.1) # Only report if takes > 0.1 seconds
def my_function():
...

@timed(enabled=True) # Force timing regardless of global flag
def my_function():
...
"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
# Check if timing is enabled: use explicit enabled param, otherwise global flag
is_enabled = enabled if enabled is not None else _TIMING_ENABLED
if not is_enabled:
return f(*args, **kwargs)

operation_name = name or f"{f.__module__}.{f.__name__}"
start_time = time.perf_counter()
try:
result = f(*args, **kwargs)
return result
finally:
elapsed = time.perf_counter() - start_time
if threshold is None or elapsed >= threshold:
message = f"[TIMING] {operation_name}: {elapsed:.4f}s"
if output_func:
output_func(message)
else:
print(message)
return wrapper

# Handle both @timed and @timed(...) syntax
if func is None:
return decorator
else:
return decorator(func)


@contextmanager
def timing(name, output_func=None, threshold=None, enabled=None):
"""
Context manager to time a code block.

Args:
name: Name of the operation being timed
output_func: Optional function to call with timing message (defaults to print)
threshold: Optional minimum time in seconds to report
enabled: Optional boolean to enable/disable timing (overrides global flag)

Usage:
with timing("load data"):
data = load_large_file()

with timing("process", output_func=logger.info):
process_data()

with timing("quick operation", threshold=0.1):
# Only reported if takes > 0.1 seconds
do_something()

with timing("important op", enabled=True):
# Force timing regardless of global flag
do_critical_work()
"""
# Check if timing is enabled: use explicit enabled param, otherwise global flag
is_enabled = enabled if enabled is not None else _TIMING_ENABLED
if not is_enabled:
yield
return

start_time = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start_time
if threshold is None or elapsed >= threshold:
message = f"[TIMING] {name}: {elapsed:.4f}s"
if output_func:
output_func(message)
else:
print(message)
66 changes: 59 additions & 7 deletions cfs3/p5inspect.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from pyfive.inspect import p5ncdump
from cfs3.s3core import get_user_config, Capturing
import s3fs
import logging

def p5view(alias, bucket, path, object, special=False):
def p5view(alias, bucket, path, object, special=False, log_level=logging.WARNING):
"""
Approximate the use of ncdump -h on the object at path in bucket

Args:
alias: S3 location alias
bucket: S3 bucket name
path: Path within bucket
object: Object/file name
special: Display special attributes (not yet implemented)
log_level: Logging level to use (default: WARNING to suppress INFO logs)
"""
MB = 2**20
credentials = get_user_config(alias)
Expand All @@ -24,10 +33,53 @@ def p5view(alias, bucket, path, object, special=False):

fs = s3fs.S3FileSystem(**storage_options)

with Capturing() as output:
if True:
log_capture = []
pyfive_logger_states = {}

# Only configure pyfive logging when we want to capture it
if log_level <= logging.INFO:
# Find all pyfive loggers and save their state
for name in list(logging.Logger.manager.loggerDict.keys()):
if name.startswith('pyfive'):
logger = logging.getLogger(name)
if isinstance(logger, logging.Logger):
pyfive_logger_states[name] = {
'level': logger.level,
'handlers': logger.handlers.copy(),
'propagate': logger.propagate
}

# Create a handler that captures log messages
class ListHandler(logging.Handler):
def emit(self, record):
log_capture.append(self.format(record))

log_handler = ListHandler()
log_handler.setLevel(log_level)
log_handler.setFormatter(logging.Formatter('[%(levelname)s] %(message)s'))

# Configure all pyfive loggers to capture to our handler
for name in pyfive_logger_states.keys():
logger = logging.getLogger(name)
logger.setLevel(log_level)
logger.propagate = False
logger.handlers.clear()
logger.addHandler(log_handler)

try:
with Capturing() as output:
with fs.open(file_uri) as s3file:
p5ncdump(s3file, special=True)
if special:
output.append('(Note that support for the special option is not yet implemented.)')
return output
p5ncdump(s3file, special=special)

finally:
# Restore pyfive logger states if we changed them
if pyfive_logger_states:
for name, state in pyfive_logger_states.items():
logger = logging.getLogger(name)
logger.handlers.clear()
for handler in state['handlers']:
logger.addHandler(handler)
logger.setLevel(state['level'])
logger.propagate = state['propagate']

return output, log_capture
Loading
Loading