from __future__ import annotations from abc import ABC, abstractmethod from enum import Enum from typing import Any, Dict, List, Optional import pandas as pd # type:ignore class PairState(Enum): INITIAL = 1 OPEN = 2 CLOSE = 3 CLOSE_POSITION = 4 CLOSE_STOP_LOSS = 5 CLOSE_STOP_PROFIT = 6 class CointegrationData: EG_PVALUE_THRESHOLD = 0.05 tstamp_: pd.Timestamp pair_: str eg_pvalue_: float johansen_lr1_: float johansen_cvt_: float eg_is_cointegrated_: bool johansen_is_cointegrated_: bool def __init__(self, pair: TradingPair): training_df = pair.training_df_ assert training_df is not None from statsmodels.tsa.vector_ar.vecm import coint_johansen df = training_df[pair.colnames()].reset_index(drop=True) # Run Johansen cointegration test result = coint_johansen(df, det_order=0, k_ar_diff=1) self.johansen_lr1_ = result.lr1[0] self.johansen_cvt_ = result.cvt[0, 1] self.johansen_is_cointegrated_ = self.johansen_lr1_ > self.johansen_cvt_ # Run Engle-Granger cointegration test from statsmodels.tsa.stattools import coint # type: ignore col1, col2 = pair.colnames() assert training_df is not None series1 = training_df[col1].reset_index(drop=True) series2 = training_df[col2].reset_index(drop=True) self.eg_pvalue_ = float(coint(series1, series2)[1]) self.eg_is_cointegrated_ = bool(self.eg_pvalue_ < self.EG_PVALUE_THRESHOLD) self.tstamp_ = training_df.index[-1] self.pair_ = pair.name() def to_dict(self) -> Dict[str, Any]: return { "tstamp": self.tstamp_, "pair": self.pair_, "eg_pvalue": self.eg_pvalue_, "johansen_lr1": self.johansen_lr1_, "johansen_cvt": self.johansen_cvt_, "eg_is_cointegrated": self.eg_is_cointegrated_, "johansen_is_cointegrated": self.johansen_is_cointegrated_, } def __repr__(self) -> str: return f"CointegrationData(tstamp={self.tstamp_}, pair={self.pair_}, eg_pvalue={self.eg_pvalue_}, johansen_lr1={self.johansen_lr1_}, johansen_cvt={self.johansen_cvt_}, eg_is_cointegrated={self.eg_is_cointegrated_}, johansen_is_cointegrated={self.johansen_is_cointegrated_})" class TradingPair(ABC): market_data_: pd.DataFrame symbol_a_: str symbol_b_: str stat_model_price_: str training_mu_: float training_std_: float training_df_: pd.DataFrame testing_df_: pd.DataFrame user_data_: Dict[str, Any] # predicted_df_: Optional[pd.DataFrame] def __init__( self, config: Dict[str, Any], market_data: pd.DataFrame, symbol_a: str, symbol_b: str, ): self.symbol_a_ = symbol_a self.symbol_b_ = symbol_b self.stat_model_price_ = config["stat_model_price"] self.user_data_ = {} self.predicted_df_ = None self.config_ = config self._set_market_data(market_data) def _set_market_data(self, market_data: pd.DataFrame) -> None: self.market_data_ = pd.DataFrame( self._transform_dataframe(market_data)[["tstamp"] + self.colnames()] ) self.market_data_ = self.market_data_.dropna().reset_index(drop=True) self.market_data_["tstamp"] = pd.to_datetime(self.market_data_["tstamp"]) self.market_data_ = self.market_data_.sort_values("tstamp") self._set_execution_price_data() pass def _set_execution_price_data(self) -> None: if "execution_price" not in self.config_: self.market_data_[f"exec_price_{self.symbol_a_}"] = self.market_data_[f"{self.stat_model_price_}_{self.symbol_a_}"] self.market_data_[f"exec_price_{self.symbol_b_}"] = self.market_data_[f"{self.stat_model_price_}_{self.symbol_b_}"] return execution_price_column = self.config_["execution_price"]["column"] execution_price_shift = self.config_["execution_price"]["shift"] self.market_data_[f"exec_price_{self.symbol_a_}"] = self.market_data_[f"{self.stat_model_price_}_{self.symbol_a_}"].shift(-execution_price_shift) self.market_data_[f"exec_price_{self.symbol_b_}"] = self.market_data_[f"{self.stat_model_price_}_{self.symbol_b_}"].shift(-execution_price_shift) self.market_data_ = self.market_data_.dropna().reset_index(drop=True) def get_begin_index(self) -> int: if "trading_hours" not in self.config_: return 0 assert "timezone" in self.config_["trading_hours"] assert "begin_session" in self.config_["trading_hours"] start_time = ( pd.to_datetime(self.config_["trading_hours"]["begin_session"]) .tz_localize(self.config_["trading_hours"]["timezone"]) .time() ) mask = self.market_data_["tstamp"].dt.time >= start_time return int(self.market_data_.index[mask].min()) def get_end_index(self) -> int: if "trading_hours" not in self.config_: return 0 assert "timezone" in self.config_["trading_hours"] assert "end_session" in self.config_["trading_hours"] end_time = ( pd.to_datetime(self.config_["trading_hours"]["end_session"]) .tz_localize(self.config_["trading_hours"]["timezone"]) .time() ) mask = self.market_data_["tstamp"].dt.time <= end_time return int(self.market_data_.index[mask].max()) def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: # Select only the columns we need df_selected: pd.DataFrame = pd.DataFrame( df[["tstamp", "symbol", self.stat_model_price_]] ) # Start with unique timestamps result_df: pd.DataFrame = ( pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True) ) # For each unique symbol, add a corresponding close price column symbols = df_selected["symbol"].unique() for symbol in symbols: # Filter rows for this symbol df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index( drop=True ) # Create column name like "close-COIN" new_price_column = f"{self.stat_model_price_}_{symbol}" # Create temporary dataframe with timestamp and price temp_df = pd.DataFrame( { "tstamp": df_symbol["tstamp"], new_price_column: df_symbol[self.stat_model_price_], } ) # Join with our result dataframe result_df = pd.merge(result_df, temp_df, on="tstamp", how="left") result_df = result_df.reset_index( drop=True ) # do not dropna() since irrelevant symbol would affect dataset return result_df.dropna() def get_datasets( self, training_minutes: int, training_start_index: int = 0, testing_size: Optional[int] = None, ) -> None: testing_start_index = training_start_index + training_minutes self.training_df_ = self.market_data_.iloc[ training_start_index:testing_start_index, :training_minutes ].copy() assert self.training_df_ is not None self.training_df_ = self.training_df_.dropna().reset_index(drop=True) testing_start_index = training_start_index + training_minutes if testing_size is None: self.testing_df_ = self.market_data_.iloc[testing_start_index:, :].copy() else: self.testing_df_ = self.market_data_.iloc[ testing_start_index : testing_start_index + testing_size, : ].copy() assert self.testing_df_ is not None self.testing_df_ = self.testing_df_.dropna().reset_index(drop=True) def colnames(self) -> List[str]: return [ f"{self.stat_model_price_}_{self.symbol_a_}", f"{self.stat_model_price_}_{self.symbol_b_}", ] def exec_prices_colnames(self) -> List[str]: return [ f"exec_price_{self.symbol_a_}", f"exec_price_{self.symbol_b_}", ] def add_trades(self, trades: pd.DataFrame) -> None: if self.user_data_["trades"] is None or len(self.user_data_["trades"]) == 0: # If trades is empty or None, just assign the new trades directly self.user_data_["trades"] = trades.copy() else: # Ensure both DataFrames have the same columns and dtypes before concatenation existing_trades = self.user_data_["trades"] # If existing trades is empty, just assign the new trades if len(existing_trades) == 0: self.user_data_["trades"] = trades.copy() else: # Ensure both DataFrames have the same columns if set(existing_trades.columns) != set(trades.columns): # Add missing columns to trades with appropriate default values for col in existing_trades.columns: if col not in trades.columns: if col == "time": trades[col] = pd.Timestamp.now() elif col in ["action", "symbol"]: trades[col] = "" elif col in [ "price", "disequilibrium", "scaled_disequilibrium", ]: trades[col] = 0.0 elif col == "pair": trades[col] = None else: trades[col] = None # Concatenate with explicit dtypes to avoid warnings self.user_data_["trades"] = pd.concat( [existing_trades, trades], ignore_index=True, copy=False ) def get_trades(self) -> pd.DataFrame: return ( self.user_data_["trades"] if "trades" in self.user_data_ else pd.DataFrame() ) def cointegration_check(self) -> Optional[pd.DataFrame]: print(f"***{self}*** STARTING....") config = self.config_ curr_training_start_idx = 0 COINTEGRATION_DATA_COLUMNS = { "tstamp": "datetime64[ns]", "pair": "string", "eg_pvalue": "float64", "johansen_lr1": "float64", "johansen_cvt": "float64", "eg_is_cointegrated": "bool", "johansen_is_cointegrated": "bool", } # Initialize trades DataFrame with proper dtypes to avoid concatenation warnings result: pd.DataFrame = pd.DataFrame( columns=[col for col in COINTEGRATION_DATA_COLUMNS.keys()] ) # .astype(COINTEGRATION_DATA_COLUMNS) training_minutes = config["training_minutes"] while True: print(curr_training_start_idx, end="\r") self.get_datasets( training_minutes=training_minutes, training_start_index=curr_training_start_idx, testing_size=1, ) if len(self.training_df_) < training_minutes: print( f"{self}: current offset={curr_training_start_idx}" f" * Training data length={len(self.training_df_)} < {training_minutes}" " * Not enough training data. Completing the job." ) break new_row = pd.Series(CointegrationData(self).to_dict()) result.loc[len(result)] = new_row curr_training_start_idx += 1 return result def to_stop_close_conditions(self, predicted_row: pd.Series) -> bool: config = self.config_ if ( "stop_close_conditions" not in config or config["stop_close_conditions"] is None ): return False if "profit" in config["stop_close_conditions"]: current_return = self._current_return(predicted_row) # # print(f"time={predicted_row['tstamp']} current_return={current_return}") # if current_return >= config["stop_close_conditions"]["profit"]: print(f"STOP PROFIT: {current_return}") self.user_data_["stop_close_state"] = PairState.CLOSE_STOP_PROFIT return True if "loss" in config["stop_close_conditions"]: if current_return <= config["stop_close_conditions"]["loss"]: print(f"STOP LOSS: {current_return}") self.user_data_["stop_close_state"] = PairState.CLOSE_STOP_LOSS return True return False def on_open_trades(self, trades: pd.DataFrame) -> None: if "close_trades" in self.user_data_: del self.user_data_["close_trades"] self.user_data_["open_trades"] = trades def on_close_trades(self, trades: pd.DataFrame) -> None: del self.user_data_["open_trades"] self.user_data_["close_trades"] = trades def _current_return(self, predicted_row: pd.Series) -> float: if "open_trades" in self.user_data_: open_trades = self.user_data_["open_trades"] if len(open_trades) == 0: return 0.0 def _single_instrument_return(symbol: str) -> float: instrument_open_trades = open_trades[open_trades["symbol"] == symbol] instrument_open_price = instrument_open_trades["price"].iloc[0] sign = -1 if instrument_open_trades["side"].iloc[0] == "SELL" else 1 instrument_price = predicted_row[f"{self.stat_model_price_}_{symbol}"] instrument_return = ( sign * (instrument_price - instrument_open_price) / instrument_open_price ) return float(instrument_return) * 100.0 instrument_a_return = _single_instrument_return(self.symbol_a_) instrument_b_return = _single_instrument_return(self.symbol_b_) return instrument_a_return + instrument_b_return return 0.0 def __repr__(self) -> str: return self.name() def name(self) -> str: return f"{self.symbol_a_} & {self.symbol_b_}" # return f"{self.symbol_a_} & {self.symbol_b_}" @abstractmethod def predict(self) -> pd.DataFrame: ... # @abstractmethod # def predicted_df(self) -> Optional[pd.DataFrame]: ...