pairs_trading/lib/pt_trading/static_fit.py
2025-07-17 00:19:49 +00:00

213 lines
7.2 KiB
Python

from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Optional, cast
import pandas as pd # type: ignore[import]
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import TradingPair
from pt_trading.fit_method import PairsTradingFitMethod
NanoPerMin = 1e9
class StaticFit(PairsTradingFitMethod):
def run_pair(
self, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]: # abstractmethod
config = pair.config_
pair.get_datasets(training_minutes=config["training_minutes"])
try:
pair.predict()
except Exception as e:
print(f"{pair}: Prediction failed: {str(e)}")
return None
pair_trades = self.create_trading_signals(
pair=pair, config=config, result=bt_result
)
return pair_trades
def create_trading_signals(
self, pair: TradingPair, config: Dict, result: BacktestResult
) -> pd.DataFrame:
beta = pair.vecm_fit_.beta # type: ignore
colname_a, colname_b = pair.colnames()
predicted_df = pair.predicted_df_
if predicted_df is None:
# Return empty DataFrame with correct columns and dtypes
return pd.DataFrame(columns=self.TRADES_COLUMNS).astype({
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object"
})
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_close_trshld"]
# Iterate through the testing dataset to find the first trading opportunity
open_row_index = None
for row_idx in range(len(predicted_df)):
curr_disequilibrium = predicted_df["scaled_disequilibrium"][row_idx]
# Check if current row has sufficient disequilibrium (not near-zero)
if curr_disequilibrium >= open_threshold:
open_row_index = row_idx
break
# If no row with sufficient disequilibrium found, skip this pair
if open_row_index is None:
print(f"{pair}: Insufficient disequilibrium in testing dataset. Skipping.")
return pd.DataFrame()
# Look for close signal starting from the open position
trading_signals_df = (
predicted_df["scaled_disequilibrium"][open_row_index:] < close_threshold
)
# Adjust indices to account for the offset from open_row_index
close_row_index = None
for idx, value in trading_signals_df.items():
if value:
close_row_index = idx
break
open_row = predicted_df.loc[open_row_index]
open_px_a = predicted_df.at[open_row_index, f"{colname_a}"]
open_px_b = predicted_df.at[open_row_index, f"{colname_b}"]
open_tstamp = predicted_df.at[open_row_index, "tstamp"]
open_disequilibrium = open_row["disequilibrium"]
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
abs_beta = abs(beta[1])
pred_px_b = predicted_df.loc[open_row_index][f"{colname_b}_pred"]
pred_px_a = predicted_df.loc[open_row_index][f"{colname_a}_pred"]
if pred_px_b * abs_beta - pred_px_a > 0:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
else:
open_side_b = "BUY"
open_side_a = "SELL"
close_side_b = "SELL"
close_side_a = "BUY"
# If no close signal found, print position and unrealized PnL
if close_row_index is None:
last_row_index = len(predicted_df) - 1
# Use the new method from BacktestResult to handle outstanding positions
result.handle_outstanding_position(
pair=pair,
pair_result_df=predicted_df,
last_row_index=last_row_index,
open_side_a=open_side_a,
open_side_b=open_side_b,
open_px_a=float(open_px_a),
open_px_b=float(open_px_b),
open_tstamp=pd.Timestamp(open_tstamp),
)
# Return only open trades (no close trades)
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
]
else:
# Close signal found - create complete trade
close_row = predicted_df.loc[close_row_index]
close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"]
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
print(f"{pair}: Close signal found at index {close_row_index}")
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
close_tstamp,
close_side_a,
pair.symbol_a_,
close_px_a,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
(
close_tstamp,
close_side_b,
pair.symbol_b_,
close_px_b,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype({
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object"
})
def reset(self) -> None:
pass