420 lines
14 KiB
Python
420 lines
14 KiB
Python
from abc import ABC, abstractmethod
|
|
from enum import Enum
|
|
from typing import Dict, Optional, cast
|
|
|
|
import pandas as pd # type: ignore[import]
|
|
|
|
from pt_trading.results import BacktestResult
|
|
from pt_trading.trading_pair import TradingPair
|
|
|
|
NanoPerMin = 1e9
|
|
|
|
class PairsTradingFitMethod(ABC):
|
|
TRADES_COLUMNS = [
|
|
"time",
|
|
"action",
|
|
"symbol",
|
|
"price",
|
|
"disequilibrium",
|
|
"scaled_disequilibrium",
|
|
"pair",
|
|
]
|
|
@abstractmethod
|
|
def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]:
|
|
...
|
|
|
|
@abstractmethod
|
|
def reset(self):
|
|
...
|
|
|
|
class StaticFit(PairsTradingFitMethod):
|
|
|
|
def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]: # abstractmethod
|
|
pair.get_datasets(training_minutes=config["training_minutes"])
|
|
try:
|
|
is_cointegrated = pair.train_pair()
|
|
if not is_cointegrated:
|
|
print(f"{pair} IS NOT COINTEGRATED")
|
|
return None
|
|
except Exception as e:
|
|
print(f"{pair}: Training failed: {str(e)}")
|
|
return None
|
|
|
|
try:
|
|
pair.predict()
|
|
except Exception as e:
|
|
print(f"{pair}: Prediction failed: {str(e)}")
|
|
return None
|
|
|
|
pair_trades = self.create_trading_signals(pair=pair, config=config, result=bt_result)
|
|
|
|
return pair_trades
|
|
|
|
def create_trading_signals(self, pair: TradingPair, config: Dict, result: BacktestResult) -> pd.DataFrame:
|
|
beta = pair.vecm_fit_.beta # type: ignore
|
|
colname_a, colname_b = pair.colnames()
|
|
|
|
predicted_df = pair.predicted_df_
|
|
|
|
open_threshold = config["dis-equilibrium_open_trshld"]
|
|
close_threshold = config["dis-equilibrium_close_trshld"]
|
|
|
|
# Iterate through the testing dataset to find the first trading opportunity
|
|
open_row_index = None
|
|
for row_idx in range(len(predicted_df)):
|
|
curr_disequilibrium = predicted_df["scaled_disequilibrium"][row_idx]
|
|
|
|
# Check if current row has sufficient disequilibrium (not near-zero)
|
|
if curr_disequilibrium >= open_threshold:
|
|
open_row_index = row_idx
|
|
break
|
|
|
|
# If no row with sufficient disequilibrium found, skip this pair
|
|
if open_row_index is None:
|
|
print(f"{pair}: Insufficient disequilibrium in testing dataset. Skipping.")
|
|
return pd.DataFrame()
|
|
|
|
# Look for close signal starting from the open position
|
|
trading_signals_df = (
|
|
predicted_df["scaled_disequilibrium"][open_row_index:] < close_threshold
|
|
)
|
|
|
|
# Adjust indices to account for the offset from open_row_index
|
|
close_row_index = None
|
|
for idx, value in trading_signals_df.items():
|
|
if value:
|
|
close_row_index = idx
|
|
break
|
|
|
|
open_row = predicted_df.loc[open_row_index]
|
|
open_tstamp = open_row["tstamp"]
|
|
open_disequilibrium = open_row["disequilibrium"]
|
|
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
|
|
open_px_a = open_row[f"{colname_a}"]
|
|
open_px_b = open_row[f"{colname_b}"]
|
|
|
|
abs_beta = abs(beta[1])
|
|
pred_px_b = predicted_df.loc[open_row_index][f"{colname_b}_pred"]
|
|
pred_px_a = predicted_df.loc[open_row_index][f"{colname_a}_pred"]
|
|
|
|
if pred_px_b * abs_beta - pred_px_a > 0:
|
|
open_side_a = "BUY"
|
|
open_side_b = "SELL"
|
|
close_side_a = "SELL"
|
|
close_side_b = "BUY"
|
|
else:
|
|
open_side_b = "BUY"
|
|
open_side_a = "SELL"
|
|
close_side_b = "SELL"
|
|
close_side_a = "BUY"
|
|
|
|
# If no close signal found, print position and unrealized PnL
|
|
if close_row_index is None:
|
|
|
|
last_row_index = len(predicted_df) - 1
|
|
|
|
# Use the new method from BacktestResult to handle outstanding positions
|
|
result.handle_outstanding_position(
|
|
pair=pair,
|
|
pair_result_df=predicted_df,
|
|
last_row_index=last_row_index,
|
|
open_side_a=open_side_a,
|
|
open_side_b=open_side_b,
|
|
open_px_a=open_px_a,
|
|
open_px_b=open_px_b,
|
|
open_tstamp=open_tstamp,
|
|
)
|
|
|
|
# Return only open trades (no close trades)
|
|
trd_signal_tuples = [
|
|
(
|
|
open_tstamp,
|
|
open_side_a,
|
|
pair.symbol_a_,
|
|
open_px_a,
|
|
open_disequilibrium,
|
|
open_scaled_disequilibrium,
|
|
pair,
|
|
),
|
|
(
|
|
open_tstamp,
|
|
open_side_b,
|
|
pair.symbol_b_,
|
|
open_px_b,
|
|
open_disequilibrium,
|
|
open_scaled_disequilibrium,
|
|
pair,
|
|
),
|
|
]
|
|
else:
|
|
# Close signal found - create complete trade
|
|
close_row = predicted_df.loc[close_row_index]
|
|
close_tstamp = close_row["tstamp"]
|
|
close_disequilibrium = close_row["disequilibrium"]
|
|
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
|
|
close_px_a = close_row[f"{colname_a}"]
|
|
close_px_b = close_row[f"{colname_b}"]
|
|
|
|
print(f"{pair}: Close signal found at index {close_row_index}")
|
|
|
|
trd_signal_tuples = [
|
|
(
|
|
open_tstamp,
|
|
open_side_a,
|
|
pair.symbol_a_,
|
|
open_px_a,
|
|
open_disequilibrium,
|
|
open_scaled_disequilibrium,
|
|
pair,
|
|
),
|
|
(
|
|
open_tstamp,
|
|
open_side_b,
|
|
pair.symbol_b_,
|
|
open_px_b,
|
|
open_disequilibrium,
|
|
open_scaled_disequilibrium,
|
|
pair,
|
|
),
|
|
(
|
|
close_tstamp,
|
|
close_side_a,
|
|
pair.symbol_a_,
|
|
close_px_a,
|
|
close_disequilibrium,
|
|
close_scaled_disequilibrium,
|
|
pair,
|
|
),
|
|
(
|
|
close_tstamp,
|
|
close_side_b,
|
|
pair.symbol_b_,
|
|
close_px_b,
|
|
close_disequilibrium,
|
|
close_scaled_disequilibrium,
|
|
pair,
|
|
),
|
|
]
|
|
|
|
# Add tuples to data frame
|
|
return pd.DataFrame(
|
|
trd_signal_tuples,
|
|
columns=self.TRADES_COLUMNS, # type: ignore
|
|
)
|
|
|
|
def reset(self) -> None:
|
|
pass
|
|
|
|
class PairState(Enum):
|
|
INITIAL = 1
|
|
OPEN = 2
|
|
CLOSED = 3
|
|
|
|
class SlidingFit(PairsTradingFitMethod):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.curr_training_start_idx_ = 0
|
|
|
|
def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]:
|
|
print(f"***{pair}*** STARTING....")
|
|
|
|
pair.user_data_['state'] = PairState.INITIAL
|
|
pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS) # type: ignore
|
|
pair.user_data_["is_cointegrated"] = False
|
|
|
|
open_threshold = config["dis-equilibrium_open_trshld"]
|
|
close_threshold = config["dis-equilibrium_open_trshld"]
|
|
|
|
training_minutes = config["training_minutes"]
|
|
while True:
|
|
print(self.curr_training_start_idx_, end='\r')
|
|
pair.get_datasets(
|
|
training_minutes=training_minutes,
|
|
training_start_index=self.curr_training_start_idx_,
|
|
testing_size=1
|
|
)
|
|
|
|
if len(pair.training_df_) < training_minutes:
|
|
print(f"{pair}: {self.curr_training_start_idx_} Not enough training data. Completing the job.")
|
|
if pair.user_data_["state"] == PairState.OPEN:
|
|
print(f"{pair}: {self.curr_training_start_idx_} Position is not closed.")
|
|
# outstanding positions
|
|
# last_row_index = self.curr_training_start_idx_ + training_minutes
|
|
|
|
bt_result.handle_outstanding_position(
|
|
pair=pair,
|
|
pair_result_df=pair.predicted_df_,
|
|
last_row_index=0,
|
|
open_side_a=pair.user_data_["open_side_a"],
|
|
open_side_b=pair.user_data_["open_side_b"],
|
|
open_px_a=pair.user_data_["open_px_a"],
|
|
open_px_b=pair.user_data_["open_px_b"],
|
|
open_tstamp=pair.user_data_["open_tstamp"],
|
|
)
|
|
break
|
|
|
|
try:
|
|
is_cointegrated = pair.train_pair()
|
|
except Exception as e:
|
|
raise RuntimeError(f"{pair}: Training failed: {str(e)}") from e
|
|
|
|
if pair.user_data_["is_cointegrated"] != is_cointegrated:
|
|
pair.user_data_["is_cointegrated"] = is_cointegrated
|
|
if not is_cointegrated:
|
|
if pair.user_data_["state"] == PairState.OPEN:
|
|
print(f"{pair} {self.curr_training_start_idx_} LOST COINTEGRATION. Consider closing positions...")
|
|
else:
|
|
print(f"{pair} {self.curr_training_start_idx_} IS NOT COINTEGRATED. Moving on")
|
|
else:
|
|
print('*' * 80)
|
|
print(f"Pair {pair} ({self.curr_training_start_idx_}) IS COINTEGRATED")
|
|
print('*' * 80)
|
|
if not is_cointegrated:
|
|
self.curr_training_start_idx_ += 1
|
|
continue
|
|
|
|
try:
|
|
pair.predict()
|
|
except Exception as e:
|
|
raise RuntimeError(f"{pair}: Prediction failed: {str(e)}") from e
|
|
|
|
if pair.user_data_["state"] == PairState.INITIAL:
|
|
|
|
open_trades = self._get_open_trades(pair, open_threshold=open_threshold)
|
|
if open_trades is not None:
|
|
pair.user_data_["trades"] = open_trades
|
|
pair.user_data_["state"] = PairState.OPEN
|
|
elif pair.user_data_["state"] == PairState.OPEN:
|
|
close_trades = self._get_close_trades(pair, close_threshold=close_threshold)
|
|
if close_trades is not None:
|
|
pair.user_data_["trades"] = pd.concat([pair.user_data_["trades"], close_trades], ignore_index=True)
|
|
pair.user_data_["state"] = PairState.CLOSED
|
|
break
|
|
|
|
self.curr_training_start_idx_ += 1
|
|
|
|
print(f"***{pair}*** FINISHED ... {len(pair.user_data_['trades'])}")
|
|
return pair.user_data_["trades"]
|
|
|
|
def _get_open_trades(self, pair: TradingPair, open_threshold: float) -> Optional[pd.DataFrame]:
|
|
colname_a, colname_b = pair.colnames()
|
|
|
|
predicted_df = pair.predicted_df_
|
|
|
|
# Check if we have any data to work with
|
|
if len(predicted_df) == 0:
|
|
return None
|
|
|
|
open_row = predicted_df.iloc[0]
|
|
open_tstamp = open_row["tstamp"]
|
|
open_disequilibrium = open_row["disequilibrium"]
|
|
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
|
|
open_px_a = open_row[f"{colname_a}"]
|
|
open_px_b = open_row[f"{colname_b}"]
|
|
|
|
if open_scaled_disequilibrium < open_threshold:
|
|
return None
|
|
|
|
# creating the trades
|
|
if open_disequilibrium > 0:
|
|
open_side_a = "SELL"
|
|
open_side_b = "BUY"
|
|
close_side_a = "BUY"
|
|
close_side_b = "SELL"
|
|
else:
|
|
open_side_a = "BUY"
|
|
open_side_b = "SELL"
|
|
close_side_a = "SELL"
|
|
close_side_b = "BUY"
|
|
|
|
# save closing sides
|
|
pair.user_data_["open_side_a"] = open_side_a
|
|
pair.user_data_["open_side_b"] = open_side_b
|
|
pair.user_data_["open_px_a"] = open_px_a
|
|
pair.user_data_["open_px_b"] = open_px_b
|
|
|
|
pair.user_data_["open_tstamp"] = open_tstamp
|
|
|
|
pair.user_data_["close_side_a"] = close_side_a
|
|
pair.user_data_["close_side_b"] = close_side_b
|
|
|
|
|
|
# create opening trades
|
|
trd_signal_tuples = [
|
|
(
|
|
open_tstamp,
|
|
open_side_a,
|
|
pair.symbol_a_,
|
|
open_px_a,
|
|
open_disequilibrium,
|
|
open_scaled_disequilibrium,
|
|
pair,
|
|
),
|
|
(
|
|
open_tstamp,
|
|
open_side_b,
|
|
pair.symbol_b_,
|
|
open_px_b,
|
|
open_disequilibrium,
|
|
open_scaled_disequilibrium,
|
|
pair,
|
|
),
|
|
]
|
|
return pd.DataFrame(
|
|
trd_signal_tuples,
|
|
columns=self.TRADES_COLUMNS, # type: ignore
|
|
)
|
|
|
|
def _get_close_trades(self, pair: TradingPair, close_threshold: float) -> Optional[pd.DataFrame]:
|
|
colname_a, colname_b = pair.colnames()
|
|
|
|
# Check if we have any data to work with
|
|
if len(pair.predicted_df_) == 0:
|
|
return None
|
|
|
|
close_row = pair.predicted_df_.iloc[0]
|
|
close_tstamp = close_row["tstamp"]
|
|
close_disequilibrium = close_row["disequilibrium"]
|
|
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
|
|
close_px_a = close_row[f"{colname_a}"]
|
|
close_px_b = close_row[f"{colname_b}"]
|
|
|
|
close_side_a = pair.user_data_["close_side_a"]
|
|
close_side_b = pair.user_data_["close_side_b"]
|
|
|
|
if close_scaled_disequilibrium > close_threshold:
|
|
return None
|
|
|
|
trd_signal_tuples = [
|
|
(
|
|
close_tstamp,
|
|
close_side_a,
|
|
pair.symbol_a_,
|
|
close_px_a,
|
|
close_disequilibrium,
|
|
close_scaled_disequilibrium,
|
|
pair,
|
|
),
|
|
(
|
|
close_tstamp,
|
|
close_side_b,
|
|
pair.symbol_b_,
|
|
close_px_b,
|
|
close_disequilibrium,
|
|
close_scaled_disequilibrium,
|
|
pair,
|
|
),
|
|
]
|
|
|
|
# Add tuples to data frame
|
|
return pd.DataFrame(
|
|
trd_signal_tuples,
|
|
columns=self.TRADES_COLUMNS, # type: ignore
|
|
)
|
|
|
|
def reset(self):
|
|
self.curr_training_start_idx_ = 0
|
|
|
|
|
|
|