Skip to content

Commit 418ce1d

Browse files
authored
Merge pull request #3 from qsor27/execution-storage-fix
Refactor execution storage to store individual executions instead of …
2 parents 23a118a + c98337b commit 418ce1d

File tree

4 files changed

+451
-189
lines changed

4 files changed

+451
-189
lines changed

scripts/ExecutionProcessing.py

Lines changed: 74 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -104,20 +104,37 @@ def invalidate_cache_after_import(processed_trades: List[Dict]) -> None:
104104
# Don't raise - cache invalidation failure shouldn't stop trade processing
105105

106106
def process_trades(df, multipliers):
107-
"""Process trades from a NinjaTrader DataFrame using proper account-based position tracking"""
107+
"""
108+
Process executions from a NinjaTrader DataFrame and output individual execution records.
109+
110+
This function outputs each execution (Entry or Exit) as a separate record without
111+
performing FIFO pairing. The position builder handles FIFO matching and P&L calculation.
112+
113+
Args:
114+
df: DataFrame containing execution data with columns:
115+
ID, Account, Instrument, Time, Action, E/X, Quantity, Price, Commission
116+
multipliers: Dict mapping instrument names to their point multipliers
117+
118+
Returns:
119+
List of individual execution records with the following structure:
120+
- For Entry executions: execution_id, Account, Instrument, action, entry_exit,
121+
quantity, entry_price, entry_time, exit_price=None, exit_time=None, commission
122+
- For Exit executions: execution_id, Account, Instrument, action, entry_exit,
123+
quantity, exit_price, exit_time, entry_price=None, entry_time=None, commission
124+
"""
108125
# Validate DataFrame size
109126
if len(df) > MAX_ROWS:
110127
raise ValueError(f"DataFrame too large: {len(df)} rows > {MAX_ROWS} limit")
111-
128+
112129
# Create an explicit copy of the DataFrame
113130
ninja_trades_df = df.copy()
114-
131+
115132
# Validate required columns
116133
required_columns = ['ID', 'Account', 'Instrument', 'Time', 'Action', 'E/X', 'Quantity', 'Price', 'Commission']
117134
missing_columns = [col for col in required_columns if col not in ninja_trades_df.columns]
118135
if missing_columns:
119136
raise ValueError(f"Missing required columns: {missing_columns}")
120-
137+
121138
# Convert Commission to float with error handling
122139
try:
123140
ninja_trades_df.loc[:, 'Commission'] = ninja_trades_df['Commission'].str.replace('$', '', regex=False).astype(float)
@@ -134,17 +151,15 @@ def process_trades(df, multipliers):
134151
except Exception as e:
135152
raise ValueError(f"Failed to parse Time column: {str(e)}")
136153

137-
# Initialize list to store processed trades
138-
processed_trades = []
154+
# Initialize list to store individual execution records
155+
individual_executions = []
139156

