from typing import Any, Dict, List, Optional import pandas as pd # type:ignore from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults # type:ignore class TradingPair: market_data_: pd.DataFrame symbol_a_: str symbol_b_: str price_column_: str training_mu_: float training_std_: float training_df_: pd.DataFrame testing_df_: pd.DataFrame vecm_fit_: VECMResults user_data_: Dict[str, Any] def __init__( self, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str ): self.symbol_a_ = symbol_a self.symbol_b_ = symbol_b self.price_column_ = price_column self.market_data_ = pd.DataFrame( self._transform_dataframe(market_data)[["tstamp"] + self.colnames()] ) self.user_data_ = {} self.predicted_df_ = pd.DataFrame() def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: # Select only the columns we need df_selected: pd.DataFrame = pd.DataFrame( df[["tstamp", "symbol", self.price_column_]] ) # Start with unique timestamps result_df: pd.DataFrame = ( pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True) ) # For each unique symbol, add a corresponding close price column symbols = df_selected["symbol"].unique() for symbol in symbols: # Filter rows for this symbol df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index( drop=True ) # Create column name like "close-COIN" new_price_column = f"{self.price_column_}_{symbol}" # Create temporary dataframe with timestamp and price temp_df = pd.DataFrame( { "tstamp": df_symbol["tstamp"], new_price_column: df_symbol[self.price_column_], } ) # Join with our result dataframe result_df = pd.merge(result_df, temp_df, on="tstamp", how="left") result_df = result_df.reset_index( drop=True ) # do not dropna() since irrelevant symbol would affect dataset return result_df def get_datasets( self, training_minutes: int, training_start_index: int = 0, testing_size: Optional[int] = None, ) -> None: testing_start_index = training_start_index + training_minutes self.training_df_ = self.market_data_.iloc[ training_start_index:testing_start_index, : training_minutes ].copy() assert self.training_df_ is not None self.training_df_ = self.training_df_.dropna().reset_index(drop=True) testing_start_index = training_start_index + training_minutes if testing_size is None: self.testing_df_ = self.market_data_.iloc[testing_start_index:, :].copy() else: self.testing_df_ = self.market_data_.iloc[ testing_start_index : testing_start_index + testing_size, : ].copy() assert self.testing_df_ is not None self.testing_df_ = self.testing_df_.dropna().reset_index(drop=True) def colnames(self) -> List[str]: return [ f"{self.price_column_}_{self.symbol_a_}", f"{self.price_column_}_{self.symbol_b_}", ] def fit_VECM(self) -> None: assert self.training_df_ is not None vecm_df = self.training_df_[self.colnames()].reset_index(drop=True) vecm_model = VECM(vecm_df, coint_rank=1) vecm_fit = vecm_model.fit() assert vecm_fit is not None # URGENT check beta and alpha # Check if the model converged properly if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None: print(f"{self}: VECM model failed to converge properly") self.vecm_fit_ = vecm_fit # print(f"{self}: beta={self.vecm_fit_.beta} alpha={self.vecm_fit_.alpha}" ) # print(f"{self}: {self.vecm_fit_.summary()}") pass def check_cointegration_johansen(self) -> bool: assert self.training_df_ is not None from statsmodels.tsa.vector_ar.vecm import coint_johansen df = self.training_df_[self.colnames()].reset_index(drop=True) result = coint_johansen(df, det_order=0, k_ar_diff=1) print( f"{self}: lr1={result.lr1[0]} > cvt={result.cvt[0, 1]}? {result.lr1[0] > result.cvt[0, 1]}" ) is_cointegrated: bool = bool(result.lr1[0] > result.cvt[0, 1]) return is_cointegrated def check_cointegration_engle_granger(self) -> bool: from statsmodels.tsa.stattools import coint col1, col2 = self.colnames() assert self.training_df_ is not None series1 = self.training_df_[col1].reset_index(drop=True) series2 = self.training_df_[col2].reset_index(drop=True) # Run Engle-Granger cointegration test pvalue = coint(series1, series2)[1] # Define cointegration if p-value < 0.05 (i.e., reject null of no cointegration) is_cointegrated: bool = bool(pvalue < 0.05) print(f"{self}: is_cointegrated={is_cointegrated} pvalue={pvalue}") return is_cointegrated def train_pair(self) -> bool: is_cointegrated_johansen = self.check_cointegration_johansen() is_cointegrated_engle_granger = self.check_cointegration_engle_granger() if not is_cointegrated_johansen and not is_cointegrated_engle_granger: return False pass # print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80) self.fit_VECM() assert self.training_df_ is not None and self.vecm_fit_ is not None diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta print(diseq_series.shape) self.training_mu_ = float(diseq_series[0].mean()) self.training_std_ = float(diseq_series[0].std()) self.training_df_["dis-equilibrium"] = ( self.training_df_[self.colnames()] @ self.vecm_fit_.beta ) # Normalize the dis-equilibrium self.training_df_["scaled_dis-equilibrium"] = ( diseq_series - self.training_mu_ ) / self.training_std_ return True def predict(self) -> pd.DataFrame: assert self.testing_df_ is not None assert self.vecm_fit_ is not None predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_)) # Convert prediction to a DataFrame for readability predicted_df = pd.DataFrame( predicted_prices, columns=pd.Index(self.colnames()), dtype=float ) # self.predicted_df_ = pd.merge( # self.testing_df_.reset_index(drop=True), # pd.DataFrame( # predicted_prices, columns=pd.Index(self.colnames()), dtype=float # ), # left_index=True, # right_index=True, # suffixes=("", "_pred"), # ).dropna() # self.predicted_df_["disequilibrium"] = ( # self.predicted_df_[self.colnames()] @ self.vecm_fit_.beta # ) # self.predicted_df_["scaled_disequilibrium"] = ( # abs(self.predicted_df_["disequilibrium"] - self.training_mu_) # / self.training_std_ # ) predicted_df = pd.merge( self.testing_df_.reset_index(drop=True), pd.DataFrame( predicted_prices, columns=pd.Index(self.colnames()), dtype=float ), left_index=True, right_index=True, suffixes=("", "_pred"), ).dropna() predicted_df["disequilibrium"] = ( predicted_df[self.colnames()] @ self.vecm_fit_.beta ) predicted_df["scaled_disequilibrium"] = ( abs(predicted_df["disequilibrium"] - self.training_mu_) / self.training_std_ ) print("*** PREDICTED DF") print(predicted_df) print("*" * 80) print("*** SELF.PREDICTED_DF") print(self.predicted_df_) print("*" * 80) predicted_df = predicted_df.reset_index(drop=True) self.predicted_df_ = pd.concat([self.predicted_df_, predicted_df], ignore_index=True) # Reset index to ensure proper indexing self.predicted_df_ = self.predicted_df_.reset_index() return self.predicted_df_ def __repr__(self) -> str: return f"{self.symbol_a_} & {self.symbol_b_}"