From 9d57d8b255d3781f60bd996fc33bf6e1ff297003 Mon Sep 17 00:00:00 2001 From: Oleg Sheynin Date: Thu, 29 May 2025 16:21:49 -0400 Subject: [PATCH] progress --- src/pt_backtest.py | 217 +++------------------------------------------ src/strategies.py | 213 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 225 insertions(+), 205 deletions(-) create mode 100644 src/strategies.py diff --git a/src/pt_backtest.py b/src/pt_backtest.py index 1d33610..d1c08d7 100644 --- a/src/pt_backtest.py +++ b/src/pt_backtest.py @@ -10,6 +10,7 @@ import numpy as np from statsmodels.tsa.vector_ar.vecm import VECM from backtest_configs import CRYPTO_CONFIG +from strategies import StaticFitStrategy from tools.data_loader import load_market_data from tools.trading_pair import TradingPair from results import BacktestResult @@ -23,203 +24,7 @@ CONFIG = CRYPTO_CONFIG # CONFIG = EQT_CONFIG -trades_columns = [ - "time", - "action", - "symbol", - "price", - "disequilibrium", - "scaled_disequilibrium", - "pair", -] - -BacktestResults = BacktestResult(config=CONFIG) - -class PairTradingStrategy(ABC): - @abstractmethod - def create_trading_signals(pair: TradingPair, config: Dict) -> pd.DataFrame: - ... - @abstractmethod - def run_pair(pair: TradingPair) -> Optional[pd.DataFrame]: - ... - - -def run_pair(pair: TradingPair) -> Optional[pd.DataFrame]: - pair.get_datasets(training_minutes=CONFIG["training_minutes"]) - try: - is_cointegrated = pair.train_pair() - if not is_cointegrated: - print(f"{pair} IS NOT COINTEGRATED") - return None - except Exception as e: - print(f"{pair}: Training failed: {str(e)}") - return None - - try: - pair.predict() - except Exception as e: - print(f"{pair}: Prediction failed: {str(e)}") - return None - - pair_trades = create_trading_signals(pair=pair, config=CONFIG) - - return pair_trades - - -def create_trading_signals(pair: TradingPair, config: Dict) -> pd.DataFrame: - beta = pair.vecm_fit_.beta - colname_a, colname_b = pair.colnames() - - predicted_df = pair.predicted_df_ - - open_threshold = config["dis-equilibrium_open_trshld"] - close_threshold = config["dis-equilibrium_close_trshld"] - - # Iterate through the testing dataset to find the first trading opportunity - open_row_index = None - for row_idx in range(len(predicted_df)): - curr_disequilibrium = predicted_df["scaled_disequilibrium"][row_idx] - - # Check if current row has sufficient disequilibrium (not near-zero) - if curr_disequilibrium >= open_threshold: - open_row_index = row_idx - break - - # If no row with sufficient disequilibrium found, skip this pair - if open_row_index is None: - print(f"{pair}: Insufficient disequilibrium in testing dataset. Skipping.") - return pd.DataFrame() - - # Look for close signal starting from the open position - trading_signals_df = ( - predicted_df["scaled_disequilibrium"][open_row_index:] < close_threshold - ) - - # Adjust indices to account for the offset from open_row_index - close_row_index = None - for idx, value in trading_signals_df.items(): - if value: - close_row_index = idx - break - - open_row = predicted_df.loc[open_row_index] - open_tstamp = open_row["tstamp"] - open_disequilibrium = open_row["disequilibrium"] - open_scaled_disequilibrium = open_row["scaled_disequilibrium"] - open_px_a = open_row[f"{colname_a}"] - open_px_b = open_row[f"{colname_b}"] - - abs_beta = abs(beta[1]) - pred_px_b = predicted_df.loc[open_row_index][f"{colname_b}_pred"] - pred_px_a = predicted_df.loc[open_row_index][f"{colname_a}_pred"] - - if pred_px_b * abs_beta - pred_px_a > 0: - open_side_a = "BUY" - open_side_b = "SELL" - close_side_a = "SELL" - close_side_b = "BUY" - else: - open_side_b = "BUY" - open_side_a = "SELL" - close_side_b = "SELL" - close_side_a = "BUY" - - # If no close signal found, print position and unrealized PnL - if close_row_index is None: - - last_row_index = len(predicted_df) - 1 - - # Use the new method from BacktestResult to handle outstanding positions - BacktestResults.handle_outstanding_position( - pair=pair, - pair_result_df=predicted_df, - last_row_index=last_row_index, - open_side_a=open_side_a, - open_side_b=open_side_b, - open_px_a=open_px_a, - open_px_b=open_px_b, - open_tstamp=open_tstamp, - ) - - # Return only open trades (no close trades) - trd_signal_tuples = [ - ( - open_tstamp, - open_side_a, - pair.symbol_a_, - open_px_a, - open_disequilibrium, - open_scaled_disequilibrium, - pair, - ), - ( - open_tstamp, - open_side_b, - pair.symbol_b_, - open_px_b, - open_disequilibrium, - open_scaled_disequilibrium, - pair, - ), - ] - else: - # Close signal found - create complete trade - close_row = predicted_df.loc[close_row_index] - close_tstamp = close_row["tstamp"] - close_disequilibrium = close_row["disequilibrium"] - close_scaled_disequilibrium = close_row["scaled_disequilibrium"] - close_px_a = close_row[f"{colname_a}"] - close_px_b = close_row[f"{colname_b}"] - - print(f"{pair}: Close signal found at index {close_row_index}") - - trd_signal_tuples = [ - ( - open_tstamp, - open_side_a, - pair.symbol_a_, - open_px_a, - open_disequilibrium, - open_scaled_disequilibrium, - pair, - ), - ( - open_tstamp, - open_side_b, - pair.symbol_b_, - open_px_b, - open_disequilibrium, - open_scaled_disequilibrium, - pair, - ), - ( - close_tstamp, - close_side_a, - pair.symbol_a_, - close_px_a, - close_disequilibrium, - close_scaled_disequilibrium, - pair, - ), - ( - close_tstamp, - close_side_b, - pair.symbol_b_, - close_px_b, - close_disequilibrium, - close_scaled_disequilibrium, - pair, - ), - ] - - # Add tuples to data frame - return pd.DataFrame( - trd_signal_tuples, - columns=trades_columns, - ) - - -def run_all_pairs(config: Dict, datafile: str, price_column: str) -> None: +def run_all_pairs(config: Dict, datafile: str, price_column: str, bt_result: BacktestResult) -> None: def _create_pairs(config: Dict) -> List[TradingPair]: nonlocal datafile @@ -242,8 +47,9 @@ def run_all_pairs(config: Dict, datafile: str, price_column: str) -> None: pairs_trades = [] + strategy = StaticFitStrategy() for pair in _create_pairs(config): - single_pair_trades = run_pair(pair=pair) + single_pair_trades = strategy.run_pair(pair=pair, bt_result=bt_result) if single_pair_trades is not None and len(single_pair_trades) > 0: pairs_trades.append(single_pair_trades) # Check if result_list has any data before concatenating @@ -255,13 +61,14 @@ def run_all_pairs(config: Dict, datafile: str, price_column: str) -> None: result["time"] = pd.to_datetime(result["time"]) result = result.set_index("time").sort_index() - BacktestResults.collect_single_day_results(result) + bt_result.collect_single_day_results(result) # BacktestResults.print_single_day_results() def main() -> None: # Initialize a dictionary to store all trade results all_results: Dict[str, Dict[str, Any]] = {} + bt_results = BacktestResult(config=CONFIG) # Initialize global PnL tracking variables @@ -271,17 +78,17 @@ def main() -> None: print(f"\n====== Processing {datafile} ======") # Clear the TRADES global dictionary and reset unrealized PnL for the new file - BacktestResults.clear_trades() + bt_results.clear_trades() # Process data for this file try: run_all_pairs( - config=CONFIG, datafile=datafile, price_column=price_column + config=CONFIG, datafile=datafile, price_column=price_column, bt_result=bt_results ) # Store results with file name as key filename = datafile.split("/")[-1] - all_results[filename] = {"trades": BacktestResults.trades.copy()} + all_results[filename] = {"trades": bt_results.trades.copy()} print(f"Successfully processed {filename}") @@ -291,10 +98,10 @@ def main() -> None: print(f"Error processing {datafile}: {str(e)}") # BacktestResults.print_results_summary(all_results) - BacktestResults.calculate_returns(all_results) + bt_results.calculate_returns(all_results) # Print grand totals - BacktestResults.print_grand_totals() - BacktestResults.print_outstanding_positions() + bt_results.print_grand_totals() + bt_results.print_outstanding_positions() if __name__ == "__main__": main() \ No newline at end of file diff --git a/src/strategies.py b/src/strategies.py new file mode 100644 index 0000000..42f6907 --- /dev/null +++ b/src/strategies.py @@ -0,0 +1,213 @@ +from abc import ABC, abstractmethod +import sys + +from typing import Dict, Optional + +import pandas as pd + +# ============= statsmodels =================== + +from backtest_configs import CRYPTO_CONFIG +from tools.trading_pair import TradingPair +from results import BacktestResult + +NanoPerMin = 1e9 +UNSET_FLOAT: float = sys.float_info.max +UNSET_INT: int = sys.maxsize + + +CONFIG = CRYPTO_CONFIG +# CONFIG = EQT_CONFIG + + + + +class PairsTradingStrategy(ABC): + TRADES_COLUMNS = [ + "time", + "action", + "symbol", + "price", + "disequilibrium", + "scaled_disequilibrium", + "pair", + ] + @abstractmethod + def run_pair(self, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]: + ... + +class StaticFitStrategy(PairsTradingStrategy): + + def run_pair(self, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]: # abstractmethod + pair.get_datasets(training_minutes=CONFIG["training_minutes"]) + try: + is_cointegrated = pair.train_pair() + if not is_cointegrated: + print(f"{pair} IS NOT COINTEGRATED") + return None + except Exception as e: + print(f"{pair}: Training failed: {str(e)}") + return None + + try: + pair.predict() + except Exception as e: + print(f"{pair}: Prediction failed: {str(e)}") + return None + + pair_trades = self.create_trading_signals(pair=pair, config=CONFIG, result=bt_result) + + return pair_trades + + def create_trading_signals(self, pair: TradingPair, config: Dict, result: BacktestResult) -> pd.DataFrame: + beta = pair.vecm_fit_.beta + colname_a, colname_b = pair.colnames() + + predicted_df = pair.predicted_df_ + + open_threshold = config["dis-equilibrium_open_trshld"] + close_threshold = config["dis-equilibrium_close_trshld"] + + # Iterate through the testing dataset to find the first trading opportunity + open_row_index = None + for row_idx in range(len(predicted_df)): + curr_disequilibrium = predicted_df["scaled_disequilibrium"][row_idx] + + # Check if current row has sufficient disequilibrium (not near-zero) + if curr_disequilibrium >= open_threshold: + open_row_index = row_idx + break + + # If no row with sufficient disequilibrium found, skip this pair + if open_row_index is None: + print(f"{pair}: Insufficient disequilibrium in testing dataset. Skipping.") + return pd.DataFrame() + + # Look for close signal starting from the open position + trading_signals_df = ( + predicted_df["scaled_disequilibrium"][open_row_index:] < close_threshold + ) + + # Adjust indices to account for the offset from open_row_index + close_row_index = None + for idx, value in trading_signals_df.items(): + if value: + close_row_index = idx + break + + open_row = predicted_df.loc[open_row_index] + open_tstamp = open_row["tstamp"] + open_disequilibrium = open_row["disequilibrium"] + open_scaled_disequilibrium = open_row["scaled_disequilibrium"] + open_px_a = open_row[f"{colname_a}"] + open_px_b = open_row[f"{colname_b}"] + + abs_beta = abs(beta[1]) + pred_px_b = predicted_df.loc[open_row_index][f"{colname_b}_pred"] + pred_px_a = predicted_df.loc[open_row_index][f"{colname_a}_pred"] + + if pred_px_b * abs_beta - pred_px_a > 0: + open_side_a = "BUY" + open_side_b = "SELL" + close_side_a = "SELL" + close_side_b = "BUY" + else: + open_side_b = "BUY" + open_side_a = "SELL" + close_side_b = "SELL" + close_side_a = "BUY" + + # If no close signal found, print position and unrealized PnL + if close_row_index is None: + + last_row_index = len(predicted_df) - 1 + + # Use the new method from BacktestResult to handle outstanding positions + result.handle_outstanding_position( + pair=pair, + pair_result_df=predicted_df, + last_row_index=last_row_index, + open_side_a=open_side_a, + open_side_b=open_side_b, + open_px_a=open_px_a, + open_px_b=open_px_b, + open_tstamp=open_tstamp, + ) + + # Return only open trades (no close trades) + trd_signal_tuples = [ + ( + open_tstamp, + open_side_a, + pair.symbol_a_, + open_px_a, + open_disequilibrium, + open_scaled_disequilibrium, + pair, + ), + ( + open_tstamp, + open_side_b, + pair.symbol_b_, + open_px_b, + open_disequilibrium, + open_scaled_disequilibrium, + pair, + ), + ] + else: + # Close signal found - create complete trade + close_row = predicted_df.loc[close_row_index] + close_tstamp = close_row["tstamp"] + close_disequilibrium = close_row["disequilibrium"] + close_scaled_disequilibrium = close_row["scaled_disequilibrium"] + close_px_a = close_row[f"{colname_a}"] + close_px_b = close_row[f"{colname_b}"] + + print(f"{pair}: Close signal found at index {close_row_index}") + + trd_signal_tuples = [ + ( + open_tstamp, + open_side_a, + pair.symbol_a_, + open_px_a, + open_disequilibrium, + open_scaled_disequilibrium, + pair, + ), + ( + open_tstamp, + open_side_b, + pair.symbol_b_, + open_px_b, + open_disequilibrium, + open_scaled_disequilibrium, + pair, + ), + ( + close_tstamp, + close_side_a, + pair.symbol_a_, + close_px_a, + close_disequilibrium, + close_scaled_disequilibrium, + pair, + ), + ( + close_tstamp, + close_side_b, + pair.symbol_b_, + close_px_b, + close_disequilibrium, + close_scaled_disequilibrium, + pair, + ), + ] + + # Add tuples to data frame + return pd.DataFrame( + trd_signal_tuples, + columns=self.TRADES_COLUMNS, + ) +