140-
# Process each account separately to handle copied trades
157+
# Process each account separately to maintain account separation
141158
for account in ninja_trades_df['Account'].unique():
142159
account_df = ninja_trades_df[ninja_trades_df['Account'] == account].copy()
143160
print(f"Processing account: {account}")
144-
145-
# Track open positions for this account using FIFO
146-
open_positions = [] # List of open entry executions
147-
161+
162+
# Process each execution as an individual record
148163
for _, execution in account_df.iterrows():
149164
try:
150165
qty = int(execution['Quantity'])
@@ -153,96 +168,59 @@ def process_trades(df, multipliers):
153168
exec_id = execution['ID']
154169
commission = float(execution['Commission'])
155170
instrument = execution['Instrument']
171+
action = execution['Action'] # Buy or Sell
172+
entry_exit = execution['E/X'] # Entry or Exit
156173
except (ValueError, TypeError) as e:
157174
logger.warning(f"Skipping invalid execution row {execution['ID']}: {str(e)}")
158175
continue
159-
160-
print(f" Processing {execution['E/X']}: {execution['Action']} {qty} at {price} (ID: {exec_id})")
161-
162-
if execution['E/X'] == 'Entry':
163-
# Opening new position - add to open positions
164-
open_positions.append({
165-
'price': price,
176+
177+
print(f" Processing {entry_exit}: {action} {qty} at {price} (ID: {exec_id})")
178+
179+
# Create individual execution record
180+
if entry_exit == 'Entry':
181+
# Entry execution: set entry fields, leave exit fields as None
182+
execution_record = {
183+
'execution_id': exec_id,
184+
'Account': account,
185+
'Instrument': instrument,
186+
'action': action,
187+
'entry_exit': entry_exit,
166188
'quantity': qty,
167-
'time': time,
168-
'id': exec_id,
169-
'commission': commission,
170-
'side': execution['Action'] # Preserve Buy/Sell terminology
171-
})
172-
print(f" Added to open positions: {qty} contracts at {price}")
173-
174-
elif execution['E/X'] == 'Exit':
175-
# Closing position - match against open positions using FIFO
176-
remaining_to_close = qty
177-
178-
# Process open positions in FIFO order (oldest first)
179-
positions_to_remove = []
180-
181-
for i, open_pos in enumerate(open_positions):
182-
if remaining_to_close <= 0:
183-
break
184-
185-
# Determine how much of this position to close
186-
close_qty = min(remaining_to_close, open_pos['quantity'])
187-
188-
# Calculate P&L for this portion
189-
# Calculate P&L for this portion based on actual market actions
190-
if open_pos['side'] == 'Buy':
191-
points_pl = price - open_pos['price'] # Long position: exit - entry
192-
else: # open_pos['side'] == 'Sell'
193-
points_pl = open_pos['price'] - price # Short position: entry - exit
194-
195-
# Get multiplier for this instrument
196-
multiplier = float(multipliers.get(instrument, 1))
197-
198-
# Calculate commission (entry + proportional exit commission)
199-
entry_commission = open_pos['commission'] * (close_qty / open_pos['quantity'])
200-
exit_commission = commission * (close_qty / qty)
201-
total_commission = entry_commission + exit_commission
202-
203-
# Calculate dollar P&L
204-
dollar_pl = (points_pl * multiplier * close_qty) - total_commission
205-
206-
# Create unique trade ID
207-
unique_id = f"{open_pos['id']}_to_{exec_id}_{len(processed_trades)+1}"
208-
209-
# Create completed trade record
210-
trade = {
211-
'Instrument': instrument,
212-
'Side of Market': open_pos['side'],
213-
'Quantity': close_qty,
214-
'Entry Price': open_pos['price'],
215-
'Entry Time': open_pos['time'],
216-
'Exit Time': time,
217-
'Exit Price': price,
218-
'Result Gain/Loss in Points': round(points_pl, 2),
219-
'Gain/Loss in Dollars': round(dollar_pl, 2),
220-
'ID': unique_id,
221-
'Commission': round(total_commission, 2),
222-
'Account': account
223-
}
224-
225-
processed_trades.append(trade)
226-
print(f" Created trade: {close_qty} contracts, P&L: ${dollar_pl:.2f}")
227-
228-
# Update the open position
229-
open_pos['quantity'] -= close_qty
230-
remaining_to_close -= close_qty
231-
232-
# Mark for removal if fully closed
233-
if open_pos['quantity'] <= 0:
234-
positions_to_remove.append(i)
235-
236-
# Remove fully closed positions (in reverse order to maintain indices)
237-
for i in reversed(positions_to_remove):
238-
open_positions.pop(i)
239-
240-
if remaining_to_close > 0:
241-
print(f" WARNING: Could not match {remaining_to_close} contracts for exit")
242-
243-
print(f" Account {account} completed with {len(open_positions)} open positions remaining")
189+
'entry_price': price,
190+
'entry_time': time,
191+
'exit_price': None,
192+
'exit_time': None,
193+
'commission': commission
194+
}
195+
print(f" Created Entry execution record: {qty} contracts at {price}")
196+
197+
elif entry_exit == 'Exit':
198+
# Exit execution: set exit fields, leave entry fields as None
199+
execution_record = {
200+
'execution_id': exec_id,
201+
'Account': account,
202+
'Instrument': instrument,
203+
'action': action,
204+
'entry_exit': entry_exit,
205+
'quantity': qty,
206+
'entry_price': None,
207+
'entry_time': None,
208+
'exit_price': price,
209+
'exit_time': time,
210+
'commission': commission
211+
}
212+
print(f" Created Exit execution record: {qty} contracts at {price}")
213+
214+
else:
215+
logger.warning(f"Unknown execution type {entry_exit} for ID {exec_id}")
216+
continue
217+
218+
individual_executions.append(execution_record)
219+
220+
print(f" Account {account} completed: {len([e for e in individual_executions if e['Account'] == account])} executions")
244221

