diff --git a/src/pt_backtest.py b/src/pt_backtest.py index 2e11a90..c1167e7 100644 --- a/src/pt_backtest.py +++ b/src/pt_backtest.py @@ -8,7 +8,7 @@ import numpy as np # ============= statsmodels =================== from statsmodels.tsa.vector_ar.vecm import VECM -from tools.data_loader import get_datasets, load_market_data, transform_dataframe +from tools.data_loader import load_market_data, transform_dataframe from tools.trading_pair import TradingPair from results import BacktestResult @@ -88,8 +88,8 @@ EQT_CONFIG: Dict = { "price_column": "close", "min_required_points": 30, "zero_threshold": 1e-10, - "disequilibrium_open_trshld": 5.0, - "disequilibrium_close_trshld": 1.0, + "disequilibrium_open_trshld": 2.0, + "disequilibrium_close_trshld": 0.5, "training_minutes": 120, # ----- Validation "funding_per_pair": 2000.0, @@ -104,23 +104,7 @@ CONFIG = EQT_CONFIG BacktestResults = BacktestResult(config=CONFIG) -def fit_VECM(training_pair_df, pair: TradingPair): - vecm_model = VECM( - training_pair_df[pair.colnames()].reset_index(drop=True), coint_rank=1 - ) - vecm_fit = vecm_model.fit() - - # Check if the model converged properly - if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None: - print(f"{pair}: VECM model failed to converge properly") - - return vecm_fit - - - -def create_trading_signals( - vecm_fit, testing_pair_df, pair: TradingPair -) -> pd.DataFrame: +def create_trading_signals(pair: TradingPair) -> pd.DataFrame: result_columns = [ "time", "action", @@ -131,13 +115,14 @@ def create_trading_signals( "pair", ] - next_values = vecm_fit.predict(steps=len(testing_pair_df)) + testing_pair_df = pair.testing_df_ + next_values = pair.vecm_fit_.predict(steps=len(testing_pair_df)) colname_a, colname_b = pair.colnames() # Convert prediction to a DataFrame for readability predicted_df = pd.DataFrame(next_values, columns=[colname_a, colname_b]) - beta = vecm_fit.beta + beta = pair.vecm_fit_.beta pair_result_df = pd.merge( testing_pair_df.reset_index(drop=True), @@ -149,12 +134,9 @@ def create_trading_signals( pair_result_df["disequilibrium"] = pair_result_df[pair.colnames()] @ beta - pair_mu = pair.disequilibrium_mu_ - pair_std = pair.disequilibrium_std_ - pair_result_df["scaled_disequilibrium"] = abs( - pair_result_df["disequilibrium"] - pair_mu - ) / pair_std + pair_result_df["disequilibrium"] - pair.training_mu_ + ) / pair.training_std_ # Reset index to ensure proper indexing @@ -311,54 +293,19 @@ def create_trading_signals( def run_single_pair( - market_data: pd.DataFrame, price_column: str, pair: TradingPair + pair: TradingPair, market_data: pd.DataFrame, price_column: str ) -> Optional[pd.DataFrame]: - training_pair_df, testing_pair_df = get_datasets( - df=market_data, training_minutes=CONFIG["training_minutes"], pair=pair + pair.get_datasets( + market_data=market_data, training_minutes=CONFIG["training_minutes"] ) - - # Check if we have enough data points for a meaningful analysis - min_required_points = CONFIG[ - "min_required_points" - ] # Minimum number of points for a reasonable VECM model - if len(training_pair_df) < min_required_points: - print( - f"{pair}: Not enough data points for analysis. Found {len(training_pair_df)}, need at least {min_required_points}" - ) - return None - - # Check for non-finite values - if not np.isfinite(training_pair_df).all().all(): - print(f"{pair}: Data contains non-finite values (NaN or inf)") - return None - - # Fit the VECM try: - vecm_fit = fit_VECM(training_pair_df, pair=pair) + pair.train_pair() except Exception as e: - print(f"{pair}: VECM fitting failed: {str(e)}") + print(f"{pair}: Training failed: {str(e)}") return None - # Add safeguard against division by zero - if ( - abs(vecm_fit.beta[1]) < CONFIG["zero_threshold"] - ): # Small threshold to avoid division by very small numbers - print(f"{pair}: Skipping due to near-zero beta[1] value: {vecm_fit.beta[1]}") - return None - diseqlbrm_series = training_pair_df[pair.colnames()] @ vecm_fit.beta - diseqlbrm_series_mu: float = diseqlbrm_series.mean().iloc[0] - diseqlbrm_series_std: float = diseqlbrm_series.std().iloc[0] - pair.set_training_disequilibrium(diseqlbrm_series_mu, diseqlbrm_series_std) - - # Normalize the disequilibrium - training_pair_df["scaled_disequilibrium"] = ( - diseqlbrm_series - diseqlbrm_series_mu - ) / diseqlbrm_series_std - try: pair_trades = create_trading_signals( - vecm_fit=vecm_fit, - testing_pair_df=testing_pair_df, pair=pair, ) except Exception as e: @@ -385,18 +332,10 @@ def run_pairs(config: Dict, market_data_df: pd.DataFrame, price_column: str) -> pairs_trades = [] for pair in _create_pairs(config): - # Get the actual variable names - # colname_a = stock_price_columns[a_index] - # colname_b = stock_price_columns[b_index] - - # symbol_a = colname_a[len(f"{price_column}-") :] - # symbol_b = colname_b[len(f"{price_column}-") :] - # pair = TradingPair(symbol_a, symbol_b, price_column) - single_pair_trades = run_single_pair( market_data=market_data_df, price_column=price_column, pair=pair ) - if len(single_pair_trades) > 0: + if single_pair_trades is not None and len(single_pair_trades) > 0: pairs_trades.append(single_pair_trades) # Check if result_list has any data before concatenating if len(pairs_trades) == 0: @@ -441,10 +380,7 @@ if __name__ == "__main__": print(f"Successfully processed {filename}") - # Print total unrealized PnL for this file - print( - f"\n====== TOTAL UNREALIZED PnL for {filename}: {BacktestResults.get_total_unrealized_pnl():.2f}% ======" - ) + # No longer printing unrealized PnL since we removed that functionality except Exception as e: print(f"Error processing {datafile}: {str(e)}") diff --git a/src/results.py b/src/results.py index e5623ea..7398eb7 100644 --- a/src/results.py +++ b/src/results.py @@ -10,7 +10,6 @@ class BacktestResult: def __init__(self, config: Dict[str, Any]): self.config = config self.trades: Dict[str, Dict[str, Any]] = {} - self.total_unrealized_pnl = 0.0 self.total_realized_pnl = 0.0 self.outstanding_positions: List[Dict[str, Any]] = [] @@ -24,10 +23,6 @@ class BacktestResult: self.trades[pair_nm][symbol] = [] self.trades[pair_nm][symbol].append((action, price)) - def add_unrealized_pnl(self, unrealized_pnl: float): - """Add unrealized PnL to the total.""" - self.total_unrealized_pnl += unrealized_pnl - def add_outstanding_position(self, position: Dict[str, Any]): """Add an outstanding position to tracking.""" self.outstanding_positions.append(position) @@ -36,10 +31,6 @@ class BacktestResult: """Add realized PnL to the total.""" self.total_realized_pnl += realized_pnl - def get_total_unrealized_pnl(self) -> float: - """Get total unrealized PnL.""" - return self.total_unrealized_pnl - def get_total_realized_pnl(self) -> float: """Get total realized PnL.""" return self.total_realized_pnl @@ -155,7 +146,7 @@ class BacktestResult: self.add_realized_pnl(day_return) def print_outstanding_positions(self): - """Print all outstanding positions with share quantities and unrealized PnL.""" + """Print all outstanding positions with share quantities and current values.""" if not self.get_outstanding_positions(): print("\n====== NO OUTSTANDING POSITIONS ======") return @@ -168,23 +159,36 @@ class BacktestResult: f" {'Shares':<10}" f" {'Open $':<8}" f" {'Current $':<10}" - f" {'Unrealized $':<12}" - f" {'%':<8}" - f" {'Close Eq':<10}" + f" {'Value $':<12}" + f" {'Disequilibrium':<15}" ) - print("-" * 105) + print("-" * 100) - total_unrealized_dollar = 0.0 + total_value = 0.0 for pos in self.get_outstanding_positions(): # Print position A print( - f"{pos['pair']:<15} {pos['symbol_a']:<6} {pos['side_a']:<4} {pos['shares_a']:<10.2f} {pos['open_px_a']:<8.2f} {pos['current_px_a']:<10.2f} {pos['unrealized_dollar_a']:<12.2f} {pos['unrealized_dollar_a']/500*100:<8.2f} {'':<10}" + f"{pos['pair']:<15}" + f" {pos['symbol_a']:<10}" + f" {pos['side_a']:<4}" + f" {pos['shares_a']:<10.2f}" + f" {pos['open_px_a']:<8.2f}" + f" {pos['current_px_a']:<10.2f}" + f" {pos['current_value_a']:<12.2f}" + f" {'':<15}" ) # Print position B print( - f"{'':<15} {pos['symbol_b']:<6} {pos['side_b']:<4} {pos['shares_b']:<10.2f} {pos['open_px_b']:<8.2f} {pos['current_px_b']:<10.2f} {pos['unrealized_dollar_b']:<12.2f} {pos['unrealized_dollar_b']/500*100:<8.2f} {'':<10}" + f"{'':<15}" + f" {pos['symbol_b']:<10}" + f" {pos['side_b']:<4}" + f" {pos['shares_b']:<10.2f}" + f" {pos['open_px_b']:<8.2f}" + f" {pos['current_px_b']:<10.2f}" + f" {pos['current_value_b']:<12.2f}" + f" {'':<15}" ) # Print pair totals with disequilibrium info @@ -194,27 +198,48 @@ class BacktestResult: else f"{pos['disequilibrium_ratio']:.2f}x" ) print( - f"{'':<15} {'PAIR':<6} {'TOT':<4} {'':<10} {'':<8} {'':<10} {pos['total_unrealized_dollar']:<12.2f} {pos['total_unrealized_dollar']/1000*100:<8.2f} {disequilibrium_status:<10}" + f"{'':<15}" + f" {'PAIR TOTAL':<10}" + f" {'':<4}" + f" {'':<10}" + f" {'':<8}" + f" {'':<10}" + f" {pos['total_current_value']:<12.2f}" + f" {disequilibrium_status:<15}" ) # Print disequilibrium details print( - f"{'':<15} {'EQ':<6} {'INFO':<4} {'':<10} {'':<8} {'':<10} {'Curr:':<6}{pos['current_abs_term']:<6.4f} {'Thresh:':<7}{pos['closing_threshold']:<6.4f} {'':<10}" + f"{'':<15}" + f" {'DISEQUIL':<10}" + f" {'':<4}" + f" {'':<10}" + f" {'':<8}" + f" {'':<10}" + f" Raw: {pos['current_disequilibrium']:<6.4f}" + f" Scaled: {pos['current_scaled_disequilibrium']:<6.4f}" ) - print("-" * 105) - total_unrealized_dollar += pos["total_unrealized_dollar"] + print( + f"{'':<15}" + f" {'THRESHOLD':<10}" + f" {'':<4}" + f" {'':<10}" + f" {'':<8}" + f" {'':<10}" + f" Close: {pos['closing_threshold']:<6.4f}" + f" Ratio: {pos['disequilibrium_ratio']:<6.2f}" + ) + print("-" * 100) - print(f"{'TOTAL OUTSTANDING':<80} ${total_unrealized_dollar:<12.2f}") + total_value += pos["total_current_value"] + + print(f"{'TOTAL OUTSTANDING VALUE':<80} ${total_value:<12.2f}") def print_grand_totals(self): """Print grand totals across all pairs.""" print(f"\n====== GRAND TOTALS ACROSS ALL PAIRS ======") print(f"Total Realized PnL: {self.get_total_realized_pnl():.2f}%") - print(f"Total Unrealized PnL: {self.get_total_unrealized_pnl():.2f}%") - print( - f"Combined Total PnL: {self.get_total_realized_pnl() + self.get_total_unrealized_pnl():.2f}%" - ) def handle_outstanding_position(self, pair, pair_result_df, last_row_index, open_side_a, open_side_b, open_px_a, open_px_b, @@ -231,9 +256,6 @@ class BacktestResult: open_tstamp: Opening timestamp initial_abs_term: Initial absolute disequilibrium term colname_a, colname_b: Column names for the price data - - Returns: - tuple: (unrealized_pnl_a, unrealized_pnl_b, unrealized_dollar_a, unrealized_dollar_b) """ last_row = pair_result_df.loc[last_row_index] last_tstamp = last_row["tstamp"] @@ -246,26 +268,14 @@ class BacktestResult: shares_a = funding_per_position / open_px_a shares_b = funding_per_position / open_px_b - # Calculate unrealized PnL for each position - if open_side_a == "BUY": - unrealized_pnl_a = (last_px_a - open_px_a) / open_px_a * 100 - unrealized_dollar_a = shares_a * (last_px_a - open_px_a) - else: # SELL - unrealized_pnl_a = (open_px_a - last_px_a) / open_px_a * 100 - unrealized_dollar_a = shares_a * (open_px_a - last_px_a) + # Calculate current position values (shares * current price) + current_value_a = shares_a * last_px_a + current_value_b = shares_b * last_px_b + total_current_value = current_value_a + current_value_b - if open_side_b == "BUY": - unrealized_pnl_b = (last_px_b - open_px_b) / open_px_b * 100 - unrealized_dollar_b = shares_b * (last_px_b - open_px_b) - else: # SELL - unrealized_pnl_b = (open_px_b - last_px_b) / open_px_b * 100 - unrealized_dollar_b = shares_b * (open_px_b - last_px_b) - - total_unrealized_pnl = unrealized_pnl_a + unrealized_pnl_b - total_unrealized_dollar = unrealized_dollar_a + unrealized_dollar_b - - # Add to global total - self.add_unrealized_pnl(total_unrealized_pnl) + # Get disequilibrium information + current_disequilibrium = last_row["disequilibrium"] + current_scaled_disequilibrium = last_row["scaled_disequilibrium"] # Store outstanding positions self.add_outstanding_position( @@ -281,35 +291,26 @@ class BacktestResult: "open_px_b": open_px_b, "current_px_a": last_px_a, "current_px_b": last_px_b, - "unrealized_dollar_a": unrealized_dollar_a, - "unrealized_dollar_b": unrealized_dollar_b, - "total_unrealized_dollar": total_unrealized_dollar, + "current_value_a": current_value_a, + "current_value_b": current_value_b, + "total_current_value": total_current_value, "open_time": open_tstamp, "last_time": last_tstamp, "initial_abs_term": initial_abs_term, - "current_abs_term": pair_result_df.loc[ - last_row_index, "scaled_disequilibrium" - ], - "closing_threshold": initial_abs_term - / self.config["disequilibrium_close_trshld"], - "disequilibrium_ratio": pair_result_df.loc[ - last_row_index, "scaled_disequilibrium" - ] - / (initial_abs_term / self.config["disequilibrium_close_trshld"]), + "current_abs_term": current_scaled_disequilibrium, + "current_disequilibrium": current_disequilibrium, + "current_scaled_disequilibrium": current_scaled_disequilibrium, + "closing_threshold": initial_abs_term / self.config["disequilibrium_close_trshld"], + "disequilibrium_ratio": current_scaled_disequilibrium / (initial_abs_term / self.config["disequilibrium_close_trshld"]), } ) # Print position details print(f"{pair}: NO CLOSE SIGNAL FOUND - Position held until end of session") print(f" Open: {open_tstamp} | Last: {last_tstamp}") - print( - f" {pair.symbol_a_}: {open_side_a} {shares_a:.2f} shares @ ${open_px_a:.2f} -> ${last_px_a:.2f} | Unrealized: ${unrealized_dollar_a:.2f} ({unrealized_pnl_a:.2f}%)" - ) - print( - f" {pair.symbol_b_}: {open_side_b} {shares_b:.2f} shares @ ${open_px_b:.2f} -> ${last_px_b:.2f} | Unrealized: ${unrealized_dollar_b:.2f} ({unrealized_pnl_b:.2f}%)" - ) - print( - f" Total Unrealized: ${total_unrealized_dollar:.2f} ({total_unrealized_pnl:.2f}%)" - ) + print(f" {pair.symbol_a_}: {open_side_a} {shares_a:.2f} shares @ ${open_px_a:.2f} -> ${last_px_a:.2f} | Value: ${current_value_a:.2f}") + print(f" {pair.symbol_b_}: {open_side_b} {shares_b:.2f} shares @ ${open_px_b:.2f} -> ${last_px_b:.2f} | Value: ${current_value_b:.2f}") + print(f" Total Value: ${total_current_value:.2f}") + print(f" Disequilibrium: {current_disequilibrium:.4f} | Scaled: {current_scaled_disequilibrium:.4f}") - return unrealized_pnl_a, unrealized_pnl_b, unrealized_dollar_a, unrealized_dollar_b \ No newline at end of file + return current_value_a, current_value_b, total_current_value \ No newline at end of file diff --git a/src/tools/data_loader.py b/src/tools/data_loader.py index 144154b..da89693 100644 --- a/src/tools/data_loader.py +++ b/src/tools/data_loader.py @@ -118,20 +118,20 @@ def transform_dataframe(df: pd.DataFrame, price_column: str): return result_df -def get_datasets(df: pd.DataFrame, training_minutes: int, pair: TradingPair) -> Tuple[pd.DataFrame, pd.DataFrame]: - # Training dataset - colname_a, colname_b = pair.colnames() - df = df[["tstamp", colname_a, colname_b]] - df = df.dropna() +# def get_datasets(df: pd.DataFrame, training_minutes: int, pair: TradingPair) -> Tuple[pd.DataFrame, pd.DataFrame]: +# # Training dataset +# colname_a, colname_b = pair.colnames() +# df = df[["tstamp", colname_a, colname_b]] +# df = df.dropna() - training_df = df.iloc[:training_minutes - 1, :].copy() - training_df.reset_index(drop=True).dropna().reset_index(drop=True) +# training_df = df.iloc[:training_minutes - 1, :].copy() +# training_df.reset_index(drop=True).dropna().reset_index(drop=True) - # Testing dataset - testing_df = df.iloc[training_minutes:, :].copy() - testing_df.reset_index(drop=True).dropna().reset_index(drop=True) +# # Testing dataset +# testing_df = df.iloc[training_minutes:, :].copy() +# testing_df.reset_index(drop=True).dropna().reset_index(drop=True) - return (training_df, testing_df) +# return (training_df, testing_df) if __name__ == "__main__": diff --git a/src/tools/trading_pair.py b/src/tools/trading_pair.py index 2818123..80b3b7a 100644 --- a/src/tools/trading_pair.py +++ b/src/tools/trading_pair.py @@ -1,36 +1,78 @@ from typing import List, Optional - +import pandas as pd +from statsmodels.tsa.vector_ar.vecm import VECM class TradingPair: symbol_a_: str symbol_b_: str price_column_: str - disequilibrium_mu_: Optional[float] - disequilibrium_std_: Optional[float] + training_mu_: Optional[float] + training_std_: Optional[float] + + original_df_: Optional[pd.DataFrame] + training_df_: Optional[pd.DataFrame] + testing_df_: Optional[pd.DataFrame] + + vecm_fit_: Optional[VECM] def __init__(self, symbol_a: str, symbol_b: str, price_column: str): self.symbol_a_ = symbol_a self.symbol_b_ = symbol_b self.price_column_ = price_column - self.disequilibrium_mu_ = None - self.disequilibrium_std_ = None + self.training_mu_ = None + self.training_std_ = None + self.original_df_ = None + self.training_df_ = None + self.testing_df_ = None + self.vecm_fit_ = None + + def get_datasets(self, market_data: pd.DataFrame, training_minutes: int) -> None: + self.original_df_ = market_data[["tstamp"] + self.colnames()] + self.training_df_ = market_data.iloc[:training_minutes - 1, :].copy() + self.training_df_ = self.training_df_.dropna().reset_index(drop=True) + + self.testing_df_ = market_data.iloc[training_minutes:, :].copy() + self.testing_df_ = self.testing_df_.dropna().reset_index(drop=True) def colnames(self) -> List[str]: return [f"{self.price_column_}_{self.symbol_a_}", f"{self.price_column_}_{self.symbol_b_}"] - def set_training_disequilibrium(self, disequilibrium_mu: float, disequilibrium_std: float): - self.disequilibrium_mu_ = disequilibrium_mu - self.disequilibrium_std_ = disequilibrium_std + def fit_VECM(self): + vecm_df = self.training_df_[self.colnames()].reset_index(drop=True) + vecm_model = VECM(vecm_df, coint_rank=1) + vecm_fit = vecm_model.fit() - def mu(self) -> float: - assert self.disequilibrium_mu_ is not None - return self.disequilibrium_mu_ + # URGENT check beta and alpha - def std(self) -> float: - assert self.disequilibrium_std_ is not None - return self.disequilibrium_std_ + # Check if the model converged properly + if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None: + print(f"{self}: VECM model failed to converge properly") + + self.vecm_fit_ = vecm_fit + + + def train_pair(self): + self.fit_VECM() + diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta + self.training_mu_ = diseq_series.mean().iloc[0] + self.training_std_ = diseq_series.std().iloc[0] + + self.training_df_["disequilibrium"] = self.training_df_[self.colnames()] @ self.vecm_fit_.beta + # Normalize the disequilibrium + self.training_df_["scaled_disequilibrium"] = ( + diseq_series - self.training_mu_ + ) / self.training_std_ + + + # def mu(self) -> float: + # assert self.training_mu_ is not None + # return self.training_mu_ + + # def std(self) -> float: + # assert self.training_std_ is not None + # return self.training_std_ def __repr__(self) ->str: return f"{self.symbol_a_} & {self.symbol_b_}"