import os import sqlite3 from datetime import date, datetime from typing import Any, Dict, List, Optional, Tuple import pandas as pd from pt_trading.trading_pair import TradingPair # Recommended replacement adapters and converters for Python 3.12+ # From: https://docs.python.org/3/library/sqlite3.html#sqlite3-adapter-converter-recipes def adapt_date_iso(val: date) -> str: """Adapt datetime.date to ISO 8601 date.""" return val.isoformat() def adapt_datetime_iso(val: datetime) -> str: """Adapt datetime.datetime to timezone-naive ISO 8601 date.""" return val.isoformat() def convert_date(val: bytes) -> date: """Convert ISO 8601 date to datetime.date object.""" return datetime.fromisoformat(val.decode()).date() def convert_datetime(val: bytes) -> datetime: """Convert ISO 8601 datetime to datetime.datetime object.""" return datetime.fromisoformat(val.decode()) # Register the adapters and converters sqlite3.register_adapter(date, adapt_date_iso) sqlite3.register_adapter(datetime, adapt_datetime_iso) sqlite3.register_converter("date", convert_date) sqlite3.register_converter("datetime", convert_datetime) def create_result_database(db_path: str) -> None: """ Create the SQLite database and required tables if they don't exist. """ try: # Create directory if it doesn't exist db_dir = os.path.dirname(db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) print(f"Created directory: {db_dir}") conn = sqlite3.connect(db_path) cursor = conn.cursor() # Create the pt_bt_results table for completed trades cursor.execute( """ CREATE TABLE IF NOT EXISTS pt_bt_results ( date DATE, pair TEXT, symbol TEXT, open_time DATETIME, open_side TEXT, open_price REAL, open_quantity INTEGER, open_disequilibrium REAL, close_time DATETIME, close_side TEXT, close_price REAL, close_quantity INTEGER, close_disequilibrium REAL, symbol_return REAL, pair_return REAL, close_condition TEXT ) """ ) cursor.execute("DELETE FROM pt_bt_results;") # Create the outstanding_positions table for open positions cursor.execute( """ CREATE TABLE IF NOT EXISTS outstanding_positions ( date DATE, pair TEXT, symbol TEXT, position_quantity REAL, last_price REAL, unrealized_return REAL, open_price REAL, open_side TEXT ) """ ) cursor.execute("DELETE FROM outstanding_positions;") # Create the config table for storing configuration JSON for reference cursor.execute( """ CREATE TABLE IF NOT EXISTS config ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_timestamp DATETIME, config_file_path TEXT, config_json TEXT, fit_method_class TEXT, datafiles TEXT, instruments TEXT ) """ ) cursor.execute("DELETE FROM config;") conn.commit() conn.close() except Exception as e: print(f"Error creating result database: {str(e)}") raise def store_config_in_database( db_path: str, config_file_path: str, config: Dict, fit_method_class: str, datafiles: List[Tuple[str, str]], instruments: List[Dict[str, str]], ) -> None: """ Store configuration information in the database for reference. """ import json if db_path.upper() == "NONE": return try: conn = sqlite3.connect(db_path) cursor = conn.cursor() # Convert config to JSON string config_json = json.dumps(config, indent=2, default=str) # Convert lists to comma-separated strings for storage datafiles_str = ", ".join([f"{datafile}" for _, datafile in datafiles]) instruments_str = ", ".join( [ f"{inst['symbol']}:{inst['instrument_type']}:{inst['exchange_id']}" for inst in instruments ] ) # Insert configuration record cursor.execute( """ INSERT INTO config ( run_timestamp, config_file_path, config_json, fit_method_class, datafiles, instruments ) VALUES (?, ?, ?, ?, ?, ?) """, ( datetime.now(), config_file_path, config_json, fit_method_class, datafiles_str, instruments_str, ), ) conn.commit() conn.close() print(f"Configuration stored in database") except Exception as e: print(f"Error storing configuration in database: {str(e)}") import traceback traceback.print_exc() def convert_timestamp(timestamp: Any) -> Optional[datetime]: """Convert pandas Timestamp to Python datetime object for SQLite compatibility.""" if timestamp is None: return None if isinstance(timestamp, pd.Timestamp): return timestamp.to_pydatetime() elif isinstance(timestamp, datetime): return timestamp elif isinstance(timestamp, date): return datetime.combine(timestamp, datetime.min.time()) elif isinstance(timestamp, str): return datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") elif isinstance(timestamp, int): return datetime.fromtimestamp(timestamp) else: raise ValueError(f"Unsupported timestamp type: {type(timestamp)}") class BacktestResult: """ Class to handle backtest results, trades tracking, PnL calculations, and reporting. """ def __init__(self, config: Dict[str, Any]): self.config = config self.trades: Dict[str, Dict[str, Any]] = {} self.total_realized_pnl = 0.0 self.outstanding_positions: List[Dict[str, Any]] = [] self.pairs_trades_: Dict[str, List[Dict[str, Any]]] = {} def add_trade( self, pair_nm: str, symbol: str, side: str, action: str, price: Any, disequilibrium: Optional[float] = None, scaled_disequilibrium: Optional[float] = None, timestamp: Optional[datetime] = None, status: Optional[str] = None, ) -> None: """Add a trade to the results tracking.""" pair_nm = str(pair_nm) if pair_nm not in self.trades: self.trades[pair_nm] = {symbol: []} if symbol not in self.trades[pair_nm]: self.trades[pair_nm][symbol] = [] self.trades[pair_nm][symbol].append( { "symbol": symbol, "side": side, "action": action, "price": price, "disequilibrium": disequilibrium, "scaled_disequilibrium": scaled_disequilibrium, "timestamp": timestamp, "status": status, } ) def add_outstanding_position(self, position: Dict[str, Any]) -> None: """Add an outstanding position to tracking.""" self.outstanding_positions.append(position) def add_realized_pnl(self, realized_pnl: float) -> None: """Add realized PnL to the total.""" self.total_realized_pnl += realized_pnl def get_total_realized_pnl(self) -> float: """Get total realized PnL.""" return self.total_realized_pnl def get_outstanding_positions(self) -> List[Dict[str, Any]]: """Get all outstanding positions.""" return self.outstanding_positions def get_trades(self) -> Dict[str, Dict[str, Any]]: """Get all trades.""" return self.trades def clear_trades(self) -> None: """Clear all trades (used when processing new files).""" self.trades.clear() def collect_single_day_results(self, pairs_trades: List[pd.DataFrame]) -> None: """Collect and process single day trading results.""" result = pd.concat(pairs_trades, ignore_index=True) result["time"] = pd.to_datetime(result["time"]) result = result.set_index("time").sort_index() print("\n -------------- Suggested Trades ") print(result) for row in result.itertuples(): side = row.side action = row.action symbol = row.symbol price = row.price disequilibrium = getattr(row, "disequilibrium", None) scaled_disequilibrium = getattr(row, "scaled_disequilibrium", None) if hasattr(row, "time"): timestamp = getattr(row, "time") else: timestamp = convert_timestamp(row.Index) status = row.status self.add_trade( pair_nm=str(row.pair), symbol=str(symbol), side=str(side), action=str(action), price=float(str(price)), disequilibrium=disequilibrium, scaled_disequilibrium=scaled_disequilibrium, timestamp=timestamp, status=str(status) if status is not None else "?", ) def print_single_day_results(self) -> None: """Print single day results summary.""" for pair, symbols in self.trades.items(): print(f"\n--- {pair} ---") for symbol, trades in symbols.items(): for trade_data in trades: if len(trade_data) >= 2: side, price = trade_data[:2] print(f"{symbol} {side} at ${price}") def print_results_summary(self, all_results: Dict[str, Dict[str, Any]]) -> None: """Print summary of all processed files.""" print("\n====== Summary of All Processed Files ======") for filename, data in all_results.items(): trade_count = sum( len(trades) for symbol_trades in data["trades"].values() for trades in symbol_trades.values() ) print(f"{filename}: {trade_count} trades") def calculate_returns(self, all_results: Dict[str, Dict[str, Any]]) -> None: """Calculate and print returns by day and pair.""" def _symbol_return(trade1_side: str, trade1_px: float, trade2_side: str, trade2_px: float) -> float: if trade1_side == "BUY" and trade2_side == "SELL": return (trade2_px - trade1_px) / trade1_px * 100 elif trade1_side == "SELL" and trade2_side == "BUY": return (trade1_px - trade2_px) / trade1_px * 100 else: return 0 print("\n====== Returns By Day and Pair ======") trades = [] for filename, data in all_results.items(): pairs = list(data["trades"].keys()) for pair in pairs: self.pairs_trades_[pair] = [] trades_dict = data["trades"][pair] for symbol in trades_dict.keys(): trades.extend(trades_dict[symbol]) trades = sorted(trades, key=lambda x: (x["timestamp"], x["symbol"])) print(f"\n--- {filename} ---") self.outstanding_positions = data["outstanding_positions"] day_return = 0.0 for idx in range(0, len(trades), 4): symbol_a = trades[idx]["symbol"] trade_a_1 = trades[idx] trade_a_2 = trades[idx + 2] symbol_b = trades[idx + 1]["symbol"] trade_b_1 = trades[idx + 1] trade_b_2 = trades[idx + 3] symbol_return = 0 assert ( trade_a_1["timestamp"] < trade_a_2["timestamp"] ), f"Trade 1: {trade_a_1['timestamp']} is not less than Trade 2: {trade_a_2['timestamp']}" assert ( trade_a_1["action"] == "OPEN" and trade_a_2["action"] == "CLOSE" ), f"Trade 1: {trade_a_1['action']} and Trade 2: {trade_a_2['action']} are the same" # Calculate return based on action combination trade_return = 0 symbol_a_return = _symbol_return(trade_a_1["side"], trade_a_1["price"], trade_a_2["side"], trade_a_2["price"]) symbol_b_return = _symbol_return(trade_b_1["side"], trade_b_1["price"], trade_b_2["side"], trade_b_2["price"]) pair_return = symbol_a_return + symbol_b_return self.pairs_trades_[pair].append( { "symbol": symbol_a, "open_side": trade_a_1["side"], "open_action": trade_a_1["action"], "open_price": trade_a_1["price"], "close_side": trade_a_2["side"], "close_action": trade_a_2["action"], "close_price": trade_a_2["price"], "symbol_return": symbol_a_return, "open_disequilibrium": trade_a_1["disequilibrium"], "open_scaled_disequilibrium": trade_a_1["scaled_disequilibrium"], "close_disequilibrium": trade_a_2["disequilibrium"], "close_scaled_disequilibrium": trade_a_2["scaled_disequilibrium"], "open_time": trade_a_1["timestamp"], "close_time": trade_a_2["timestamp"], "shares": self.config["funding_per_pair"] / 2 / trade_a_1["price"], "is_completed": True, "close_condition": trade_a_2["status"], "pair_return": pair_return } ) self.pairs_trades_[pair].append( { "symbol": symbol_b, "open_side": trade_b_1["side"], "open_action": trade_b_1["action"], "open_price": trade_b_1["price"], "close_side": trade_b_2["side"], "close_action": trade_b_2["action"], "close_price": trade_b_2["price"], "symbol_return": symbol_b_return, "open_disequilibrium": trade_b_1["disequilibrium"], "open_scaled_disequilibrium": trade_b_1["scaled_disequilibrium"], "close_disequilibrium": trade_b_2["disequilibrium"], "close_scaled_disequilibrium": trade_b_2["scaled_disequilibrium"], "open_time": trade_b_1["timestamp"], "close_time": trade_b_2["timestamp"], "shares": self.config["funding_per_pair"] / 2 / trade_b_1["price"], "is_completed": True, "close_condition": trade_b_2["status"], "pair_return": pair_return } ) # Print pair returns with disequilibrium information day_return = 0.0 if pair in self.pairs_trades_: print(f"{pair}:") pair_return = 0.0 for trd in self.pairs_trades_[pair]: disequil_info = "" if ( trd["open_scaled_disequilibrium"] is not None and trd["open_scaled_disequilibrium"] is not None ): disequil_info = f" | Open Dis-eq: {trd['open_scaled_disequilibrium']:.2f}," f" Close Dis-eq: {trd['open_scaled_disequilibrium']:.2f}" print( f" {trd['open_time'].time()} {trd['symbol']}: " f" {trd['open_side']} @ ${trd['open_price']:.2f}," f" {trd["close_side"]} @ ${trd["close_price"]:.2f}," f" Return: {trd['symbol_return']:.2f}%{disequil_info}" ) pair_return += trd["symbol_return"] print(f" Pair Total Return: {pair_return:.2f}%") day_return += pair_return # Print day total return and add to global realized PnL if day_return != 0: print(f" Day Total Return: {day_return:.2f}%") self.add_realized_pnl(day_return) def print_outstanding_positions(self) -> None: """Print all outstanding positions with share quantities and current values.""" if not self.get_outstanding_positions(): print("\n====== NO OUTSTANDING POSITIONS ======") return print(f"\n====== OUTSTANDING POSITIONS ======") print( f"{'Pair':<15}" f" {'Symbol':<10}" f" {'Side':<4}" f" {'Shares':<10}" f" {'Open $':<8}" f" {'Current $':<10}" f" {'Value $':<12}" f" {'Disequilibrium':<15}" ) print("-" * 100) total_value = 0.0 for pos in self.get_outstanding_positions(): # Print position A print( f"{pos['pair']:<15}" f" {pos['symbol_a']:<10}" f" {pos['side_a']:<4}" f" {pos['shares_a']:<10.2f}" f" {pos['open_px_a']:<8.2f}" f" {pos['current_px_a']:<10.2f}" f" {pos['current_value_a']:<12.2f}" f" {'':<15}" ) # Print position B print( f"{'':<15}" f" {pos['symbol_b']:<10}" f" {pos['side_b']:<4}" f" {pos['shares_b']:<10.2f}" f" {pos['open_px_b']:<8.2f}" f" {pos['current_px_b']:<10.2f}" f" {pos['current_value_b']:<12.2f}" ) # Print pair totals with disequilibrium info print( f"{'':<15}" f" {'PAIR TOTAL':<10}" f" {'':<4}" f" {'':<10}" f" {'':<8}" f" {'':<10}" f" {pos['total_current_value']:<12.2f}" ) # Print disequilibrium details print( f"{'':<15}" f" {'DISEQUIL':<10}" f" {'':<4}" f" {'':<10}" f" {'':<8}" f" {'':<10}" f" Raw: {pos['current_disequilibrium']:<6.4f}" f" Scaled: {pos['current_scaled_disequilibrium']:<6.4f}" ) print("-" * 100) total_value += pos["total_current_value"] print(f"{'TOTAL OUTSTANDING VALUE':<80} ${total_value:<12.2f}") def print_grand_totals(self) -> None: """Print grand totals across all pairs.""" print(f"\n====== GRAND TOTALS ACROSS ALL PAIRS ======") print(f"Total Realized PnL: {self.get_total_realized_pnl():.2f}%") def handle_outstanding_position( self, pair: TradingPair, pair_result_df: pd.DataFrame, last_row_index: int, open_side_a: str, open_side_b: str, open_px_a: float, open_px_b: float, open_tstamp: datetime, ) -> Tuple[float, float, float]: """ Handle calculation and tracking of outstanding positions when no close signal is found. Args: pair: TradingPair object pair_result_df: DataFrame with pair results last_row_index: Index of the last row in the data open_side_a, open_side_b: Trading sides for symbols A and B open_px_a, open_px_b: Opening prices for symbols A and B open_tstamp: Opening timestamp """ if pair_result_df is None or pair_result_df.empty: return 0, 0, 0 last_row = pair_result_df.loc[last_row_index] last_tstamp = last_row["tstamp"] colname_a, colname_b = pair.colnames() last_px_a = last_row[colname_a] last_px_b = last_row[colname_b] # Calculate share quantities based on funding per pair # Split funding equally between the two positions funding_per_position = self.config["funding_per_pair"] / 2 shares_a = funding_per_position / open_px_a shares_b = funding_per_position / open_px_b # Calculate current position values (shares * current price) current_value_a = shares_a * last_px_a * (-1 if open_side_a == "SELL" else 1) current_value_b = shares_b * last_px_b * (-1 if open_side_b == "SELL" else 1) total_current_value = current_value_a + current_value_b # Get disequilibrium information current_disequilibrium = last_row["disequilibrium"] current_scaled_disequilibrium = last_row["scaled_disequilibrium"] # Store outstanding positions self.add_outstanding_position( { "pair": str(pair), "symbol_a": pair.symbol_a_, "symbol_b": pair.symbol_b_, "side_a": open_side_a, "side_b": open_side_b, "shares_a": shares_a, "shares_b": shares_b, "open_px_a": open_px_a, "open_px_b": open_px_b, "current_px_a": last_px_a, "current_px_b": last_px_b, "current_value_a": current_value_a, "current_value_b": current_value_b, "total_current_value": total_current_value, "open_time": open_tstamp, "last_time": last_tstamp, "current_abs_term": current_scaled_disequilibrium, "current_disequilibrium": current_disequilibrium, "current_scaled_disequilibrium": current_scaled_disequilibrium, } ) # Print position details print(f"{pair}: NO CLOSE SIGNAL FOUND - Position held until end of session") print(f" Open: {open_tstamp} | Last: {last_tstamp}") print( f" {pair.symbol_a_}: {open_side_a} {shares_a:.2f} shares @ ${open_px_a:.2f} -> ${last_px_a:.2f} | Value: ${current_value_a:.2f}" ) print( f" {pair.symbol_b_}: {open_side_b} {shares_b:.2f} shares @ ${open_px_b:.2f} -> ${last_px_b:.2f} | Value: ${current_value_b:.2f}" ) print(f" Total Value: ${total_current_value:.2f}") print( f" Disequilibrium: {current_disequilibrium:.4f} | Scaled: {current_scaled_disequilibrium:.4f}" ) return current_value_a, current_value_b, total_current_value def store_results_in_database( self, db_path: str, day: str ) -> None: """ Store backtest results in the SQLite database. """ if db_path.upper() == "NONE": return try: # Extract date from datafile name (assuming format like 20250528.mktdata.ohlcv.db) date_str = day # Convert to proper date format try: date_obj = datetime.strptime(date_str, "%Y%m%d").date() except ValueError: # If date parsing fails, use current date date_obj = datetime.now().date() conn = sqlite3.connect(db_path) cursor = conn.cursor() # Process each trade from bt_result trades = self.get_trades() for pair_name, _ in trades.items(): # Second pass: insert completed trade records into database for trade_pair in sorted(self.pairs_trades_[pair_name], key=lambda x: x["open_time"]): # Only store completed trades in pt_bt_results table cursor.execute( """ INSERT INTO pt_bt_results ( date, pair, symbol, open_time, open_side, open_price, open_quantity, open_disequilibrium, close_time, close_side, close_price, close_quantity, close_disequilibrium, symbol_return, pair_return, close_condition ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( date_obj, pair_name, trade_pair["symbol"], trade_pair["open_time"], trade_pair["open_side"], trade_pair["open_price"], trade_pair["shares"], trade_pair["open_scaled_disequilibrium"], trade_pair["close_time"], trade_pair["close_side"], trade_pair["close_price"], trade_pair["shares"], trade_pair["close_scaled_disequilibrium"], trade_pair["symbol_return"], trade_pair["pair_return"], trade_pair["close_condition"] ), ) # Store outstanding positions in separate table outstanding_positions = self.get_outstanding_positions() for pos in outstanding_positions: # Calculate position quantity (negative for SELL positions) position_qty_a = ( pos["shares_a"] if pos["side_a"] == "BUY" else -pos["shares_a"] ) position_qty_b = ( pos["shares_b"] if pos["side_b"] == "BUY" else -pos["shares_b"] ) # Calculate unrealized returns # For symbol A: (current_price - open_price) / open_price * 100 * position_direction unrealized_return_a = ( (pos["current_px_a"] - pos["open_px_a"]) / pos["open_px_a"] * 100 ) * (1 if pos["side_a"] == "BUY" else -1) unrealized_return_b = ( (pos["current_px_b"] - pos["open_px_b"]) / pos["open_px_b"] * 100 ) * (1 if pos["side_b"] == "BUY" else -1) # Store outstanding position for symbol A cursor.execute( """ INSERT INTO outstanding_positions ( date, pair, symbol, position_quantity, last_price, unrealized_return, open_price, open_side ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( date_obj, pos["pair"], pos["symbol_a"], position_qty_a, pos["current_px_a"], unrealized_return_a, pos["open_px_a"], pos["side_a"], ), ) # Store outstanding position for symbol B cursor.execute( """ INSERT INTO outstanding_positions ( date, pair, symbol, position_quantity, last_price, unrealized_return, open_price, open_side ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( date_obj, pos["pair"], pos["symbol_b"], position_qty_b, pos["current_px_b"], unrealized_return_b, pos["open_px_b"], pos["side_b"], ), ) conn.commit() conn.close() except Exception as e: print(f"Error storing results in database: {str(e)}") import traceback traceback.print_exc()