pairs_trading/src/trading/trading_pair.py
Oleg Sheynin 352f7df269 progress
2025-07-07 22:00:57 +00:00

209 lines
7.3 KiB
Python

from typing import Any, Dict, List, Optional
import pandas as pd # type:ignore
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults # type:ignore
class TradingPair:
market_data_: pd.DataFrame
symbol_a_: str
symbol_b_: str
price_column_: str
training_mu_: float
training_std_: float
training_df_: pd.DataFrame
testing_df_: pd.DataFrame
vecm_fit_: VECMResults
user_data_: Dict[str, Any]
def __init__(
self, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str
):
self.symbol_a_ = symbol_a
self.symbol_b_ = symbol_b
self.price_column_ = price_column
self.market_data_ = pd.DataFrame(
self._transform_dataframe(market_data)[["tstamp"] + self.colnames()]
)
self.user_data_ = {}
def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
# Select only the columns we need
df_selected: pd.DataFrame = pd.DataFrame(
df[["tstamp", "symbol", self.price_column_]]
)
# Start with unique timestamps
result_df: pd.DataFrame = (
pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True)
)
# For each unique symbol, add a corresponding close price column
symbols = df_selected["symbol"].unique()
for symbol in symbols:
# Filter rows for this symbol
df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(
drop=True
)
# Create column name like "close-COIN"
new_price_column = f"{self.price_column_}_{symbol}"
# Create temporary dataframe with timestamp and price
temp_df = pd.DataFrame(
{
"tstamp": df_symbol["tstamp"],
new_price_column: df_symbol[self.price_column_],
}
)
# Join with our result dataframe
result_df = pd.merge(result_df, temp_df, on="tstamp", how="left")
result_df = result_df.reset_index(
drop=True
) # do not dropna() since irrelevant symbol would affect dataset
return result_df
def get_datasets(
self,
training_minutes: int,
training_start_index: int = 0,
testing_size: Optional[int] = None,
) -> None:
testing_start_index = training_start_index + training_minutes
self.training_df_ = self.market_data_.iloc[
training_start_index:testing_start_index, :
].copy()
assert self.training_df_ is not None
self.training_df_ = self.training_df_.dropna().reset_index(drop=True)
testing_start_index = training_start_index + training_minutes
if testing_size is None:
self.testing_df_ = self.market_data_.iloc[testing_start_index:, :].copy()
else:
self.testing_df_ = self.market_data_.iloc[
testing_start_index : testing_start_index + testing_size, :
].copy()
assert self.testing_df_ is not None
self.testing_df_ = self.testing_df_.dropna().reset_index(drop=True)
def colnames(self) -> List[str]:
return [
f"{self.price_column_}_{self.symbol_a_}",
f"{self.price_column_}_{self.symbol_b_}",
]
def fit_VECM(self):
assert self.training_df_ is not None
vecm_df = self.training_df_[self.colnames()].reset_index(drop=True)
vecm_model = VECM(vecm_df, coint_rank=1)
vecm_fit = vecm_model.fit()
assert vecm_fit is not None
# URGENT check beta and alpha
# Check if the model converged properly
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
print(f"{self}: VECM model failed to converge properly")
self.vecm_fit_ = vecm_fit
# print(f"{self}: beta={self.vecm_fit_.beta} alpha={self.vecm_fit_.alpha}" )
# print(f"{self}: {self.vecm_fit_.summary()}")
pass
def check_cointegration_johansen(self):
assert self.training_df_ is not None
from statsmodels.tsa.vector_ar.vecm import coint_johansen
df = self.training_df_[self.colnames()].reset_index(drop=True)
result = coint_johansen(df, det_order=0, k_ar_diff=1)
print(
f"{self}: lr1={result.lr1[0]} > cvt={result.cvt[0, 1]}? {result.lr1[0] > result.cvt[0, 1]}"
)
is_cointegrated = result.lr1[0] > result.cvt[0, 1]
return is_cointegrated
def check_cointegration_engle_granger(self):
from statsmodels.tsa.stattools import coint
col1, col2 = self.colnames()
assert self.training_df_ is not None
series1 = self.training_df_[col1].reset_index(drop=True)
series2 = self.training_df_[col2].reset_index(drop=True)
# Run Engle-Granger cointegration test
pvalue = coint(series1, series2)[1]
# Define cointegration if p-value < 0.05 (i.e., reject null of no cointegration)
is_cointegrated = pvalue < 0.05
print(f"{self}: is_cointegrated={is_cointegrated} pvalue={pvalue}")
return is_cointegrated
def train_pair(self) -> bool:
is_cointegrated_johansen = self.check_cointegration_johansen()
is_cointegrated_engle_granger = self.check_cointegration_engle_granger()
if not is_cointegrated_johansen and not is_cointegrated_engle_granger:
return False
pass
# print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80)
self.fit_VECM()
assert self.training_df_ is not None and self.vecm_fit_ is not None
diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
print(diseq_series.shape)
self.training_mu_ = float(diseq_series[0].mean())
self.training_std_ = float(diseq_series[0].std())
self.training_df_["dis-equilibrium"] = (
self.training_df_[self.colnames()] @ self.vecm_fit_.beta
)
# Normalize the dis-equilibrium
self.training_df_["scaled_dis-equilibrium"] = (
diseq_series - self.training_mu_
) / self.training_std_
return True
def predict(self) -> pd.DataFrame:
assert self.testing_df_ is not None
assert self.vecm_fit_ is not None
predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_))
# Convert prediction to a DataFrame for readability
# predicted_df =
self.predicted_df_ = pd.merge(
self.testing_df_.reset_index(drop=True),
pd.DataFrame(
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
),
left_index=True,
right_index=True,
suffixes=("", "_pred"),
).dropna()
self.predicted_df_["disequilibrium"] = (
self.predicted_df_[self.colnames()] @ self.vecm_fit_.beta
)
self.predicted_df_["scaled_disequilibrium"] = (
abs(self.predicted_df_["disequilibrium"] - self.training_mu_)
/ self.training_std_
)
# Reset index to ensure proper indexing
self.predicted_df_ = self.predicted_df_.reset_index()
return self.predicted_df_
def __repr__(self) -> str:
return f"{self.symbol_a_} & {self.symbol_b_}"