From 50674bd3b82853bd0d54efcf7e088c461c05cf93 Mon Sep 17 00:00:00 2001 From: Oleg Sheynin Date: Thu, 29 May 2025 15:47:56 -0400 Subject: [PATCH] progress --- src/pt_backtest.py | 173 +++++++++++++++++--------------------- src/results.py | 25 +----- src/tools/data_loader.py | 26 ------ src/tools/trading_pair.py | 72 ++++++++++++++-- 4 files changed, 144 insertions(+), 152 deletions(-) diff --git a/src/pt_backtest.py b/src/pt_backtest.py index a2257bc..1d33610 100644 --- a/src/pt_backtest.py +++ b/src/pt_backtest.py @@ -1,3 +1,4 @@ +from abc import ABC, abstractmethod import sys from typing import Any, Dict, List, Optional @@ -9,7 +10,7 @@ import numpy as np from statsmodels.tsa.vector_ar.vecm import VECM from backtest_configs import CRYPTO_CONFIG -from tools.data_loader import load_market_data, transform_dataframe +from tools.data_loader import load_market_data from tools.trading_pair import TradingPair from results import BacktestResult @@ -18,65 +19,70 @@ UNSET_FLOAT: float = sys.float_info.max UNSET_INT: int = sys.maxsize -# # ========================================================================== - CONFIG = CRYPTO_CONFIG # CONFIG = EQT_CONFIG + +trades_columns = [ + "time", + "action", + "symbol", + "price", + "disequilibrium", + "scaled_disequilibrium", + "pair", +] + BacktestResults = BacktestResult(config=CONFIG) +class PairTradingStrategy(ABC): + @abstractmethod + def create_trading_signals(pair: TradingPair, config: Dict) -> pd.DataFrame: + ... + @abstractmethod + def run_pair(pair: TradingPair) -> Optional[pd.DataFrame]: + ... -def create_trading_signals(pair: TradingPair) -> pd.DataFrame: - result_columns = [ - "time", - "action", - "symbol", - "price", - "disequilibrium", - "scaled_disequilibrium", - "pair", - ] - testing_pair_df = pair.testing_df_ - next_values = pair.vecm_fit_.predict(steps=len(testing_pair_df)) +def run_pair(pair: TradingPair) -> Optional[pd.DataFrame]: + pair.get_datasets(training_minutes=CONFIG["training_minutes"]) + try: + is_cointegrated = pair.train_pair() + if not is_cointegrated: + print(f"{pair} IS NOT COINTEGRATED") + return None + except Exception as e: + print(f"{pair}: Training failed: {str(e)}") + return None + + try: + pair.predict() + except Exception as e: + print(f"{pair}: Prediction failed: {str(e)}") + return None + + pair_trades = create_trading_signals(pair=pair, config=CONFIG) + + return pair_trades + + +def create_trading_signals(pair: TradingPair, config: Dict) -> pd.DataFrame: + beta = pair.vecm_fit_.beta colname_a, colname_b = pair.colnames() - # Convert prediction to a DataFrame for readability - predicted_df = pd.DataFrame(next_values, columns=[colname_a, colname_b]) + predicted_df = pair.predicted_df_ - beta = pair.vecm_fit_.beta - - pair_result_df = pd.merge( - testing_pair_df.reset_index(drop=True), - predicted_df, - left_index=True, - right_index=True, - suffixes=("", "_pred"), - ).dropna() - - pair_result_df["disequilibrium"] = pair_result_df[pair.colnames()] @ beta - - pair_result_df["scaled_disequilibrium"] = abs( - pair_result_df["disequilibrium"] - pair.training_mu_ - ) / pair.training_std_ - - - # Reset index to ensure proper indexing - pair_result_df = pair_result_df.reset_index() + open_threshold = config["dis-equilibrium_open_trshld"] + close_threshold = config["dis-equilibrium_close_trshld"] # Iterate through the testing dataset to find the first trading opportunity open_row_index = None - initial_abs_term = None - - open_threshold = CONFIG["dis-equilibrium_open_trshld"] - close_threshold = CONFIG["dis-equilibrium_close_trshld"] - for row_idx in range(len(pair_result_df)): - curr_disequilibrium = pair_result_df["scaled_disequilibrium"][row_idx] + for row_idx in range(len(predicted_df)): + curr_disequilibrium = predicted_df["scaled_disequilibrium"][row_idx] # Check if current row has sufficient disequilibrium (not near-zero) if curr_disequilibrium >= open_threshold: open_row_index = row_idx - initial_abs_term = curr_disequilibrium break # If no row with sufficient disequilibrium found, skip this pair @@ -85,7 +91,9 @@ def create_trading_signals(pair: TradingPair) -> pd.DataFrame: return pd.DataFrame() # Look for close signal starting from the open position - trading_signals_df = (pair_result_df["scaled_disequilibrium"][open_row_index:] < close_threshold) + trading_signals_df = ( + predicted_df["scaled_disequilibrium"][open_row_index:] < close_threshold + ) # Adjust indices to account for the offset from open_row_index close_row_index = None @@ -94,7 +102,7 @@ def create_trading_signals(pair: TradingPair) -> pd.DataFrame: close_row_index = idx break - open_row = pair_result_df.loc[open_row_index] + open_row = predicted_df.loc[open_row_index] open_tstamp = open_row["tstamp"] open_disequilibrium = open_row["disequilibrium"] open_scaled_disequilibrium = open_row["scaled_disequilibrium"] @@ -102,8 +110,8 @@ def create_trading_signals(pair: TradingPair) -> pd.DataFrame: open_px_b = open_row[f"{colname_b}"] abs_beta = abs(beta[1]) - pred_px_b = pair_result_df.loc[open_row_index][f"{colname_b}_pred"] - pred_px_a = pair_result_df.loc[open_row_index][f"{colname_a}_pred"] + pred_px_b = predicted_df.loc[open_row_index][f"{colname_b}_pred"] + pred_px_a = predicted_df.loc[open_row_index][f"{colname_a}_pred"] if pred_px_b * abs_beta - pred_px_a > 0: open_side_a = "BUY" @@ -119,21 +127,18 @@ def create_trading_signals(pair: TradingPair) -> pd.DataFrame: # If no close signal found, print position and unrealized PnL if close_row_index is None: - last_row_index = len(pair_result_df) - 1 + last_row_index = len(predicted_df) - 1 # Use the new method from BacktestResult to handle outstanding positions BacktestResults.handle_outstanding_position( pair=pair, - pair_result_df=pair_result_df, + pair_result_df=predicted_df, last_row_index=last_row_index, open_side_a=open_side_a, open_side_b=open_side_b, open_px_a=open_px_a, open_px_b=open_px_b, open_tstamp=open_tstamp, - initial_abs_term=initial_abs_term, - colname_a=colname_a, - colname_b=colname_b ) # Return only open trades (no close trades) @@ -159,7 +164,7 @@ def create_trading_signals(pair: TradingPair) -> pd.DataFrame: ] else: # Close signal found - create complete trade - close_row = pair_result_df.loc[close_row_index] + close_row = predicted_df.loc[close_row_index] close_tstamp = close_row["tstamp"] close_disequilibrium = close_row["disequilibrium"] close_scaled_disequilibrium = close_row["scaled_disequilibrium"] @@ -210,56 +215,35 @@ def create_trading_signals(pair: TradingPair) -> pd.DataFrame: # Add tuples to data frame return pd.DataFrame( trd_signal_tuples, - columns=result_columns, + columns=trades_columns, ) -def run_single_pair( - pair: TradingPair, market_data: pd.DataFrame, price_column: str -) -> Optional[pd.DataFrame]: - pair.get_datasets( - market_data=market_data, training_minutes=CONFIG["training_minutes"] - ) - try: - is_cointegrated = pair.train_pair() - if not is_cointegrated: - print(f"{pair} IS NOT COINTEGRATED") - return None - except Exception as e: - print(f"{pair}: Training failed: {str(e)}") - return None - - try: - pair_trades = create_trading_signals( - pair=pair, - ) - except Exception as e: - print(f"{pair}: Prediction failed: {str(e)}") - return None - - return pair_trades - - -def run_pairs(config: Dict, market_data_df: pd.DataFrame, price_column: str) -> None: +def run_all_pairs(config: Dict, datafile: str, price_column: str) -> None: def _create_pairs(config: Dict) -> List[TradingPair]: + nonlocal datafile instruments = config["instruments"] all_indexes = range(len(instruments)) unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j] pairs = [] + market_data_df = load_market_data( + f'{config["data_directory"]}/{datafile}', config=CONFIG + ) for a_index, b_index in unique_index_pairs: - symbol_a = instruments[a_index] - symbol_b = instruments[b_index] - pair = TradingPair(symbol_a, symbol_b, price_column) + pair = TradingPair( + market_data=market_data_df, + symbol_a=instruments[a_index], + symbol_b=instruments[b_index], + price_column=price_column, + ) pairs.append(pair) return pairs - + pairs_trades = [] for pair in _create_pairs(config): - single_pair_trades = run_single_pair( - market_data=market_data_df, price_column=price_column, pair=pair - ) + single_pair_trades = run_pair(pair=pair) if single_pair_trades is not None and len(single_pair_trades) > 0: pairs_trades.append(single_pair_trades) # Check if result_list has any data before concatenating @@ -275,7 +259,7 @@ def run_pairs(config: Dict, market_data_df: pd.DataFrame, price_column: str) -> # BacktestResults.print_single_day_results() -if __name__ == "__main__": +def main() -> None: # Initialize a dictionary to store all trade results all_results: Dict[str, Dict[str, Any]] = {} @@ -291,13 +275,9 @@ if __name__ == "__main__": # Process data for this file try: - market_data_df = load_market_data( - f'{CONFIG["data_directory"]}/{datafile}', config=CONFIG + run_all_pairs( + config=CONFIG, datafile=datafile, price_column=price_column ) - market_data_df = transform_dataframe( - df=market_data_df, price_column=price_column - ) - run_pairs(config=CONFIG, market_data_df=market_data_df, price_column=price_column) # Store results with file name as key filename = datafile.split("/")[-1] @@ -315,3 +295,6 @@ if __name__ == "__main__": # Print grand totals BacktestResults.print_grand_totals() BacktestResults.print_outstanding_positions() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/results.py b/src/results.py index 5d55450..231abea 100644 --- a/src/results.py +++ b/src/results.py @@ -188,15 +188,9 @@ class BacktestResult: f" {pos['open_px_b']:<8.2f}" f" {pos['current_px_b']:<10.2f}" f" {pos['current_value_b']:<12.2f}" - f" {'':<15}" ) # Print pair totals with disequilibrium info - disequilibrium_status = ( - "CLOSE" - if pos["current_abs_term"] < pos["closing_threshold"] - else f"{pos['disequilibrium_ratio']:.2f}x" - ) print( f"{'':<15}" f" {'PAIR TOTAL':<10}" @@ -205,7 +199,6 @@ class BacktestResult: f" {'':<8}" f" {'':<10}" f" {pos['total_current_value']:<12.2f}" - f" {disequilibrium_status:<15}" ) # Print disequilibrium details @@ -220,16 +213,6 @@ class BacktestResult: f" Scaled: {pos['current_scaled_disequilibrium']:<6.4f}" ) - print( - f"{'':<15}" - f" {'THRESHOLD':<10}" - f" {'':<4}" - f" {'':<10}" - f" {'':<8}" - f" {'':<10}" - f" Close: {pos['closing_threshold']:<6.4f}" - f" Ratio: {pos['disequilibrium_ratio']:<6.2f}" - ) print("-" * 100) total_value += pos["total_current_value"] @@ -243,7 +226,7 @@ class BacktestResult: def handle_outstanding_position(self, pair, pair_result_df, last_row_index, open_side_a, open_side_b, open_px_a, open_px_b, - open_tstamp, initial_abs_term, colname_a, colname_b): + open_tstamp): """ Handle calculation and tracking of outstanding positions when no close signal is found. @@ -254,11 +237,10 @@ class BacktestResult: open_side_a, open_side_b: Trading sides for symbols A and B open_px_a, open_px_b: Opening prices for symbols A and B open_tstamp: Opening timestamp - initial_abs_term: Initial absolute disequilibrium term - colname_a, colname_b: Column names for the price data """ last_row = pair_result_df.loc[last_row_index] last_tstamp = last_row["tstamp"] + colname_a, colname_b = pair.colnames() last_px_a = last_row[colname_a] last_px_b = last_row[colname_b] @@ -296,12 +278,9 @@ class BacktestResult: "total_current_value": total_current_value, "open_time": open_tstamp, "last_time": last_tstamp, - "initial_abs_term": initial_abs_term, "current_abs_term": current_scaled_disequilibrium, "current_disequilibrium": current_disequilibrium, "current_scaled_disequilibrium": current_scaled_disequilibrium, - "closing_threshold": initial_abs_term / self.config["dis-equilibrium_close_trshld"], - "disequilibrium_ratio": current_scaled_disequilibrium / (initial_abs_term / self.config["dis-equilibrium_close_trshld"]), } ) diff --git a/src/tools/data_loader.py b/src/tools/data_loader.py index b40bcb2..c9609e1 100644 --- a/src/tools/data_loader.py +++ b/src/tools/data_loader.py @@ -91,32 +91,6 @@ def load_market_data(datafile: str, config: Dict) -> pd.DataFrame: return df -def transform_dataframe(df: pd.DataFrame, price_column: str): - # Select only the columns we need - df_selected = df[["tstamp", "symbol", price_column]] - - # Start with unique timestamps - result_df: pd.DataFrame = pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True) - - # For each unique symbol, add a corresponding close price column - for symbol in df_selected["symbol"].unique(): - # Filter rows for this symbol - df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True) - - # Create column name like "close-COIN" - new_price_column = f"{price_column}_{symbol}" - - # Create temporary dataframe with timestamp and price - temp_df = pd.DataFrame({ - "tstamp": df_symbol["tstamp"], - new_price_column: df_symbol[price_column] - }) - - # Join with our result dataframe - result_df = pd.merge(result_df, temp_df, on="tstamp", how="left") - result_df = result_df.reset_index(drop=True) # do not dropna() since irrelevant symbol would affect dataset - - return result_df # if __name__ == "__main__": diff --git a/src/tools/trading_pair.py b/src/tools/trading_pair.py index 2c749e2..e7cba3d 100644 --- a/src/tools/trading_pair.py +++ b/src/tools/trading_pair.py @@ -4,6 +4,7 @@ import pandas as pd from statsmodels.tsa.vector_ar.vecm import VECM class TradingPair: + market_data_: pd.DataFrame symbol_a_: str symbol_b_: str price_column_: str @@ -11,29 +12,59 @@ class TradingPair: training_mu_: Optional[float] training_std_: Optional[float] - original_df_: Optional[pd.DataFrame] training_df_: Optional[pd.DataFrame] testing_df_: Optional[pd.DataFrame] vecm_fit_: Optional[VECM] - def __init__(self, symbol_a: str, symbol_b: str, price_column: str): + def __init__(self, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str): self.symbol_a_ = symbol_a self.symbol_b_ = symbol_b self.price_column_ = price_column + self.market_data_ = self._transform_dataframe(market_data)[["tstamp"] + self.colnames()] + + self.training_mu_ = None self.training_std_ = None - self.original_df_ = None self.training_df_ = None self.testing_df_ = None self.vecm_fit_ = None - def get_datasets(self, market_data: pd.DataFrame, training_minutes: int) -> None: - self.original_df_ = market_data[["tstamp"] + self.colnames()] - self.training_df_ = market_data.iloc[:training_minutes - 1, :].copy() + def _transform_dataframe(self, df: pd.DataFrame): + # Select only the columns we need + df_selected = df[["tstamp", "symbol", self.price_column_]] + + # Start with unique timestamps + result_df: pd.DataFrame = pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True) + + # For each unique symbol, add a corresponding close price column + for symbol in df_selected["symbol"].unique(): + # Filter rows for this symbol + df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True) + + # Create column name like "close-COIN" + new_price_column = f"{self.price_column_}_{symbol}" + + # Create temporary dataframe with timestamp and price + temp_df = pd.DataFrame({ + "tstamp": df_symbol["tstamp"], + new_price_column: df_symbol[self.price_column_] + }) + + # Join with our result dataframe + result_df = pd.merge(result_df, temp_df, on="tstamp", how="left") + result_df = result_df.reset_index(drop=True) # do not dropna() since irrelevant symbol would affect dataset + + return result_df + def get_datasets(self, training_minutes: int, training_start_index: int = 0, testing_size: Optional[int] = None) -> None: + self.training_df_ = self.market_data_.iloc[training_start_index:training_minutes - 1, :].copy() self.training_df_ = self.training_df_.dropna().reset_index(drop=True) - self.testing_df_ = market_data.iloc[training_minutes:, :].copy() + testing_start_index = training_start_index + training_minutes + if testing_size is None: + self.testing_df_ = self.market_data_.iloc[testing_start_index:, :].copy() + else: + self.testing_df_ = self.market_data_.iloc[testing_start_index:testing_start_index + testing_size, :].copy() self.testing_df_ = self.testing_df_.dropna().reset_index(drop=True) def colnames(self) -> List[str]: @@ -70,7 +101,7 @@ class TradingPair: return False pass - print(f"*****\n**************** {self} IS COINTEGRATED ****************\n*****") + print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80) self.fit_VECM() diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta self.training_mu_ = diseq_series.mean().iloc[0] @@ -84,6 +115,31 @@ class TradingPair: return True + def predict(self) -> None: + predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_)) + + # Convert prediction to a DataFrame for readability + # predicted_df = + + self.predicted_df_ = pd.merge( + self.testing_df_.reset_index(drop=True), + pd.DataFrame(predicted_prices, columns=self.colnames()), + left_index=True, + right_index=True, + suffixes=("", "_pred"), + ).dropna() + + self.predicted_df_["disequilibrium"] = self.predicted_df_[self.colnames()] @ self.vecm_fit_.beta + + self.predicted_df_["scaled_disequilibrium"] = ( + abs(self.predicted_df_["disequilibrium"] - self.training_mu_) / self.training_std_ + ) + + # Reset index to ensure proper indexing + self.predicted_df_ = self.predicted_df_.reset_index() + return self.predicted_df_ + + def __repr__(self) ->str: return f"{self.symbol_a_} & {self.symbol_b_}"