From 0ceb2f2eba519d33f5aced88d2fe7ce9a96c8e18 Mon Sep 17 00:00:00 2001 From: Oleg Sheynin Date: Thu, 29 May 2025 00:28:15 -0400 Subject: [PATCH] progress --- src/pt_backtest.py | 570 ++++++++++---------------------------- src/results.py | 315 +++++++++++++++++++++ src/tools/data_loader.py | 125 ++++++++- src/tools/trading_pair.py | 37 +++ 4 files changed, 620 insertions(+), 427 deletions(-) create mode 100644 src/results.py create mode 100644 src/tools/trading_pair.py diff --git a/src/pt_backtest.py b/src/pt_backtest.py index 96dd56e..2e11a90 100644 --- a/src/pt_backtest.py +++ b/src/pt_backtest.py @@ -1,8 +1,6 @@ -import datetime import sys -import json -from typing import Any, Dict, List, Tuple, Optional +from typing import Any, Dict, List, Optional import pandas as pd import numpy as np @@ -10,6 +8,10 @@ import numpy as np # ============= statsmodels =================== from statsmodels.tsa.vector_ar.vecm import VECM +from tools.data_loader import get_datasets, load_market_data, transform_dataframe +from tools.trading_pair import TradingPair +from results import BacktestResult + NanoPerMin = 1e9 UNSET_FLOAT: float = sys.float_info.max UNSET_INT: int = sys.maxsize @@ -17,43 +19,43 @@ UNSET_INT: int = sys.maxsize # ------------------------ Configuration ------------------------ # Default configuration CRYPTO_CONFIG: Dict = { + "security_type": "CRYPTO", # --- Data retrieval "data_directory": "./data/crypto", "datafiles": [ "20250519.mktdata.ohlcv.db", + # "20250519.mktdata.ohlcv.db", ], "db_table_name": "bnbspot_ohlcv_1min", - # ----- Instruments "exchange_id": "BNBSPOT", "instrument_id_pfx": "PAIR-", - "instruments": [ "BTC-USDT", - "ETH-USDT", + # "ETH-USDT", "LTC-USDT", - ], - + ], "trading_hours": { - "begin_session": "00:00:00", - "end_session": "23:59:00", - "timezone": "UTC" + "begin_session": "00:00:00", + "end_session": "23:59:00", + "timezone": "UTC", }, - # ----- Model Settings "price_column": "close", "min_required_points": 30, "zero_threshold": 1e-10, - "equilibrium_threshold_open": 5.0, - "equilibrium_threshold_close": 1.0, - "training_minutes": 120, + "disequilibrium_open_trshld": 2, + "disequilibrium_close_trshld": 0.5, + + "training_minutes": 120, # ----- Validation "funding_per_pair": 2000.0, # USD } # ========================== EQUITIES EQT_CONFIG: Dict = { # --- Data retrieval + "security_type": "EQUITY", "data_directory": "./data/equity", "datafiles": [ "20250508.alpaca_sim_md.db", @@ -67,11 +69,9 @@ EQT_CONFIG: Dict = { # "20250520.alpaca_sim_md.db" ], "db_table_name": "md_1min_bars", - - # ----- Instruments - "exchange_id": "ALPACA", + # ----- Instruments + "exchange_id": "ALPACA", "instrument_id_pfx": "STOCK-", - "instruments": [ "COIN", "GBTC", @@ -79,149 +79,35 @@ EQT_CONFIG: Dict = { "MSTR", "PYPL", ], - "trading_hours": { - "begin_session": "9:30:00", - "end_session": "16:00:00", - "timezone": "America/New_York" + "begin_session": "9:30:00", + "end_session": "16:00:00", + "timezone": "America/New_York", }, - # ----- Model Settings "price_column": "close", "min_required_points": 30, "zero_threshold": 1e-10, - "equilibrium_threshold_open": 5.0, - "equilibrium_threshold_close": 1.0, + "disequilibrium_open_trshld": 5.0, + "disequilibrium_close_trshld": 1.0, "training_minutes": 120, - # ----- Validation "funding_per_pair": 2000.0, } + # ========================================================================== + +# CONFIG = CRYPTO_CONFIG CONFIG = EQT_CONFIG -TRADES = {} -TOTAL_UNREALIZED_PNL = 0.0 # Global variable to track total unrealized PnL -TOTAL_REALIZED_PNL = 0.0 # Global variable to track total realized PnL -OUTSTANDING_POSITIONS = [] # Global list to track outstanding positions with share quantities -class TradingPair: - symbol_a_: str - symbol_b_: str - price_column_: str +BacktestResults = BacktestResult(config=CONFIG) - def __init__(self, symbol_a: str, symbol_b: str, price_column: str): - self.symbol_a_ = symbol_a - self.symbol_b_ = symbol_b - self.price_column_ = price_column - - def colnames(self) -> List[str]: - return [f"{self.price_column_}_{self.symbol_a_}", f"{self.price_column_}_{self.symbol_b_}"] - - def __repr__(self) ->str: - return f"{self.symbol_a_} & {self.symbol_b_}" - -def convert_time_to_UTC(value: str, timezone: str): - - from zoneinfo import ZoneInfo - from datetime import datetime - - # Parse it to naive datetime object - local_dt = datetime.strptime(value, '%Y-%m-%d %H:%M:%S') - - zinfo = ZoneInfo(timezone) - result = local_dt.replace(tzinfo=zinfo) - - result = result.astimezone(ZoneInfo('UTC')) - result = result.strftime('%Y-%m-%d %H:%M:%S') - - return result - - - pass - -def load_market_data(datafile: str, config: Dict) -> pd.DataFrame: - from tools.data_loader import load_sqlite_to_dataframe - - instrument_ids = ["\"" + config["instrument_id_pfx"] + instrument + "\"" for instrument in config["instruments"]] - exchange_id = config["exchange_id"] - - query = "select tstamp" - query += ", tstamp_ns as time_ns" - query += ", substr(instrument_id, 7) as symbol" - query += ", open" - query += ", high" - query += ", low" - query += ", close" - query += ", volume" - query += ", num_trades" - query += ", vwap" - - query += f" from {config['db_table_name']}" - query += f" where exchange_id ='{exchange_id}'" - query += f" and instrument_id in ({','.join(instrument_ids)})" - - df = load_sqlite_to_dataframe(db_path=datafile, query=query) - - # Trading Hours - date_str = df["tstamp"][0][0:10] - trading_hours = CONFIG['trading_hours'] - start_time = f"{date_str} {trading_hours['begin_session']}" - end_time = f"{date_str} {trading_hours['end_session']}" - - start_time = convert_time_to_UTC(start_time, trading_hours["timezone"]) - end_time = convert_time_to_UTC(end_time, trading_hours["timezone"]) - - # Perform boolean selection - df = df[(df["tstamp"] >= start_time) & (df["tstamp"] <= end_time)] - df["tstamp"] = pd.to_datetime(df["tstamp"]) - - return df - -def transform_dataframe(df: pd.DataFrame, price_column: str): - # Select only the columns we need - df_selected = df[["tstamp", "symbol", price_column]] - - # Start with unique timestamps - result_df: pd.DataFrame = pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True) - - # For each unique symbol, add a corresponding close price column - for symbol in df_selected["symbol"].unique(): - # Filter rows for this symbol - df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True) - - # Create column name like "close-COIN" - new_price_column = f"{price_column}_{symbol}" - - # Create temporary dataframe with timestamp and price - temp_df = pd.DataFrame({ - "tstamp": df_symbol["tstamp"], - new_price_column: df_symbol[price_column] - }) - - # Join with our result dataframe - result_df = pd.merge(result_df, temp_df, on="tstamp", how="left") - result_df = result_df.reset_index(drop=True) # do not dropna() since irrelevant symbol would affect dataset - - return result_df - -def get_datasets(df: pd.DataFrame, training_minutes: int, pair: TradingPair) -> Tuple[pd.DataFrame, pd.DataFrame]: - # Training dataset - colname_a, colname_b = pair.colnames() - df = df[["tstamp", colname_a, colname_b]] - df = df.dropna() - - training_df = df.iloc[:training_minutes - 1, :].copy() - training_df.reset_index(drop=True).dropna().reset_index(drop=True) - - # Testing dataset - testing_df = df.iloc[training_minutes:, :].copy() - testing_df.reset_index(drop=True).dropna().reset_index(drop=True) - - return (training_df, testing_df) def fit_VECM(training_pair_df, pair: TradingPair): - vecm_model = VECM(training_pair_df[pair.colnames()].reset_index(drop=True), coint_rank=1) + vecm_model = VECM( + training_pair_df[pair.colnames()].reset_index(drop=True), coint_rank=1 + ) vecm_fit = vecm_model.fit() # Check if the model converged properly @@ -230,13 +116,18 @@ def fit_VECM(training_pair_df, pair: TradingPair): return vecm_fit -def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.DataFrame: + + +def create_trading_signals( + vecm_fit, testing_pair_df, pair: TradingPair +) -> pd.DataFrame: result_columns = [ "time", "action", "symbol", "price", - "equilibrium", + "disequilibrium", + "scaled_disequilibrium", "pair", ] @@ -248,22 +139,23 @@ def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.D beta = vecm_fit.beta - predicted_df["equilibrium_term"] = ( - beta[0] * predicted_df[colname_a] - + beta[1] * predicted_df[colname_b] - ) - pair_result_df = pd.merge( - testing_pair_df.reset_index(drop=True), predicted_df, left_index=True, right_index=True, suffixes=('', '_pred') + testing_pair_df.reset_index(drop=True), + predicted_df, + left_index=True, + right_index=True, + suffixes=("", "_pred"), ).dropna() - pair_result_df["equilibrium"] = ( - beta[0] * pair_result_df[colname_a] - + beta[1] * pair_result_df[colname_b] - ) + pair_result_df["disequilibrium"] = pair_result_df[pair.colnames()] @ beta + + pair_mu = pair.disequilibrium_mu_ + pair_std = pair.disequilibrium_std_ + + pair_result_df["scaled_disequilibrium"] = abs( + pair_result_df["disequilibrium"] - pair_mu + ) / pair_std - pair_result_df["abs_equilibrium"] = np.abs(pair_result_df["equilibrium"]) - pair_result_df["scaled_equilibrium"] = pair_result_df["abs_equilibrium"] / (abs(beta[0]) * pair_result_df[colname_a] + abs(beta[1]) * pair_result_df[colname_b]) # Reset index to ensure proper indexing pair_result_df = pair_result_df.reset_index() @@ -272,26 +164,24 @@ def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.D open_row_index = None initial_abs_term = None + open_threshold = CONFIG["disequilibrium_open_trshld"] + close_threshold = CONFIG["disequilibrium_close_trshld"] for row_idx in range(len(pair_result_df)): - current_abs_term = pair_result_df["abs_equilibrium"][row_idx] + curr_disequilibrium = pair_result_df["scaled_disequilibrium"][row_idx] - # Check if current row has sufficient equilibrium (not near-zero) - if current_abs_term >= CONFIG["equilibrium_threshold_open"]: + # Check if current row has sufficient disequilibrium (not near-zero) + if curr_disequilibrium >= open_threshold: open_row_index = row_idx - initial_abs_term = current_abs_term + initial_abs_term = curr_disequilibrium break - # If no row with sufficient equilibrium found, skip this pair + # If no row with sufficient disequilibrium found, skip this pair if open_row_index is None: - print(f"{pair}: Insufficient divergence in testing dataset. Skipping.") + print(f"{pair}: Insufficient disequilibrium in testing dataset. Skipping.") return pd.DataFrame() # Look for close signal starting from the open position - trading_signals_df = ( - pair_result_df["abs_equilibrium"][open_row_index:] - # < initial_abs_term / CONFIG["equilibrium_threshold_close"] - < CONFIG["equilibrium_threshold_close"] - ) + trading_signals_df = (pair_result_df["scaled_disequilibrium"][open_row_index:] < close_threshold) # Adjust indices to account for the offset from open_row_index close_row_index = None @@ -302,7 +192,8 @@ def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.D open_row = pair_result_df.loc[open_row_index] open_tstamp = open_row["tstamp"] - open_eqlbrm = open_row["equilibrium"] + open_disequilibrium = open_row["disequilibrium"] + open_scaled_disequilibrium = open_row["scaled_disequilibrium"] open_px_a = open_row[f"{colname_a}"] open_px_b = open_row[f"{colname_b}"] @@ -323,70 +214,23 @@ def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.D # If no close signal found, print position and unrealized PnL if close_row_index is None: - global TOTAL_UNREALIZED_PNL, OUTSTANDING_POSITIONS last_row_index = len(pair_result_df) - 1 - last_row = pair_result_df.loc[last_row_index] - last_tstamp = last_row["tstamp"] - last_px_a = last_row[f"{colname_a}"] - last_px_b = last_row[f"{colname_b}"] - # Calculate share quantities based on $1000 funding per pair - # Split $1000 equally between the two positions ($500 each) - funding_per_position = CONFIG["funding_per_pair"] / 2 - shares_a = funding_per_position / open_px_a - shares_b = funding_per_position / open_px_b - - # Calculate unrealized PnL for each position - if open_side_a == "BUY": - unrealized_pnl_a = (last_px_a - open_px_a) / open_px_a * 100 - unrealized_dollar_a = shares_a * (last_px_a - open_px_a) - else: # SELL - unrealized_pnl_a = (open_px_a - last_px_a) / open_px_a * 100 - unrealized_dollar_a = shares_a * (open_px_a - last_px_a) - - if open_side_b == "BUY": - unrealized_pnl_b = (last_px_b - open_px_b) / open_px_b * 100 - unrealized_dollar_b = shares_b * (last_px_b - open_px_b) - else: # SELL - unrealized_pnl_b = (open_px_b - last_px_b) / open_px_b * 100 - unrealized_dollar_b = shares_b * (open_px_b - last_px_b) - - total_unrealized_pnl = unrealized_pnl_a + unrealized_pnl_b - total_unrealized_dollar = unrealized_dollar_a + unrealized_dollar_b - - # Add to global total - TOTAL_UNREALIZED_PNL += total_unrealized_pnl - - # Store outstanding positions - OUTSTANDING_POSITIONS.append({ - 'pair': str(pair), - 'symbol_a': pair.symbol_a_, - 'symbol_b': pair.symbol_b_, - 'side_a': open_side_a, - 'side_b': open_side_b, - 'shares_a': shares_a, - 'shares_b': shares_b, - 'open_px_a': open_px_a, - 'open_px_b': open_px_b, - 'current_px_a': last_px_a, - 'current_px_b': last_px_b, - 'unrealized_dollar_a': unrealized_dollar_a, - 'unrealized_dollar_b': unrealized_dollar_b, - 'total_unrealized_dollar': total_unrealized_dollar, - 'open_time': open_tstamp, - 'last_time': last_tstamp, - 'initial_abs_term': initial_abs_term, - 'current_abs_term': pair_result_df.loc[last_row_index, "abs_equilibrium"], - 'closing_threshold': initial_abs_term / CONFIG["equilibrium_threshold_close"], - 'equilibrium_ratio': pair_result_df.loc[last_row_index, "abs_equilibrium"] / (initial_abs_term / CONFIG["equilibrium_threshold_close"]) - }) - - print(f"{pair}: NO CLOSE SIGNAL FOUND - Position held until end of session") - print(f" Open: {open_tstamp} | Last: {last_tstamp}") - print(f" {pair.symbol_a_}: {open_side_a} {shares_a:.2f} shares @ ${open_px_a:.2f} -> ${last_px_a:.2f} | Unrealized: ${unrealized_dollar_a:.2f} ({unrealized_pnl_a:.2f}%)") - print(f" {pair.symbol_b_}: {open_side_b} {shares_b:.2f} shares @ ${open_px_b:.2f} -> ${last_px_b:.2f} | Unrealized: ${unrealized_dollar_b:.2f} ({unrealized_pnl_b:.2f}%)") - print(f" Total Unrealized: ${total_unrealized_dollar:.2f} ({total_unrealized_pnl:.2f}%)") + # Use the new method from BacktestResult to handle outstanding positions + BacktestResults.handle_outstanding_position( + pair=pair, + pair_result_df=pair_result_df, + last_row_index=last_row_index, + open_side_a=open_side_a, + open_side_b=open_side_b, + open_px_a=open_px_a, + open_px_b=open_px_b, + open_tstamp=open_tstamp, + initial_abs_term=initial_abs_term, + colname_a=colname_a, + colname_b=colname_b + ) # Return only open trades (no close trades) trd_signal_tuples = [ @@ -395,7 +239,8 @@ def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.D open_side_a, pair.symbol_a_, open_px_a, - open_eqlbrm, + open_disequilibrium, + open_scaled_disequilibrium, pair, ), ( @@ -403,7 +248,8 @@ def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.D open_side_b, pair.symbol_b_, open_px_b, - open_eqlbrm, + open_disequilibrium, + open_scaled_disequilibrium, pair, ), ] @@ -411,7 +257,8 @@ def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.D # Close signal found - create complete trade close_row = pair_result_df.loc[close_row_index] close_tstamp = close_row["tstamp"] - close_eqlbrm = close_row["equilibrium"] + close_disequilibrium = close_row["disequilibrium"] + close_scaled_disequilibrium = close_row["scaled_disequilibrium"] close_px_a = close_row[f"{colname_a}"] close_px_b = close_row[f"{colname_b}"] @@ -423,7 +270,8 @@ def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.D open_side_a, pair.symbol_a_, open_px_a, - open_eqlbrm, + open_disequilibrium, + open_scaled_disequilibrium, pair, ), ( @@ -431,7 +279,8 @@ def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.D open_side_b, pair.symbol_b_, open_px_b, - open_eqlbrm, + open_disequilibrium, + open_scaled_disequilibrium, pair, ), ( @@ -439,7 +288,8 @@ def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.D close_side_a, pair.symbol_a_, close_px_a, - close_eqlbrm, + close_disequilibrium, + close_scaled_disequilibrium, pair, ), ( @@ -447,7 +297,8 @@ def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.D close_side_b, pair.symbol_b_, close_px_b, - close_eqlbrm, + close_disequilibrium, + close_scaled_disequilibrium, pair, ), ] @@ -458,10 +309,13 @@ def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.D columns=result_columns, ) -def run_single_pair(market_data: pd.DataFrame, price_column:str, pair: TradingPair) -> Optional[pd.DataFrame]: - colname_a = f"{price_column}_{pair.symbol_a_}" - colname_b = f"{price_column}_{pair.symbol_b_}" - training_pair_df, testing_pair_df = get_datasets(df=market_data, training_minutes=CONFIG["training_minutes"], pair=pair) + +def run_single_pair( + market_data: pd.DataFrame, price_column: str, pair: TradingPair +) -> Optional[pd.DataFrame]: + training_pair_df, testing_pair_df = get_datasets( + df=market_data, training_minutes=CONFIG["training_minutes"], pair=pair + ) # Check if we have enough data points for a meaningful analysis min_required_points = CONFIG[ @@ -491,12 +345,21 @@ def run_single_pair(market_data: pd.DataFrame, price_column:str, pair: TradingPa ): # Small threshold to avoid division by very small numbers print(f"{pair}: Skipping due to near-zero beta[1] value: {vecm_fit.beta[1]}") return None + diseqlbrm_series = training_pair_df[pair.colnames()] @ vecm_fit.beta + diseqlbrm_series_mu: float = diseqlbrm_series.mean().iloc[0] + diseqlbrm_series_std: float = diseqlbrm_series.std().iloc[0] + pair.set_training_disequilibrium(diseqlbrm_series_mu, diseqlbrm_series_std) + + # Normalize the disequilibrium + training_pair_df["scaled_disequilibrium"] = ( + diseqlbrm_series - diseqlbrm_series_mu + ) / diseqlbrm_series_std try: pair_trades = create_trading_signals( - vecm_fit=vecm_fit, - testing_pair_df=testing_pair_df, - pair=pair, + vecm_fit=vecm_fit, + testing_pair_df=testing_pair_df, + pair=pair, ) except Exception as e: print(f"{pair}: Prediction failed: {str(e)}") @@ -504,136 +367,35 @@ def run_single_pair(market_data: pd.DataFrame, price_column:str, pair: TradingPa return pair_trades -def add_trade(pair_nm, symbol, action, price): - pair_nm = str(pair_nm) +def run_pairs(config: Dict, market_data_df: pd.DataFrame, price_column: str) -> None: - if pair_nm not in TRADES: - TRADES[pair_nm] = {symbol: []} - if symbol not in TRADES[pair_nm]: - TRADES[pair_nm][symbol] = [] - TRADES[pair_nm][symbol].append((action, price)) + def _create_pairs(config: Dict) -> List[TradingPair]: + instruments = config["instruments"] + all_indexes = range(len(instruments)) + unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j] + pairs = [] + for a_index, b_index in unique_index_pairs: + symbol_a = instruments[a_index] + symbol_b = instruments[b_index] + pair = TradingPair(symbol_a, symbol_b, price_column) + pairs.append(pair) + return pairs -def collect_single_day_results(result): - if result is None: - return - - print("\n -------------- Suggested Trades ") - print(result) - - for row in result.itertuples(): - action = row.action - symbol = row.symbol - price = row.price - add_trade(pair_nm=row.pair, action=action, symbol=symbol, price=price) - -def print_single_day_results(result): - for pair, symbols in TRADES.items(): - print(f"\n--- {pair} ---") - for symbol, trades in symbols.items(): - for side, price in trades: - print(f"{symbol} {side} at ${price}") - -def print_results_suummary(all_results): - # Summary of all processed files - print("\n====== Summary of All Processed Files ======") - for filename, data in all_results.items(): - trade_count = sum( - len(trades) - for symbol_trades in data["trades"].values() - for trades in symbol_trades.values() - ) - print(f"{filename}: {trade_count} trades") - - -def calculate_returns(all_results: Dict): - global TOTAL_REALIZED_PNL - print("\n====== Returns By Day and Pair ======") - - for filename, data in all_results.items(): - day_return = 0 - print(f"\n--- {filename} ---") - - # Process each pair - for pair, symbols in data["trades"].items(): - pair_return = 0 - pair_trades = [] - - # Calculate individual symbol returns in the pair - for symbol, trades in symbols.items(): - if len(trades) >= 2: # Need at least entry and exit - # Get entry and exit trades - entry_action, entry_price = trades[0] - exit_action, exit_price = trades[1] - - # Calculate return based on action - symbol_return = 0 - if entry_action == "BUY" and exit_action == "SELL": - # Long position - symbol_return = (exit_price - entry_price) / entry_price * 100 - elif entry_action == "SELL" and exit_action == "BUY": - # Short position - symbol_return = (entry_price - exit_price) / entry_price * 100 - - pair_trades.append( - ( - symbol, - entry_action, - entry_price, - exit_action, - exit_price, - symbol_return, - ) - ) - pair_return += symbol_return - - # Print pair returns - if pair_trades: - print(f" {pair}:") - for ( - symbol, - entry_action, - entry_price, - exit_action, - exit_price, - symbol_return, - ) in pair_trades: - print( - f" {symbol}: {entry_action} @ ${entry_price:.2f}, {exit_action} @ ${exit_price:.2f}, Return: {symbol_return:.2f}%" - ) - print(f" Pair Total Return: {pair_return:.2f}%") - day_return += pair_return - - # Print day total return and add to global realized PnL - if day_return != 0: - print(f" Day Total Return: {day_return:.2f}%") - TOTAL_REALIZED_PNL += day_return - -def run_pairs(summaries_df: pd.DataFrame, price_column: str) -> None: - - result_df = transform_dataframe(df=summaries_df, price_column=price_column) - - stock_price_columns = [ - column - for column in result_df.columns - if column.startswith(f"{price_column}_") - ] - - # Find the starting indices for A and B - all_indexes = range(len(stock_price_columns)) - unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j] pairs_trades = [] - for a_index, b_index in unique_index_pairs: + for pair in _create_pairs(config): # Get the actual variable names - colname_a = stock_price_columns[a_index] - colname_b = stock_price_columns[b_index] + # colname_a = stock_price_columns[a_index] + # colname_b = stock_price_columns[b_index] - symbol_a = colname_a[len(f"{price_column}-") :] - symbol_b = colname_b[len(f"{price_column}-") :] - pair = TradingPair(symbol_a, symbol_b, price_column) + # symbol_a = colname_a[len(f"{price_column}-") :] + # symbol_b = colname_b[len(f"{price_column}-") :] + # pair = TradingPair(symbol_a, symbol_b, price_column) - single_pair_trades = run_single_pair(market_data=result_df, price_column=price_column, pair=pair) + single_pair_trades = run_single_pair( + market_data=market_data_df, price_column=price_column, pair=pair + ) if len(single_pair_trades) > 0: pairs_trades.append(single_pair_trades) # Check if result_list has any data before concatenating @@ -645,48 +407,15 @@ def run_pairs(summaries_df: pd.DataFrame, price_column: str) -> None: result["time"] = pd.to_datetime(result["time"]) result = result.set_index("time").sort_index() - collect_single_day_results(result) - # print_single_day_results(result) + BacktestResults.collect_single_day_results(result) + # BacktestResults.print_single_day_results() -def print_outstanding_positions(): - """Print all outstanding positions with share quantities and unrealized PnL""" - if not OUTSTANDING_POSITIONS: - print("\n====== NO OUTSTANDING POSITIONS ======") - return - - print(f"\n====== OUTSTANDING POSITIONS ======") - print(f"{'Pair':<15} {'Symbol':<6} {'Side':<4} {'Shares':<10} {'Open $':<8} {'Current $':<10} {'Unrealized $':<12} {'%':<8} {'Close Eq':<10}") - print("-" * 105) - - total_unrealized_dollar = 0.0 - - for pos in OUTSTANDING_POSITIONS: - # Print position A - print(f"{pos['pair']:<15} {pos['symbol_a']:<6} {pos['side_a']:<4} {pos['shares_a']:<10.2f} {pos['open_px_a']:<8.2f} {pos['current_px_a']:<10.2f} {pos['unrealized_dollar_a']:<12.2f} {pos['unrealized_dollar_a']/500*100:<8.2f} {'':<10}") - - # Print position B - print(f"{'':<15} {pos['symbol_b']:<6} {pos['side_b']:<4} {pos['shares_b']:<10.2f} {pos['open_px_b']:<8.2f} {pos['current_px_b']:<10.2f} {pos['unrealized_dollar_b']:<12.2f} {pos['unrealized_dollar_b']/500*100:<8.2f} {'':<10}") - - # Print pair totals with equilibrium info - equilibrium_status = "CLOSE" if pos['current_abs_term'] < pos['closing_threshold'] else f"{pos['equilibrium_ratio']:.2f}x" - print(f"{'':<15} {'PAIR':<6} {'TOT':<4} {'':<10} {'':<8} {'':<10} {pos['total_unrealized_dollar']:<12.2f} {pos['total_unrealized_dollar']/1000*100:<8.2f} {equilibrium_status:<10}") - - # Print equilibrium details - print(f"{'':<15} {'EQ':<6} {'INFO':<4} {'':<10} {'':<8} {'':<10} {'Curr:':<6}{pos['current_abs_term']:<6.4f} {'Thresh:':<7}{pos['closing_threshold']:<6.4f} {'':<10}") - print("-" * 105) - - total_unrealized_dollar += pos['total_unrealized_dollar'] - - print(f"{'TOTAL OUTSTANDING':<80} ${total_unrealized_dollar:<12.2f}") if __name__ == "__main__": # Initialize a dictionary to store all trade results - all_results = {} + all_results: Dict[str, Dict[str, Any]] = {} # Initialize global PnL tracking variables - TOTAL_REALIZED_PNL = 0.0 - TOTAL_UNREALIZED_PNL = 0.0 - OUTSTANDING_POSITIONS = [] # Process each data file price_column = CONFIG["price_column"] @@ -694,39 +423,36 @@ if __name__ == "__main__": print(f"\n====== Processing {datafile} ======") # Clear the TRADES global dictionary and reset unrealized PnL for the new file - TRADES.clear() - TOTAL_UNREALIZED_PNL = 0.0 - TOTAL_REALIZED_PNL = 0.0 + BacktestResults.clear_trades() # Process data for this file try: - run_pairs( - summaries_df=load_market_data(f'{CONFIG["data_directory"]}/{datafile}', config=CONFIG), - price_column=price_column + market_data_df = load_market_data( + f'{CONFIG["data_directory"]}/{datafile}', config=CONFIG ) + market_data_df = transform_dataframe( + df=market_data_df, price_column=price_column + ) + run_pairs(config=CONFIG, market_data_df=market_data_df, price_column=price_column) # Store results with file name as key filename = datafile.split("/")[-1] - all_results[filename] = {"trades": TRADES.copy()} + all_results[filename] = {"trades": BacktestResults.trades.copy()} print(f"Successfully processed {filename}") # Print total unrealized PnL for this file - if TOTAL_UNREALIZED_PNL != 0: - print(f"\n====== TOTAL UNREALIZED PnL for {filename}: {TOTAL_UNREALIZED_PNL:.2f}% ======") - else: - print(f"\n====== No unrealized positions for {filename} ======") + print( + f"\n====== TOTAL UNREALIZED PnL for {filename}: {BacktestResults.get_total_unrealized_pnl():.2f}% ======" + ) except Exception as e: print(f"Error processing {datafile}: {str(e)}") - # print_results_suummary(all_results) - calculate_returns(all_results) + # BacktestResults.print_results_summary(all_results) + BacktestResults.calculate_returns(all_results) # Print grand totals - print(f"\n====== GRAND TOTALS ACROSS ALL PAIRS ======") - print(f"Total Realized PnL: {TOTAL_REALIZED_PNL:.2f}%") - print(f"Total Unrealized PnL: {TOTAL_UNREALIZED_PNL:.2f}%") - print(f"Combined Total PnL: {TOTAL_REALIZED_PNL + TOTAL_UNREALIZED_PNL:.2f}%") + BacktestResults.print_grand_totals() - print_outstanding_positions() + BacktestResults.print_outstanding_positions() diff --git a/src/results.py b/src/results.py new file mode 100644 index 0000000..e5623ea --- /dev/null +++ b/src/results.py @@ -0,0 +1,315 @@ +from typing import Any, Dict, List +import pandas as pd + + +class BacktestResult: + """ + Class to handle backtest results, trades tracking, PnL calculations, and reporting. + """ + + def __init__(self, config: Dict[str, Any]): + self.config = config + self.trades: Dict[str, Dict[str, Any]] = {} + self.total_unrealized_pnl = 0.0 + self.total_realized_pnl = 0.0 + self.outstanding_positions: List[Dict[str, Any]] = [] + + def add_trade(self, pair_nm, symbol, action, price): + """Add a trade to the results tracking.""" + pair_nm = str(pair_nm) + + if pair_nm not in self.trades: + self.trades[pair_nm] = {symbol: []} + if symbol not in self.trades[pair_nm]: + self.trades[pair_nm][symbol] = [] + self.trades[pair_nm][symbol].append((action, price)) + + def add_unrealized_pnl(self, unrealized_pnl: float): + """Add unrealized PnL to the total.""" + self.total_unrealized_pnl += unrealized_pnl + + def add_outstanding_position(self, position: Dict[str, Any]): + """Add an outstanding position to tracking.""" + self.outstanding_positions.append(position) + + def add_realized_pnl(self, realized_pnl: float): + """Add realized PnL to the total.""" + self.total_realized_pnl += realized_pnl + + def get_total_unrealized_pnl(self) -> float: + """Get total unrealized PnL.""" + return self.total_unrealized_pnl + + def get_total_realized_pnl(self) -> float: + """Get total realized PnL.""" + return self.total_realized_pnl + + def get_outstanding_positions(self) -> List[Dict[str, Any]]: + """Get all outstanding positions.""" + return self.outstanding_positions + + def get_trades(self) -> Dict[str, Dict[str, Any]]: + """Get all trades.""" + return self.trades + + def clear_trades(self): + """Clear all trades (used when processing new files).""" + self.trades.clear() + + def collect_single_day_results(self, result): + """Collect and process single day trading results.""" + if result is None: + return + + print("\n -------------- Suggested Trades ") + print(result) + + for row in result.itertuples(): + action = row.action + symbol = row.symbol + price = row.price + self.add_trade( + pair_nm=row.pair, action=action, symbol=symbol, price=price + ) + + def print_single_day_results(self): + """Print single day results summary.""" + for pair, symbols in self.trades.items(): + print(f"\n--- {pair} ---") + for symbol, trades in symbols.items(): + for side, price in trades: + print(f"{symbol} {side} at ${price}") + + def print_results_summary(self, all_results): + """Print summary of all processed files.""" + print("\n====== Summary of All Processed Files ======") + for filename, data in all_results.items(): + trade_count = sum( + len(trades) + for symbol_trades in data["trades"].values() + for trades in symbol_trades.values() + ) + print(f"{filename}: {trade_count} trades") + + def calculate_returns(self, all_results: Dict): + """Calculate and print returns by day and pair.""" + print("\n====== Returns By Day and Pair ======") + + for filename, data in all_results.items(): + day_return = 0 + print(f"\n--- {filename} ---") + + # Process each pair + for pair, symbols in data["trades"].items(): + pair_return = 0 + pair_trades = [] + + # Calculate individual symbol returns in the pair + for symbol, trades in symbols.items(): + if len(trades) >= 2: # Need at least entry and exit + # Get entry and exit trades + entry_action, entry_price = trades[0] + exit_action, exit_price = trades[1] + + # Calculate return based on action + symbol_return = 0 + if entry_action == "BUY" and exit_action == "SELL": + # Long position + symbol_return = (exit_price - entry_price) / entry_price * 100 + elif entry_action == "SELL" and exit_action == "BUY": + # Short position + symbol_return = (entry_price - exit_price) / entry_price * 100 + + pair_trades.append( + ( + symbol, + entry_action, + entry_price, + exit_action, + exit_price, + symbol_return, + ) + ) + pair_return += symbol_return + + # Print pair returns + if pair_trades: + print(f" {pair}:") + for ( + symbol, + entry_action, + entry_price, + exit_action, + exit_price, + symbol_return, + ) in pair_trades: + print( + f" {symbol}: {entry_action} @ ${entry_price:.2f}, {exit_action} @ ${exit_price:.2f}, Return: {symbol_return:.2f}%" + ) + print(f" Pair Total Return: {pair_return:.2f}%") + day_return += pair_return + + # Print day total return and add to global realized PnL + if day_return != 0: + print(f" Day Total Return: {day_return:.2f}%") + self.add_realized_pnl(day_return) + + def print_outstanding_positions(self): + """Print all outstanding positions with share quantities and unrealized PnL.""" + if not self.get_outstanding_positions(): + print("\n====== NO OUTSTANDING POSITIONS ======") + return + + print(f"\n====== OUTSTANDING POSITIONS ======") + print( + f"{'Pair':<15}" + f" {'Symbol':<10}" + f" {'Side':<4}" + f" {'Shares':<10}" + f" {'Open $':<8}" + f" {'Current $':<10}" + f" {'Unrealized $':<12}" + f" {'%':<8}" + f" {'Close Eq':<10}" + ) + print("-" * 105) + + total_unrealized_dollar = 0.0 + + for pos in self.get_outstanding_positions(): + # Print position A + print( + f"{pos['pair']:<15} {pos['symbol_a']:<6} {pos['side_a']:<4} {pos['shares_a']:<10.2f} {pos['open_px_a']:<8.2f} {pos['current_px_a']:<10.2f} {pos['unrealized_dollar_a']:<12.2f} {pos['unrealized_dollar_a']/500*100:<8.2f} {'':<10}" + ) + + # Print position B + print( + f"{'':<15} {pos['symbol_b']:<6} {pos['side_b']:<4} {pos['shares_b']:<10.2f} {pos['open_px_b']:<8.2f} {pos['current_px_b']:<10.2f} {pos['unrealized_dollar_b']:<12.2f} {pos['unrealized_dollar_b']/500*100:<8.2f} {'':<10}" + ) + + # Print pair totals with disequilibrium info + disequilibrium_status = ( + "CLOSE" + if pos["current_abs_term"] < pos["closing_threshold"] + else f"{pos['disequilibrium_ratio']:.2f}x" + ) + print( + f"{'':<15} {'PAIR':<6} {'TOT':<4} {'':<10} {'':<8} {'':<10} {pos['total_unrealized_dollar']:<12.2f} {pos['total_unrealized_dollar']/1000*100:<8.2f} {disequilibrium_status:<10}" + ) + + # Print disequilibrium details + print( + f"{'':<15} {'EQ':<6} {'INFO':<4} {'':<10} {'':<8} {'':<10} {'Curr:':<6}{pos['current_abs_term']:<6.4f} {'Thresh:':<7}{pos['closing_threshold']:<6.4f} {'':<10}" + ) + print("-" * 105) + + total_unrealized_dollar += pos["total_unrealized_dollar"] + + print(f"{'TOTAL OUTSTANDING':<80} ${total_unrealized_dollar:<12.2f}") + + def print_grand_totals(self): + """Print grand totals across all pairs.""" + print(f"\n====== GRAND TOTALS ACROSS ALL PAIRS ======") + print(f"Total Realized PnL: {self.get_total_realized_pnl():.2f}%") + print(f"Total Unrealized PnL: {self.get_total_unrealized_pnl():.2f}%") + print( + f"Combined Total PnL: {self.get_total_realized_pnl() + self.get_total_unrealized_pnl():.2f}%" + ) + + def handle_outstanding_position(self, pair, pair_result_df, last_row_index, + open_side_a, open_side_b, open_px_a, open_px_b, + open_tstamp, initial_abs_term, colname_a, colname_b): + """ + Handle calculation and tracking of outstanding positions when no close signal is found. + + Args: + pair: TradingPair object + pair_result_df: DataFrame with pair results + last_row_index: Index of the last row in the data + open_side_a, open_side_b: Trading sides for symbols A and B + open_px_a, open_px_b: Opening prices for symbols A and B + open_tstamp: Opening timestamp + initial_abs_term: Initial absolute disequilibrium term + colname_a, colname_b: Column names for the price data + + Returns: + tuple: (unrealized_pnl_a, unrealized_pnl_b, unrealized_dollar_a, unrealized_dollar_b) + """ + last_row = pair_result_df.loc[last_row_index] + last_tstamp = last_row["tstamp"] + last_px_a = last_row[colname_a] + last_px_b = last_row[colname_b] + + # Calculate share quantities based on funding per pair + # Split funding equally between the two positions + funding_per_position = self.config["funding_per_pair"] / 2 + shares_a = funding_per_position / open_px_a + shares_b = funding_per_position / open_px_b + + # Calculate unrealized PnL for each position + if open_side_a == "BUY": + unrealized_pnl_a = (last_px_a - open_px_a) / open_px_a * 100 + unrealized_dollar_a = shares_a * (last_px_a - open_px_a) + else: # SELL + unrealized_pnl_a = (open_px_a - last_px_a) / open_px_a * 100 + unrealized_dollar_a = shares_a * (open_px_a - last_px_a) + + if open_side_b == "BUY": + unrealized_pnl_b = (last_px_b - open_px_b) / open_px_b * 100 + unrealized_dollar_b = shares_b * (last_px_b - open_px_b) + else: # SELL + unrealized_pnl_b = (open_px_b - last_px_b) / open_px_b * 100 + unrealized_dollar_b = shares_b * (open_px_b - last_px_b) + + total_unrealized_pnl = unrealized_pnl_a + unrealized_pnl_b + total_unrealized_dollar = unrealized_dollar_a + unrealized_dollar_b + + # Add to global total + self.add_unrealized_pnl(total_unrealized_pnl) + + # Store outstanding positions + self.add_outstanding_position( + { + "pair": str(pair), + "symbol_a": pair.symbol_a_, + "symbol_b": pair.symbol_b_, + "side_a": open_side_a, + "side_b": open_side_b, + "shares_a": shares_a, + "shares_b": shares_b, + "open_px_a": open_px_a, + "open_px_b": open_px_b, + "current_px_a": last_px_a, + "current_px_b": last_px_b, + "unrealized_dollar_a": unrealized_dollar_a, + "unrealized_dollar_b": unrealized_dollar_b, + "total_unrealized_dollar": total_unrealized_dollar, + "open_time": open_tstamp, + "last_time": last_tstamp, + "initial_abs_term": initial_abs_term, + "current_abs_term": pair_result_df.loc[ + last_row_index, "scaled_disequilibrium" + ], + "closing_threshold": initial_abs_term + / self.config["disequilibrium_close_trshld"], + "disequilibrium_ratio": pair_result_df.loc[ + last_row_index, "scaled_disequilibrium" + ] + / (initial_abs_term / self.config["disequilibrium_close_trshld"]), + } + ) + + # Print position details + print(f"{pair}: NO CLOSE SIGNAL FOUND - Position held until end of session") + print(f" Open: {open_tstamp} | Last: {last_tstamp}") + print( + f" {pair.symbol_a_}: {open_side_a} {shares_a:.2f} shares @ ${open_px_a:.2f} -> ${last_px_a:.2f} | Unrealized: ${unrealized_dollar_a:.2f} ({unrealized_pnl_a:.2f}%)" + ) + print( + f" {pair.symbol_b_}: {open_side_b} {shares_b:.2f} shares @ ${open_px_b:.2f} -> ${last_px_b:.2f} | Unrealized: ${unrealized_dollar_b:.2f} ({unrealized_pnl_b:.2f}%)" + ) + print( + f" Total Unrealized: ${total_unrealized_dollar:.2f} ({total_unrealized_pnl:.2f}%)" + ) + + return unrealized_pnl_a, unrealized_pnl_b, unrealized_dollar_a, unrealized_dollar_b \ No newline at end of file diff --git a/src/tools/data_loader.py b/src/tools/data_loader.py index 67f76a6..144154b 100644 --- a/src/tools/data_loader.py +++ b/src/tools/data_loader.py @@ -1,8 +1,11 @@ - import sys import sqlite3 +from typing import Dict, Tuple import pandas as pd +from tools.trading_pair import TradingPair + + def load_sqlite_to_dataframe(db_path, query): try: conn = sqlite3.connect(db_path) @@ -16,10 +19,122 @@ def load_sqlite_to_dataframe(db_path, query): print(f"Error: {excpt}") raise finally: - if 'conn' in locals(): + if "conn" in locals(): conn.close() -if __name__ == "__main__": - df1 = load_sqlite_to_dataframe(sys.argv[1], table_name='md_1min_bars') - print(df1) \ No newline at end of file +def convert_time_to_UTC(value: str, timezone: str): + + from zoneinfo import ZoneInfo + from datetime import datetime + + # Parse it to naive datetime object + local_dt = datetime.strptime(value, "%Y-%m-%d %H:%M:%S") + + zinfo = ZoneInfo(timezone) + result = local_dt.replace(tzinfo=zinfo) + + result = result.astimezone(ZoneInfo("UTC")) + result = result.strftime("%Y-%m-%d %H:%M:%S") + + return result + + +def load_market_data(datafile: str, config: Dict) -> pd.DataFrame: + from tools.data_loader import load_sqlite_to_dataframe + + instrument_ids = [ + '"' + config["instrument_id_pfx"] + instrument + '"' + for instrument in config["instruments"] + ] + security_type = config["security_type"] + exchange_id = config["exchange_id"] + + query = "select" + if security_type == "CRYPTO": + query += " strftime('%Y-%m-%d %H:%M:%S', tstamp/1000000000, 'unixepoch') as tstamp" + query += ", tstamp as time_ns" + else: + query += " tstamp" + query += ", tstamp_ns as time_ns" + + query += f", substr(instrument_id, {len(config['instrument_id_pfx']) + 1}) as symbol" + query += ", open" + query += ", high" + query += ", low" + query += ", close" + query += ", volume" + query += ", num_trades" + query += ", vwap" + + query += f" from {config['db_table_name']}" + query += f" where exchange_id ='{exchange_id}'" + query += f" and instrument_id in ({','.join(instrument_ids)})" + + df = load_sqlite_to_dataframe(db_path=datafile, query=query) + + # Trading Hours + date_str = df["tstamp"][0][0:10] + trading_hours = config["trading_hours"] + + start_time = convert_time_to_UTC( + f"{date_str} {trading_hours['begin_session']}", trading_hours["timezone"] + ) + end_time = convert_time_to_UTC( + f"{date_str} {trading_hours['end_session']}", trading_hours["timezone"] + ) + + # Perform boolean selection + df = df[(df["tstamp"] >= start_time) & (df["tstamp"] <= end_time)] + df["tstamp"] = pd.to_datetime(df["tstamp"]) + + return df + + +def transform_dataframe(df: pd.DataFrame, price_column: str): + # Select only the columns we need + df_selected = df[["tstamp", "symbol", price_column]] + + # Start with unique timestamps + result_df: pd.DataFrame = pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True) + + # For each unique symbol, add a corresponding close price column + for symbol in df_selected["symbol"].unique(): + # Filter rows for this symbol + df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True) + + # Create column name like "close-COIN" + new_price_column = f"{price_column}_{symbol}" + + # Create temporary dataframe with timestamp and price + temp_df = pd.DataFrame({ + "tstamp": df_symbol["tstamp"], + new_price_column: df_symbol[price_column] + }) + + # Join with our result dataframe + result_df = pd.merge(result_df, temp_df, on="tstamp", how="left") + result_df = result_df.reset_index(drop=True) # do not dropna() since irrelevant symbol would affect dataset + + return result_df + +def get_datasets(df: pd.DataFrame, training_minutes: int, pair: TradingPair) -> Tuple[pd.DataFrame, pd.DataFrame]: + # Training dataset + colname_a, colname_b = pair.colnames() + df = df[["tstamp", colname_a, colname_b]] + df = df.dropna() + + training_df = df.iloc[:training_minutes - 1, :].copy() + training_df.reset_index(drop=True).dropna().reset_index(drop=True) + + # Testing dataset + testing_df = df.iloc[training_minutes:, :].copy() + testing_df.reset_index(drop=True).dropna().reset_index(drop=True) + + return (training_df, testing_df) + + +if __name__ == "__main__": + df1 = load_sqlite_to_dataframe(sys.argv[1], table_name="md_1min_bars") + + print(df1) diff --git a/src/tools/trading_pair.py b/src/tools/trading_pair.py new file mode 100644 index 0000000..2818123 --- /dev/null +++ b/src/tools/trading_pair.py @@ -0,0 +1,37 @@ + +from typing import List, Optional + + +class TradingPair: + symbol_a_: str + symbol_b_: str + price_column_: str + + disequilibrium_mu_: Optional[float] + disequilibrium_std_: Optional[float] + + def __init__(self, symbol_a: str, symbol_b: str, price_column: str): + self.symbol_a_ = symbol_a + self.symbol_b_ = symbol_b + self.price_column_ = price_column + self.disequilibrium_mu_ = None + self.disequilibrium_std_ = None + + def colnames(self) -> List[str]: + return [f"{self.price_column_}_{self.symbol_a_}", f"{self.price_column_}_{self.symbol_b_}"] + + def set_training_disequilibrium(self, disequilibrium_mu: float, disequilibrium_std: float): + self.disequilibrium_mu_ = disequilibrium_mu + self.disequilibrium_std_ = disequilibrium_std + + def mu(self) -> float: + assert self.disequilibrium_mu_ is not None + return self.disequilibrium_mu_ + + def std(self) -> float: + assert self.disequilibrium_std_ is not None + return self.disequilibrium_std_ + + def __repr__(self) ->str: + return f"{self.symbol_a_} & {self.symbol_b_}" +