from typing import Any, Dict, Optional, cast import pandas as pd from pt_trading.results import BacktestResult from pt_trading.rolling_window_fit import RollingFit from pt_trading.trading_pair import TradingPair from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults NanoPerMin = 1e9 class VECMTradingPair(TradingPair): vecm_fit_: Optional[VECMResults] pair_predict_result_: Optional[pd.DataFrame] def __init__(self, config: Dict[str, Any], market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str): super().__init__(config, market_data, symbol_a, symbol_b, price_column) self.vecm_fit_ = None self.pair_predict_result_ = None def _train_pair(self) -> None: self._fit_VECM() assert self.vecm_fit_ is not None diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta # print(diseq_series.shape) self.training_mu_ = float(diseq_series[0].mean()) self.training_std_ = float(diseq_series[0].std()) self.training_df_["dis-equilibrium"] = ( self.training_df_[self.colnames()] @ self.vecm_fit_.beta ) # Normalize the dis-equilibrium self.training_df_["scaled_dis-equilibrium"] = ( diseq_series - self.training_mu_ ) / self.training_std_ def _fit_VECM(self) -> None: assert self.training_df_ is not None vecm_df = self.training_df_[self.colnames()].reset_index(drop=True) vecm_model = VECM(vecm_df, coint_rank=1) vecm_fit = vecm_model.fit() assert vecm_fit is not None # URGENT check beta and alpha # Check if the model converged properly if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None: print(f"{self}: VECM model failed to converge properly") self.vecm_fit_ = vecm_fit pass def predict(self) -> pd.DataFrame: self._train_pair() assert self.testing_df_ is not None assert self.vecm_fit_ is not None predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_)) # Convert prediction to a DataFrame for readability predicted_df = pd.DataFrame( predicted_prices, columns=pd.Index(self.colnames()), dtype=float ) predicted_df = pd.merge( self.testing_df_.reset_index(drop=True), pd.DataFrame( predicted_prices, columns=pd.Index(self.colnames()), dtype=float ), left_index=True, right_index=True, suffixes=("", "_pred"), ).dropna() predicted_df["disequilibrium"] = ( predicted_df[self.colnames()] @ self.vecm_fit_.beta ) predicted_df["scaled_disequilibrium"] = ( abs(predicted_df["disequilibrium"] - self.training_mu_) / self.training_std_ ) predicted_df = predicted_df.reset_index(drop=True) if self.pair_predict_result_ is None: self.pair_predict_result_ = predicted_df else: self.pair_predict_result_ = pd.concat([self.pair_predict_result_, predicted_df], ignore_index=True) # Reset index to ensure proper indexing self.pair_predict_result_ = self.pair_predict_result_.reset_index(drop=True) return self.pair_predict_result_ class VECMRollingFit(RollingFit): def __init__(self) -> None: super().__init__() def run_pair( self, pair: TradingPair, bt_result: BacktestResult ) -> Optional[pd.DataFrame]: return super().run_pair(pair, bt_result) def create_trading_pair( self, config: Dict, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str ) -> TradingPair: return VECMTradingPair( config=config, market_data=market_data, symbol_a=symbol_a, symbol_b=symbol_b, price_column=price_column )