from abc import ABC, abstractmethod import sys from typing import Dict, Optional import pandas as pd from tools.trading_pair import TradingPair from results import BacktestResult NanoPerMin = 1e9 class PairsTradingStrategy(ABC): TRADES_COLUMNS = [ "time", "action", "symbol", "price", "disequilibrium", "scaled_disequilibrium", "pair", ] @abstractmethod def run_pair(self, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]: ... class StaticFitStrategy(PairsTradingStrategy): def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]: # abstractmethod pair.get_datasets(training_minutes=config["training_minutes"]) try: is_cointegrated = pair.train_pair() if not is_cointegrated: print(f"{pair} IS NOT COINTEGRATED") return None except Exception as e: print(f"{pair}: Training failed: {str(e)}") return None try: pair.predict() except Exception as e: print(f"{pair}: Prediction failed: {str(e)}") return None pair_trades = self.create_trading_signals(pair=pair, config=config, result=bt_result) return pair_trades def create_trading_signals(self, pair: TradingPair, config: Dict, result: BacktestResult) -> pd.DataFrame: beta = pair.vecm_fit_.beta colname_a, colname_b = pair.colnames() predicted_df = pair.predicted_df_ open_threshold = config["dis-equilibrium_open_trshld"] close_threshold = config["dis-equilibrium_close_trshld"] # Iterate through the testing dataset to find the first trading opportunity open_row_index = None for row_idx in range(len(predicted_df)): curr_disequilibrium = predicted_df["scaled_disequilibrium"][row_idx] # Check if current row has sufficient disequilibrium (not near-zero) if curr_disequilibrium >= open_threshold: open_row_index = row_idx break # If no row with sufficient disequilibrium found, skip this pair if open_row_index is None: print(f"{pair}: Insufficient disequilibrium in testing dataset. Skipping.") return pd.DataFrame() # Look for close signal starting from the open position trading_signals_df = ( predicted_df["scaled_disequilibrium"][open_row_index:] < close_threshold ) # Adjust indices to account for the offset from open_row_index close_row_index = None for idx, value in trading_signals_df.items(): if value: close_row_index = idx break open_row = predicted_df.loc[open_row_index] open_tstamp = open_row["tstamp"] open_disequilibrium = open_row["disequilibrium"] open_scaled_disequilibrium = open_row["scaled_disequilibrium"] open_px_a = open_row[f"{colname_a}"] open_px_b = open_row[f"{colname_b}"] abs_beta = abs(beta[1]) pred_px_b = predicted_df.loc[open_row_index][f"{colname_b}_pred"] pred_px_a = predicted_df.loc[open_row_index][f"{colname_a}_pred"] if pred_px_b * abs_beta - pred_px_a > 0: open_side_a = "BUY" open_side_b = "SELL" close_side_a = "SELL" close_side_b = "BUY" else: open_side_b = "BUY" open_side_a = "SELL" close_side_b = "SELL" close_side_a = "BUY" # If no close signal found, print position and unrealized PnL if close_row_index is None: last_row_index = len(predicted_df) - 1 # Use the new method from BacktestResult to handle outstanding positions result.handle_outstanding_position( pair=pair, pair_result_df=predicted_df, last_row_index=last_row_index, open_side_a=open_side_a, open_side_b=open_side_b, open_px_a=open_px_a, open_px_b=open_px_b, open_tstamp=open_tstamp, ) # Return only open trades (no close trades) trd_signal_tuples = [ ( open_tstamp, open_side_a, pair.symbol_a_, open_px_a, open_disequilibrium, open_scaled_disequilibrium, pair, ), ( open_tstamp, open_side_b, pair.symbol_b_, open_px_b, open_disequilibrium, open_scaled_disequilibrium, pair, ), ] else: # Close signal found - create complete trade close_row = predicted_df.loc[close_row_index] close_tstamp = close_row["tstamp"] close_disequilibrium = close_row["disequilibrium"] close_scaled_disequilibrium = close_row["scaled_disequilibrium"] close_px_a = close_row[f"{colname_a}"] close_px_b = close_row[f"{colname_b}"] print(f"{pair}: Close signal found at index {close_row_index}") trd_signal_tuples = [ ( open_tstamp, open_side_a, pair.symbol_a_, open_px_a, open_disequilibrium, open_scaled_disequilibrium, pair, ), ( open_tstamp, open_side_b, pair.symbol_b_, open_px_b, open_disequilibrium, open_scaled_disequilibrium, pair, ), ( close_tstamp, close_side_a, pair.symbol_a_, close_px_a, close_disequilibrium, close_scaled_disequilibrium, pair, ), ( close_tstamp, close_side_b, pair.symbol_b_, close_px_b, close_disequilibrium, close_scaled_disequilibrium, pair, ), ] # Add tuples to data frame return pd.DataFrame( trd_signal_tuples, columns=self.TRADES_COLUMNS, ) class SlidingFitStrategy(PairsTradingStrategy): def __init__(self): super().__init__() self.curr_training_start_idx_ = 0 def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]: pair.user_data_['is_position_open'] = False training_minutes = config["training_minutes"] while True: pair.get_datasets( training_minutes=training_minutes, training_start_index=self.curr_training_start_idx_, testing_size=1 ) if len(pair.training_df_) < training_minutes: print(f"{pair}: Not enough training data. Completing the job.") break try: is_cointegrated = pair.train_pair() if not is_cointegrated: print(f"{pair} IS NOT COINTEGRATED") return None except Exception as e: print(f"{pair}: Training failed: {str(e)}") return None try: pair.predict() except Exception as e: print(f"{pair}: Prediction failed: {str(e)}") return None pair_trades = self.create_trading_signals(pair=pair, config=config, result=bt_result) return pair_trades