from abc import ABC, abstractmethod from enum import Enum from typing import Dict, Optional, cast import pandas as pd # type: ignore[import] from pt_trading.results import BacktestResult from pt_trading.trading_pair import TradingPair NanoPerMin = 1e9 class PairsTradingFitMethod(ABC): TRADES_COLUMNS = [ "time", "action", "symbol", "price", "disequilibrium", "scaled_disequilibrium", "pair", ] @abstractmethod def run_pair( self, config: Dict, pair: TradingPair, bt_result: BacktestResult ) -> Optional[pd.DataFrame]: ... @abstractmethod def reset(self) -> None: ... class StaticFit(PairsTradingFitMethod): def run_pair( self, config: Dict, pair: TradingPair, bt_result: BacktestResult ) -> Optional[pd.DataFrame]: # abstractmethod pair.get_datasets(training_minutes=config["training_minutes"]) try: is_cointegrated = pair.train_pair() if not is_cointegrated: print(f"{pair} IS NOT COINTEGRATED") return None except Exception as e: print(f"{pair}: Training failed: {str(e)}") return None try: pair.predict() except Exception as e: print(f"{pair}: Prediction failed: {str(e)}") return None pair_trades = self.create_trading_signals( pair=pair, config=config, result=bt_result ) return pair_trades def create_trading_signals( self, pair: TradingPair, config: Dict, result: BacktestResult ) -> pd.DataFrame: beta = pair.vecm_fit_.beta # type: ignore colname_a, colname_b = pair.colnames() predicted_df = pair.predicted_df_ if predicted_df is None: # Return empty DataFrame with correct columns and dtypes return pd.DataFrame(columns=self.TRADES_COLUMNS).astype({ "time": "datetime64[ns]", "action": "string", "symbol": "string", "price": "float64", "disequilibrium": "float64", "scaled_disequilibrium": "float64", "pair": "object" }) open_threshold = config["dis-equilibrium_open_trshld"] close_threshold = config["dis-equilibrium_close_trshld"] # Iterate through the testing dataset to find the first trading opportunity open_row_index = None for row_idx in range(len(predicted_df)): curr_disequilibrium = predicted_df["scaled_disequilibrium"][row_idx] # Check if current row has sufficient disequilibrium (not near-zero) if curr_disequilibrium >= open_threshold: open_row_index = row_idx break # If no row with sufficient disequilibrium found, skip this pair if open_row_index is None: print(f"{pair}: Insufficient disequilibrium in testing dataset. Skipping.") return pd.DataFrame() # Look for close signal starting from the open position trading_signals_df = ( predicted_df["scaled_disequilibrium"][open_row_index:] < close_threshold ) # Adjust indices to account for the offset from open_row_index close_row_index = None for idx, value in trading_signals_df.items(): if value: close_row_index = idx break open_row = predicted_df.loc[open_row_index] open_px_a = predicted_df.at[open_row_index, f"{colname_a}"] open_px_b = predicted_df.at[open_row_index, f"{colname_b}"] open_tstamp = predicted_df.at[open_row_index, "tstamp"] open_disequilibrium = open_row["disequilibrium"] open_scaled_disequilibrium = open_row["scaled_disequilibrium"] abs_beta = abs(beta[1]) pred_px_b = predicted_df.loc[open_row_index][f"{colname_b}_pred"] pred_px_a = predicted_df.loc[open_row_index][f"{colname_a}_pred"] if pred_px_b * abs_beta - pred_px_a > 0: open_side_a = "BUY" open_side_b = "SELL" close_side_a = "SELL" close_side_b = "BUY" else: open_side_b = "BUY" open_side_a = "SELL" close_side_b = "SELL" close_side_a = "BUY" # If no close signal found, print position and unrealized PnL if close_row_index is None: last_row_index = len(predicted_df) - 1 # Use the new method from BacktestResult to handle outstanding positions result.handle_outstanding_position( pair=pair, pair_result_df=predicted_df, last_row_index=last_row_index, open_side_a=open_side_a, open_side_b=open_side_b, open_px_a=float(open_px_a), open_px_b=float(open_px_b), open_tstamp=pd.Timestamp(open_tstamp), ) # Return only open trades (no close trades) trd_signal_tuples = [ ( open_tstamp, open_side_a, pair.symbol_a_, open_px_a, open_disequilibrium, open_scaled_disequilibrium, pair, ), ( open_tstamp, open_side_b, pair.symbol_b_, open_px_b, open_disequilibrium, open_scaled_disequilibrium, pair, ), ] else: # Close signal found - create complete trade close_row = predicted_df.loc[close_row_index] close_tstamp = close_row["tstamp"] close_disequilibrium = close_row["disequilibrium"] close_scaled_disequilibrium = close_row["scaled_disequilibrium"] close_px_a = close_row[f"{colname_a}"] close_px_b = close_row[f"{colname_b}"] print(f"{pair}: Close signal found at index {close_row_index}") trd_signal_tuples = [ ( open_tstamp, open_side_a, pair.symbol_a_, open_px_a, open_disequilibrium, open_scaled_disequilibrium, pair, ), ( open_tstamp, open_side_b, pair.symbol_b_, open_px_b, open_disequilibrium, open_scaled_disequilibrium, pair, ), ( close_tstamp, close_side_a, pair.symbol_a_, close_px_a, close_disequilibrium, close_scaled_disequilibrium, pair, ), ( close_tstamp, close_side_b, pair.symbol_b_, close_px_b, close_disequilibrium, close_scaled_disequilibrium, pair, ), ] # Add tuples to data frame with explicit dtypes to avoid concatenation warnings df = pd.DataFrame( trd_signal_tuples, columns=self.TRADES_COLUMNS, ) # Ensure consistent dtypes return df.astype({ "time": "datetime64[ns]", "action": "string", "symbol": "string", "price": "float64", "disequilibrium": "float64", "scaled_disequilibrium": "float64", "pair": "object" }) def reset(self) -> None: pass class PairState(Enum): INITIAL = 1 OPEN = 2 CLOSED = 3 class SlidingFit(PairsTradingFitMethod): def __init__(self) -> None: super().__init__() self.curr_training_start_idx_ = 0 def run_pair( self, config: Dict, pair: TradingPair, bt_result: BacktestResult ) -> Optional[pd.DataFrame]: print(f"***{pair}*** STARTING....") pair.user_data_["state"] = PairState.INITIAL # Initialize trades DataFrame with proper dtypes to avoid concatenation warnings pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS).astype({ "time": "datetime64[ns]", "action": "string", "symbol": "string", "price": "float64", "disequilibrium": "float64", "scaled_disequilibrium": "float64", "pair": "object" }) pair.user_data_["is_cointegrated"] = False training_minutes = config["training_minutes"] curr_predicted_row_idx = 0 while True: print(self.curr_training_start_idx_, end="\r") pair.get_datasets( training_minutes=training_minutes, training_start_index=self.curr_training_start_idx_, testing_size=1, ) if len(pair.training_df_) < training_minutes: print( f"{pair}: current offset={self.curr_training_start_idx_}" f" * Training data length={len(pair.training_df_)} < {training_minutes}" " * Not enough training data. Completing the job." ) # if pair.user_data_["state"] == PairState.OPEN: # print( # f"{pair}: {self.curr_training_start_idx_} Position is not closed." # ) # # outstanding positions # # last_row_index = self.curr_training_start_idx_ + training_minutes # if pair.predicted_df_ is not None: # bt_result.handle_outstanding_position( # pair=pair, # pair_result_df=pair.predicted_df_, # last_row_index=0, # open_side_a=pair.user_data_["open_side_a"], # open_side_b=pair.user_data_["open_side_b"], # open_px_a=pair.user_data_["open_px_a"], # open_px_b=pair.user_data_["open_px_b"], # open_tstamp=pair.user_data_["open_tstamp"], # ) break try: is_cointegrated = pair.train_pair() except Exception as e: raise RuntimeError(f"{pair}: Training failed: {str(e)}") from e if pair.user_data_["is_cointegrated"] != is_cointegrated: pair.user_data_["is_cointegrated"] = is_cointegrated if not is_cointegrated: if pair.user_data_["state"] == PairState.OPEN: print( f"{pair} {self.curr_training_start_idx_} LOST COINTEGRATION. Consider closing positions..." ) else: print( f"{pair} {self.curr_training_start_idx_} IS NOT COINTEGRATED. Moving on" ) else: print("*" * 80) print( f"Pair {pair} ({self.curr_training_start_idx_}) IS COINTEGRATED" ) print("*" * 80) if not is_cointegrated: self.curr_training_start_idx_ += 1 continue try: pair.predict() except Exception as e: raise RuntimeError(f"{pair}: Prediction failed: {str(e)}") from e # break self.curr_training_start_idx_ += 1 curr_predicted_row_idx += 1 self._create_trading_signals(pair, config, bt_result) print(f"***{pair}*** FINISHED ... {len(pair.user_data_['trades'])}") return pair.get_trades() def _create_trading_signals( self, pair: TradingPair, config: Dict, bt_result: BacktestResult ) -> None: if pair.predicted_df_ is None: print(f"{pair.market_data_.iloc[0]['tstamp']} {pair}: No predicted data") return open_threshold = config["dis-equilibrium_open_trshld"] close_threshold = config["dis-equilibrium_close_trshld"] for curr_predicted_row_idx in range(len(pair.predicted_df_)): pred_row = pair.predicted_df_.iloc[curr_predicted_row_idx] if pair.user_data_["state"] in [PairState.INITIAL, PairState.CLOSED]: open_trades = self._get_open_trades( pair, row=pred_row, open_threshold=open_threshold ) if open_trades is not None: open_trades["status"] = "OPEN" print(f"OPEN TRADES:\n{open_trades}") pair.add_trades(open_trades) pair.user_data_["state"] = PairState.OPEN elif pair.user_data_["state"] == PairState.OPEN: close_trades = self._get_close_trades( pair, row=pred_row, close_threshold=close_threshold ) if close_trades is not None: close_trades["status"] = "CLOSE" print(f"CLOSE TRADES:\n{close_trades}") pair.add_trades(close_trades) pair.user_data_["state"] = PairState.CLOSED # Outstanding positions if pair.user_data_["state"] == PairState.OPEN: print( f"{pair}: *** Position is NOT CLOSED. ***" ) # outstanding positions # last_row_index = self.curr_training_start_idx_ + training_minutes if pair.predicted_df_ is not None: bt_result.handle_outstanding_position( pair=pair, pair_result_df=pair.predicted_df_, last_row_index=0, open_side_a=pair.user_data_["open_side_a"], open_side_b=pair.user_data_["open_side_b"], open_px_a=pair.user_data_["open_px_a"], open_px_b=pair.user_data_["open_px_b"], open_tstamp=pair.user_data_["open_tstamp"], ) def _get_open_trades( self, pair: TradingPair, row: pd.Series, open_threshold: float ) -> Optional[pd.DataFrame]: colname_a, colname_b = pair.colnames() assert pair.predicted_df_ is not None predicted_df = pair.predicted_df_ # Check if we have any data to work with if len(predicted_df) == 0: return None open_row = row open_tstamp = open_row["tstamp"] open_disequilibrium = open_row["disequilibrium"] open_scaled_disequilibrium = open_row["scaled_disequilibrium"] open_px_a = open_row[f"{colname_a}"] open_px_b = open_row[f"{colname_b}"] # Ensure scalars for handle_outstanding_position # if isinstance(open_px_a, pd.Series): # open_px_a = open_px_a.iloc[0] # if isinstance(open_px_b, pd.Series): # open_px_b = open_px_b.iloc[0] # if isinstance(open_tstamp, pd.Series): # open_tstamp = open_tstamp.iloc[0] # open_px_a = float(open_px_a) # open_px_b = float(open_px_b) # open_tstamp = pd.Timestamp(open_tstamp) if open_scaled_disequilibrium < open_threshold: return None # creating the trades print(f"OPEN_TRADES: {row["tstamp"]} {open_scaled_disequilibrium=}") if open_disequilibrium > 0: open_side_a = "SELL" open_side_b = "BUY" close_side_a = "BUY" close_side_b = "SELL" else: open_side_a = "BUY" open_side_b = "SELL" close_side_a = "SELL" close_side_b = "BUY" # save closing sides pair.user_data_["open_side_a"] = open_side_a pair.user_data_["open_side_b"] = open_side_b pair.user_data_["open_px_a"] = open_px_a pair.user_data_["open_px_b"] = open_px_b pair.user_data_["open_tstamp"] = open_tstamp pair.user_data_["close_side_a"] = close_side_a pair.user_data_["close_side_b"] = close_side_b # create opening trades trd_signal_tuples = [ ( open_tstamp, open_side_a, pair.symbol_a_, open_px_a, open_disequilibrium, open_scaled_disequilibrium, pair, ), ( open_tstamp, open_side_b, pair.symbol_b_, open_px_b, open_disequilibrium, open_scaled_disequilibrium, pair, ), ] # Create DataFrame with explicit dtypes to avoid concatenation warnings df = pd.DataFrame( trd_signal_tuples, columns=self.TRADES_COLUMNS, ) # Ensure consistent dtypes return df.astype({ "time": "datetime64[ns]", "action": "string", "symbol": "string", "price": "float64", "disequilibrium": "float64", "scaled_disequilibrium": "float64", "pair": "object" }) def _get_close_trades( self, pair: TradingPair, row: pd.Series, close_threshold: float ) -> Optional[pd.DataFrame]: colname_a, colname_b = pair.colnames() assert pair.predicted_df_ is not None if len(pair.predicted_df_) == 0: return None close_row = row close_tstamp = close_row["tstamp"] close_disequilibrium = close_row["disequilibrium"] close_scaled_disequilibrium = close_row["scaled_disequilibrium"] close_px_a = close_row[f"{colname_a}"] close_px_b = close_row[f"{colname_b}"] close_side_a = pair.user_data_["close_side_a"] close_side_b = pair.user_data_["close_side_b"] if close_scaled_disequilibrium > close_threshold: return None trd_signal_tuples = [ ( close_tstamp, close_side_a, pair.symbol_a_, close_px_a, close_disequilibrium, close_scaled_disequilibrium, pair, ), ( close_tstamp, close_side_b, pair.symbol_b_, close_px_b, close_disequilibrium, close_scaled_disequilibrium, pair, ), ] # Add tuples to data frame with explicit dtypes to avoid concatenation warnings df = pd.DataFrame( trd_signal_tuples, columns=self.TRADES_COLUMNS, ) # Ensure consistent dtypes return df.astype({ "time": "datetime64[ns]", "action": "string", "symbol": "string", "price": "float64", "disequilibrium": "float64", "scaled_disequilibrium": "float64", "pair": "object" }) def reset(self) -> None: self.curr_training_start_idx_ = 0