sliding fit fix

This commit is contained in:
Oleg Sheynin 2025-07-13 22:33:48 +00:00
parent 48f18f7b4f
commit b24285802a
6 changed files with 553 additions and 297 deletions

View File

@ -0,0 +1,26 @@
{
"security_type": "EQUITY",
"data_directory": "./data/equity",
"datafiles": [
"20250605.mktdata.ohlcv.db",
],
"db_table_name": "md_1min_bars",
"exchange_id": "ALPACA",
"instrument_id_pfx": "STOCK-",
"trading_hours": {
"begin_session": "9:30:00",
"end_session": "16:00:00",
"timezone": "America/New_York"
},
"price_column": "close",
"min_required_points": 30,
"zero_threshold": 1e-10,
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 1.0,
"training_minutes": 120,
"funding_per_pair": 2000.0,
"fit_method_class": "pt_trading.fit_methods.SlidingFit",
# "fit_method_class": "pt_trading.fit_methods.StaticFit",
"exclude_instruments": ["CAN"]
}

View File

@ -9,6 +9,7 @@ from pt_trading.trading_pair import TradingPair
NanoPerMin = 1e9 NanoPerMin = 1e9
class PairsTradingFitMethod(ABC): class PairsTradingFitMethod(ABC):
TRADES_COLUMNS = [ TRADES_COLUMNS = [
"time", "time",
@ -19,17 +20,21 @@ class PairsTradingFitMethod(ABC):
"scaled_disequilibrium", "scaled_disequilibrium",
"pair", "pair",
] ]
@abstractmethod @abstractmethod
def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]: def run_pair(
... self, config: Dict, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]: ...
@abstractmethod @abstractmethod
def reset(self): def reset(self) -> None: ...
...
class StaticFit(PairsTradingFitMethod): class StaticFit(PairsTradingFitMethod):
def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]: # abstractmethod def run_pair(
self, config: Dict, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]: # abstractmethod
pair.get_datasets(training_minutes=config["training_minutes"]) pair.get_datasets(training_minutes=config["training_minutes"])
try: try:
is_cointegrated = pair.train_pair() is_cointegrated = pair.train_pair()
@ -46,11 +51,15 @@ class StaticFit(PairsTradingFitMethod):
print(f"{pair}: Prediction failed: {str(e)}") print(f"{pair}: Prediction failed: {str(e)}")
return None return None
pair_trades = self.create_trading_signals(pair=pair, config=config, result=bt_result) pair_trades = self.create_trading_signals(
pair=pair, config=config, result=bt_result
)
return pair_trades return pair_trades
def create_trading_signals(self, pair: TradingPair, config: Dict, result: BacktestResult) -> pd.DataFrame: def create_trading_signals(
self, pair: TradingPair, config: Dict, result: BacktestResult
) -> pd.DataFrame:
beta = pair.vecm_fit_.beta # type: ignore beta = pair.vecm_fit_.beta # type: ignore
colname_a, colname_b = pair.colnames() colname_a, colname_b = pair.colnames()
@ -201,43 +210,49 @@ class StaticFit(PairsTradingFitMethod):
trd_signal_tuples, trd_signal_tuples,
columns=self.TRADES_COLUMNS, # type: ignore columns=self.TRADES_COLUMNS, # type: ignore
) )
def reset(self) -> None: def reset(self) -> None:
pass pass
class PairState(Enum): class PairState(Enum):
INITIAL = 1 INITIAL = 1
OPEN = 2 OPEN = 2
CLOSED = 3 CLOSED = 3
class SlidingFit(PairsTradingFitMethod): class SlidingFit(PairsTradingFitMethod):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self.curr_training_start_idx_ = 0 self.curr_training_start_idx_ = 0
def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]: def run_pair(
self, config: Dict, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]:
print(f"***{pair}*** STARTING....") print(f"***{pair}*** STARTING....")
pair.user_data_['state'] = PairState.INITIAL pair.user_data_["state"] = PairState.INITIAL
pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS) # type: ignore pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS)
pair.user_data_["is_cointegrated"] = False pair.user_data_["is_cointegrated"] = False
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_open_trshld"]
training_minutes = config["training_minutes"] training_minutes = config["training_minutes"]
curr_predicted_row_idx = 0
while True: while True:
print(self.curr_training_start_idx_, end='\r') print(self.curr_training_start_idx_, end="\r")
pair.get_datasets( pair.get_datasets(
training_minutes=training_minutes, training_minutes=training_minutes,
training_start_index=self.curr_training_start_idx_, training_start_index=self.curr_training_start_idx_,
testing_size=1 testing_size=1,
) )
if len(pair.training_df_) < training_minutes: if len(pair.training_df_) < training_minutes:
print(f"{pair}: {self.curr_training_start_idx_} Not enough training data. Completing the job.") print(
f"{pair}: {self.curr_training_start_idx_} Not enough training data. Completing the job."
)
if pair.user_data_["state"] == PairState.OPEN: if pair.user_data_["state"] == PairState.OPEN:
print(f"{pair}: {self.curr_training_start_idx_} Position is not closed.") print(
f"{pair}: {self.curr_training_start_idx_} Position is not closed."
)
# outstanding positions # outstanding positions
# last_row_index = self.curr_training_start_idx_ + training_minutes # last_row_index = self.curr_training_start_idx_ + training_minutes
@ -259,16 +274,22 @@ class SlidingFit(PairsTradingFitMethod):
raise RuntimeError(f"{pair}: Training failed: {str(e)}") from e raise RuntimeError(f"{pair}: Training failed: {str(e)}") from e
if pair.user_data_["is_cointegrated"] != is_cointegrated: if pair.user_data_["is_cointegrated"] != is_cointegrated:
pair.user_data_["is_cointegrated"] = is_cointegrated pair.user_data_["is_cointegrated"] = is_cointegrated
if not is_cointegrated: if not is_cointegrated:
if pair.user_data_["state"] == PairState.OPEN: if pair.user_data_["state"] == PairState.OPEN:
print(f"{pair} {self.curr_training_start_idx_} LOST COINTEGRATION. Consider closing positions...") print(
else: f"{pair} {self.curr_training_start_idx_} LOST COINTEGRATION. Consider closing positions..."
print(f"{pair} {self.curr_training_start_idx_} IS NOT COINTEGRATED. Moving on") )
else: else:
print('*' * 80) print(
print(f"Pair {pair} ({self.curr_training_start_idx_}) IS COINTEGRATED") f"{pair} {self.curr_training_start_idx_} IS NOT COINTEGRATED. Moving on"
print('*' * 80) )
else:
print("*" * 80)
print(
f"Pair {pair} ({self.curr_training_start_idx_}) IS COINTEGRATED"
)
print("*" * 80)
if not is_cointegrated: if not is_cointegrated:
self.curr_training_start_idx_ += 1 self.curr_training_start_idx_ += 1
continue continue
@ -278,34 +299,55 @@ class SlidingFit(PairsTradingFitMethod):
except Exception as e: except Exception as e:
raise RuntimeError(f"{pair}: Prediction failed: {str(e)}") from e raise RuntimeError(f"{pair}: Prediction failed: {str(e)}") from e
if pair.user_data_["state"] == PairState.INITIAL: # break
open_trades = self._get_open_trades(pair, open_threshold=open_threshold)
if open_trades is not None:
pair.user_data_["trades"] = open_trades
pair.user_data_["state"] = PairState.OPEN
elif pair.user_data_["state"] == PairState.OPEN:
close_trades = self._get_close_trades(pair, close_threshold=close_threshold)
if close_trades is not None:
pair.user_data_["trades"] = pd.concat([pair.user_data_["trades"], close_trades], ignore_index=True)
pair.user_data_["state"] = PairState.CLOSED
break
self.curr_training_start_idx_ += 1 self.curr_training_start_idx_ += 1
curr_predicted_row_idx += 1
self._create_trading_signals(pair, config, bt_result)
print(f"***{pair}*** FINISHED ... {len(pair.user_data_['trades'])}") print(f"***{pair}*** FINISHED ... {len(pair.user_data_['trades'])}")
return pair.user_data_["trades"] return pair.get_trades()
def _get_open_trades(self, pair: TradingPair, open_threshold: float) -> Optional[pd.DataFrame]: def _create_trading_signals(
self, pair: TradingPair, config: Dict, bt_result: BacktestResult
) -> None:
assert pair.predicted_df_ is not None
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_close_trshld"]
for curr_predicted_row_idx in range(len(pair.predicted_df_)):
pred_row = pair.predicted_df_.iloc[curr_predicted_row_idx]
if pair.user_data_["state"] in [PairState.INITIAL, PairState.CLOSED]:
open_trades = self._get_open_trades(
pair, row=pred_row, open_threshold=open_threshold
)
if open_trades is not None:
open_trades["status"] = "OPEN"
print(f"OPEN TRADES:\n{open_trades}")
pair.add_trades(open_trades)
pair.user_data_["state"] = PairState.OPEN
elif pair.user_data_["state"] == PairState.OPEN:
close_trades = self._get_close_trades(
pair, row=pred_row, close_threshold=close_threshold
)
if close_trades is not None:
close_trades["status"] = "CLOSE"
print(f"CLOSE TRADES:\n{close_trades}")
pair.add_trades(close_trades)
pair.user_data_["state"] = PairState.CLOSED
def _get_open_trades(
self, pair: TradingPair, row: pd.Series, open_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames() colname_a, colname_b = pair.colnames()
assert pair.predicted_df_ is not None
predicted_df = pair.predicted_df_ predicted_df = pair.predicted_df_
# Check if we have any data to work with # Check if we have any data to work with
if len(predicted_df) == 0: if len(predicted_df) == 0:
return None return None
open_row = predicted_df.iloc[0] open_row = row
open_tstamp = open_row["tstamp"] open_tstamp = open_row["tstamp"]
open_disequilibrium = open_row["disequilibrium"] open_disequilibrium = open_row["disequilibrium"]
open_scaled_disequilibrium = open_row["scaled_disequilibrium"] open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
@ -316,6 +358,7 @@ class SlidingFit(PairsTradingFitMethod):
return None return None
# creating the trades # creating the trades
print(f"OPEN_TRADES: {row["tstamp"]} {open_scaled_disequilibrium=}")
if open_disequilibrium > 0: if open_disequilibrium > 0:
open_side_a = "SELL" open_side_a = "SELL"
open_side_b = "BUY" open_side_b = "BUY"
@ -338,7 +381,6 @@ class SlidingFit(PairsTradingFitMethod):
pair.user_data_["close_side_a"] = close_side_a pair.user_data_["close_side_a"] = close_side_a
pair.user_data_["close_side_b"] = close_side_b pair.user_data_["close_side_b"] = close_side_b
# create opening trades # create opening trades
trd_signal_tuples = [ trd_signal_tuples = [
( (
@ -365,14 +407,16 @@ class SlidingFit(PairsTradingFitMethod):
columns=self.TRADES_COLUMNS, # type: ignore columns=self.TRADES_COLUMNS, # type: ignore
) )
def _get_close_trades(self, pair: TradingPair, close_threshold: float) -> Optional[pd.DataFrame]: def _get_close_trades(
self, pair: TradingPair, row: pd.Series, close_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames() colname_a, colname_b = pair.colnames()
# Check if we have any data to work with assert pair.predicted_df_ is not None
if len(pair.predicted_df_) == 0: if len(pair.predicted_df_) == 0:
return None return None
close_row = pair.predicted_df_.iloc[0] close_row = row
close_tstamp = close_row["tstamp"] close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"] close_disequilibrium = close_row["disequilibrium"]
close_scaled_disequilibrium = close_row["scaled_disequilibrium"] close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
@ -384,7 +428,6 @@ class SlidingFit(PairsTradingFitMethod):
if close_scaled_disequilibrium > close_threshold: if close_scaled_disequilibrium > close_threshold:
return None return None
trd_signal_tuples = [ trd_signal_tuples = [
( (
close_tstamp, close_tstamp,
@ -412,8 +455,5 @@ class SlidingFit(PairsTradingFitMethod):
columns=self.TRADES_COLUMNS, # type: ignore columns=self.TRADES_COLUMNS, # type: ignore
) )
def reset(self): def reset(self) -> None:
self.curr_training_start_idx_ = 0 self.curr_training_start_idx_ = 0

View File

@ -1,28 +1,30 @@
from typing import Any, Dict, List
import pandas as pd
import sqlite3
import os import os
from datetime import datetime, date import sqlite3
from datetime import date, datetime
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from pt_trading.trading_pair import TradingPair
# Recommended replacement adapters and converters for Python 3.12+ # Recommended replacement adapters and converters for Python 3.12+
# From: https://docs.python.org/3/library/sqlite3.html#sqlite3-adapter-converter-recipes # From: https://docs.python.org/3/library/sqlite3.html#sqlite3-adapter-converter-recipes
def adapt_date_iso(val): def adapt_date_iso(val: date) -> str:
"""Adapt datetime.date to ISO 8601 date.""" """Adapt datetime.date to ISO 8601 date."""
return val.isoformat() return val.isoformat()
def adapt_datetime_iso(val): def adapt_datetime_iso(val: datetime) -> str:
"""Adapt datetime.datetime to timezone-naive ISO 8601 date.""" """Adapt datetime.datetime to timezone-naive ISO 8601 date."""
return val.isoformat() return val.isoformat()
def convert_date(val): def convert_date(val: bytes) -> date:
"""Convert ISO 8601 date to datetime.date object.""" """Convert ISO 8601 date to datetime.date object."""
return datetime.fromisoformat(val.decode()).date() return datetime.fromisoformat(val.decode()).date()
def convert_datetime(val): def convert_datetime(val: bytes) -> datetime:
"""Convert ISO 8601 datetime to datetime.datetime object.""" """Convert ISO 8601 datetime to datetime.datetime object."""
return datetime.fromisoformat(val.decode()) return datetime.fromisoformat(val.decode())
@ -172,7 +174,7 @@ def store_results_in_database(
if db_path.upper() == "NONE": if db_path.upper() == "NONE":
return return
def convert_timestamp(timestamp): def convert_timestamp(timestamp: Any) -> Optional[datetime]:
"""Convert pandas Timestamp to Python datetime object for SQLite compatibility.""" """Convert pandas Timestamp to Python datetime object for SQLite compatibility."""
if timestamp is None: if timestamp is None:
return None return None
@ -423,14 +425,14 @@ class BacktestResult:
def add_trade( def add_trade(
self, self,
pair_nm, pair_nm: str,
symbol, symbol: str,
action, action: str,
price, price: Any,
disequilibrium=None, disequilibrium: Optional[float] = None,
scaled_disequilibrium=None, scaled_disequilibrium: Optional[float] = None,
timestamp=None, timestamp: Optional[datetime] = None,
): ) -> None:
"""Add a trade to the results tracking.""" """Add a trade to the results tracking."""
pair_nm = str(pair_nm) pair_nm = str(pair_nm)
@ -442,11 +444,11 @@ class BacktestResult:
(action, price, disequilibrium, scaled_disequilibrium, timestamp) (action, price, disequilibrium, scaled_disequilibrium, timestamp)
) )
def add_outstanding_position(self, position: Dict[str, Any]): def add_outstanding_position(self, position: Dict[str, Any]) -> None:
"""Add an outstanding position to tracking.""" """Add an outstanding position to tracking."""
self.outstanding_positions.append(position) self.outstanding_positions.append(position)
def add_realized_pnl(self, realized_pnl: float): def add_realized_pnl(self, realized_pnl: float) -> None:
"""Add realized PnL to the total.""" """Add realized PnL to the total."""
self.total_realized_pnl += realized_pnl self.total_realized_pnl += realized_pnl
@ -462,14 +464,12 @@ class BacktestResult:
"""Get all trades.""" """Get all trades."""
return self.trades return self.trades
def clear_trades(self): def clear_trades(self) -> None:
"""Clear all trades (used when processing new files).""" """Clear all trades (used when processing new files)."""
self.trades.clear() self.trades.clear()
def collect_single_day_results(self, result): def collect_single_day_results(self, result: pd.DataFrame) -> None:
"""Collect and process single day trading results.""" """Collect and process single day trading results."""
if result is None:
return
print("\n -------------- Suggested Trades ") print("\n -------------- Suggested Trades ")
print(result) print(result)
@ -482,16 +482,16 @@ class BacktestResult:
scaled_disequilibrium = getattr(row, "scaled_disequilibrium", None) scaled_disequilibrium = getattr(row, "scaled_disequilibrium", None)
timestamp = getattr(row, "time", None) timestamp = getattr(row, "time", None)
self.add_trade( self.add_trade(
pair_nm=row.pair, pair_nm=str(row.pair),
action=action, action=str(action),
symbol=symbol, symbol=str(symbol),
price=price, price=float(str(price)),
disequilibrium=disequilibrium, disequilibrium=disequilibrium,
scaled_disequilibrium=scaled_disequilibrium, scaled_disequilibrium=scaled_disequilibrium,
timestamp=timestamp, timestamp=timestamp,
) )
def print_single_day_results(self): def print_single_day_results(self) -> None:
"""Print single day results summary.""" """Print single day results summary."""
for pair, symbols in self.trades.items(): for pair, symbols in self.trades.items():
print(f"\n--- {pair} ---") print(f"\n--- {pair} ---")
@ -501,7 +501,7 @@ class BacktestResult:
side, price = trade_data[:2] side, price = trade_data[:2]
print(f"{symbol} {side} at ${price}") print(f"{symbol} {side} at ${price}")
def print_results_summary(self, all_results): def print_results_summary(self, all_results: Dict[str, Dict[str, Any]]) -> None:
"""Print summary of all processed files.""" """Print summary of all processed files."""
print("\n====== Summary of All Processed Files ======") print("\n====== Summary of All Processed Files ======")
for filename, data in all_results.items(): for filename, data in all_results.items():
@ -512,7 +512,7 @@ class BacktestResult:
) )
print(f"{filename}: {trade_count} trades") print(f"{filename}: {trade_count} trades")
def calculate_returns(self, all_results: Dict): def calculate_returns(self, all_results: Dict[str, Dict[str, Any]]) -> None:
"""Calculate and print returns by day and pair.""" """Calculate and print returns by day and pair."""
print("\n====== Returns By Day and Pair ======") print("\n====== Returns By Day and Pair ======")
@ -527,80 +527,87 @@ class BacktestResult:
# Calculate individual symbol returns in the pair # Calculate individual symbol returns in the pair
for symbol, trades in symbols.items(): for symbol, trades in symbols.items():
if len(trades) >= 2: # Need at least entry and exit if len(trades) == 0:
# Get entry and exit trades - handle both old and new tuple formats continue
if len(trades[0]) == 2: # Old format: (action, price)
entry_action, entry_price = trades[0] symbol_return = 0
exit_action, exit_price = trades[1] symbol_trades = []
open_disequilibrium = None
open_scaled_disequilibrium = None # Process all trades sequentially for this symbol
close_disequilibrium = None for i, trade in enumerate(trades):
close_scaled_disequilibrium = None # Handle both old and new tuple formats
if len(trade) == 2: # Old format: (action, price)
action, price = trade
disequilibrium = None
scaled_disequilibrium = None
timestamp = None
else: # New format: (action, price, disequilibrium, scaled_disequilibrium, timestamp) else: # New format: (action, price, disequilibrium, scaled_disequilibrium, timestamp)
entry_action, entry_price = trades[0][:2] action, price = trade[:2]
exit_action, exit_price = trades[1][:2] disequilibrium = trade[2] if len(trade) > 2 else None
open_disequilibrium = ( scaled_disequilibrium = trade[3] if len(trade) > 3 else None
trades[0][2] if len(trades[0]) > 2 else None timestamp = trade[4] if len(trade) > 4 else None
)
open_scaled_disequilibrium = ( symbol_trades.append((action, price, disequilibrium, scaled_disequilibrium, timestamp))
trades[0][3] if len(trades[0]) > 3 else None
) # Calculate returns for all trade combinations
close_disequilibrium = ( for i in range(len(symbol_trades) - 1):
trades[1][2] if len(trades[1]) > 2 else None trade1 = symbol_trades[i]
) trade2 = symbol_trades[i + 1]
close_scaled_disequilibrium = (
trades[1][3] if len(trades[1]) > 3 else None action1, price1, diseq1, scaled_diseq1, ts1 = trade1
) action2, price2, diseq2, scaled_diseq2, ts2 = trade2
# Calculate return based on action # Calculate return based on action combination
symbol_return = 0 trade_return = 0
if entry_action == "BUY" and exit_action == "SELL": if action1 == "BUY" and action2 == "SELL":
# Long position # Long position
symbol_return = ( trade_return = (price2 - price1) / price1 * 100
(exit_price - entry_price) / entry_price * 100 elif action1 == "SELL" and action2 == "BUY":
)
elif entry_action == "SELL" and exit_action == "BUY":
# Short position # Short position
symbol_return = ( trade_return = (price1 - price2) / price1 * 100
(entry_price - exit_price) / entry_price * 100
) symbol_return += trade_return
# Store trade details for reporting
pair_trades.append( pair_trades.append(
( (
symbol, symbol,
entry_action, action1,
entry_price, price1,
exit_action, action2,
exit_price, price2,
symbol_return, trade_return,
open_scaled_disequilibrium, scaled_diseq1,
close_scaled_disequilibrium, scaled_diseq2,
i + 1, # Trade sequence number
) )
) )
pair_return += symbol_return
pair_return += symbol_return
# Print pair returns with disequilibrium information # Print pair returns with disequilibrium information
if pair_trades: if pair_trades:
print(f" {pair}:") print(f" {pair}:")
for ( for (
symbol, symbol,
entry_action, action1,
entry_price, price1,
exit_action, action2,
exit_price, price2,
symbol_return, trade_return,
open_scaled_disequilibrium, scaled_diseq1,
close_scaled_disequilibrium, scaled_diseq2,
trade_num,
) in pair_trades: ) in pair_trades:
disequil_info = "" disequil_info = ""
if ( if (
open_scaled_disequilibrium is not None scaled_diseq1 is not None
and close_scaled_disequilibrium is not None and scaled_diseq2 is not None
): ):
disequil_info = f" | Open Dis-eq: {open_scaled_disequilibrium:.2f}, Close Dis-eq: {close_scaled_disequilibrium:.2f}" disequil_info = f" | Open Dis-eq: {scaled_diseq1:.2f}, Close Dis-eq: {scaled_diseq2:.2f}"
print( print(
f" {symbol}: {entry_action} @ ${entry_price:.2f}, {exit_action} @ ${exit_price:.2f}, Return: {symbol_return:.2f}%{disequil_info}" f" {symbol} (Trade #{trade_num}): {action1} @ ${price1:.2f}, {action2} @ ${price2:.2f}, Return: {trade_return:.2f}%{disequil_info}"
) )
print(f" Pair Total Return: {pair_return:.2f}%") print(f" Pair Total Return: {pair_return:.2f}%")
day_return += pair_return day_return += pair_return
@ -610,7 +617,7 @@ class BacktestResult:
print(f" Day Total Return: {day_return:.2f}%") print(f" Day Total Return: {day_return:.2f}%")
self.add_realized_pnl(day_return) self.add_realized_pnl(day_return)
def print_outstanding_positions(self): def print_outstanding_positions(self) -> None:
"""Print all outstanding positions with share quantities and current values.""" """Print all outstanding positions with share quantities and current values."""
if not self.get_outstanding_positions(): if not self.get_outstanding_positions():
print("\n====== NO OUTSTANDING POSITIONS ======") print("\n====== NO OUTSTANDING POSITIONS ======")
@ -684,22 +691,22 @@ class BacktestResult:
print(f"{'TOTAL OUTSTANDING VALUE':<80} ${total_value:<12.2f}") print(f"{'TOTAL OUTSTANDING VALUE':<80} ${total_value:<12.2f}")
def print_grand_totals(self): def print_grand_totals(self) -> None:
"""Print grand totals across all pairs.""" """Print grand totals across all pairs."""
print(f"\n====== GRAND TOTALS ACROSS ALL PAIRS ======") print(f"\n====== GRAND TOTALS ACROSS ALL PAIRS ======")
print(f"Total Realized PnL: {self.get_total_realized_pnl():.2f}%") print(f"Total Realized PnL: {self.get_total_realized_pnl():.2f}%")
def handle_outstanding_position( def handle_outstanding_position(
self, self,
pair, pair: TradingPair,
pair_result_df, pair_result_df: pd.DataFrame,
last_row_index, last_row_index: int,
open_side_a, open_side_a: str,
open_side_b, open_side_b: str,
open_px_a, open_px_a: float,
open_px_b, open_px_b: float,
open_tstamp, open_tstamp: datetime,
): ) -> Tuple[float, float, float]:
""" """
Handle calculation and tracking of outstanding positions when no close signal is found. Handle calculation and tracking of outstanding positions when no close signal is found.

View File

@ -1,4 +1,5 @@
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import pandas as pd # type:ignore import pandas as pd # type:ignore
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults # type:ignore from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults # type:ignore
@ -19,6 +20,8 @@ class TradingPair:
user_data_: Dict[str, Any] user_data_: Dict[str, Any]
predicted_df_: Optional[pd.DataFrame]
def __init__( def __init__(
self, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str self, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str
): ):
@ -31,7 +34,7 @@ class TradingPair:
self.user_data_ = {} self.user_data_ = {}
self.predicted_df_ = pd.DataFrame() self.predicted_df_ = None
def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
# Select only the columns we need # Select only the columns we need
@ -127,9 +130,9 @@ class TradingPair:
df = self.training_df_[self.colnames()].reset_index(drop=True) df = self.training_df_[self.colnames()].reset_index(drop=True)
result = coint_johansen(df, det_order=0, k_ar_diff=1) result = coint_johansen(df, det_order=0, k_ar_diff=1)
print( # print(
f"{self}: lr1={result.lr1[0]} > cvt={result.cvt[0, 1]}? {result.lr1[0] > result.cvt[0, 1]}" # f"{self}: lr1={result.lr1[0]} > cvt={result.cvt[0, 1]}? {result.lr1[0] > result.cvt[0, 1]}"
) # )
is_cointegrated: bool = bool(result.lr1[0] > result.cvt[0, 1]) is_cointegrated: bool = bool(result.lr1[0] > result.cvt[0, 1])
return is_cointegrated return is_cointegrated
@ -146,21 +149,22 @@ class TradingPair:
pvalue = coint(series1, series2)[1] pvalue = coint(series1, series2)[1]
# Define cointegration if p-value < 0.05 (i.e., reject null of no cointegration) # Define cointegration if p-value < 0.05 (i.e., reject null of no cointegration)
is_cointegrated: bool = bool(pvalue < 0.05) is_cointegrated: bool = bool(pvalue < 0.05)
print(f"{self}: is_cointegrated={is_cointegrated} pvalue={pvalue}") # print(f"{self}: is_cointegrated={is_cointegrated} pvalue={pvalue}")
return is_cointegrated return is_cointegrated
def train_pair(self) -> bool: def check_cointegration(self) -> bool:
is_cointegrated_johansen = self.check_cointegration_johansen() is_cointegrated_johansen = self.check_cointegration_johansen()
is_cointegrated_engle_granger = self.check_cointegration_engle_granger() is_cointegrated_engle_granger = self.check_cointegration_engle_granger()
if not is_cointegrated_johansen and not is_cointegrated_engle_granger: result = is_cointegrated_johansen or is_cointegrated_engle_granger
return False return result or True # TODO: remove this
pass
def train_pair(self) -> bool:
result = self.check_cointegration()
# print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80) # print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80)
self.fit_VECM() self.fit_VECM()
assert self.training_df_ is not None and self.vecm_fit_ is not None assert self.training_df_ is not None and self.vecm_fit_ is not None
diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
print(diseq_series.shape) # print(diseq_series.shape)
self.training_mu_ = float(diseq_series[0].mean()) self.training_mu_ = float(diseq_series[0].mean())
self.training_std_ = float(diseq_series[0].std()) self.training_std_ = float(diseq_series[0].std())
@ -172,7 +176,16 @@ class TradingPair:
diseq_series - self.training_mu_ diseq_series - self.training_mu_
) / self.training_std_ ) / self.training_std_
return True return result
def add_trades(self, trades: pd.DataFrame) -> None:
if self.user_data_["trades"] is None:
self.user_data_["trades"] = pd.DataFrame(trades)
else:
self.user_data_["trades"] = pd.concat([self.user_data_["trades"], pd.DataFrame(trades)], ignore_index=True)
def get_trades(self) -> pd.DataFrame:
return self.user_data_["trades"] if "trades" in self.user_data_ else pd.DataFrame()
def predict(self) -> pd.DataFrame: def predict(self) -> pd.DataFrame:
assert self.testing_df_ is not None assert self.testing_df_ is not None
@ -184,24 +197,6 @@ class TradingPair:
predicted_prices, columns=pd.Index(self.colnames()), dtype=float predicted_prices, columns=pd.Index(self.colnames()), dtype=float
) )
# self.predicted_df_ = pd.merge(
# self.testing_df_.reset_index(drop=True),
# pd.DataFrame(
# predicted_prices, columns=pd.Index(self.colnames()), dtype=float
# ),
# left_index=True,
# right_index=True,
# suffixes=("", "_pred"),
# ).dropna()
# self.predicted_df_["disequilibrium"] = (
# self.predicted_df_[self.colnames()] @ self.vecm_fit_.beta
# )
# self.predicted_df_["scaled_disequilibrium"] = (
# abs(self.predicted_df_["disequilibrium"] - self.training_mu_)
# / self.training_std_
# )
predicted_df = pd.merge( predicted_df = pd.merge(
self.testing_df_.reset_index(drop=True), self.testing_df_.reset_index(drop=True),
@ -222,17 +217,20 @@ class TradingPair:
/ self.training_std_ / self.training_std_
) )
print("*** PREDICTED DF") # print("*** PREDICTED DF")
print(predicted_df) # print(predicted_df)
print("*" * 80) # print("*" * 80)
print("*** SELF.PREDICTED_DF") # print("*** SELF.PREDICTED_DF")
print(self.predicted_df_) # print(self.predicted_df_)
print("*" * 80) # print("*" * 80)
predicted_df = predicted_df.reset_index(drop=True) predicted_df = predicted_df.reset_index(drop=True)
self.predicted_df_ = pd.concat([self.predicted_df_, predicted_df], ignore_index=True) if self.predicted_df_ is None:
self.predicted_df_ = predicted_df
else:
self.predicted_df_ = pd.concat([self.predicted_df_, predicted_df], ignore_index=True)
# Reset index to ensure proper indexing # Reset index to ensure proper indexing
self.predicted_df_ = self.predicted_df_.reset_index() self.predicted_df_ = self.predicted_df_.reset_index(drop=True)
return self.predicted_df_ return self.predicted_df_
def __repr__(self) -> str: def __repr__(self) -> str:

File diff suppressed because one or more lines are too long

View File

@ -99,7 +99,7 @@ def run_backtest(
) )
if single_pair_trades is not None and len(single_pair_trades) > 0: if single_pair_trades is not None and len(single_pair_trades) > 0:
pairs_trades.append(single_pair_trades) pairs_trades.append(single_pair_trades)
print(f"pairs_trades: {pairs_trades}")
# Check if result_list has any data before concatenating # Check if result_list has any data before concatenating
if len(pairs_trades) == 0: if len(pairs_trades) == 0:
print("No trading signals found for any pairs") print("No trading signals found for any pairs")