pairs_trading/lib/pt_trading/rolling_window_fit.py
Oleg Sheynin 31eb9f800c bug fix
2025-07-22 17:25:16 +00:00

293 lines
11 KiB
Python

from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, Optional, cast
import pandas as pd # type: ignore[import]
from pt_trading.fit_method import PairsTradingFitMethod
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import PairState, TradingPair
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults
NanoPerMin = 1e9
class RollingFit(PairsTradingFitMethod):
'''
N O T E:
=========
- This class remains to be abstract
- The following methods are to be implemented in the subclass:
- create_trading_pair()
=========
'''
def __init__(self) -> None:
super().__init__()
def run_pair(
self, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]:
print(f"***{pair}*** STARTING....")
config = pair.config_
curr_training_start_idx = pair.get_begin_index()
end_index = pair.get_end_index()
pair.user_data_["state"] = PairState.INITIAL
# Initialize trades DataFrame with proper dtypes to avoid concatenation warnings
pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS).astype({
"time": "datetime64[ns]",
"symbol": "string",
"side": "string",
"action": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object"
})
training_minutes = config["training_minutes"]
curr_predicted_row_idx = 0
while True:
print(curr_training_start_idx, end="\r")
pair.get_datasets(
training_minutes=training_minutes,
training_start_index=curr_training_start_idx,
testing_size=1,
)
if len(pair.training_df_) < training_minutes:
print(
f"{pair}: current offset={curr_training_start_idx}"
f" * Training data length={len(pair.training_df_)} < {training_minutes}"
" * Not enough training data. Completing the job."
)
break
try:
# ================================ PREDICTION ================================
self.pair_predict_result_ = pair.predict()
except Exception as e:
raise RuntimeError(f"{pair}: TrainingPrediction failed: {str(e)}") from e
# break
curr_training_start_idx += 1
if curr_training_start_idx > end_index:
break
curr_predicted_row_idx += 1
self._create_trading_signals(pair, config, bt_result)
print(f"***{pair}*** FINISHED *** Num Trades:{len(pair.user_data_['trades'])}")
return pair.get_trades()
def _create_trading_signals(
self, pair: TradingPair, config: Dict, bt_result: BacktestResult
) -> None:
predicted_df = self.pair_predict_result_
assert predicted_df is not None
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_close_trshld"]
for curr_predicted_row_idx in range(len(predicted_df)):
pred_row = predicted_df.iloc[curr_predicted_row_idx]
scaled_disequilibrium = pred_row["scaled_disequilibrium"]
if pair.user_data_["state"] in [PairState.INITIAL, PairState.CLOSE, PairState.CLOSE_POSITION]:
if scaled_disequilibrium >= open_threshold:
open_trades = self._get_open_trades(
pair, row=pred_row, open_threshold=open_threshold
)
if open_trades is not None:
open_trades["status"] = PairState.OPEN.name
print(f"OPEN TRADES:\n{open_trades}")
pair.add_trades(open_trades)
pair.user_data_["state"] = PairState.OPEN
pair.on_open_trades(open_trades)
elif pair.user_data_["state"] == PairState.OPEN:
if scaled_disequilibrium <= close_threshold:
close_trades = self._get_close_trades(
pair, row=pred_row, close_threshold=close_threshold
)
if close_trades is not None:
close_trades["status"] = PairState.CLOSE.name
print(f"CLOSE TRADES:\n{close_trades}")
pair.add_trades(close_trades)
pair.user_data_["state"] = PairState.CLOSE
pair.on_close_trades(close_trades)
elif pair.to_stop_close_conditions(predicted_row=pred_row):
close_trades = self._get_close_trades(
pair, row=pred_row, close_threshold=close_threshold
)
if close_trades is not None:
close_trades["status"] = pair.user_data_["stop_close_state"].name
print(f"STOP CLOSE TRADES:\n{close_trades}")
pair.add_trades(close_trades)
pair.user_data_["state"] = pair.user_data_["stop_close_state"]
pair.on_close_trades(close_trades)
# Outstanding positions
if pair.user_data_["state"] == PairState.OPEN:
print(
f"{pair}: *** Position is NOT CLOSED. ***"
)
# outstanding positions
if config["close_outstanding_positions"]:
close_position_trades = self._get_close_trades(
pair=pair,
row=pred_row,
close_threshold=close_threshold
)
if close_position_trades is not None:
close_position_trades["status"] = PairState.CLOSE_POSITION.name
print(f"CLOSE_POSITION TRADES:\n{close_position_trades}")
pair.add_trades(close_position_trades)
pair.user_data_["state"] = PairState.CLOSE_POSITION
pair.on_close_trades(close_position_trades)
else:
if predicted_df is not None:
bt_result.handle_outstanding_position(
pair=pair,
pair_result_df=predicted_df,
last_row_index=0,
open_side_a=pair.user_data_["open_side_a"],
open_side_b=pair.user_data_["open_side_b"],
open_px_a=pair.user_data_["open_px_a"],
open_px_b=pair.user_data_["open_px_b"],
open_tstamp=pair.user_data_["open_tstamp"],
)
def _get_open_trades(
self, pair: TradingPair, row: pd.Series, open_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames()
open_row = row
open_tstamp = open_row["tstamp"]
open_disequilibrium = open_row["disequilibrium"]
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"]
# creating the trades
print(f"OPEN_TRADES: {row["tstamp"]} {open_scaled_disequilibrium=}")
if open_disequilibrium > 0:
open_side_a = "SELL"
open_side_b = "BUY"
close_side_a = "BUY"
close_side_b = "SELL"
else:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
# save closing sides
pair.user_data_["open_side_a"] = open_side_a
pair.user_data_["open_side_b"] = open_side_b
pair.user_data_["open_px_a"] = open_px_a
pair.user_data_["open_px_b"] = open_px_b
pair.user_data_["open_tstamp"] = open_tstamp
pair.user_data_["close_side_a"] = close_side_a
pair.user_data_["close_side_b"] = close_side_b
# create opening trades
trd_signal_tuples = [
(
open_tstamp,
pair.symbol_a_,
open_side_a,
"OPEN",
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
open_tstamp,
pair.symbol_b_,
open_side_b,
"OPEN",
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
]
# Create DataFrame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype({
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object"
})
def _get_close_trades(
self, pair: TradingPair, row: pd.Series, close_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames()
close_row = row
close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"]
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
close_side_a = pair.user_data_["close_side_a"]
close_side_b = pair.user_data_["close_side_b"]
trd_signal_tuples = [
(
close_tstamp,
pair.symbol_a_,
close_side_a,
"CLOSE",
close_px_a,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
(
close_tstamp,
pair.symbol_b_,
close_side_b,
"CLOSE",
close_px_b,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype({
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object"
})
def reset(self) -> None:
curr_training_start_idx = 0