from __future__ import annotations from typing import Optional import pandas as pd import statsmodels.api as sm from pt_strategy.pt_model import PairsTradingModel, Prediction from pt_strategy.trading_pair import TradingPair class OLSModel(PairsTradingModel): zscore_model_: Optional[sm.regression.linear_model.RegressionResultsWrapper] pair_predict_result_: Optional[pd.DataFrame] zscore_df_: Optional[pd.DataFrame] def predict(self, pair: TradingPair) -> Prediction: self.training_df_ = pair.market_data_.copy() zscore_df = self._fit_zscore(pair=pair) assert zscore_df is not None # zscore is both disequilibrium and scaled_disequilibrium self.training_df_["dis-equilibrium"] = zscore_df[0] self.training_df_["scaled_dis-equilibrium"] = zscore_df[0] assert zscore_df is not None return Prediction( tstamp=pair.market_data_.iloc[-1]["tstamp"], disequilibrium=self.training_df_["dis-equilibrium"].iloc[-1], scaled_disequilibrium=self.training_df_["scaled_dis-equilibrium"].iloc[-1], ) def _fit_zscore(self, pair: TradingPair) -> pd.DataFrame: assert self.training_df_ is not None symbol_a_px_series = self.training_df_[pair.colnames()].iloc[:, 0] symbol_b_px_series = self.training_df_[pair.colnames()].iloc[:, 1] symbol_a_px_series, symbol_b_px_series = symbol_a_px_series.align( symbol_b_px_series, axis=0 ) X = sm.add_constant(symbol_b_px_series) self.zscore_model_ = sm.OLS(symbol_a_px_series, X).fit() assert self.zscore_model_ is not None hedge_ratio = self.zscore_model_.params.iloc[1] spread = symbol_a_px_series - hedge_ratio * symbol_b_px_series return pd.DataFrame((spread - spread.mean()) / spread.std()) class VECMModel(PairsTradingModel): def predict(self, pair: TradingPair) -> Prediction: self.training_df_ = pair.market_data_.copy() assert self.training_df_ is not None vecm_fit = self._fit_VECM(pair=pair) assert vecm_fit is not None predicted_prices = vecm_fit.predict(steps=1) # Convert prediction to a DataFrame for readability predicted_df = pd.DataFrame( predicted_prices, columns=pd.Index(pair.colnames()), dtype=float ) disequilibrium = (predicted_df[pair.colnames()] @ vecm_fit.beta)[0][0] scaled_disequilibrium = (disequilibrium - self.training_mu_) / self.training_std_ return Prediction( tstamp=pair.market_data_.iloc[-1]["tstamp"], disequilibrium=disequilibrium, scaled_disequilibrium=scaled_disequilibrium, ) def _fit_VECM(self, pair: TradingPair) -> VECMResults: # type: ignore from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults vecm_df = self.training_df_[pair.colnames()].reset_index(drop=True) vecm_model = VECM(vecm_df, coint_rank=1) vecm_fit = vecm_model.fit() assert vecm_fit is not None # Check if the model converged properly if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None: print(f"{self}: VECM model failed to converge properly") diseq_series = self.training_df_[pair.colnames()] @ vecm_fit.beta # print(diseq_series.shape) self.training_mu_ = float(diseq_series[0].mean()) self.training_std_ = float(diseq_series[0].std()) self.training_df_["dis-equilibrium"] = ( self.training_df_[pair.colnames()] @ vecm_fit.beta ) # Normalize the dis-equilibrium self.training_df_["scaled_dis-equilibrium"] = ( diseq_series - self.training_mu_ ) / self.training_std_ return vecm_fit