pairs_trading/lib/pt_trading/trading_pair.py
Oleg Sheynin bff1c54b48 fix
2025-07-15 03:57:18 +00:00

267 lines
9.9 KiB
Python

from typing import Any, Dict, List, Optional
import pandas as pd # type:ignore
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults # type:ignore
class TradingPair:
market_data_: pd.DataFrame
symbol_a_: str
symbol_b_: str
price_column_: str
training_mu_: float
training_std_: float
training_df_: pd.DataFrame
testing_df_: pd.DataFrame
vecm_fit_: VECMResults
user_data_: Dict[str, Any]
predicted_df_: Optional[pd.DataFrame]
def __init__(
self, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str
):
self.symbol_a_ = symbol_a
self.symbol_b_ = symbol_b
self.price_column_ = price_column
self.market_data_ = pd.DataFrame(
self._transform_dataframe(market_data)[["tstamp"] + self.colnames()]
)
self.user_data_ = {}
self.predicted_df_ = None
def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
# Select only the columns we need
df_selected: pd.DataFrame = pd.DataFrame(
df[["tstamp", "symbol", self.price_column_]]
)
# Start with unique timestamps
result_df: pd.DataFrame = (
pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True)
)
# For each unique symbol, add a corresponding close price column
symbols = df_selected["symbol"].unique()
for symbol in symbols:
# Filter rows for this symbol
df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(
drop=True
)
# Create column name like "close-COIN"
new_price_column = f"{self.price_column_}_{symbol}"
# Create temporary dataframe with timestamp and price
temp_df = pd.DataFrame(
{
"tstamp": df_symbol["tstamp"],
new_price_column: df_symbol[self.price_column_],
}
)
# Join with our result dataframe
result_df = pd.merge(result_df, temp_df, on="tstamp", how="left")
result_df = result_df.reset_index(
drop=True
) # do not dropna() since irrelevant symbol would affect dataset
return result_df.dropna()
def get_datasets(
self,
training_minutes: int,
training_start_index: int = 0,
testing_size: Optional[int] = None,
) -> None:
testing_start_index = training_start_index + training_minutes
self.training_df_ = self.market_data_.iloc[
training_start_index:testing_start_index, : training_minutes
].copy()
assert self.training_df_ is not None
self.training_df_ = self.training_df_.dropna().reset_index(drop=True)
testing_start_index = training_start_index + training_minutes
if testing_size is None:
self.testing_df_ = self.market_data_.iloc[testing_start_index:, :].copy()
else:
self.testing_df_ = self.market_data_.iloc[
testing_start_index : testing_start_index + testing_size, :
].copy()
assert self.testing_df_ is not None
self.testing_df_ = self.testing_df_.dropna().reset_index(drop=True)
def colnames(self) -> List[str]:
return [
f"{self.price_column_}_{self.symbol_a_}",
f"{self.price_column_}_{self.symbol_b_}",
]
def fit_VECM(self) -> None:
assert self.training_df_ is not None
vecm_df = self.training_df_[self.colnames()].reset_index(drop=True)
vecm_model = VECM(vecm_df, coint_rank=1)
vecm_fit = vecm_model.fit()
assert vecm_fit is not None
# URGENT check beta and alpha
# Check if the model converged properly
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
print(f"{self}: VECM model failed to converge properly")
self.vecm_fit_ = vecm_fit
# print(f"{self}: beta={self.vecm_fit_.beta} alpha={self.vecm_fit_.alpha}" )
# print(f"{self}: {self.vecm_fit_.summary()}")
pass
def check_cointegration_johansen(self) -> bool:
assert self.training_df_ is not None
from statsmodels.tsa.vector_ar.vecm import coint_johansen
df = self.training_df_[self.colnames()].reset_index(drop=True)
result = coint_johansen(df, det_order=0, k_ar_diff=1)
# print(
# f"{self}: lr1={result.lr1[0]} > cvt={result.cvt[0, 1]}? {result.lr1[0] > result.cvt[0, 1]}"
# )
is_cointegrated: bool = bool(result.lr1[0] > result.cvt[0, 1])
return is_cointegrated
def check_cointegration_engle_granger(self) -> bool:
from statsmodels.tsa.stattools import coint
col1, col2 = self.colnames()
assert self.training_df_ is not None
series1 = self.training_df_[col1].reset_index(drop=True)
series2 = self.training_df_[col2].reset_index(drop=True)
# Run Engle-Granger cointegration test
pvalue = coint(series1, series2)[1]
# Define cointegration if p-value < 0.05 (i.e., reject null of no cointegration)
is_cointegrated: bool = bool(pvalue < 0.05)
# print(f"{self}: is_cointegrated={is_cointegrated} pvalue={pvalue}")
return is_cointegrated
def check_cointegration(self) -> bool:
is_cointegrated_johansen = self.check_cointegration_johansen()
is_cointegrated_engle_granger = self.check_cointegration_engle_granger()
result = is_cointegrated_johansen or is_cointegrated_engle_granger
return result or True # TODO: remove this
def train_pair(self) -> bool:
result = self.check_cointegration()
# print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80)
self.fit_VECM()
assert self.training_df_ is not None and self.vecm_fit_ is not None
diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
# print(diseq_series.shape)
self.training_mu_ = float(diseq_series[0].mean())
self.training_std_ = float(diseq_series[0].std())
self.training_df_["dis-equilibrium"] = (
self.training_df_[self.colnames()] @ self.vecm_fit_.beta
)
# Normalize the dis-equilibrium
self.training_df_["scaled_dis-equilibrium"] = (
diseq_series - self.training_mu_
) / self.training_std_
return result
def add_trades(self, trades: pd.DataFrame) -> None:
if self.user_data_["trades"] is None or len(self.user_data_["trades"]) == 0:
# If trades is empty or None, just assign the new trades directly
self.user_data_["trades"] = trades.copy()
else:
# Ensure both DataFrames have the same columns and dtypes before concatenation
existing_trades = self.user_data_["trades"]
# If existing trades is empty, just assign the new trades
if len(existing_trades) == 0:
self.user_data_["trades"] = trades.copy()
else:
# Ensure both DataFrames have the same columns
if set(existing_trades.columns) != set(trades.columns):
# Add missing columns to trades with appropriate default values
for col in existing_trades.columns:
if col not in trades.columns:
if col == "time":
trades[col] = pd.Timestamp.now()
elif col in ["action", "symbol"]:
trades[col] = ""
elif col in ["price", "disequilibrium", "scaled_disequilibrium"]:
trades[col] = 0.0
elif col == "pair":
trades[col] = None
else:
trades[col] = None
# Concatenate with explicit dtypes to avoid warnings
self.user_data_["trades"] = pd.concat(
[existing_trades, trades],
ignore_index=True,
copy=False
)
def get_trades(self) -> pd.DataFrame:
return self.user_data_["trades"] if "trades" in self.user_data_ else pd.DataFrame()
def predict(self) -> pd.DataFrame:
assert self.testing_df_ is not None
assert self.vecm_fit_ is not None
predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_))
# Convert prediction to a DataFrame for readability
predicted_df = pd.DataFrame(
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
)
predicted_df = pd.merge(
self.testing_df_.reset_index(drop=True),
pd.DataFrame(
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
),
left_index=True,
right_index=True,
suffixes=("", "_pred"),
).dropna()
predicted_df["disequilibrium"] = (
predicted_df[self.colnames()] @ self.vecm_fit_.beta
)
predicted_df["scaled_disequilibrium"] = (
abs(predicted_df["disequilibrium"] - self.training_mu_)
/ self.training_std_
)
# print("*** PREDICTED DF")
# print(predicted_df)
# print("*" * 80)
# print("*** SELF.PREDICTED_DF")
# print(self.predicted_df_)
# print("*" * 80)
predicted_df = predicted_df.reset_index(drop=True)
if self.predicted_df_ is None:
self.predicted_df_ = predicted_df
else:
self.predicted_df_ = pd.concat([self.predicted_df_, predicted_df], ignore_index=True)
# Reset index to ensure proper indexing
self.predicted_df_ = self.predicted_df_.reset_index(drop=True)
return self.predicted_df_
def __repr__(self) -> str:
return f"{self.symbol_a_} & {self.symbol_b_}"