2025-08-21 00:46:28 +00:00

105 lines
3.8 KiB
Python

from __future__ import annotations
from typing import Optional
import pandas as pd
import statsmodels.api as sm
from pt_strategy.pt_model import PairsTradingModel, Prediction
from pt_strategy.trading_pair import TradingPair
class OLSModel(PairsTradingModel):
model_: Optional[sm.regression.linear_model.RegressionResultsWrapper]
pair_predict_result_: Optional[pd.DataFrame]
zscore_df_: Optional[pd.DataFrame]
def predict(self, pair: TradingPair) -> Prediction:
self.training_df_ = pair.market_data_.copy()
zscore_df = self._fit_zscore(pair=pair)
assert zscore_df is not None
# zscore is both disequilibrium and scaled_disequilibrium
self.training_df_["dis-equilibrium"] = zscore_df[0]
self.training_df_["scaled_dis-equilibrium"] = zscore_df[0]
assert zscore_df is not None
return Prediction(
tstamp=pair.market_data_.iloc[-1]["tstamp"],
disequilibrium=self.training_df_["dis-equilibrium"].iloc[-1],
scaled_disequilibrium=self.training_df_["scaled_dis-equilibrium"].iloc[-1],
)
def _fit_zscore(self, pair: TradingPair) -> pd.DataFrame:
assert self.training_df_ is not None
symbol_a_px_series = self.training_df_[pair.colnames()].iloc[:, 0]
symbol_b_px_series = self.training_df_[pair.colnames()].iloc[:, 1]
symbol_a_px_series, symbol_b_px_series = symbol_a_px_series.align(
symbol_b_px_series, axis=0
)
X = sm.add_constant(symbol_b_px_series)
self.model_ = sm.OLS(symbol_a_px_series, X).fit()
assert self.model_ is not None
# alternate way would be to use models residuals (will give identical results)
# alpha, beta = self.model_.params
# spread = symbol_a_px_series - (alpha + beta * symbol_b_px_series)
spread = self.model_.resid
return pd.DataFrame((spread - spread.mean()) / spread.std())
class VECMModel(PairsTradingModel):
def predict(self, pair: TradingPair) -> Prediction:
self.training_df_ = pair.market_data_.copy()
assert self.training_df_ is not None
vecm_fit = self._fit_VECM(pair=pair)
assert vecm_fit is not None
predicted_prices = vecm_fit.predict(steps=1)
# Convert prediction to a DataFrame for readability
predicted_df = pd.DataFrame(
predicted_prices, columns=pd.Index(pair.colnames()), dtype=float
)
disequilibrium = (predicted_df[pair.colnames()] @ vecm_fit.beta)[0][0]
scaled_disequilibrium = (disequilibrium - self.training_mu_) / self.training_std_
return Prediction(
tstamp=pair.market_data_.iloc[-1]["tstamp"],
disequilibrium=disequilibrium,
scaled_disequilibrium=scaled_disequilibrium,
)
def _fit_VECM(self, pair: TradingPair) -> VECMResults: # type: ignore
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults
vecm_df = self.training_df_[pair.colnames()].reset_index(drop=True)
vecm_model = VECM(vecm_df, coint_rank=1)
vecm_fit = vecm_model.fit()
assert vecm_fit is not None
# Check if the model converged properly
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
print(f"{self}: VECM model failed to converge properly")
diseq_series = self.training_df_[pair.colnames()] @ vecm_fit.beta
# print(diseq_series.shape)
self.training_mu_ = float(diseq_series[0].mean())
self.training_std_ = float(diseq_series[0].std())
self.training_df_["dis-equilibrium"] = (
self.training_df_[pair.colnames()] @ vecm_fit.beta
)
# Normalize the dis-equilibrium
self.training_df_["scaled_dis-equilibrium"] = (
diseq_series - self.training_mu_
) / self.training_std_
return vecm_fit