From 8d58439f440c6ce640629bbba5d13c9fcecde5b6 Mon Sep 17 00:00:00 2001 From: Oleg Sheynin Date: Wed, 25 Jun 2025 22:36:04 +0000 Subject: [PATCH] progress --- src/notebooks/pt_sliding.ipynb | 127 +++++++++++++++++++------------- src/tools/trading_pair.py | 128 +++++++++++++++++++++------------ 2 files changed, 159 insertions(+), 96 deletions(-) diff --git a/src/notebooks/pt_sliding.ipynb b/src/notebooks/pt_sliding.ipynb index e47ba62..89326f1 100644 --- a/src/notebooks/pt_sliding.ipynb +++ b/src/notebooks/pt_sliding.ipynb @@ -15,7 +15,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### \ud83c\udfaf Key Features:\n", + "### 🎯 Key Features:\n", "\n", "1. **Interactive Configuration**: \n", " - Easy switching between CRYPTO and EQUITY configurations\n", @@ -43,7 +43,7 @@ " - Re-run capabilities for different configurations\n", " - Support for both StaticFitStrategy and SlidingFitStrategy\n", "\n", - "### \ud83d\ude80 How to Use:\n", + "### 🚀 How to Use:\n", "\n", "1. **Start Jupyter**:\n", " ```bash\n", @@ -74,7 +74,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ @@ -133,7 +133,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ @@ -185,16 +185,18 @@ "\n", "# Load the specified configuration\n", "print(f\"\\nLoading {CONFIG_FILE} configuration using HJSON...\")\n", - "CONFIG = load_config_from_file(CONFIG_FILE)\n", + "test_config = load_config_from_file(CONFIG_FILE)\n", + "assert test_config is not None\n", + "BT_TEST_CONFIG = test_config\n", "\n", - "if CONFIG:\n", - " print(f\"\u2713 Successfully loaded {CONFIG['security_type']} configuration\")\n", - " print(f\" Data directory: {CONFIG['data_directory']}\")\n", - " print(f\" Database table: {CONFIG['db_table_name']}\")\n", - " print(f\" Exchange: {CONFIG['exchange_id']}\")\n", - " print(f\" Training window: {CONFIG['training_minutes']} minutes\")\n", - " print(f\" Open threshold: {CONFIG['dis-equilibrium_open_trshld']}\")\n", - " print(f\" Close threshold: {CONFIG['dis-equilibrium_close_trshld']}\")\n", + "if BT_TEST_CONFIG:\n", + " print(f\"✓ Successfully loaded {BT_TEST_CONFIG['security_type']} configuration\")\n", + " print(f\" Data directory: {BT_TEST_CONFIG['data_directory']}\")\n", + " print(f\" Database table: {BT_TEST_CONFIG['db_table_name']}\")\n", + " print(f\" Exchange: {BT_TEST_CONFIG['exchange_id']}\")\n", + " print(f\" Training window: {BT_TEST_CONFIG['training_minutes']} minutes\")\n", + " print(f\" Open threshold: {BT_TEST_CONFIG['dis-equilibrium_open_trshld']}\")\n", + " print(f\" Close threshold: {BT_TEST_CONFIG['dis-equilibrium_close_trshld']}\")\n", " \n", " # Automatically construct data file name based on date and config type\n", " # if CONFIG['security_type'] == \"CRYPTO\":\n", @@ -205,25 +207,25 @@ " # DATA_FILE = f\"{TRADING_DATE}.mktdata.db\" # Default fallback\n", "\n", " # Update CONFIG with the specific data file and instruments\n", - " CONFIG[\"datafiles\"] = [DATA_FILE]\n", - " CONFIG[\"instruments\"] = [SYMBOL_A, SYMBOL_B]\n", + " BT_TEST_CONFIG[\"datafiles\"] = [DATA_FILE]\n", + " BT_TEST_CONFIG[\"instruments\"] = [SYMBOL_A, SYMBOL_B]\n", " \n", " print(f\"\\nData Configuration:\")\n", " print(f\" Data File: {DATA_FILE}\")\n", - " print(f\" Security Type: {CONFIG['security_type']}\")\n", + " print(f\" Security Type: {BT_TEST_CONFIG['security_type']}\")\n", " \n", " # Verify data file exists\n", " import os\n", - " data_file_path = f\"{CONFIG['data_directory']}/{DATA_FILE}\"\n", + " data_file_path = f\"{BT_TEST_CONFIG['data_directory']}/{DATA_FILE}\"\n", " if os.path.exists(data_file_path):\n", - " print(f\" \u2713 Data file found: {data_file_path}\")\n", + " print(f\" ✓ Data file found: {data_file_path}\")\n", " else:\n", - " print(f\" \u26a0 Data file not found: {data_file_path}\")\n", + " print(f\" ⚠ Data file not found: {data_file_path}\")\n", " print(f\" Please check if the date and file exist in the data directory\")\n", " \n", " # List available files in the data directory\n", " try:\n", - " data_dir = CONFIG['data_directory']\n", + " data_dir = BT_TEST_CONFIG['data_directory']\n", " if os.path.exists(data_dir):\n", " available_files = [f for f in os.listdir(data_dir) if f.endswith('.db')]\n", " print(f\" Available files in {data_dir}:\")\n", @@ -234,7 +236,7 @@ " except Exception as e:\n", " print(f\" Could not list files in data directory: {e}\")\n", "else:\n", - " print(\"\u26a0 Failed to load configuration. Please check the configuration file.\")\n", + " print(\"⚠ Failed to load configuration. Please check the configuration file.\")\n", " print(\"Available configuration files:\")\n", " config_dir = \"../../configuration\"\n", " if os.path.exists(config_dir):\n", @@ -289,10 +291,10 @@ "outputs": [], "source": [ "# Load market data\n", - "datafile_path = f\"{CONFIG['data_directory']}/{DATA_FILE}\"\n", + "datafile_path = f\"{BT_TEST_CONFIG['data_directory']}/{DATA_FILE}\"\n", "print(f\"Loading data from: {datafile_path}\")\n", "\n", - "market_data_df = load_market_data(datafile_path, config=CONFIG)\n", + "market_data_df = load_market_data(datafile_path, config=BT_TEST_CONFIG)\n", "\n", "print(f\"Loaded {len(market_data_df)} rows of market data\")\n", "print(f\"Symbols in data: {market_data_df['symbol'].unique()}\")\n", @@ -303,7 +305,7 @@ " market_data=market_data_df,\n", " symbol_a=SYMBOL_A,\n", " symbol_b=SYMBOL_B,\n", - " price_column=CONFIG[\"price_column\"]\n", + " price_column=BT_TEST_CONFIG[\"price_column\"]\n", ")\n", "\n", "print(f\"\\nCreated trading pair: {pair}\")\n", @@ -311,7 +313,7 @@ "print(f\"Column names: {pair.colnames()}\")\n", "\n", "# Calculate maximum possible iterations for sliding window\n", - "training_minutes = CONFIG[\"training_minutes\"]\n", + "training_minutes = BT_TEST_CONFIG[\"training_minutes\"]\n", "max_iterations = len(pair.market_data_) - training_minutes\n", "print(f\"\\nSliding window analysis:\")\n", "print(f\" Training window size: {training_minutes} minutes\")\n", @@ -354,13 +356,13 @@ "\n", "# Initialize the strategy state\n", "pair.user_data_['state'] = PairState.INITIAL\n", - "pair.user_data_[\"trades\"] = pd.DataFrame(columns=STRATEGY.TRADES_COLUMNS)\n", + "pair.user_data_[\"trades\"] = pd.DataFrame(columns=pd.Index(STRATEGY.TRADES_COLUMNS, dtype=str))\n", "pair.user_data_[\"is_cointegrated\"] = False\n", "\n", - "bt_result = BacktestResult(config=CONFIG)\n", - "training_minutes = CONFIG[\"training_minutes\"]\n", - "open_threshold = CONFIG[\"dis-equilibrium_open_trshld\"]\n", - "close_threshold = CONFIG[\"dis-equilibrium_close_trshld\"]\n", + "bt_result = BacktestResult(config=BT_TEST_CONFIG)\n", + "training_minutes = BT_TEST_CONFIG[\"training_minutes\"]\n", + "open_threshold = BT_TEST_CONFIG[\"dis-equilibrium_open_trshld\"]\n", + "close_threshold = BT_TEST_CONFIG[\"dis-equilibrium_close_trshld\"]\n", "\n", "# Limit iterations for demonstration (change this to max_iterations for full run)\n", "max_demo_iterations = min(200, max_iterations) # Process first 200 minutes\n", @@ -586,15 +588,21 @@ "source": [ "# Detailed analysis of how training windows evolve\n", "print(\"TRAINING WINDOW EVOLUTION ANALYSIS\")\n", - "print(\"=\"*50)\n", + "print(\"=\" * 50)\n", "\n", "# Analyze cointegration stability\n", "if len(cointegration_history) > 1:\n", " # Find cointegration change points\n", " change_points = []\n", " for i in range(1, len(cointegration_history)):\n", - " if cointegration_history[i] != cointegration_history[i-1]:\n", - " change_points.append((i, cointegration_history[i], valid_timestamps[i] if i < len(valid_timestamps) else None))\n", + " if cointegration_history[i] != cointegration_history[i - 1]:\n", + " change_points.append(\n", + " (\n", + " i,\n", + " cointegration_history[i],\n", + " valid_timestamps[i] if i < len(valid_timestamps) else None,\n", + " )\n", + " )\n", "\n", " print(f\"\\nCointegration Change Points:\")\n", " if change_points:\n", @@ -608,25 +616,34 @@ "\n", "# Analyze beta stability when cointegrated\n", "if valid_betas and len(valid_betas) > 10:\n", - " beta_df = pd.DataFrame(valid_betas, columns=[f'Beta_{i}' for i in range(len(valid_betas[0]))])\n", - " beta_df['timestamp'] = beta_timestamps\n", + " beta_df = pd.DataFrame(\n", + " valid_betas,\n", + " columns=pd.Index([f\"Beta_{i}\" for i in range(len(valid_betas[0]))], dtype=str),\n", + " )\n", + " beta_df[\"timestamp\"] = beta_timestamps\n", "\n", " print(f\"\\nBeta Coefficient Analysis:\")\n", " print(f\" Number of valid beta estimates: {len(valid_betas)}\")\n", " print(f\" Beta statistics:\")\n", " for col in beta_df.columns[:-1]: # Exclude timestamp\n", - " print(f\" {col}: Mean={beta_df[col].mean():.4f}, Std={beta_df[col].std():.4f}\")\n", + " print(\n", + " f\" {col}: Mean={beta_df[col].mean():.4f}, Std={beta_df[col].std():.4f}\"\n", + " )\n", "\n", " # Check for beta regime changes\n", " beta_changes = []\n", " threshold = 0.1 # 10% change threshold\n", " for i in range(1, len(valid_betas)):\n", - " if np.any(np.abs(np.array(valid_betas[i]) - np.array(valid_betas[i-1])) > threshold):\n", + " if np.any(\n", + " np.abs(np.array(valid_betas[i]) - np.array(valid_betas[i - 1])) > threshold\n", + " ):\n", " beta_changes.append(i)\n", "\n", " print(f\" Significant beta changes (>{threshold*100}%): {len(beta_changes)}\")\n", " if beta_changes:\n", - " print(f\" Change frequency: {len(beta_changes)/len(valid_betas)*100:.1f}% of cointegrated periods\")\n", + " print(\n", + " f\" Change frequency: {len(beta_changes)/len(valid_betas)*100:.1f}% of cointegrated periods\"\n", + " )\n", "\n", "# Analyze dis-equilibrium characteristics\n", "if valid_scaled_diseq:\n", @@ -640,20 +657,28 @@ "\n", " # Threshold analysis\n", " open_breaches = sum(1 for x in valid_scaled_diseq if abs(x) >= open_threshold)\n", - " close_opportunities = sum(1 for x in valid_scaled_diseq if abs(x) <= close_threshold)\n", + " close_opportunities = sum(\n", + " 1 for x in valid_scaled_diseq if abs(x) <= close_threshold\n", + " )\n", "\n", - " print(f\" Open threshold breaches: {open_breaches} ({open_breaches/len(valid_scaled_diseq)*100:.1f}%)\")\n", - " print(f\" Close opportunities: {close_opportunities} ({close_opportunities/len(valid_scaled_diseq)*100:.1f}%)\")\n", + " print(\n", + " f\" Open threshold breaches: {open_breaches} ({open_breaches/len(valid_scaled_diseq)*100:.1f}%)\"\n", + " )\n", + " print(\n", + " f\" Close opportunities: {close_opportunities} ({close_opportunities/len(valid_scaled_diseq)*100:.1f}%)\"\n", + " )\n", "\n", " # Mean reversion analysis\n", " zero_crossings = 0\n", " for i in range(1, len(valid_scaled_diseq)):\n", - " if (valid_scaled_diseq[i-1] * valid_scaled_diseq[i]) < 0: # Sign change\n", + " if (valid_scaled_diseq[i - 1] * valid_scaled_diseq[i]) < 0: # Sign change\n", " zero_crossings += 1\n", "\n", " print(f\" Zero crossings (mean reversion events): {zero_crossings}\")\n", " if zero_crossings > 0:\n", - " print(f\" Average time between mean reversions: {len(valid_scaled_diseq)/zero_crossings:.1f} minutes\")" + " print(\n", + " f\" Average time between mean reversions: {len(valid_scaled_diseq)/zero_crossings:.1f} minutes\"\n", + " )" ] }, { @@ -686,13 +711,13 @@ " market_data=market_data_df,\n", " symbol_a=SYMBOL_A,\n", " symbol_b=SYMBOL_B,\n", - " price_column=CONFIG[\"price_column\"]\n", + " price_column=BT_TEST_CONFIG[\"price_column\"]\n", " )\n", "\n", - " bt_result_full = BacktestResult(config=CONFIG)\n", + " bt_result_full = BacktestResult(config=BT_TEST_CONFIG)\n", "\n", " # Run strategy\n", - " pair_trades = STRATEGY.run_pair(config=CONFIG, pair=pair_full, bt_result=bt_result_full)\n", + " pair_trades = STRATEGY.run_pair(config=BT_TEST_CONFIG, pair=pair_full, bt_result=bt_result_full)\n", "\n", " if pair_trades is not None and len(pair_trades) > 0:\n", " print(f\"\\nGenerated {len(pair_trades)} trading signals:\")\n", @@ -741,9 +766,9 @@ "print(\"=\"*40)\n", "\n", "print(f\"Current parameters:\")\n", - "print(f\" Training window: {CONFIG['training_minutes']} minutes\")\n", - "print(f\" Open threshold: {CONFIG['dis-equilibrium_open_trshld']}\")\n", - "print(f\" Close threshold: {CONFIG['dis-equilibrium_close_trshld']}\")\n", + "print(f\" Training window: {BT_TEST_CONFIG['training_minutes']} minutes\")\n", + "print(f\" Open threshold: {BT_TEST_CONFIG['dis-equilibrium_open_trshld']}\")\n", + "print(f\" Close threshold: {BT_TEST_CONFIG['dis-equilibrium_close_trshld']}\")\n", "\n", "# Recommendations based on observed data\n", "if valid_scaled_diseq:\n", @@ -825,9 +850,9 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.12.9" } }, "nbformat": 4, "nbformat_minor": 4 -} \ No newline at end of file +} diff --git a/src/tools/trading_pair.py b/src/tools/trading_pair.py index 7a20505..e09c2ad 100644 --- a/src/tools/trading_pair.py +++ b/src/tools/trading_pair.py @@ -1,7 +1,7 @@ - from typing import Any, Dict, List, Optional -import pandas as pd #type:ignore -from statsmodels.tsa.vector_ar.vecm import VECM #type:ignore +import pandas as pd # type:ignore +from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults # type:ignore + class TradingPair: market_data_: pd.DataFrame @@ -9,79 +9,106 @@ class TradingPair: symbol_b_: str price_column_: str - training_mu_: Optional[float] - training_std_: Optional[float] + training_mu_: float + training_std_: float - training_df_: Optional[pd.DataFrame] - testing_df_: Optional[pd.DataFrame] + training_df_: pd.DataFrame + testing_df_: pd.DataFrame - vecm_fit_: Optional[VECM] + vecm_fit_: VECMResults user_data_: Dict[str, Any] - def __init__(self, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str): + def __init__( + self, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str + ): self.symbol_a_ = symbol_a self.symbol_b_ = symbol_b self.price_column_ = price_column - self.market_data_ = self._transform_dataframe(market_data)[["tstamp"] + self.colnames()] - - self.training_mu_ = None - self.training_std_ = None - self.training_df_ = None - self.testing_df_ = None - self.vecm_fit_ = None + self.market_data_ = pd.DataFrame( + self._transform_dataframe(market_data)[["tstamp"] + self.colnames()] + ) + self.user_data_ = {} - def _transform_dataframe(self, df: pd.DataFrame): + def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: # Select only the columns we need - df_selected = df[["tstamp", "symbol", self.price_column_]] + df_selected: pd.DataFrame = pd.DataFrame( + df[["tstamp", "symbol", self.price_column_]] + ) # Start with unique timestamps - result_df: pd.DataFrame = pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True) + result_df: pd.DataFrame = ( + pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True) + ) # For each unique symbol, add a corresponding close price column symbols = df_selected["symbol"].unique() for symbol in symbols: # Filter rows for this symbol - df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True) + df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index( + drop=True + ) # Create column name like "close-COIN" new_price_column = f"{self.price_column_}_{symbol}" # Create temporary dataframe with timestamp and price - temp_df = pd.DataFrame({ - "tstamp": df_symbol["tstamp"], - new_price_column: df_symbol[self.price_column_] - }) + temp_df = pd.DataFrame( + { + "tstamp": df_symbol["tstamp"], + new_price_column: df_symbol[self.price_column_], + } + ) # Join with our result dataframe result_df = pd.merge(result_df, temp_df, on="tstamp", how="left") - result_df = result_df.reset_index(drop=True) # do not dropna() since irrelevant symbol would affect dataset + result_df = result_df.reset_index( + drop=True + ) # do not dropna() since irrelevant symbol would affect dataset return result_df - def get_datasets(self, training_minutes: int, training_start_index: int = 0, testing_size: Optional[int] = None) -> None: + + def get_datasets( + self, + training_minutes: int, + training_start_index: int = 0, + testing_size: Optional[int] = None, + ) -> None: testing_start_index = training_start_index + training_minutes - self.training_df_ = self.market_data_.iloc[training_start_index:testing_start_index, :].copy() + self.training_df_ = self.market_data_.iloc[ + training_start_index:testing_start_index, : + ].copy() + assert self.training_df_ is not None self.training_df_ = self.training_df_.dropna().reset_index(drop=True) testing_start_index = training_start_index + training_minutes if testing_size is None: self.testing_df_ = self.market_data_.iloc[testing_start_index:, :].copy() else: - self.testing_df_ = self.market_data_.iloc[testing_start_index:testing_start_index + testing_size, :].copy() + self.testing_df_ = self.market_data_.iloc[ + testing_start_index : testing_start_index + testing_size, : + ].copy() + assert self.testing_df_ is not None self.testing_df_ = self.testing_df_.dropna().reset_index(drop=True) def colnames(self) -> List[str]: - return [f"{self.price_column_}_{self.symbol_a_}", f"{self.price_column_}_{self.symbol_b_}"] + return [ + f"{self.price_column_}_{self.symbol_a_}", + f"{self.price_column_}_{self.symbol_b_}", + ] def fit_VECM(self): + assert self.training_df_ is not None vecm_df = self.training_df_[self.colnames()].reset_index(drop=True) vecm_model = VECM(vecm_df, coint_rank=1) vecm_fit = vecm_model.fit() + assert vecm_fit is not None + # URGENT check beta and alpha # Check if the model converged properly @@ -94,18 +121,23 @@ class TradingPair: pass def check_cointegration_johansen(self): + assert self.training_df_ is not None from statsmodels.tsa.vector_ar.vecm import coint_johansen + df = self.training_df_[self.colnames()].reset_index(drop=True) result = coint_johansen(df, det_order=0, k_ar_diff=1) - print(f"{self}: lr1={result.lr1[0]} cvt={result.cvt[0, 1]}.") + print( + f"{self}: lr1={result.lr1[0]} > cvt={result.cvt[0, 1]}? {result.lr1[0] > result.cvt[0, 1]}" + ) is_cointegrated = result.lr1[0] > result.cvt[0, 1] return is_cointegrated - - def check_cointegration(self): + def check_cointegration_engle_granger(self): from statsmodels.tsa.stattools import coint + col1, col2 = self.colnames() + assert self.training_df_ is not None series1 = self.training_df_[col1].reset_index(drop=True) series2 = self.training_df_[col2].reset_index(drop=True) @@ -116,10 +148,10 @@ class TradingPair: print(f"{self}: is_cointegrated={is_cointegrated} pvalue={pvalue}") return is_cointegrated - def train_pair(self) -> bool: - is_cointegrated = self.check_cointegration() - if not is_cointegrated: + is_cointegrated_johansen = self.check_cointegration_johansen() + is_cointegrated_engle_granger = self.check_cointegration_engle_granger() + if not is_cointegrated_johansen and not is_cointegrated_engle_granger: return False pass @@ -127,10 +159,13 @@ class TradingPair: self.fit_VECM() assert self.training_df_ is not None and self.vecm_fit_ is not None diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta - self.training_mu_ = diseq_series.mean().iloc[0] - self.training_std_ = diseq_series.std().iloc[0] + print(diseq_series.shape) + self.training_mu_ = float(diseq_series[0].mean()) + self.training_std_ = float(diseq_series[0].std()) - self.training_df_["dis-equilibrium"] = self.training_df_[self.colnames()] @ self.vecm_fit_.beta + self.training_df_["dis-equilibrium"] = ( + self.training_df_[self.colnames()] @ self.vecm_fit_.beta + ) # Normalize the dis-equilibrium self.training_df_["scaled_dis-equilibrium"] = ( diseq_series - self.training_mu_ @@ -138,7 +173,7 @@ class TradingPair: return True - def predict(self) -> None: + def predict(self) -> pd.DataFrame: assert self.testing_df_ is not None assert self.vecm_fit_ is not None predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_)) @@ -148,23 +183,26 @@ class TradingPair: self.predicted_df_ = pd.merge( self.testing_df_.reset_index(drop=True), - pd.DataFrame(predicted_prices, columns=self.colnames()), + pd.DataFrame( + predicted_prices, columns=pd.Index(self.colnames()), dtype=float + ), left_index=True, right_index=True, suffixes=("", "_pred"), ).dropna() - self.predicted_df_["disequilibrium"] = self.predicted_df_[self.colnames()] @ self.vecm_fit_.beta + self.predicted_df_["disequilibrium"] = ( + self.predicted_df_[self.colnames()] @ self.vecm_fit_.beta + ) self.predicted_df_["scaled_disequilibrium"] = ( - abs(self.predicted_df_["disequilibrium"] - self.training_mu_) / self.training_std_ + abs(self.predicted_df_["disequilibrium"] - self.training_mu_) + / self.training_std_ ) # Reset index to ensure proper indexing self.predicted_df_ = self.predicted_df_.reset_index() return self.predicted_df_ - - def __repr__(self) ->str: + def __repr__(self) -> str: return f"{self.symbol_a_} & {self.symbol_b_}" -