import sys from typing import Any, Dict, List, Optional import pandas as pd import numpy as np # ============= statsmodels =================== from statsmodels.tsa.vector_ar.vecm import VECM from tools.data_loader import get_datasets, load_market_data, transform_dataframe from tools.trading_pair import TradingPair from results import BacktestResult NanoPerMin = 1e9 UNSET_FLOAT: float = sys.float_info.max UNSET_INT: int = sys.maxsize # ------------------------ Configuration ------------------------ # Default configuration CRYPTO_CONFIG: Dict = { "security_type": "CRYPTO", # --- Data retrieval "data_directory": "./data/crypto", "datafiles": [ "20250519.mktdata.ohlcv.db", # "20250519.mktdata.ohlcv.db", ], "db_table_name": "bnbspot_ohlcv_1min", # ----- Instruments "exchange_id": "BNBSPOT", "instrument_id_pfx": "PAIR-", "instruments": [ "BTC-USDT", # "ETH-USDT", "LTC-USDT", ], "trading_hours": { "begin_session": "00:00:00", "end_session": "23:59:00", "timezone": "UTC", }, # ----- Model Settings "price_column": "close", "min_required_points": 30, "zero_threshold": 1e-10, "disequilibrium_open_trshld": 2, "disequilibrium_close_trshld": 0.5, "training_minutes": 120, # ----- Validation "funding_per_pair": 2000.0, # USD } # ========================== EQUITIES EQT_CONFIG: Dict = { # --- Data retrieval "security_type": "EQUITY", "data_directory": "./data/equity", "datafiles": [ "20250508.alpaca_sim_md.db", # "20250509.alpaca_sim_md.db", # "20250512.alpaca_sim_md.db", # "20250513.alpaca_sim_md.db", # "20250514.alpaca_sim_md.db", # "20250515.alpaca_sim_md.db", # "20250516.alpaca_sim_md.db", # "20250519.alpaca_sim_md.db", # "20250520.alpaca_sim_md.db" ], "db_table_name": "md_1min_bars", # ----- Instruments "exchange_id": "ALPACA", "instrument_id_pfx": "STOCK-", "instruments": [ "COIN", "GBTC", "HOOD", "MSTR", "PYPL", ], "trading_hours": { "begin_session": "9:30:00", "end_session": "16:00:00", "timezone": "America/New_York", }, # ----- Model Settings "price_column": "close", "min_required_points": 30, "zero_threshold": 1e-10, "disequilibrium_open_trshld": 5.0, "disequilibrium_close_trshld": 1.0, "training_minutes": 120, # ----- Validation "funding_per_pair": 2000.0, } # ========================================================================== # CONFIG = CRYPTO_CONFIG CONFIG = EQT_CONFIG BacktestResults = BacktestResult(config=CONFIG) def fit_VECM(training_pair_df, pair: TradingPair): vecm_model = VECM( training_pair_df[pair.colnames()].reset_index(drop=True), coint_rank=1 ) vecm_fit = vecm_model.fit() # Check if the model converged properly if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None: print(f"{pair}: VECM model failed to converge properly") return vecm_fit def create_trading_signals( vecm_fit, testing_pair_df, pair: TradingPair ) -> pd.DataFrame: result_columns = [ "time", "action", "symbol", "price", "disequilibrium", "scaled_disequilibrium", "pair", ] next_values = vecm_fit.predict(steps=len(testing_pair_df)) colname_a, colname_b = pair.colnames() # Convert prediction to a DataFrame for readability predicted_df = pd.DataFrame(next_values, columns=[colname_a, colname_b]) beta = vecm_fit.beta pair_result_df = pd.merge( testing_pair_df.reset_index(drop=True), predicted_df, left_index=True, right_index=True, suffixes=("", "_pred"), ).dropna() pair_result_df["disequilibrium"] = pair_result_df[pair.colnames()] @ beta pair_mu = pair.disequilibrium_mu_ pair_std = pair.disequilibrium_std_ pair_result_df["scaled_disequilibrium"] = abs( pair_result_df["disequilibrium"] - pair_mu ) / pair_std # Reset index to ensure proper indexing pair_result_df = pair_result_df.reset_index() # Iterate through the testing dataset to find the first trading opportunity open_row_index = None initial_abs_term = None open_threshold = CONFIG["disequilibrium_open_trshld"] close_threshold = CONFIG["disequilibrium_close_trshld"] for row_idx in range(len(pair_result_df)): curr_disequilibrium = pair_result_df["scaled_disequilibrium"][row_idx] # Check if current row has sufficient disequilibrium (not near-zero) if curr_disequilibrium >= open_threshold: open_row_index = row_idx initial_abs_term = curr_disequilibrium break # If no row with sufficient disequilibrium found, skip this pair if open_row_index is None: print(f"{pair}: Insufficient disequilibrium in testing dataset. Skipping.") return pd.DataFrame() # Look for close signal starting from the open position trading_signals_df = (pair_result_df["scaled_disequilibrium"][open_row_index:] < close_threshold) # Adjust indices to account for the offset from open_row_index close_row_index = None for idx, value in trading_signals_df.items(): if value: close_row_index = idx break open_row = pair_result_df.loc[open_row_index] open_tstamp = open_row["tstamp"] open_disequilibrium = open_row["disequilibrium"] open_scaled_disequilibrium = open_row["scaled_disequilibrium"] open_px_a = open_row[f"{colname_a}"] open_px_b = open_row[f"{colname_b}"] abs_beta = abs(beta[1]) pred_px_b = pair_result_df.loc[open_row_index][f"{colname_b}_pred"] pred_px_a = pair_result_df.loc[open_row_index][f"{colname_a}_pred"] if pred_px_b * abs_beta - pred_px_a > 0: open_side_a = "BUY" open_side_b = "SELL" close_side_a = "SELL" close_side_b = "BUY" else: open_side_b = "BUY" open_side_a = "SELL" close_side_b = "SELL" close_side_a = "BUY" # If no close signal found, print position and unrealized PnL if close_row_index is None: last_row_index = len(pair_result_df) - 1 # Use the new method from BacktestResult to handle outstanding positions BacktestResults.handle_outstanding_position( pair=pair, pair_result_df=pair_result_df, last_row_index=last_row_index, open_side_a=open_side_a, open_side_b=open_side_b, open_px_a=open_px_a, open_px_b=open_px_b, open_tstamp=open_tstamp, initial_abs_term=initial_abs_term, colname_a=colname_a, colname_b=colname_b ) # Return only open trades (no close trades) trd_signal_tuples = [ ( open_tstamp, open_side_a, pair.symbol_a_, open_px_a, open_disequilibrium, open_scaled_disequilibrium, pair, ), ( open_tstamp, open_side_b, pair.symbol_b_, open_px_b, open_disequilibrium, open_scaled_disequilibrium, pair, ), ] else: # Close signal found - create complete trade close_row = pair_result_df.loc[close_row_index] close_tstamp = close_row["tstamp"] close_disequilibrium = close_row["disequilibrium"] close_scaled_disequilibrium = close_row["scaled_disequilibrium"] close_px_a = close_row[f"{colname_a}"] close_px_b = close_row[f"{colname_b}"] print(f"{pair}: Close signal found at index {close_row_index}") trd_signal_tuples = [ ( open_tstamp, open_side_a, pair.symbol_a_, open_px_a, open_disequilibrium, open_scaled_disequilibrium, pair, ), ( open_tstamp, open_side_b, pair.symbol_b_, open_px_b, open_disequilibrium, open_scaled_disequilibrium, pair, ), ( close_tstamp, close_side_a, pair.symbol_a_, close_px_a, close_disequilibrium, close_scaled_disequilibrium, pair, ), ( close_tstamp, close_side_b, pair.symbol_b_, close_px_b, close_disequilibrium, close_scaled_disequilibrium, pair, ), ] # Add tuples to data frame return pd.DataFrame( trd_signal_tuples, columns=result_columns, ) def run_single_pair( market_data: pd.DataFrame, price_column: str, pair: TradingPair ) -> Optional[pd.DataFrame]: training_pair_df, testing_pair_df = get_datasets( df=market_data, training_minutes=CONFIG["training_minutes"], pair=pair ) # Check if we have enough data points for a meaningful analysis min_required_points = CONFIG[ "min_required_points" ] # Minimum number of points for a reasonable VECM model if len(training_pair_df) < min_required_points: print( f"{pair}: Not enough data points for analysis. Found {len(training_pair_df)}, need at least {min_required_points}" ) return None # Check for non-finite values if not np.isfinite(training_pair_df).all().all(): print(f"{pair}: Data contains non-finite values (NaN or inf)") return None # Fit the VECM try: vecm_fit = fit_VECM(training_pair_df, pair=pair) except Exception as e: print(f"{pair}: VECM fitting failed: {str(e)}") return None # Add safeguard against division by zero if ( abs(vecm_fit.beta[1]) < CONFIG["zero_threshold"] ): # Small threshold to avoid division by very small numbers print(f"{pair}: Skipping due to near-zero beta[1] value: {vecm_fit.beta[1]}") return None diseqlbrm_series = training_pair_df[pair.colnames()] @ vecm_fit.beta diseqlbrm_series_mu: float = diseqlbrm_series.mean().iloc[0] diseqlbrm_series_std: float = diseqlbrm_series.std().iloc[0] pair.set_training_disequilibrium(diseqlbrm_series_mu, diseqlbrm_series_std) # Normalize the disequilibrium training_pair_df["scaled_disequilibrium"] = ( diseqlbrm_series - diseqlbrm_series_mu ) / diseqlbrm_series_std try: pair_trades = create_trading_signals( vecm_fit=vecm_fit, testing_pair_df=testing_pair_df, pair=pair, ) except Exception as e: print(f"{pair}: Prediction failed: {str(e)}") return None return pair_trades def run_pairs(config: Dict, market_data_df: pd.DataFrame, price_column: str) -> None: def _create_pairs(config: Dict) -> List[TradingPair]: instruments = config["instruments"] all_indexes = range(len(instruments)) unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j] pairs = [] for a_index, b_index in unique_index_pairs: symbol_a = instruments[a_index] symbol_b = instruments[b_index] pair = TradingPair(symbol_a, symbol_b, price_column) pairs.append(pair) return pairs pairs_trades = [] for pair in _create_pairs(config): # Get the actual variable names # colname_a = stock_price_columns[a_index] # colname_b = stock_price_columns[b_index] # symbol_a = colname_a[len(f"{price_column}-") :] # symbol_b = colname_b[len(f"{price_column}-") :] # pair = TradingPair(symbol_a, symbol_b, price_column) single_pair_trades = run_single_pair( market_data=market_data_df, price_column=price_column, pair=pair ) if len(single_pair_trades) > 0: pairs_trades.append(single_pair_trades) # Check if result_list has any data before concatenating if len(pairs_trades) == 0: print("No trading signals found for any pairs") return None result = pd.concat(pairs_trades, ignore_index=True) result["time"] = pd.to_datetime(result["time"]) result = result.set_index("time").sort_index() BacktestResults.collect_single_day_results(result) # BacktestResults.print_single_day_results() if __name__ == "__main__": # Initialize a dictionary to store all trade results all_results: Dict[str, Dict[str, Any]] = {} # Initialize global PnL tracking variables # Process each data file price_column = CONFIG["price_column"] for datafile in CONFIG["datafiles"]: print(f"\n====== Processing {datafile} ======") # Clear the TRADES global dictionary and reset unrealized PnL for the new file BacktestResults.clear_trades() # Process data for this file try: market_data_df = load_market_data( f'{CONFIG["data_directory"]}/{datafile}', config=CONFIG ) market_data_df = transform_dataframe( df=market_data_df, price_column=price_column ) run_pairs(config=CONFIG, market_data_df=market_data_df, price_column=price_column) # Store results with file name as key filename = datafile.split("/")[-1] all_results[filename] = {"trades": BacktestResults.trades.copy()} print(f"Successfully processed {filename}") # Print total unrealized PnL for this file print( f"\n====== TOTAL UNREALIZED PnL for {filename}: {BacktestResults.get_total_unrealized_pnl():.2f}% ======" ) except Exception as e: print(f"Error processing {datafile}: {str(e)}") # BacktestResults.print_results_summary(all_results) BacktestResults.calculate_returns(all_results) # Print grand totals BacktestResults.print_grand_totals() BacktestResults.print_outstanding_positions()