-
Notifications
You must be signed in to change notification settings - Fork 0
Closed
Labels
data-pipelineData collection and processing tasksData collection and processing tasksgex-calculationGamma exposure calculation tasksGamma exposure calculation tasksresearchGeneral research tasksGeneral research tasks
Description
Overview
Implement a dedicated caching system for pre-calculated Gamma Exposure (GEX) values to enable efficient multi-symbol, multi-timeframe analysis without repetitive heavy calculations. This cache will store GEX computations indexed by symbol, date, strike, and expiration for instant retrieval.
Problem Statement
Computational Challenges
- Heavy Calculations: GEX requires complex aggregations across all strikes and expirations
- Frequent Access: Same GEX levels needed for multiple analyses and pattern detection
- Multi-Symbol Processing: Concurrent analysis of SPY, SPX, and other symbols
- Historical Analysis: Backtesting requires GEX data across years of history
- Real-time Requirements: Live pattern detection needs sub-second GEX lookup
Current Limitations
- No GEX caching - recalculation on every request
- Options data cache separate from computed metrics
- No indexing strategy for fast GEX retrieval
- Missing concurrent processing optimization
Technical Requirements
GEX Cache Architecture
Primary Cache Structure
.cache/
├── gex_data/
│ ├── SPY/
│ │ ├── 2024-01-15/
│ │ │ ├── gex_summary.json # Daily aggregated GEX
│ │ │ ├── gex_by_strike.parquet # Strike-level GEX breakdown
│ │ │ ├── gex_by_expiration.json # Expiry-level aggregations
│ │ │ └── metadata.json # Calculation timestamp, version
│ │ └── 2024-01-16/
│ ├── SPX/
│ └── index/
│ └── gex_cache_index.sqlite # Fast lookup index
Cache Key Strategy
# Hierarchical cache keys
primary_key = f"gex_{symbol}_{trading_date}" # Daily summary
strike_key = f"gex_{symbol}_{trading_date}_{strike}" # Strike-level
expiry_key = f"gex_{symbol}_{trading_date}_{expiry}" # Expiry-level
# Index keys for fast lookup
index_keys = [
(symbol, trading_date, 'summary'),
(symbol, trading_date, strike, expiry),
(symbol, trading_date, 'flip_point'),
(symbol, trading_date, 'total_exposure')
]Data Structures
GEX Summary (Daily Aggregate)
{
"symbol": "SPY",
"trading_date": "2024-01-15",
"calculation_timestamp": "2024-01-15T16:30:00Z",
"underlying_price": 450.25,
"total_gex": 2.5e9, // Total dollar gamma exposure
"net_gex": 1.2e9, // Net dealer exposure
"flip_point": 447.50, // Zero gamma crossing
"absolute_gex": 3.7e9, // Sum of absolute exposures
"call_gex": 1.8e9, // Total call exposure
"put_gex": -0.6e9, // Total put exposure (negative)
"gex_concentration": {
"top_5_strikes_pct": 68.5, // % of GEX in top 5 strikes
"atm_concentration_pct": 45.2 // % within ±2% of spot
},
"key_levels": {
"resistance_levels": [455.0, 460.0], // High positive GEX
"support_levels": [445.0, 440.0], // High negative GEX
"gamma_flip_range": [447.0, 448.0] // Zero crossing zone
},
"expiration_breakdown": {
"2024-01-19": {"gex": 800e6, "weight": 0.32},
"2024-01-26": {"gex": 600e6, "weight": 0.24},
"2024-02-16": {"gex": 1.1e9, "weight": 0.44}
},
"calculation_metadata": {
"options_contracts_processed": 1247,
"calculation_method": "black_scholes_numeric",
"risk_free_rate": 0.045,
"calculation_duration_ms": 234
}
}Strike-Level GEX (Detailed Breakdown)
# Stored as Parquet for performance
strike_level_data = {
'strike': [445.0, 446.0, 447.0, ...],
'call_gex': [50e6, 75e6, 125e6, ...], // Call gamma exposure
'put_gex': [-25e6, -45e6, -85e6, ...], // Put gamma exposure
'net_gex': [25e6, 30e6, 40e6, ...], // Net exposure per strike
'call_volume': [1500, 2200, 3400, ...], // Supporting volume data
'put_volume': [800, 1200, 2100, ...],
'call_oi': [5000, 7500, 12000, ...], // Supporting OI data
'put_oi': [3000, 4500, 8500, ...],
'distance_from_spot': [-5.25, -4.25, -3.25, ...], // Moneyness
'time_weighted_gex': [22e6, 28e6, 38e6, ...] // Time-decay adjusted
}Implementation Requirements
Core Cache Manager Class
class GEXCacheManager:
def __init__(self, base_cache_dir: str = ".cache"):
self.gex_cache_dir = Path(base_cache_dir) / "gex_data"
self.index_path = Path(base_cache_dir) / "index" / "gex_cache_index.sqlite"
self._setup_cache_structure()
self._setup_index()
def store_gex_calculation(self,
symbol: str,
trading_date: str,
gex_summary: Dict,
strike_breakdown: pd.DataFrame,
expiry_breakdown: Dict) -> None:
"""Store complete GEX calculation results."""
pass
def get_gex_summary(self, symbol: str, trading_date: str) -> Optional[Dict]:
"""Retrieve daily GEX summary."""
pass
def get_gex_by_strike_range(self,
symbol: str,
trading_date: str,
strike_min: float,
strike_max: float) -> pd.DataFrame:
"""Retrieve GEX data for specific strike range."""
pass
def get_historical_flip_points(self,
symbol: str,
start_date: str,
end_date: str) -> pd.DataFrame:
"""Retrieve historical gamma flip point time series."""
pass
def batch_get_gex(self, requests: List[GEXRequest]) -> Dict[str, Any]:
"""Efficient batch retrieval for multiple symbols/dates."""
pass
def invalidate_cache(self, symbol: str, trading_date: str) -> None:
"""Remove cached GEX data (for recalculation)."""
passCache Indexing Strategy
SQLite Index Schema
CREATE TABLE gex_cache_index (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
trading_date TEXT NOT NULL,
calculation_timestamp TEXT NOT NULL,
data_type TEXT NOT NULL, -- 'summary', 'strike_detail', 'expiry_breakdown'
file_path TEXT NOT NULL,
total_gex REAL,
flip_point REAL,
underlying_price REAL,
contracts_processed INTEGER,
calculation_duration_ms INTEGER,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
-- Indexes for fast lookup
CREATE INDEX idx_symbol_date ON gex_cache_index(symbol, trading_date);
CREATE INDEX idx_flip_point ON gex_cache_index(symbol, flip_point);
CREATE INDEX idx_total_gex ON gex_cache_index(symbol, total_gex);
CREATE INDEX idx_calculation_time ON gex_cache_index(calculation_timestamp);Fast Lookup Methods
def find_nearest_flip_point(self, symbol: str, target_price: float,
date_range: Tuple[str, str]) -> List[Dict]:
"""Find dates where flip point was near target price."""
query = """
SELECT symbol, trading_date, flip_point, total_gex
FROM gex_cache_index
WHERE symbol = ?
AND trading_date BETWEEN ? AND ?
AND ABS(flip_point - ?) < 5.0
ORDER BY ABS(flip_point - ?)
"""
return self.index_db.execute(query, (symbol, date_range[0], date_range[1], target_price, target_price))Integration with Existing Cache
Unified Cache Manager Extension
class UnifiedCacheManager:
def __init__(self):
# Existing functionality
self.market_data_cache = MarketDataCache()
# Add GEX caching
self.gex_cache = GEXCacheManager()
def get_or_calculate_gex(self, symbol: str, trading_date: str) -> Dict:
"""Get GEX from cache or calculate if missing."""
# 1. Check GEX cache first
cached_gex = self.gex_cache.get_gex_summary(symbol, trading_date)
if cached_gex:
return cached_gex
# 2. Get options data (from existing cache)
options_data = self.get_market_data(
f"options_{symbol}_{trading_date}",
trading_date, trading_date, "historical_options"
)
if options_data is None or options_data.empty:
return None
# 3. Calculate GEX
from src.gex.gex_calculator import GEXCalculationEngine
calculator = GEXCalculationEngine()
gex_results = calculator.calculate_full_gex(options_data)
# 4. Cache the results
self.gex_cache.store_gex_calculation(
symbol, trading_date,
gex_results['summary'],
gex_results['strike_breakdown'],
gex_results['expiry_breakdown']
)
return gex_results['summary']Performance Optimizations
Concurrent Processing Strategy
class ConcurrentGEXProcessor:
def __init__(self, max_workers: int = 4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.cache_manager = UnifiedCacheManager()
def process_symbol_date_range(self,
symbol: str,
start_date: str,
end_date: str) -> Dict[str, Any]:
"""Process GEX for entire date range concurrently."""
trading_dates = self.get_trading_dates(start_date, end_date)
# Submit all calculations concurrently
futures = {
self.executor.submit(
self.cache_manager.get_or_calculate_gex, symbol, date
): date for date in trading_dates
}
# Collect results
results = {}
for future in as_completed(futures):
date = futures[future]
try:
results[date] = future.result()
except Exception as e:
logger.error(f"GEX calculation failed for {symbol} {date}: {e}")
return results
def process_multi_symbol(self,
symbols: List[str],
trading_date: str) -> Dict[str, Any]:
"""Process multiple symbols for same date concurrently."""
futures = {
self.executor.submit(
self.cache_manager.get_or_calculate_gex, symbol, trading_date
): symbol for symbol in symbols
}
results = {}
for future in as_completed(futures):
symbol = futures[future]
results[symbol] = future.result()
return resultsMemory Management
class MemoryEfficientGEXCache:
def __init__(self, max_memory_mb: int = 512):
self.max_memory = max_memory_mb * 1024 * 1024
self.loaded_data = {} # In-memory cache
self.access_times = {} # LRU tracking
def get_with_memory_management(self, cache_key: str) -> Any:
"""Get data with automatic memory management."""
# Update access time
self.access_times[cache_key] = time.time()
# Check if in memory
if cache_key in self.loaded_data:
return self.loaded_data[cache_key]
# Load from disk
data = self._load_from_disk(cache_key)
# Manage memory usage
current_memory = self._estimate_memory_usage()
if current_memory > self.max_memory:
self._evict_least_recently_used()
# Store in memory
self.loaded_data[cache_key] = data
return dataIntegration Points
GEX Calculator Integration
# The GEX calculator will automatically use caching
from src.gex.gex_calculator import GEXCalculationEngine
from src.cache import UnifiedCacheManager
cache_manager = UnifiedCacheManager()
gex_data = cache_manager.get_or_calculate_gex("SPY", "2024-07-15")
# Automatic cache population during batch processing
processor = ConcurrentGEXProcessor()
historical_gex = processor.process_symbol_date_range("SPY", "2020-01-01", "2024-12-31")Pattern Analysis Integration
# Pattern analysis can now access pre-computed GEX instantly
from src.tools.options_analyzer import OptionsChainAnalyzer
analyzer = OptionsChainAnalyzer(cache_manager=cache_manager)
# Fast GEX-based pattern detection
patterns = analyzer.detect_gex_patterns(
symbol="SPY",
date_range=("2024-06-01", "2024-08-31"), # Summer analysis
gex_threshold=1e9
)Agent Framework Integration
# Agents can now access historical GEX efficiently
class GEXCalculationAgent(BaseGEXAgent):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.cache_manager = UnifiedCacheManager()
async def analyze_gex_patterns(self, request: str) -> str:
# Fast access to cached GEX data
spy_gex = self.cache_manager.gex_cache.get_historical_flip_points(
"SPY", "2024-01-01", "2024-12-31"
)
# Process with LLM
return await self.process_request(
f"Analyze GEX patterns: {request}",
context={"gex_data": spy_gex.to_dict()}
)Success Criteria
Performance Targets
- Cache Hit Rate: >95% for repeated GEX requests
- Lookup Speed: <50ms for GEX summary retrieval
- Batch Processing: Process 1000 trading days in <10 minutes
- Concurrent Processing: 4x speedup with multi-symbol analysis
- Memory Efficiency: <1GB RAM for typical analysis session
Functionality Requirements
- Complete Coverage: Cache all GEX calculation components
- Index Performance: Sub-second queries on years of data
- Data Integrity: 100% consistency between cache and calculations
- Concurrent Safety: Thread-safe multi-symbol processing
- Storage Efficiency: <10MB per symbol per trading day
Integration Success
- Seamless Integration: Drop-in replacement for direct calculations
- Agent Compatibility: Full integration with multi-agent framework
- Pattern Analysis: Enable fast historical pattern scanning
- Research Workflows: Support backtesting and validation studies
Testing Requirements
Cache Performance Tests
def test_gex_cache_performance():
# Test cache hit rates
# Test concurrent access
# Test memory usage
# Test disk I/O performance
pass
def test_gex_calculation_consistency():
# Verify cached results match direct calculation
# Test different market conditions
# Validate edge cases
passIntegration Tests
def test_multi_symbol_processing():
# Test SPY + SPX concurrent processing
# Verify resource usage
# Check result consistency
pass
def test_historical_analysis():
# Process 4 years of data
# Verify performance targets
# Check memory management
passDocumentation Requirements
- Cache Architecture: Design decisions and trade-offs
- Performance Guide: Optimization recommendations
- API Reference: Complete method documentation
- Integration Examples: Common usage patterns
- Troubleshooting: Performance tuning and debugging
Implementation Timeline
Phase 1 (1 week): Core Cache Infrastructure
- Basic GEX cache structure
- SQLite indexing system
- Integration with existing cache manager
Phase 2 (1 week): Performance Optimization
- Concurrent processing framework
- Memory management system
- Batch processing optimizations
Phase 3 (1 week): Integration & Testing
- Agent framework integration
- Pattern analysis integration
- Comprehensive testing suite
This caching system will transform GEX analysis from expensive calculations to fast lookups, enabling real-time pattern detection and large-scale historical analysis.
Metadata
Metadata
Assignees
Labels
data-pipelineData collection and processing tasksData collection and processing tasksgex-calculationGamma exposure calculation tasksGamma exposure calculation tasksresearchGeneral research tasksGeneral research tasks