245-
return processed_trades
222+
print(f"\nTotal individual executions created: {len(individual_executions)}")
223+
return individual_executions
246224

247225
def main():
248226
# Change to the script's directory

services/unified_csv_import_service.py

Lines changed: 77 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -372,95 +372,128 @@ def _process_csv_file(self, file_path: Path) -> List[Dict[str, Any]]:
372372
return []
373373

374374
def _log_processing_summary(self, trades: List[Dict[str, Any]], filename: str) -> None:
375-
"""Log summary of processed trades"""
375+
"""Log summary of processed trades/executions"""
376376
try:
377377
account_summary = {}
378378
instrument_summary = {}
379-
379+
380+
# Detect format
381+
is_individual_execution = trades and 'execution_id' in trades[0]
382+
380383
for trade in trades:
381384
# Account summary
382385
account = trade.get('Account', 'Unknown')
383386
if account not in account_summary:
384387
account_summary[account] = {'count': 0, 'total_pnl': 0}
385388
account_summary[account]['count'] += 1
386-
account_summary[account]['total_pnl'] += trade.get('Gain/Loss in Dollars', 0)
387-
389+
390+
# For individual executions, P&L is not calculated yet
391+
if not is_individual_execution:
392+
account_summary[account]['total_pnl'] += trade.get('Gain/Loss in Dollars', 0)
393+
388394
# Instrument summary
389395
instrument = trade.get('Instrument', 'Unknown')
390396
if instrument not in instrument_summary:
391397
instrument_summary[instrument] = 0
392398
instrument_summary[instrument] += 1
393-
399+
400+
record_type = "executions" if is_individual_execution else "trades"
394401
self.logger.info(f"Processing summary for {filename}:")
395-
self.logger.info(f" Total trades: {len(trades)}")
396-
402+
self.logger.info(f" Total {record_type}: {len(trades)}")
403+
397404
for account, summary in account_summary.items():
398-
self.logger.info(
399-
f" {account}: {summary['count']} trades, "
400-
f"P&L: ${summary['total_pnl']:.2f}"
401-
)
402-
405+
if is_individual_execution:
406+
self.logger.info(f" {account}: {summary['count']} executions")
407+
else:
408+
self.logger.info(
409+
f" {account}: {summary['count']} trades, "
410+
f"P&L: ${summary['total_pnl']:.2f}"
411+
)
412+
403413
for instrument, count in instrument_summary.items():
404-
self.logger.info(f" {instrument}: {count} trades")
414+
self.logger.info(f" {instrument}: {count} {record_type}")
405415

406416
except Exception as e:
407417
self.logger.error(f"Error creating processing summary: {e}")
408418

