pairs_trading/src/pt_backtest.py
2025-05-29 02:47:38 -04:00

318 lines
9.9 KiB
Python

import sys
from typing import Any, Dict, List, Optional
import pandas as pd
import numpy as np
# ============= statsmodels ===================
from statsmodels.tsa.vector_ar.vecm import VECM
from backtest_configs import CRYPTO_CONFIG
from tools.data_loader import load_market_data, transform_dataframe
from tools.trading_pair import TradingPair
from results import BacktestResult
NanoPerMin = 1e9
UNSET_FLOAT: float = sys.float_info.max
UNSET_INT: int = sys.maxsize
# # ==========================================================================
CONFIG = CRYPTO_CONFIG
# CONFIG = EQT_CONFIG
BacktestResults = BacktestResult(config=CONFIG)
def create_trading_signals(pair: TradingPair) -> pd.DataFrame:
result_columns = [
"time",
"action",
"symbol",
"price",
"disequilibrium",
"scaled_disequilibrium",
"pair",
]
testing_pair_df = pair.testing_df_
next_values = pair.vecm_fit_.predict(steps=len(testing_pair_df))
colname_a, colname_b = pair.colnames()
# Convert prediction to a DataFrame for readability
predicted_df = pd.DataFrame(next_values, columns=[colname_a, colname_b])
beta = pair.vecm_fit_.beta
pair_result_df = pd.merge(
testing_pair_df.reset_index(drop=True),
predicted_df,
left_index=True,
right_index=True,
suffixes=("", "_pred"),
).dropna()
pair_result_df["disequilibrium"] = pair_result_df[pair.colnames()] @ beta
pair_result_df["scaled_disequilibrium"] = abs(
pair_result_df["disequilibrium"] - pair.training_mu_
) / pair.training_std_
# Reset index to ensure proper indexing
pair_result_df = pair_result_df.reset_index()
# Iterate through the testing dataset to find the first trading opportunity
open_row_index = None
initial_abs_term = None
open_threshold = CONFIG["dis-equilibrium_open_trshld"]
close_threshold = CONFIG["dis-equilibrium_close_trshld"]
for row_idx in range(len(pair_result_df)):
curr_disequilibrium = pair_result_df["scaled_disequilibrium"][row_idx]
# Check if current row has sufficient disequilibrium (not near-zero)
if curr_disequilibrium >= open_threshold:
open_row_index = row_idx
initial_abs_term = curr_disequilibrium
break
# If no row with sufficient disequilibrium found, skip this pair
if open_row_index is None:
print(f"{pair}: Insufficient disequilibrium in testing dataset. Skipping.")
return pd.DataFrame()
# Look for close signal starting from the open position
trading_signals_df = (pair_result_df["scaled_disequilibrium"][open_row_index:] < close_threshold)
# Adjust indices to account for the offset from open_row_index
close_row_index = None
for idx, value in trading_signals_df.items():
if value:
close_row_index = idx
break
open_row = pair_result_df.loc[open_row_index]
open_tstamp = open_row["tstamp"]
open_disequilibrium = open_row["disequilibrium"]
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"]
abs_beta = abs(beta[1])
pred_px_b = pair_result_df.loc[open_row_index][f"{colname_b}_pred"]
pred_px_a = pair_result_df.loc[open_row_index][f"{colname_a}_pred"]
if pred_px_b * abs_beta - pred_px_a > 0:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
else:
open_side_b = "BUY"
open_side_a = "SELL"
close_side_b = "SELL"
close_side_a = "BUY"
# If no close signal found, print position and unrealized PnL
if close_row_index is None:
last_row_index = len(pair_result_df) - 1
# Use the new method from BacktestResult to handle outstanding positions
BacktestResults.handle_outstanding_position(
pair=pair,
pair_result_df=pair_result_df,
last_row_index=last_row_index,
open_side_a=open_side_a,
open_side_b=open_side_b,
open_px_a=open_px_a,
open_px_b=open_px_b,
open_tstamp=open_tstamp,
initial_abs_term=initial_abs_term,
colname_a=colname_a,
colname_b=colname_b
)
# Return only open trades (no close trades)
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
]
else:
# Close signal found - create complete trade
close_row = pair_result_df.loc[close_row_index]
close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"]
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
print(f"{pair}: Close signal found at index {close_row_index}")
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
close_tstamp,
close_side_a,
pair.symbol_a_,
close_px_a,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
(
close_tstamp,
close_side_b,
pair.symbol_b_,
close_px_b,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame
return pd.DataFrame(
trd_signal_tuples,
columns=result_columns,
)
def run_single_pair(
pair: TradingPair, market_data: pd.DataFrame, price_column: str
) -> Optional[pd.DataFrame]:
pair.get_datasets(
market_data=market_data, training_minutes=CONFIG["training_minutes"]
)
try:
is_cointegrated = pair.train_pair()
if not is_cointegrated:
print(f"{pair} IS NOT COINTEGRATED")
return None
except Exception as e:
print(f"{pair}: Training failed: {str(e)}")
return None
try:
pair_trades = create_trading_signals(
pair=pair,
)
except Exception as e:
print(f"{pair}: Prediction failed: {str(e)}")
return None
return pair_trades
def run_pairs(config: Dict, market_data_df: pd.DataFrame, price_column: str) -> None:
def _create_pairs(config: Dict) -> List[TradingPair]:
instruments = config["instruments"]
all_indexes = range(len(instruments))
unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j]
pairs = []
for a_index, b_index in unique_index_pairs:
symbol_a = instruments[a_index]
symbol_b = instruments[b_index]
pair = TradingPair(symbol_a, symbol_b, price_column)
pairs.append(pair)
return pairs
pairs_trades = []
for pair in _create_pairs(config):
single_pair_trades = run_single_pair(
market_data=market_data_df, price_column=price_column, pair=pair
)
if single_pair_trades is not None and len(single_pair_trades) > 0:
pairs_trades.append(single_pair_trades)
# Check if result_list has any data before concatenating
if len(pairs_trades) == 0:
print("No trading signals found for any pairs")
return None
result = pd.concat(pairs_trades, ignore_index=True)
result["time"] = pd.to_datetime(result["time"])
result = result.set_index("time").sort_index()
BacktestResults.collect_single_day_results(result)
# BacktestResults.print_single_day_results()
if __name__ == "__main__":
# Initialize a dictionary to store all trade results
all_results: Dict[str, Dict[str, Any]] = {}
# Initialize global PnL tracking variables
# Process each data file
price_column = CONFIG["price_column"]
for datafile in CONFIG["datafiles"]:
print(f"\n====== Processing {datafile} ======")
# Clear the TRADES global dictionary and reset unrealized PnL for the new file
BacktestResults.clear_trades()
# Process data for this file
try:
market_data_df = load_market_data(
f'{CONFIG["data_directory"]}/{datafile}', config=CONFIG
)
market_data_df = transform_dataframe(
df=market_data_df, price_column=price_column
)
run_pairs(config=CONFIG, market_data_df=market_data_df, price_column=price_column)
# Store results with file name as key
filename = datafile.split("/")[-1]
all_results[filename] = {"trades": BacktestResults.trades.copy()}
print(f"Successfully processed {filename}")
# No longer printing unrealized PnL since we removed that functionality
except Exception as e:
print(f"Error processing {datafile}: {str(e)}")
# BacktestResults.print_results_summary(all_results)
BacktestResults.calculate_returns(all_results)
# Print grand totals
BacktestResults.print_grand_totals()
BacktestResults.print_outstanding_positions()