Skip to content

Commit 5dbda3d

Browse files
committed
improved pyptv_batch_parallel
1 parent cbde753 commit 5dbda3d

File tree

3 files changed

+932
-95
lines changed

3 files changed

+932
-95
lines changed

demo_parallel_batch.py

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Demonstration script for the improved pyptv_batch_parallel.py functionality.
4+
5+
This script shows how to use the improved parallel batch processing with
6+
proper logging, error handling, and CPU optimization.
7+
"""
8+
9+
import sys
10+
import tempfile
11+
import shutil
12+
import multiprocessing
13+
from pathlib import Path
14+
import logging
15+
16+
# Import our improved pyptv_batch_parallel components
17+
from pyptv.pyptv_batch_parallel import (
18+
main,
19+
chunk_ranges,
20+
validate_experiment_directory,
21+
ProcessingError,
22+
AttrDict,
23+
logger
24+
)
25+
26+
def create_test_experiment_directory():
27+
"""Create a temporary test experiment directory with required structure."""
28+
temp_dir = tempfile.mkdtemp()
29+
exp_path = Path(temp_dir) / "test_experiment"
30+
exp_path.mkdir()
31+
32+
# Create required directories
33+
for dirname in ["parameters", "img", "cal", "res"]:
34+
(exp_path / dirname).mkdir()
35+
36+
# Create ptv.par file with camera count
37+
ptv_par = exp_path / "parameters" / "ptv.par"
38+
ptv_par.write_text("4\n") # 4 cameras for test
39+
40+
logger.info(f"Created test experiment directory: {exp_path}")
41+
return exp_path, temp_dir
42+
43+
def demonstrate_chunk_ranges():
44+
"""Demonstrate frame range chunking functionality."""
45+
logger.info("=== Demonstrating Frame Range Chunking ===")
46+
47+
test_cases = [
48+
(1000, 1019, 4), # 20 frames, 4 processes
49+
(1000, 1010, 3), # 11 frames, 3 processes
50+
(1000, 1005, 8), # 6 frames, 8 processes (more processes than frames)
51+
(1000, 1000, 2), # 1 frame, 2 processes
52+
]
53+
54+
for first, last, n_processes in test_cases:
55+
total_frames = last - first + 1
56+
logger.info(f"Chunking {total_frames} frames ({first}-{last}) into {n_processes} processes:")
57+
58+
try:
59+
ranges = chunk_ranges(first, last, n_processes)
60+
for i, (chunk_first, chunk_last) in enumerate(ranges):
61+
chunk_size = chunk_last - chunk_first + 1
62+
logger.info(f" Process {i+1}: frames {chunk_first}-{chunk_last} ({chunk_size} frames)")
63+
except Exception as e:
64+
logger.error(f" Error: {e}")
65+
66+
logger.info("")
67+
68+
def demonstrate_cpu_optimization():
69+
"""Demonstrate CPU count detection and optimization recommendations."""
70+
logger.info("=== CPU Optimization Demonstration ===")
71+
72+
cpu_count = multiprocessing.cpu_count()
73+
logger.info(f"Available CPU cores: {cpu_count}")
74+
75+
# Demonstrate different process count scenarios
76+
scenarios = [
77+
("Conservative (50% of cores)", max(1, cpu_count // 2)),
78+
("Moderate (75% of cores)", max(1, int(cpu_count * 0.75))),
79+
("Aggressive (100% of cores)", cpu_count),
80+
("Over-subscription (150% of cores)", int(cpu_count * 1.5)),
81+
]
82+
83+
for description, n_processes in scenarios:
84+
logger.info(f"{description}: {n_processes} processes")
85+
if n_processes > cpu_count:
86+
logger.warning(f" ⚠️ Over-subscription may reduce performance")
87+
elif n_processes == cpu_count:
88+
logger.info(f" ✓ Optimal for CPU-bound tasks")
89+
else:
90+
logger.info(f" ✓ Conservative, leaves resources for system")
91+
92+
logger.info("")
93+
94+
def demonstrate_error_handling():
95+
"""Demonstrate error handling in parallel processing."""
96+
logger.info("=== Error Handling Demonstration ===")
97+
98+
try:
99+
# Test invalid frame range
100+
chunk_ranges(2000, 1000, 4)
101+
except ValueError as e:
102+
logger.info(f"✓ Caught invalid frame range: {e}")
103+
104+
try:
105+
# Test invalid process count
106+
chunk_ranges(1000, 2000, 0)
107+
except ValueError as e:
108+
logger.info(f"✓ Caught invalid process count: {e}")
109+
110+
# Test directory validation
111+
try:
112+
nonexistent_path = Path("/nonexistent/directory")
113+
validate_experiment_directory(nonexistent_path)
114+
except ProcessingError as e:
115+
logger.info(f"✓ Caught directory validation error: {e}")
116+
117+
logger.info("")
118+
119+
def simulate_parallel_processing():
120+
"""Simulate the parallel processing workflow."""
121+
logger.info("=== Simulating Parallel Processing Workflow ===")
122+
123+
exp_path, temp_dir = create_test_experiment_directory()
124+
125+
try:
126+
logger.info("Starting simulated parallel batch processing...")
127+
128+
# Validate directory (should succeed)
129+
validate_experiment_directory(exp_path)
130+
logger.info("✓ Directory validation completed")
131+
132+
# Demonstrate chunking for different scenarios
133+
test_scenarios = [
134+
(1000, 1019, 4, "Optimal chunking: 20 frames, 4 processes"),
135+
(1000, 1050, 8, "Large dataset: 51 frames, 8 processes"),
136+
(1000, 1005, 2, "Small dataset: 6 frames, 2 processes"),
137+
]
138+
139+
for seq_first, seq_last, n_processes, description in test_scenarios:
140+
logger.info(f"\n{description}")
141+
total_frames = seq_last - seq_first + 1
142+
143+
ranges = chunk_ranges(seq_first, seq_last, n_processes)
144+
logger.info(f" Total frames: {total_frames}")
145+
logger.info(f" Processes: {n_processes}")
146+
logger.info(f" Chunks: {len(ranges)}")
147+
148+
for i, (chunk_first, chunk_last) in enumerate(ranges):
149+
chunk_size = chunk_last - chunk_first + 1
150+
logger.info(f" Process {i+1}: {chunk_first}-{chunk_last} ({chunk_size} frames)")
151+
152+
logger.info("\n✓ Simulated processing setup completed")
153+
154+
except Exception as e:
155+
logger.error(f"Simulation failed: {e}")
156+
finally:
157+
shutil.rmtree(temp_dir)
158+
159+
def demonstrate_performance_considerations():
160+
"""Demonstrate performance considerations for parallel processing."""
161+
logger.info("=== Performance Considerations ===")
162+
163+
cpu_count = multiprocessing.cpu_count()
164+
165+
logger.info("Guidelines for choosing number of processes:")
166+
logger.info("1. CPU-bound tasks (like image processing):")
167+
logger.info(f" - Optimal: {cpu_count} processes (one per core)")
168+
logger.info(f" - Conservative: {max(1, cpu_count // 2)} processes (50% of cores)")
169+
170+
logger.info("\n2. I/O-bound tasks (reading many files):")
171+
logger.info(f" - Can use more: {cpu_count * 2} processes")
172+
logger.info(" - Limited by storage speed, not CPU")
173+
174+
logger.info("\n3. Memory considerations:")
175+
logger.info(" - Each process loads full experiment data")
176+
logger.info(" - Monitor memory usage with many processes")
177+
logger.info(" - Reduce processes if memory becomes limiting factor")
178+
179+
logger.info("\n4. Frame range considerations:")
180+
frame_scenarios = [
181+
(100, "Very small dataset - consider sequential processing"),
182+
(1000, "Small dataset - 2-4 processes optimal"),
183+
(10000, "Medium dataset - 4-8 processes optimal"),
184+
(100000, "Large dataset - 8+ processes beneficial"),
185+
]
186+
187+
for frames, recommendation in frame_scenarios:
188+
logger.info(f" - {frames} frames: {recommendation}")
189+
190+
logger.info("")
191+
192+
def main_demo():
193+
"""Run all demonstrations."""
194+
logger.info("PyPTV Parallel Batch Processing Demonstration")
195+
logger.info("=" * 60)
196+
197+
# Run all demonstrations
198+
demonstrate_chunk_ranges()
199+
demonstrate_cpu_optimization()
200+
demonstrate_error_handling()
201+
simulate_parallel_processing()
202+
demonstrate_performance_considerations()
203+
204+
logger.info("=" * 60)
205+
logger.info("Demonstration completed successfully!")
206+
207+
# Show environment information
208+
logger.info(f"Python version: {sys.version}")
209+
logger.info(f"CPU cores available: {multiprocessing.cpu_count()}")
210+
logger.info(f"Running from: {sys.executable}")
211+
212+
if __name__ == "__main__":
213+
# Configure logging to show all messages
214+
logging.basicConfig(
215+
level=logging.DEBUG,
216+
format='%(asctime)s - %(levelname)s - %(message)s'
217+
)
218+
219+
try:
220+
main_demo()
221+
except Exception as e:
222+
logger.error(f"Demo execution failed: {e}")
223+
sys.exit(1)

0 commit comments

Comments
 (0)