This commit is contained in:
Oleg Sheynin 2025-05-29 01:39:31 -04:00
parent 0ceb2f2eba
commit 06884d72b7
4 changed files with 154 additions and 175 deletions

View File

@ -8,7 +8,7 @@ import numpy as np
# ============= statsmodels ===================
from statsmodels.tsa.vector_ar.vecm import VECM
from tools.data_loader import get_datasets, load_market_data, transform_dataframe
from tools.data_loader import load_market_data, transform_dataframe
from tools.trading_pair import TradingPair
from results import BacktestResult
@ -88,8 +88,8 @@ EQT_CONFIG: Dict = {
"price_column": "close",
"min_required_points": 30,
"zero_threshold": 1e-10,
"disequilibrium_open_trshld": 5.0,
"disequilibrium_close_trshld": 1.0,
"disequilibrium_open_trshld": 2.0,
"disequilibrium_close_trshld": 0.5,
"training_minutes": 120,
# ----- Validation
"funding_per_pair": 2000.0,
@ -104,23 +104,7 @@ CONFIG = EQT_CONFIG
BacktestResults = BacktestResult(config=CONFIG)
def fit_VECM(training_pair_df, pair: TradingPair):
vecm_model = VECM(
training_pair_df[pair.colnames()].reset_index(drop=True), coint_rank=1
)
vecm_fit = vecm_model.fit()
# Check if the model converged properly
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
print(f"{pair}: VECM model failed to converge properly")
return vecm_fit
def create_trading_signals(
vecm_fit, testing_pair_df, pair: TradingPair
) -> pd.DataFrame:
def create_trading_signals(pair: TradingPair) -> pd.DataFrame:
result_columns = [
"time",
"action",
@ -131,13 +115,14 @@ def create_trading_signals(
"pair",
]
next_values = vecm_fit.predict(steps=len(testing_pair_df))
testing_pair_df = pair.testing_df_
next_values = pair.vecm_fit_.predict(steps=len(testing_pair_df))
colname_a, colname_b = pair.colnames()
# Convert prediction to a DataFrame for readability
predicted_df = pd.DataFrame(next_values, columns=[colname_a, colname_b])
beta = vecm_fit.beta
beta = pair.vecm_fit_.beta
pair_result_df = pd.merge(
testing_pair_df.reset_index(drop=True),
@ -149,12 +134,9 @@ def create_trading_signals(
pair_result_df["disequilibrium"] = pair_result_df[pair.colnames()] @ beta
pair_mu = pair.disequilibrium_mu_
pair_std = pair.disequilibrium_std_
pair_result_df["scaled_disequilibrium"] = abs(
pair_result_df["disequilibrium"] - pair_mu
) / pair_std
pair_result_df["disequilibrium"] - pair.training_mu_
) / pair.training_std_
# Reset index to ensure proper indexing
@ -311,54 +293,19 @@ def create_trading_signals(
def run_single_pair(
market_data: pd.DataFrame, price_column: str, pair: TradingPair
pair: TradingPair, market_data: pd.DataFrame, price_column: str
) -> Optional[pd.DataFrame]:
training_pair_df, testing_pair_df = get_datasets(
df=market_data, training_minutes=CONFIG["training_minutes"], pair=pair
pair.get_datasets(
market_data=market_data, training_minutes=CONFIG["training_minutes"]
)
# Check if we have enough data points for a meaningful analysis
min_required_points = CONFIG[
"min_required_points"
] # Minimum number of points for a reasonable VECM model
if len(training_pair_df) < min_required_points:
print(
f"{pair}: Not enough data points for analysis. Found {len(training_pair_df)}, need at least {min_required_points}"
)
return None
# Check for non-finite values
if not np.isfinite(training_pair_df).all().all():
print(f"{pair}: Data contains non-finite values (NaN or inf)")
return None
# Fit the VECM
try:
vecm_fit = fit_VECM(training_pair_df, pair=pair)
pair.train_pair()
except Exception as e:
print(f"{pair}: VECM fitting failed: {str(e)}")
print(f"{pair}: Training failed: {str(e)}")
return None
# Add safeguard against division by zero
if (
abs(vecm_fit.beta[1]) < CONFIG["zero_threshold"]
): # Small threshold to avoid division by very small numbers
print(f"{pair}: Skipping due to near-zero beta[1] value: {vecm_fit.beta[1]}")
return None
diseqlbrm_series = training_pair_df[pair.colnames()] @ vecm_fit.beta
diseqlbrm_series_mu: float = diseqlbrm_series.mean().iloc[0]
diseqlbrm_series_std: float = diseqlbrm_series.std().iloc[0]
pair.set_training_disequilibrium(diseqlbrm_series_mu, diseqlbrm_series_std)
# Normalize the disequilibrium
training_pair_df["scaled_disequilibrium"] = (
diseqlbrm_series - diseqlbrm_series_mu
) / diseqlbrm_series_std
try:
pair_trades = create_trading_signals(
vecm_fit=vecm_fit,
testing_pair_df=testing_pair_df,
pair=pair,
)
except Exception as e:
@ -385,18 +332,10 @@ def run_pairs(config: Dict, market_data_df: pd.DataFrame, price_column: str) ->
pairs_trades = []
for pair in _create_pairs(config):
# Get the actual variable names
# colname_a = stock_price_columns[a_index]
# colname_b = stock_price_columns[b_index]
# symbol_a = colname_a[len(f"{price_column}-") :]
# symbol_b = colname_b[len(f"{price_column}-") :]
# pair = TradingPair(symbol_a, symbol_b, price_column)
single_pair_trades = run_single_pair(
market_data=market_data_df, price_column=price_column, pair=pair
)
if len(single_pair_trades) > 0:
if single_pair_trades is not None and len(single_pair_trades) > 0:
pairs_trades.append(single_pair_trades)
# Check if result_list has any data before concatenating
if len(pairs_trades) == 0:
@ -441,10 +380,7 @@ if __name__ == "__main__":
print(f"Successfully processed {filename}")
# Print total unrealized PnL for this file
print(
f"\n====== TOTAL UNREALIZED PnL for {filename}: {BacktestResults.get_total_unrealized_pnl():.2f}% ======"
)
# No longer printing unrealized PnL since we removed that functionality
except Exception as e:
print(f"Error processing {datafile}: {str(e)}")

View File

@ -10,7 +10,6 @@ class BacktestResult:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.trades: Dict[str, Dict[str, Any]] = {}
self.total_unrealized_pnl = 0.0
self.total_realized_pnl = 0.0
self.outstanding_positions: List[Dict[str, Any]] = []
@ -24,10 +23,6 @@ class BacktestResult:
self.trades[pair_nm][symbol] = []
self.trades[pair_nm][symbol].append((action, price))
def add_unrealized_pnl(self, unrealized_pnl: float):
"""Add unrealized PnL to the total."""
self.total_unrealized_pnl += unrealized_pnl
def add_outstanding_position(self, position: Dict[str, Any]):
"""Add an outstanding position to tracking."""
self.outstanding_positions.append(position)
@ -36,10 +31,6 @@ class BacktestResult:
"""Add realized PnL to the total."""
self.total_realized_pnl += realized_pnl
def get_total_unrealized_pnl(self) -> float:
"""Get total unrealized PnL."""
return self.total_unrealized_pnl
def get_total_realized_pnl(self) -> float:
"""Get total realized PnL."""
return self.total_realized_pnl
@ -155,7 +146,7 @@ class BacktestResult:
self.add_realized_pnl(day_return)
def print_outstanding_positions(self):
"""Print all outstanding positions with share quantities and unrealized PnL."""
"""Print all outstanding positions with share quantities and current values."""
if not self.get_outstanding_positions():
print("\n====== NO OUTSTANDING POSITIONS ======")
return
@ -168,23 +159,36 @@ class BacktestResult:
f" {'Shares':<10}"
f" {'Open $':<8}"
f" {'Current $':<10}"
f" {'Unrealized $':<12}"
f" {'%':<8}"
f" {'Close Eq':<10}"
f" {'Value $':<12}"
f" {'Disequilibrium':<15}"
)
print("-" * 105)
print("-" * 100)
total_unrealized_dollar = 0.0
total_value = 0.0
for pos in self.get_outstanding_positions():
# Print position A
print(
f"{pos['pair']:<15} {pos['symbol_a']:<6} {pos['side_a']:<4} {pos['shares_a']:<10.2f} {pos['open_px_a']:<8.2f} {pos['current_px_a']:<10.2f} {pos['unrealized_dollar_a']:<12.2f} {pos['unrealized_dollar_a']/500*100:<8.2f} {'':<10}"
f"{pos['pair']:<15}"
f" {pos['symbol_a']:<10}"
f" {pos['side_a']:<4}"
f" {pos['shares_a']:<10.2f}"
f" {pos['open_px_a']:<8.2f}"
f" {pos['current_px_a']:<10.2f}"
f" {pos['current_value_a']:<12.2f}"
f" {'':<15}"
)
# Print position B
print(
f"{'':<15} {pos['symbol_b']:<6} {pos['side_b']:<4} {pos['shares_b']:<10.2f} {pos['open_px_b']:<8.2f} {pos['current_px_b']:<10.2f} {pos['unrealized_dollar_b']:<12.2f} {pos['unrealized_dollar_b']/500*100:<8.2f} {'':<10}"
f"{'':<15}"
f" {pos['symbol_b']:<10}"
f" {pos['side_b']:<4}"
f" {pos['shares_b']:<10.2f}"
f" {pos['open_px_b']:<8.2f}"
f" {pos['current_px_b']:<10.2f}"
f" {pos['current_value_b']:<12.2f}"
f" {'':<15}"
)
# Print pair totals with disequilibrium info
@ -194,27 +198,48 @@ class BacktestResult:
else f"{pos['disequilibrium_ratio']:.2f}x"
)
print(
f"{'':<15} {'PAIR':<6} {'TOT':<4} {'':<10} {'':<8} {'':<10} {pos['total_unrealized_dollar']:<12.2f} {pos['total_unrealized_dollar']/1000*100:<8.2f} {disequilibrium_status:<10}"
f"{'':<15}"
f" {'PAIR TOTAL':<10}"
f" {'':<4}"
f" {'':<10}"
f" {'':<8}"
f" {'':<10}"
f" {pos['total_current_value']:<12.2f}"
f" {disequilibrium_status:<15}"
)
# Print disequilibrium details
print(
f"{'':<15} {'EQ':<6} {'INFO':<4} {'':<10} {'':<8} {'':<10} {'Curr:':<6}{pos['current_abs_term']:<6.4f} {'Thresh:':<7}{pos['closing_threshold']:<6.4f} {'':<10}"
f"{'':<15}"
f" {'DISEQUIL':<10}"
f" {'':<4}"
f" {'':<10}"
f" {'':<8}"
f" {'':<10}"
f" Raw: {pos['current_disequilibrium']:<6.4f}"
f" Scaled: {pos['current_scaled_disequilibrium']:<6.4f}"
)
print("-" * 105)
total_unrealized_dollar += pos["total_unrealized_dollar"]
print(
f"{'':<15}"
f" {'THRESHOLD':<10}"
f" {'':<4}"
f" {'':<10}"
f" {'':<8}"
f" {'':<10}"
f" Close: {pos['closing_threshold']:<6.4f}"
f" Ratio: {pos['disequilibrium_ratio']:<6.2f}"
)
print("-" * 100)
print(f"{'TOTAL OUTSTANDING':<80} ${total_unrealized_dollar:<12.2f}")
total_value += pos["total_current_value"]
print(f"{'TOTAL OUTSTANDING VALUE':<80} ${total_value:<12.2f}")
def print_grand_totals(self):
"""Print grand totals across all pairs."""
print(f"\n====== GRAND TOTALS ACROSS ALL PAIRS ======")
print(f"Total Realized PnL: {self.get_total_realized_pnl():.2f}%")
print(f"Total Unrealized PnL: {self.get_total_unrealized_pnl():.2f}%")
print(
f"Combined Total PnL: {self.get_total_realized_pnl() + self.get_total_unrealized_pnl():.2f}%"
)
def handle_outstanding_position(self, pair, pair_result_df, last_row_index,
open_side_a, open_side_b, open_px_a, open_px_b,
@ -231,9 +256,6 @@ class BacktestResult:
open_tstamp: Opening timestamp
initial_abs_term: Initial absolute disequilibrium term
colname_a, colname_b: Column names for the price data
Returns:
tuple: (unrealized_pnl_a, unrealized_pnl_b, unrealized_dollar_a, unrealized_dollar_b)
"""
last_row = pair_result_df.loc[last_row_index]
last_tstamp = last_row["tstamp"]
@ -246,26 +268,14 @@ class BacktestResult:
shares_a = funding_per_position / open_px_a
shares_b = funding_per_position / open_px_b
# Calculate unrealized PnL for each position
if open_side_a == "BUY":
unrealized_pnl_a = (last_px_a - open_px_a) / open_px_a * 100
unrealized_dollar_a = shares_a * (last_px_a - open_px_a)
else: # SELL
unrealized_pnl_a = (open_px_a - last_px_a) / open_px_a * 100
unrealized_dollar_a = shares_a * (open_px_a - last_px_a)
# Calculate current position values (shares * current price)
current_value_a = shares_a * last_px_a
current_value_b = shares_b * last_px_b
total_current_value = current_value_a + current_value_b
if open_side_b == "BUY":
unrealized_pnl_b = (last_px_b - open_px_b) / open_px_b * 100
unrealized_dollar_b = shares_b * (last_px_b - open_px_b)
else: # SELL
unrealized_pnl_b = (open_px_b - last_px_b) / open_px_b * 100
unrealized_dollar_b = shares_b * (open_px_b - last_px_b)
total_unrealized_pnl = unrealized_pnl_a + unrealized_pnl_b
total_unrealized_dollar = unrealized_dollar_a + unrealized_dollar_b
# Add to global total
self.add_unrealized_pnl(total_unrealized_pnl)
# Get disequilibrium information
current_disequilibrium = last_row["disequilibrium"]
current_scaled_disequilibrium = last_row["scaled_disequilibrium"]
# Store outstanding positions
self.add_outstanding_position(
@ -281,35 +291,26 @@ class BacktestResult:
"open_px_b": open_px_b,
"current_px_a": last_px_a,
"current_px_b": last_px_b,
"unrealized_dollar_a": unrealized_dollar_a,
"unrealized_dollar_b": unrealized_dollar_b,
"total_unrealized_dollar": total_unrealized_dollar,
"current_value_a": current_value_a,
"current_value_b": current_value_b,
"total_current_value": total_current_value,
"open_time": open_tstamp,
"last_time": last_tstamp,
"initial_abs_term": initial_abs_term,
"current_abs_term": pair_result_df.loc[
last_row_index, "scaled_disequilibrium"
],
"closing_threshold": initial_abs_term
/ self.config["disequilibrium_close_trshld"],
"disequilibrium_ratio": pair_result_df.loc[
last_row_index, "scaled_disequilibrium"
]
/ (initial_abs_term / self.config["disequilibrium_close_trshld"]),
"current_abs_term": current_scaled_disequilibrium,
"current_disequilibrium": current_disequilibrium,
"current_scaled_disequilibrium": current_scaled_disequilibrium,
"closing_threshold": initial_abs_term / self.config["disequilibrium_close_trshld"],
"disequilibrium_ratio": current_scaled_disequilibrium / (initial_abs_term / self.config["disequilibrium_close_trshld"]),
}
)
# Print position details
print(f"{pair}: NO CLOSE SIGNAL FOUND - Position held until end of session")
print(f" Open: {open_tstamp} | Last: {last_tstamp}")
print(
f" {pair.symbol_a_}: {open_side_a} {shares_a:.2f} shares @ ${open_px_a:.2f} -> ${last_px_a:.2f} | Unrealized: ${unrealized_dollar_a:.2f} ({unrealized_pnl_a:.2f}%)"
)
print(
f" {pair.symbol_b_}: {open_side_b} {shares_b:.2f} shares @ ${open_px_b:.2f} -> ${last_px_b:.2f} | Unrealized: ${unrealized_dollar_b:.2f} ({unrealized_pnl_b:.2f}%)"
)
print(
f" Total Unrealized: ${total_unrealized_dollar:.2f} ({total_unrealized_pnl:.2f}%)"
)
print(f" {pair.symbol_a_}: {open_side_a} {shares_a:.2f} shares @ ${open_px_a:.2f} -> ${last_px_a:.2f} | Value: ${current_value_a:.2f}")
print(f" {pair.symbol_b_}: {open_side_b} {shares_b:.2f} shares @ ${open_px_b:.2f} -> ${last_px_b:.2f} | Value: ${current_value_b:.2f}")
print(f" Total Value: ${total_current_value:.2f}")
print(f" Disequilibrium: {current_disequilibrium:.4f} | Scaled: {current_scaled_disequilibrium:.4f}")
return unrealized_pnl_a, unrealized_pnl_b, unrealized_dollar_a, unrealized_dollar_b
return current_value_a, current_value_b, total_current_value

View File

@ -118,20 +118,20 @@ def transform_dataframe(df: pd.DataFrame, price_column: str):
return result_df
def get_datasets(df: pd.DataFrame, training_minutes: int, pair: TradingPair) -> Tuple[pd.DataFrame, pd.DataFrame]:
# Training dataset
colname_a, colname_b = pair.colnames()
df = df[["tstamp", colname_a, colname_b]]
df = df.dropna()
# def get_datasets(df: pd.DataFrame, training_minutes: int, pair: TradingPair) -> Tuple[pd.DataFrame, pd.DataFrame]:
# # Training dataset
# colname_a, colname_b = pair.colnames()
# df = df[["tstamp", colname_a, colname_b]]
# df = df.dropna()
training_df = df.iloc[:training_minutes - 1, :].copy()
training_df.reset_index(drop=True).dropna().reset_index(drop=True)
# training_df = df.iloc[:training_minutes - 1, :].copy()
# training_df.reset_index(drop=True).dropna().reset_index(drop=True)
# Testing dataset
testing_df = df.iloc[training_minutes:, :].copy()
testing_df.reset_index(drop=True).dropna().reset_index(drop=True)
# # Testing dataset
# testing_df = df.iloc[training_minutes:, :].copy()
# testing_df.reset_index(drop=True).dropna().reset_index(drop=True)
return (training_df, testing_df)
# return (training_df, testing_df)
if __name__ == "__main__":

View File

@ -1,36 +1,78 @@
from typing import List, Optional
import pandas as pd
from statsmodels.tsa.vector_ar.vecm import VECM
class TradingPair:
symbol_a_: str
symbol_b_: str
price_column_: str
disequilibrium_mu_: Optional[float]
disequilibrium_std_: Optional[float]
training_mu_: Optional[float]
training_std_: Optional[float]
original_df_: Optional[pd.DataFrame]
training_df_: Optional[pd.DataFrame]
testing_df_: Optional[pd.DataFrame]
vecm_fit_: Optional[VECM]
def __init__(self, symbol_a: str, symbol_b: str, price_column: str):
self.symbol_a_ = symbol_a
self.symbol_b_ = symbol_b
self.price_column_ = price_column
self.disequilibrium_mu_ = None
self.disequilibrium_std_ = None
self.training_mu_ = None
self.training_std_ = None
self.original_df_ = None
self.training_df_ = None
self.testing_df_ = None
self.vecm_fit_ = None
def get_datasets(self, market_data: pd.DataFrame, training_minutes: int) -> None:
self.original_df_ = market_data[["tstamp"] + self.colnames()]
self.training_df_ = market_data.iloc[:training_minutes - 1, :].copy()
self.training_df_ = self.training_df_.dropna().reset_index(drop=True)
self.testing_df_ = market_data.iloc[training_minutes:, :].copy()
self.testing_df_ = self.testing_df_.dropna().reset_index(drop=True)
def colnames(self) -> List[str]:
return [f"{self.price_column_}_{self.symbol_a_}", f"{self.price_column_}_{self.symbol_b_}"]
def set_training_disequilibrium(self, disequilibrium_mu: float, disequilibrium_std: float):
self.disequilibrium_mu_ = disequilibrium_mu
self.disequilibrium_std_ = disequilibrium_std
def fit_VECM(self):
vecm_df = self.training_df_[self.colnames()].reset_index(drop=True)
vecm_model = VECM(vecm_df, coint_rank=1)
vecm_fit = vecm_model.fit()
def mu(self) -> float:
assert self.disequilibrium_mu_ is not None
return self.disequilibrium_mu_
# URGENT check beta and alpha
def std(self) -> float:
assert self.disequilibrium_std_ is not None
return self.disequilibrium_std_
# Check if the model converged properly
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
print(f"{self}: VECM model failed to converge properly")
self.vecm_fit_ = vecm_fit
def train_pair(self):
self.fit_VECM()
diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
self.training_mu_ = diseq_series.mean().iloc[0]
self.training_std_ = diseq_series.std().iloc[0]
self.training_df_["disequilibrium"] = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
# Normalize the disequilibrium
self.training_df_["scaled_disequilibrium"] = (
diseq_series - self.training_mu_
) / self.training_std_
# def mu(self) -> float:
# assert self.training_mu_ is not None
# return self.training_mu_
# def std(self) -> float:
# assert self.training_std_ is not None
# return self.training_std_
def __repr__(self) ->str:
return f"{self.symbol_a_} & {self.symbol_b_}"