from __future__ import annotations from typing import Optional import pandas as pd import statsmodels.api as sm from pt_strategy.pt_model import PairsTradingModel, Prediction from pt_strategy.trading_pair import TradingPair class ZScoreOLSModel(PairsTradingModel): zscore_model_: Optional[sm.regression.linear_model.RegressionResultsWrapper] pair_predict_result_: Optional[pd.DataFrame] zscore_df_: Optional[pd.DataFrame] def _fit_zscore(self, pair: TradingPair) -> pd.DataFrame: assert self.training_df_ is not None symbol_a_px_series = self.training_df_[pair.colnames()].iloc[:, 0].astype(float) symbol_b_px_series = self.training_df_[pair.colnames()].iloc[:, 1].astype(float) symbol_a_px_series, symbol_b_px_series = symbol_a_px_series.align( symbol_b_px_series, axis=0 ) X = sm.add_constant(symbol_b_px_series) self.zscore_model_ = sm.OLS(symbol_a_px_series, X).fit() assert self.zscore_model_ is not None hedge_ratio = self.zscore_model_.params.iloc[1] spread = symbol_a_px_series - hedge_ratio * symbol_b_px_series return pd.DataFrame((spread - spread.mean()) / spread.std()) def predict(self, pair: TradingPair) -> Prediction: self.training_df_ = pair.market_data_.copy() zscore_df = self._fit_zscore(pair=pair) assert zscore_df is not None # zscore is both disequilibrium and scaled_disequilibrium self.training_df_["dis-equilibrium"] = zscore_df[0] self.training_df_["scaled_dis-equilibrium"] = zscore_df[0] assert zscore_df is not None return Prediction( tstamp_=pair.market_data_.index[-1], disequilibrium_=self.training_df_["dis-equilibrium"].iloc[-1], scaled_disequilibrium_=self.training_df_["scaled_dis-equilibrium"].iloc[-1], pair_=pair, )