pairs_trading/src/pt_backtest.py
2025-05-27 15:26:02 -04:00

733 lines
25 KiB
Python

import datetime
import sys
import json
from typing import Any, Dict, List, Tuple, Optional
import pandas as pd
import numpy as np
# ============= statsmodels ===================
from statsmodels.tsa.vector_ar.vecm import VECM
NanoPerMin = 1e9
UNSET_FLOAT: float = sys.float_info.max
UNSET_INT: int = sys.maxsize
# ------------------------ Configuration ------------------------
# Default configuration
CRYPTO_CONFIG: Dict = {
# --- Data retrieval
"data_directory": "./data/crypto",
"datafiles": [
"20250519.mktdata.ohlcv.db",
],
"db_table_name": "bnbspot_ohlcv_1min",
# ----- Instruments
"exchange_id": "BNBSPOT",
"instrument_id_pfx": "PAIR-",
"instruments": [
"BTC-USDT",
"ETH-USDT",
"LTC-USDT",
],
"trading_hours": {
"begin_session": "00:00:00",
"end_session": "23:59:00",
"timezone": "UTC"
},
# ----- Model Settings
"price_column": "close",
"min_required_points": 30,
"zero_threshold": 1e-10,
"equilibrium_threshold_open": 5.0,
"equilibrium_threshold_close": 1.0,
"training_minutes": 120,
# ----- Validation
"funding_per_pair": 2000.0, # USD
}
# ========================== EQUITIES
EQT_CONFIG: Dict = {
# --- Data retrieval
"data_directory": "./data/equity",
"datafiles": [
"20250508.alpaca_sim_md.db",
# "20250509.alpaca_sim_md.db",
# "20250512.alpaca_sim_md.db",
# "20250513.alpaca_sim_md.db",
# "20250514.alpaca_sim_md.db",
# "20250515.alpaca_sim_md.db",
# "20250516.alpaca_sim_md.db",
# "20250519.alpaca_sim_md.db",
# "20250520.alpaca_sim_md.db"
],
"db_table_name": "md_1min_bars",
# ----- Instruments
"exchange_id": "ALPACA",
"instrument_id_pfx": "STOCK-",
"instruments": [
"COIN",
"GBTC",
"HOOD",
"MSTR",
"PYPL",
],
"trading_hours": {
"begin_session": "9:30:00",
"end_session": "16:00:00",
"timezone": "America/New_York"
},
# ----- Model Settings
"price_column": "close",
"min_required_points": 30,
"zero_threshold": 1e-10,
"equilibrium_threshold_open": 5.0,
"equilibrium_threshold_close": 1.0,
"training_minutes": 120,
# ----- Validation
"funding_per_pair": 2000.0,
}
# ==========================================================================
CONFIG = EQT_CONFIG
TRADES = {}
TOTAL_UNREALIZED_PNL = 0.0 # Global variable to track total unrealized PnL
TOTAL_REALIZED_PNL = 0.0 # Global variable to track total realized PnL
OUTSTANDING_POSITIONS = [] # Global list to track outstanding positions with share quantities
class TradingPair:
symbol_a_: str
symbol_b_: str
price_column_: str
def __init__(self, symbol_a: str, symbol_b: str, price_column: str):
self.symbol_a_ = symbol_a
self.symbol_b_ = symbol_b
self.price_column_ = price_column
def colnames(self) -> List[str]:
return [f"{self.price_column_}_{self.symbol_a_}", f"{self.price_column_}_{self.symbol_b_}"]
def __repr__(self) ->str:
return f"{self.symbol_a_} & {self.symbol_b_}"
def convert_time_to_UTC(value: str, timezone: str):
from zoneinfo import ZoneInfo
from datetime import datetime
# Parse it to naive datetime object
local_dt = datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
zinfo = ZoneInfo(timezone)
result = local_dt.replace(tzinfo=zinfo)
result = result.astimezone(ZoneInfo('UTC'))
result = result.strftime('%Y-%m-%d %H:%M:%S')
return result
pass
def load_market_data(datafile: str, config: Dict) -> pd.DataFrame:
from tools.data_loader import load_sqlite_to_dataframe
instrument_ids = ["\"" + config["instrument_id_pfx"] + instrument + "\"" for instrument in config["instruments"]]
exchange_id = config["exchange_id"]
query = "select tstamp"
query += ", tstamp_ns as time_ns"
query += ", substr(instrument_id, 7) as symbol"
query += ", open"
query += ", high"
query += ", low"
query += ", close"
query += ", volume"
query += ", num_trades"
query += ", vwap"
query += f" from {config['db_table_name']}"
query += f" where exchange_id ='{exchange_id}'"
query += f" and instrument_id in ({','.join(instrument_ids)})"
df = load_sqlite_to_dataframe(db_path=datafile, query=query)
# Trading Hours
date_str = df["tstamp"][0][0:10]
trading_hours = CONFIG['trading_hours']
start_time = f"{date_str} {trading_hours['begin_session']}"
end_time = f"{date_str} {trading_hours['end_session']}"
start_time = convert_time_to_UTC(start_time, trading_hours["timezone"])
end_time = convert_time_to_UTC(end_time, trading_hours["timezone"])
# Perform boolean selection
df = df[(df["tstamp"] >= start_time) & (df["tstamp"] <= end_time)]
df["tstamp"] = pd.to_datetime(df["tstamp"])
return df
def transform_dataframe(df: pd.DataFrame, price_column: str):
# Select only the columns we need
df_selected = df[["tstamp", "symbol", price_column]]
# Start with unique timestamps
result_df: pd.DataFrame = pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True)
# For each unique symbol, add a corresponding close price column
for symbol in df_selected["symbol"].unique():
# Filter rows for this symbol
df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True)
# Create column name like "close-COIN"
new_price_column = f"{price_column}_{symbol}"
# Create temporary dataframe with timestamp and price
temp_df = pd.DataFrame({
"tstamp": df_symbol["tstamp"],
new_price_column: df_symbol[price_column]
})
# Join with our result dataframe
result_df = pd.merge(result_df, temp_df, on="tstamp", how="left")
result_df = result_df.reset_index(drop=True) # do not dropna() since irrelevant symbol would affect dataset
return result_df
def get_datasets(df: pd.DataFrame, training_minutes: int, pair: TradingPair) -> Tuple[pd.DataFrame, pd.DataFrame]:
# Training dataset
colname_a, colname_b = pair.colnames()
df = df[["tstamp", colname_a, colname_b]]
df = df.dropna()
training_df = df.iloc[:training_minutes - 1, :].copy()
training_df.reset_index(drop=True).dropna().reset_index(drop=True)
# Testing dataset
testing_df = df.iloc[training_minutes:, :].copy()
testing_df.reset_index(drop=True).dropna().reset_index(drop=True)
return (training_df, testing_df)
def fit_VECM(training_pair_df, pair: TradingPair):
vecm_model = VECM(training_pair_df[pair.colnames()].reset_index(drop=True), coint_rank=1)
vecm_fit = vecm_model.fit()
# Check if the model converged properly
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
print(f"{pair}: VECM model failed to converge properly")
return vecm_fit
def create_trading_signals(vecm_fit, testing_pair_df, pair: TradingPair) -> pd.DataFrame:
result_columns = [
"time",
"action",
"symbol",
"price",
"equilibrium",
"pair",
]
next_values = vecm_fit.predict(steps=len(testing_pair_df))
colname_a, colname_b = pair.colnames()
# Convert prediction to a DataFrame for readability
predicted_df = pd.DataFrame(next_values, columns=[colname_a, colname_b])
beta = vecm_fit.beta
predicted_df["equilibrium_term"] = (
beta[0] * predicted_df[colname_a]
+ beta[1] * predicted_df[colname_b]
)
pair_result_df = pd.merge(
testing_pair_df.reset_index(drop=True), predicted_df, left_index=True, right_index=True, suffixes=('', '_pred')
).dropna()
pair_result_df["equilibrium"] = (
beta[0] * pair_result_df[colname_a]
+ beta[1] * pair_result_df[colname_b]
)
pair_result_df["abs_equilibrium"] = np.abs(pair_result_df["equilibrium"])
pair_result_df["scaled_equilibrium"] = pair_result_df["abs_equilibrium"] / (abs(beta[0]) * pair_result_df[colname_a] + abs(beta[1]) * pair_result_df[colname_b])
# Reset index to ensure proper indexing
pair_result_df = pair_result_df.reset_index()
# Iterate through the testing dataset to find the first trading opportunity
open_row_index = None
initial_abs_term = None
for row_idx in range(len(pair_result_df)):
current_abs_term = pair_result_df["abs_equilibrium"][row_idx]
# Check if current row has sufficient equilibrium (not near-zero)
if current_abs_term >= CONFIG["equilibrium_threshold_open"]:
open_row_index = row_idx
initial_abs_term = current_abs_term
break
# If no row with sufficient equilibrium found, skip this pair
if open_row_index is None:
print(f"{pair}: Insufficient divergence in testing dataset. Skipping.")
return pd.DataFrame()
# Look for close signal starting from the open position
trading_signals_df = (
pair_result_df["abs_equilibrium"][open_row_index:]
# < initial_abs_term / CONFIG["equilibrium_threshold_close"]
< CONFIG["equilibrium_threshold_close"]
)
# Adjust indices to account for the offset from open_row_index
close_row_index = None
for idx, value in trading_signals_df.items():
if value:
close_row_index = idx
break
open_row = pair_result_df.loc[open_row_index]
open_tstamp = open_row["tstamp"]
open_eqlbrm = open_row["equilibrium"]
open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"]
abs_beta = abs(beta[1])
pred_px_b = pair_result_df.loc[open_row_index][f"{colname_b}_pred"]
pred_px_a = pair_result_df.loc[open_row_index][f"{colname_a}_pred"]
if pred_px_b * abs_beta - pred_px_a > 0:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
else:
open_side_b = "BUY"
open_side_a = "SELL"
close_side_b = "SELL"
close_side_a = "BUY"
# If no close signal found, print position and unrealized PnL
if close_row_index is None:
global TOTAL_UNREALIZED_PNL, OUTSTANDING_POSITIONS
last_row_index = len(pair_result_df) - 1
last_row = pair_result_df.loc[last_row_index]
last_tstamp = last_row["tstamp"]
last_px_a = last_row[f"{colname_a}"]
last_px_b = last_row[f"{colname_b}"]
# Calculate share quantities based on $1000 funding per pair
# Split $1000 equally between the two positions ($500 each)
funding_per_position = CONFIG["funding_per_pair"] / 2
shares_a = funding_per_position / open_px_a
shares_b = funding_per_position / open_px_b
# Calculate unrealized PnL for each position
if open_side_a == "BUY":
unrealized_pnl_a = (last_px_a - open_px_a) / open_px_a * 100
unrealized_dollar_a = shares_a * (last_px_a - open_px_a)
else: # SELL
unrealized_pnl_a = (open_px_a - last_px_a) / open_px_a * 100
unrealized_dollar_a = shares_a * (open_px_a - last_px_a)
if open_side_b == "BUY":
unrealized_pnl_b = (last_px_b - open_px_b) / open_px_b * 100
unrealized_dollar_b = shares_b * (last_px_b - open_px_b)
else: # SELL
unrealized_pnl_b = (open_px_b - last_px_b) / open_px_b * 100
unrealized_dollar_b = shares_b * (open_px_b - last_px_b)
total_unrealized_pnl = unrealized_pnl_a + unrealized_pnl_b
total_unrealized_dollar = unrealized_dollar_a + unrealized_dollar_b
# Add to global total
TOTAL_UNREALIZED_PNL += total_unrealized_pnl
# Store outstanding positions
OUTSTANDING_POSITIONS.append({
'pair': str(pair),
'symbol_a': pair.symbol_a_,
'symbol_b': pair.symbol_b_,
'side_a': open_side_a,
'side_b': open_side_b,
'shares_a': shares_a,
'shares_b': shares_b,
'open_px_a': open_px_a,
'open_px_b': open_px_b,
'current_px_a': last_px_a,
'current_px_b': last_px_b,
'unrealized_dollar_a': unrealized_dollar_a,
'unrealized_dollar_b': unrealized_dollar_b,
'total_unrealized_dollar': total_unrealized_dollar,
'open_time': open_tstamp,
'last_time': last_tstamp,
'initial_abs_term': initial_abs_term,
'current_abs_term': pair_result_df.loc[last_row_index, "abs_equilibrium"],
'closing_threshold': initial_abs_term / CONFIG["equilibrium_threshold_close"],
'equilibrium_ratio': pair_result_df.loc[last_row_index, "abs_equilibrium"] / (initial_abs_term / CONFIG["equilibrium_threshold_close"])
})
print(f"{pair}: NO CLOSE SIGNAL FOUND - Position held until end of session")
print(f" Open: {open_tstamp} | Last: {last_tstamp}")
print(f" {pair.symbol_a_}: {open_side_a} {shares_a:.2f} shares @ ${open_px_a:.2f} -> ${last_px_a:.2f} | Unrealized: ${unrealized_dollar_a:.2f} ({unrealized_pnl_a:.2f}%)")
print(f" {pair.symbol_b_}: {open_side_b} {shares_b:.2f} shares @ ${open_px_b:.2f} -> ${last_px_b:.2f} | Unrealized: ${unrealized_dollar_b:.2f} ({unrealized_pnl_b:.2f}%)")
print(f" Total Unrealized: ${total_unrealized_dollar:.2f} ({total_unrealized_pnl:.2f}%)")
# Return only open trades (no close trades)
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_eqlbrm,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_eqlbrm,
pair,
),
]
else:
# Close signal found - create complete trade
close_row = pair_result_df.loc[close_row_index]
close_tstamp = close_row["tstamp"]
close_eqlbrm = close_row["equilibrium"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
print(f"{pair}: Close signal found at index {close_row_index}")
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_eqlbrm,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_eqlbrm,
pair,
),
(
close_tstamp,
close_side_a,
pair.symbol_a_,
close_px_a,
close_eqlbrm,
pair,
),
(
close_tstamp,
close_side_b,
pair.symbol_b_,
close_px_b,
close_eqlbrm,
pair,
),
]
# Add tuples to data frame
return pd.DataFrame(
trd_signal_tuples,
columns=result_columns,
)
def run_single_pair(market_data: pd.DataFrame, price_column:str, pair: TradingPair) -> Optional[pd.DataFrame]:
colname_a = f"{price_column}_{pair.symbol_a_}"
colname_b = f"{price_column}_{pair.symbol_b_}"
training_pair_df, testing_pair_df = get_datasets(df=market_data, training_minutes=CONFIG["training_minutes"], pair=pair)
# Check if we have enough data points for a meaningful analysis
min_required_points = CONFIG[
"min_required_points"
] # Minimum number of points for a reasonable VECM model
if len(training_pair_df) < min_required_points:
print(
f"{pair}: Not enough data points for analysis. Found {len(training_pair_df)}, need at least {min_required_points}"
)
return None
# Check for non-finite values
if not np.isfinite(training_pair_df).all().all():
print(f"{pair}: Data contains non-finite values (NaN or inf)")
return None
# Fit the VECM
try:
vecm_fit = fit_VECM(training_pair_df, pair=pair)
except Exception as e:
print(f"{pair}: VECM fitting failed: {str(e)}")
return None
# Add safeguard against division by zero
if (
abs(vecm_fit.beta[1]) < CONFIG["zero_threshold"]
): # Small threshold to avoid division by very small numbers
print(f"{pair}: Skipping due to near-zero beta[1] value: {vecm_fit.beta[1]}")
return None
try:
pair_trades = create_trading_signals(
vecm_fit=vecm_fit,
testing_pair_df=testing_pair_df,
pair=pair,
)
except Exception as e:
print(f"{pair}: Prediction failed: {str(e)}")
return None
return pair_trades
def add_trade(pair_nm, symbol, action, price):
pair_nm = str(pair_nm)
if pair_nm not in TRADES:
TRADES[pair_nm] = {symbol: []}
if symbol not in TRADES[pair_nm]:
TRADES[pair_nm][symbol] = []
TRADES[pair_nm][symbol].append((action, price))
def collect_single_day_results(result):
if result is None:
return
print("\n -------------- Suggested Trades ")
print(result)
for row in result.itertuples():
action = row.action
symbol = row.symbol
price = row.price
add_trade(pair_nm=row.pair, action=action, symbol=symbol, price=price)
def print_single_day_results(result):
for pair, symbols in TRADES.items():
print(f"\n--- {pair} ---")
for symbol, trades in symbols.items():
for side, price in trades:
print(f"{symbol} {side} at ${price}")
def print_results_suummary(all_results):
# Summary of all processed files
print("\n====== Summary of All Processed Files ======")
for filename, data in all_results.items():
trade_count = sum(
len(trades)
for symbol_trades in data["trades"].values()
for trades in symbol_trades.values()
)
print(f"{filename}: {trade_count} trades")
def calculate_returns(all_results: Dict):
global TOTAL_REALIZED_PNL
print("\n====== Returns By Day and Pair ======")
for filename, data in all_results.items():
day_return = 0
print(f"\n--- {filename} ---")
# Process each pair
for pair, symbols in data["trades"].items():
pair_return = 0
pair_trades = []
# Calculate individual symbol returns in the pair
for symbol, trades in symbols.items():
if len(trades) >= 2: # Need at least entry and exit
# Get entry and exit trades
entry_action, entry_price = trades[0]
exit_action, exit_price = trades[1]
# Calculate return based on action
symbol_return = 0
if entry_action == "BUY" and exit_action == "SELL":
# Long position
symbol_return = (exit_price - entry_price) / entry_price * 100
elif entry_action == "SELL" and exit_action == "BUY":
# Short position
symbol_return = (entry_price - exit_price) / entry_price * 100
pair_trades.append(
(
symbol,
entry_action,
entry_price,
exit_action,
exit_price,
symbol_return,
)
)
pair_return += symbol_return
# Print pair returns
if pair_trades:
print(f" {pair}:")
for (
symbol,
entry_action,
entry_price,
exit_action,
exit_price,
symbol_return,
) in pair_trades:
print(
f" {symbol}: {entry_action} @ ${entry_price:.2f}, {exit_action} @ ${exit_price:.2f}, Return: {symbol_return:.2f}%"
)
print(f" Pair Total Return: {pair_return:.2f}%")
day_return += pair_return
# Print day total return and add to global realized PnL
if day_return != 0:
print(f" Day Total Return: {day_return:.2f}%")
TOTAL_REALIZED_PNL += day_return
def run_pairs(summaries_df: pd.DataFrame, price_column: str) -> None:
result_df = transform_dataframe(df=summaries_df, price_column=price_column)
stock_price_columns = [
column
for column in result_df.columns
if column.startswith(f"{price_column}_")
]
# Find the starting indices for A and B
all_indexes = range(len(stock_price_columns))
unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j]
pairs_trades = []
for a_index, b_index in unique_index_pairs:
# Get the actual variable names
colname_a = stock_price_columns[a_index]
colname_b = stock_price_columns[b_index]
symbol_a = colname_a[len(f"{price_column}-") :]
symbol_b = colname_b[len(f"{price_column}-") :]
pair = TradingPair(symbol_a, symbol_b, price_column)
single_pair_trades = run_single_pair(market_data=result_df, price_column=price_column, pair=pair)
if len(single_pair_trades) > 0:
pairs_trades.append(single_pair_trades)
# Check if result_list has any data before concatenating
if len(pairs_trades) == 0:
print("No trading signals found for any pairs")
return None
result = pd.concat(pairs_trades, ignore_index=True)
result["time"] = pd.to_datetime(result["time"])
result = result.set_index("time").sort_index()
collect_single_day_results(result)
# print_single_day_results(result)
def print_outstanding_positions():
"""Print all outstanding positions with share quantities and unrealized PnL"""
if not OUTSTANDING_POSITIONS:
print("\n====== NO OUTSTANDING POSITIONS ======")
return
print(f"\n====== OUTSTANDING POSITIONS ======")
print(f"{'Pair':<15} {'Symbol':<6} {'Side':<4} {'Shares':<10} {'Open $':<8} {'Current $':<10} {'Unrealized $':<12} {'%':<8} {'Close Eq':<10}")
print("-" * 105)
total_unrealized_dollar = 0.0
for pos in OUTSTANDING_POSITIONS:
# Print position A
print(f"{pos['pair']:<15} {pos['symbol_a']:<6} {pos['side_a']:<4} {pos['shares_a']:<10.2f} {pos['open_px_a']:<8.2f} {pos['current_px_a']:<10.2f} {pos['unrealized_dollar_a']:<12.2f} {pos['unrealized_dollar_a']/500*100:<8.2f} {'':<10}")
# Print position B
print(f"{'':<15} {pos['symbol_b']:<6} {pos['side_b']:<4} {pos['shares_b']:<10.2f} {pos['open_px_b']:<8.2f} {pos['current_px_b']:<10.2f} {pos['unrealized_dollar_b']:<12.2f} {pos['unrealized_dollar_b']/500*100:<8.2f} {'':<10}")
# Print pair totals with equilibrium info
equilibrium_status = "CLOSE" if pos['current_abs_term'] < pos['closing_threshold'] else f"{pos['equilibrium_ratio']:.2f}x"
print(f"{'':<15} {'PAIR':<6} {'TOT':<4} {'':<10} {'':<8} {'':<10} {pos['total_unrealized_dollar']:<12.2f} {pos['total_unrealized_dollar']/1000*100:<8.2f} {equilibrium_status:<10}")
# Print equilibrium details
print(f"{'':<15} {'EQ':<6} {'INFO':<4} {'':<10} {'':<8} {'':<10} {'Curr:':<6}{pos['current_abs_term']:<6.4f} {'Thresh:':<7}{pos['closing_threshold']:<6.4f} {'':<10}")
print("-" * 105)
total_unrealized_dollar += pos['total_unrealized_dollar']
print(f"{'TOTAL OUTSTANDING':<80} ${total_unrealized_dollar:<12.2f}")
if __name__ == "__main__":
# Initialize a dictionary to store all trade results
all_results = {}
# Initialize global PnL tracking variables
TOTAL_REALIZED_PNL = 0.0
TOTAL_UNREALIZED_PNL = 0.0
OUTSTANDING_POSITIONS = []
# Process each data file
price_column = CONFIG["price_column"]
for datafile in CONFIG["datafiles"]:
print(f"\n====== Processing {datafile} ======")
# Clear the TRADES global dictionary and reset unrealized PnL for the new file
TRADES.clear()
TOTAL_UNREALIZED_PNL = 0.0
TOTAL_REALIZED_PNL = 0.0
# Process data for this file
try:
run_pairs(
summaries_df=load_market_data(f'{CONFIG["data_directory"]}/{datafile}', config=CONFIG),
price_column=price_column
)
# Store results with file name as key
filename = datafile.split("/")[-1]
all_results[filename] = {"trades": TRADES.copy()}
print(f"Successfully processed {filename}")
# Print total unrealized PnL for this file
if TOTAL_UNREALIZED_PNL != 0:
print(f"\n====== TOTAL UNREALIZED PnL for {filename}: {TOTAL_UNREALIZED_PNL:.2f}% ======")
else:
print(f"\n====== No unrealized positions for {filename} ======")
except Exception as e:
print(f"Error processing {datafile}: {str(e)}")
# print_results_suummary(all_results)
calculate_returns(all_results)
# Print grand totals
print(f"\n====== GRAND TOTALS ACROSS ALL PAIRS ======")
print(f"Total Realized PnL: {TOTAL_REALIZED_PNL:.2f}%")
print(f"Total Unrealized PnL: {TOTAL_UNREALIZED_PNL:.2f}%")
print(f"Combined Total PnL: {TOTAL_REALIZED_PNL + TOTAL_UNREALIZED_PNL:.2f}%")
print_outstanding_positions()