409419
def _import_trades_to_database(self, trades: List[Dict[str, Any]]) -> bool:
410420
"""
411-
Import processed trades to the database.
412-
421+
Import processed executions to the database.
422+
423+
Handles both individual executions (new format) and complete trades (legacy format).
424+
413425
Args:
414-
trades: List of trade dictionaries to import
415-
426+
trades: List of execution/trade dictionaries to import
427+
416428
Returns:
417429
True if import successful, False otherwise
418430
"""
419431
if not trades:
420432
self.logger.debug("No trades to import")
421433
return True
422-
434+
423435
try:
424436
if not FuturesDB:
425437
self.logger.error("FuturesDB not available - cannot import trades")
426438
return False
427-
439+
428440
imported_count = 0
429-
441+
430442
with FuturesDB() as db:
431443
for trade in trades:
432-
# Convert trade dictionary to database format
433-
trade_data = {
434-
'instrument': trade.get('Instrument', ''),
435-
'side_of_market': trade.get('Side of Market', ''),
436-
'quantity': trade.get('Quantity', 0),
437-
'entry_price': trade.get('Entry Price', 0.0),
438-
'entry_time': trade.get('Entry Time', ''),
439-
'exit_time': trade.get('Exit Time', ''),
440-
'exit_price': trade.get('Exit Price', 0.0),
441-
'points_gain_loss': trade.get('Result Gain/Loss in Points', 0.0),
442-
'dollars_gain_loss': trade.get('Gain/Loss in Dollars', 0.0),
443-
'entry_execution_id': trade.get('ID', ''),
444-
'commission': trade.get('Commission', 0.0),
445-
'account': trade.get('Account', '')
446-
}
447-
444+
# Detect format: new individual execution format vs legacy trade format
445+
is_individual_execution = 'execution_id' in trade and 'entry_exit' in trade
446+
447+
if is_individual_execution:
448+
# New format: Individual execution
449+
trade_data = {
450+
'instrument': trade.get('Instrument', ''),
451+
'side_of_market': trade.get('action', ''), # Buy/Sell
452+
'quantity': trade.get('quantity', 0),
453+
'entry_price': trade.get('entry_price', None),
454+
'entry_time': trade.get('entry_time', None),
455+
'exit_time': trade.get('exit_time', None),
456+
'exit_price': trade.get('exit_price', None),
457+
'points_gain_loss': None, # Position builder calculates this
458+
'dollars_gain_loss': None, # Position builder calculates this
459+
'entry_execution_id': trade.get('execution_id', ''),
460+
'commission': trade.get('commission', 0.0),
461+
'account': trade.get('Account', '')
462+
}
463+
exec_id = trade_data['entry_execution_id']
464+
else:
465+
# Legacy format: Complete trade
466+
trade_data = {
467+
'instrument': trade.get('Instrument', ''),
468+
'side_of_market': trade.get('Side of Market', ''),
469+
'quantity': trade.get('Quantity', 0),
470+
'entry_price': trade.get('Entry Price', 0.0),
471+
'entry_time': trade.get('Entry Time', ''),
472+
'exit_time': trade.get('Exit Time', ''),
473+
'exit_price': trade.get('Exit Price', 0.0),
474+
'points_gain_loss': trade.get('Result Gain/Loss in Points', 0.0),
475+
'dollars_gain_loss': trade.get('Gain/Loss in Dollars', 0.0),
476+
'entry_execution_id': trade.get('ID', ''),
477+
'commission': trade.get('Commission', 0.0),
478+
'account': trade.get('Account', '')
479+
}
480+
exec_id = trade_data['entry_execution_id']
481+
448482
# Insert trade (database handles duplicates)
449-
self.logger.debug(f"Importing trade: {trade_data['entry_execution_id']}")
483+
self.logger.debug(f"Importing execution: {exec_id}")
450484
success = db.add_trade(trade_data)
451-
485+
452486
if success:
453487
imported_count += 1
454-
self.logger.debug(f"Successfully imported trade {trade_data['entry_execution_id']}")
488+
self.logger.debug(f"Successfully imported execution {exec_id}")
455489
else:
456490
self.logger.warning(
457-
f"Failed to import trade {trade_data['entry_execution_id']} "
458-
f"- possibly duplicate"
491+
f"Failed to import execution {exec_id} - possibly duplicate"
459492
)
460-
461-
self.logger.info(f"Successfully imported {imported_count} trades to database")
493+
494+
self.logger.info(f"Successfully imported {imported_count} executions to database")
462495
return True
463-
496+
464497
except Exception as e:
465498
self.logger.error(f"Error importing trades to database: {e}")
466499
return False

0 commit comments

Comments
 (0)