This commit is contained in:
Oleg Sheynin 2025-07-21 05:15:33 +00:00
parent 28386cdf12
commit b87b40a6ed
11 changed files with 253 additions and 510 deletions

View File

@ -14,7 +14,7 @@
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 1.0,
"training_minutes": 120,
"fit_method_class": "pt_trading.sliding_fit.SlidingFit",
"fit_method_class": "pt_trading.vecm_rolling_fit.VECMRollingFit",
# ====== Stop Conditions ======
"stop_close_conditions": {

View File

@ -16,7 +16,7 @@
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 1.0,
"training_minutes": 120,
"fit_method_class": "pt_trading.sliding_fit.SlidingFit",
"fit_method_class": "pt_trading.vecm_rolling_fit.VECMRollingFit",
# ====== Stop Conditions ======
"stop_close_conditions": {

View File

@ -2,7 +2,7 @@ from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Optional, cast
import pandas as pd # type: ignore[import]
import pandas as pd
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import TradingPair
@ -28,4 +28,8 @@ class PairsTradingFitMethod(ABC):
@abstractmethod
def reset(self) -> None: ...
@abstractmethod
def create_trading_pair(
self, config: Dict, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str
) -> TradingPair: ...

View File

@ -1,15 +1,16 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Optional, cast
from typing import Any, Dict, Optional, cast
import pandas as pd # type: ignore[import]
from pt_trading.fit_method import PairsTradingFitMethod
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import CointegrationData, TradingPair, PairState
from pt_trading.trading_pair import PairState, TradingPair
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults
NanoPerMin = 1e9
class SlidingFit(PairsTradingFitMethod):
class RollingFit(PairsTradingFitMethod):
def __init__(self) -> None:
super().__init__()
@ -52,17 +53,11 @@ class SlidingFit(PairsTradingFitMethod):
)
break
try:
# ================================ TRAINING ================================
pair.train_pair()
except Exception as e:
raise RuntimeError(f"{pair}: Training failed: {str(e)}") from e
try:
# ================================ PREDICTION ================================
pair.predict()
self.pair_predict_result_ = pair.predict()
except Exception as e:
raise RuntimeError(f"{pair}: Prediction failed: {str(e)}") from e
raise RuntimeError(f"{pair}: TrainingPrediction failed: {str(e)}") from e
# break
@ -72,20 +67,21 @@ class SlidingFit(PairsTradingFitMethod):
curr_predicted_row_idx += 1
self._create_trading_signals(pair, config, bt_result)
print(f"***{pair}*** FINISHED *** Num Trades:{len(pair.user_data_['trades'])}")
print(f"***{pair}*** FINISHED *** Num Trades:{len(pair.user_data_['trades'])}")
return pair.get_trades()
def _create_trading_signals(
self, pair: TradingPair, config: Dict, bt_result: BacktestResult
) -> None:
if pair.predicted_df_ is None:
print(f"{pair.market_data_.iloc[0]['tstamp']} {pair}: No predicted data")
return
predicted_df = self.pair_predict_result_
assert predicted_df is not None
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_close_trshld"]
for curr_predicted_row_idx in range(len(pair.predicted_df_)):
pred_row = pair.predicted_df_.iloc[curr_predicted_row_idx]
for curr_predicted_row_idx in range(len(predicted_df)):
pred_row = predicted_df.iloc[curr_predicted_row_idx]
scaled_disequilibrium = pred_row["scaled_disequilibrium"]
if pair.user_data_["state"] in [PairState.INITIAL, PairState.CLOSE, PairState.CLOSE_POSITION]:
@ -141,10 +137,10 @@ class SlidingFit(PairsTradingFitMethod):
pair.user_data_["state"] = PairState.CLOSE_POSITION
pair.on_close_trades(close_position_trades)
else:
if pair.predicted_df_ is not None:
if predicted_df is not None:
bt_result.handle_outstanding_position(
pair=pair,
pair_result_df=pair.predicted_df_,
pair_result_df=predicted_df,
last_row_index=0,
open_side_a=pair.user_data_["open_side_a"],
open_side_b=pair.user_data_["open_side_b"],
@ -158,13 +154,6 @@ class SlidingFit(PairsTradingFitMethod):
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames()
assert pair.predicted_df_ is not None
predicted_df = pair.predicted_df_
# Check if we have any data to work with
if len(predicted_df) == 0:
return None
open_row = row
open_tstamp = open_row["tstamp"]
open_disequilibrium = open_row["disequilibrium"]
@ -238,10 +227,6 @@ class SlidingFit(PairsTradingFitMethod):
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames()
assert pair.predicted_df_ is not None
if len(pair.predicted_df_) == 0:
return None
close_row = row
close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"]
@ -289,114 +274,6 @@ class SlidingFit(PairsTradingFitMethod):
"pair": "object"
})
# def _get_stop_close_trades(
# self, pair: TradingPair, row: pd.Series, close_threshold: float
# ) -> Optional[pd.DataFrame]:
# colname_a, colname_b = pair.colnames()
# assert pair.predicted_df_ is not None
# if len(pair.predicted_df_) == 0:
# return None
# stop_close_row = row
# stop_close_tstamp = stop_close_row["tstamp"]
# stop_close_disequilibrium = stop_close_row["disequilibrium"]
# stop_close_scaled_disequilibrium = stop_close_row["scaled_disequilibrium"]
# stop_close_px_a = stop_close_row[f"{colname_a}"]
# stop_close_px_b = stop_close_row[f"{colname_b}"]
# stop_close_side_a = pair.user_data_["close_side_a"]
# stop_close_side_b = pair.user_data_["close_side_b"]
# trd_signal_tuples = [
# (
# stop_close_tstamp,
# stop_close_side_a,
# pair.symbol_a_,
# stop_close_px_a,
# stop_close_disequilibrium,
# stop_close_scaled_disequilibrium,
# pair,
# ),
# (
# stop_close_tstamp,
# stop_close_side_b,
# pair.symbol_b_,
# stop_close_px_b,
# stop_close_disequilibrium,
# stop_close_scaled_disequilibrium,
# pair,
# ),
# ]
# df = pd.DataFrame(
# trd_signal_tuples,
# columns=self.TRADES_COLUMNS,
# )
# # Ensure consistent dtypes
# return df.astype({
# "time": "datetime64[ns]",
# "action": "string",
# "symbol": "string",
# "price": "float64",
# "disequilibrium": "float64",
# "scaled_disequilibrium": "float64",
# "pair": "object"
# })
# def _get_close_position_trades(
# self, pair: TradingPair, row: pd.Series, close_threshold: float
# ) -> Optional[pd.DataFrame]:
# colname_a, colname_b = pair.colnames()
# assert pair.predicted_df_ is not None
# if len(pair.predicted_df_) == 0:
# return None
# close_position_row = row
# close_position_tstamp = close_position_row["tstamp"]
# close_position_disequilibrium = close_position_row["disequilibrium"]
# close_position_scaled_disequilibrium = close_position_row["scaled_disequilibrium"]
# close_position_px_a = close_position_row[f"{colname_a}"]
# close_position_px_b = close_position_row[f"{colname_b}"]
# close_position_side_a = pair.user_data_["close_side_a"]
# close_position_side_b = pair.user_data_["close_side_b"]
# trd_signal_tuples = [
# (
# close_position_tstamp,
# close_position_side_a,
# pair.symbol_a_,
# close_position_px_a,
# close_position_disequilibrium,
# close_position_scaled_disequilibrium,
# pair,
# ),
# (
# close_position_tstamp,
# close_position_side_b,
# pair.symbol_b_,
# close_position_px_b,
# close_position_disequilibrium,
# close_position_scaled_disequilibrium,
# pair,
# ),
# ]
# # Add tuples to data frame with explicit dtypes to avoid concatenation warnings
# df = pd.DataFrame(
# trd_signal_tuples,
# columns=self.TRADES_COLUMNS,
# )
# # Ensure consistent dtypes
# return df.astype({
# "time": "datetime64[ns]",
# "action": "string",
# "symbol": "string",
# "price": "float64",
# "disequilibrium": "float64",
# "scaled_disequilibrium": "float64",
# "pair": "object"
# })
def reset(self) -> None:
curr_training_start_idx = 0

View File

@ -1,10 +1,11 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional
import pandas as pd # type:ignore
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults
class PairState(Enum):
INITIAL = 1
@ -40,7 +41,7 @@ class CointegrationData:
self.johansen_is_cointegrated_ = self.johansen_lr1_ > self.johansen_cvt_
# Run Engle-Granger cointegration test
from statsmodels.tsa.stattools import coint #type: ignore
from statsmodels.tsa.stattools import coint # type: ignore
col1, col2 = pair.colnames()
assert training_df is not None
@ -68,7 +69,7 @@ class CointegrationData:
return f"CointegrationData(tstamp={self.tstamp_}, pair={self.pair_}, eg_pvalue={self.eg_pvalue_}, johansen_lr1={self.johansen_lr1_}, johansen_cvt={self.johansen_cvt_}, eg_is_cointegrated={self.eg_is_cointegrated_}, johansen_is_cointegrated={self.johansen_is_cointegrated_})"
class TradingPair:
class TradingPair(ABC):
market_data_: pd.DataFrame
symbol_a_: str
symbol_b_: str
@ -80,11 +81,9 @@ class TradingPair:
training_df_: pd.DataFrame
testing_df_: pd.DataFrame
vecm_fit_: VECMResults
user_data_: Dict[str, Any]
predicted_df_: Optional[pd.DataFrame]
# predicted_df_: Optional[pd.DataFrame]
def __init__(
self, config: Dict[str, Any], market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str
@ -193,42 +192,6 @@ class TradingPair:
f"{self.price_column_}_{self.symbol_b_}",
]
def fit_VECM(self) -> None:
assert self.training_df_ is not None
vecm_df = self.training_df_[self.colnames()].reset_index(drop=True)
vecm_model = VECM(vecm_df, coint_rank=1)
vecm_fit = vecm_model.fit()
assert vecm_fit is not None
# URGENT check beta and alpha
# Check if the model converged properly
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
print(f"{self}: VECM model failed to converge properly")
self.vecm_fit_ = vecm_fit
# print(f"{self}: beta={self.vecm_fit_.beta} alpha={self.vecm_fit_.alpha}" )
# print(f"{self}: {self.vecm_fit_.summary()}")
pass
def train_pair(self) -> None:
# print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80)
self.fit_VECM()
assert self.training_df_ is not None and self.vecm_fit_ is not None
diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
# print(diseq_series.shape)
self.training_mu_ = float(diseq_series[0].mean())
self.training_std_ = float(diseq_series[0].std())
self.training_df_["dis-equilibrium"] = (
self.training_df_[self.colnames()] @ self.vecm_fit_.beta
)
# Normalize the dis-equilibrium
self.training_df_["scaled_dis-equilibrium"] = (
diseq_series - self.training_mu_
) / self.training_std_
def add_trades(self, trades: pd.DataFrame) -> None:
if self.user_data_["trades"] is None or len(self.user_data_["trades"]) == 0:
# If trades is empty or None, just assign the new trades directly
@ -267,45 +230,6 @@ class TradingPair:
def get_trades(self) -> pd.DataFrame:
return self.user_data_["trades"] if "trades" in self.user_data_ else pd.DataFrame()
def predict(self) -> pd.DataFrame:
assert self.testing_df_ is not None
assert self.vecm_fit_ is not None
predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_))
# Convert prediction to a DataFrame for readability
predicted_df = pd.DataFrame(
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
)
predicted_df = pd.merge(
self.testing_df_.reset_index(drop=True),
pd.DataFrame(
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
),
left_index=True,
right_index=True,
suffixes=("", "_pred"),
).dropna()
predicted_df["disequilibrium"] = (
predicted_df[self.colnames()] @ self.vecm_fit_.beta
)
predicted_df["scaled_disequilibrium"] = (
abs(predicted_df["disequilibrium"] - self.training_mu_)
/ self.training_std_
)
predicted_df = predicted_df.reset_index(drop=True)
if self.predicted_df_ is None:
self.predicted_df_ = predicted_df
else:
self.predicted_df_ = pd.concat([self.predicted_df_, predicted_df], ignore_index=True)
# Reset index to ensure proper indexing
self.predicted_df_ = self.predicted_df_.reset_index(drop=True)
return self.predicted_df_
def cointegration_check(self) -> Optional[pd.DataFrame]:
print(f"***{self}*** STARTING....")
config = self.config_
@ -394,3 +318,10 @@ class TradingPair:
def name(self) -> str:
return f"{self.symbol_a_} & {self.symbol_b_}"
# return f"{self.symbol_a_} & {self.symbol_b_}"
@abstractmethod
def predict(self) -> pd.DataFrame: ...
# @abstractmethod
# def predicted_df(self) -> Optional[pd.DataFrame]: ...

View File

@ -0,0 +1,111 @@
from typing import Any, Dict, Optional, cast
import pandas as pd
from pt_trading.results import BacktestResult
from pt_trading.rolling_window_fit import RollingFit
from pt_trading.trading_pair import TradingPair
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults
NanoPerMin = 1e9
class VECMTradingPair(TradingPair):
vecm_fit_: Optional[VECMResults]
pair_predict_result_: Optional[pd.DataFrame]
def __init__(self, config: Dict[str, Any], market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str):
super().__init__(config, market_data, symbol_a, symbol_b, price_column)
self.vecm_fit_ = None
self.pair_predict_result_ = None
def _train_pair(self) -> None:
# print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80)
self._fit_VECM()
assert self.training_df_ is not None and self.vecm_fit_ is not None
diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
# print(diseq_series.shape)
self.training_mu_ = float(diseq_series[0].mean())
self.training_std_ = float(diseq_series[0].std())
self.training_df_["dis-equilibrium"] = (
self.training_df_[self.colnames()] @ self.vecm_fit_.beta
)
# Normalize the dis-equilibrium
self.training_df_["scaled_dis-equilibrium"] = (
diseq_series - self.training_mu_
) / self.training_std_
def _fit_VECM(self) -> None:
assert self.training_df_ is not None
vecm_df = self.training_df_[self.colnames()].reset_index(drop=True)
vecm_model = VECM(vecm_df, coint_rank=1)
vecm_fit = vecm_model.fit()
assert vecm_fit is not None
# URGENT check beta and alpha
# Check if the model converged properly
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
print(f"{self}: VECM model failed to converge properly")
self.vecm_fit_ = vecm_fit
pass
def predict(self) -> pd.DataFrame:
self._train_pair()
assert self.testing_df_ is not None
assert self.vecm_fit_ is not None
predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_))
# Convert prediction to a DataFrame for readability
predicted_df = pd.DataFrame(
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
)
predicted_df = pd.merge(
self.testing_df_.reset_index(drop=True),
pd.DataFrame(
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
),
left_index=True,
right_index=True,
suffixes=("", "_pred"),
).dropna()
predicted_df["disequilibrium"] = (
predicted_df[self.colnames()] @ self.vecm_fit_.beta
)
predicted_df["scaled_disequilibrium"] = (
abs(predicted_df["disequilibrium"] - self.training_mu_)
/ self.training_std_
)
predicted_df = predicted_df.reset_index(drop=True)
if self.pair_predict_result_ is None:
self.pair_predict_result_ = predicted_df
else:
self.pair_predict_result_ = pd.concat([self.pair_predict_result_, predicted_df], ignore_index=True)
# Reset index to ensure proper indexing
self.pair_predict_result_ = self.pair_predict_result_.reset_index(drop=True)
return self.pair_predict_result_
class VECMRollingFit(RollingFit):
def __init__(self) -> None:
super().__init__()
def run_pair(
self, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]:
return super().run_pair(pair, bt_result)
def create_trading_pair(
self, config: Dict, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str
) -> TradingPair:
return VECMTradingPair(
config=config,
market_data=market_data,
symbol_a=symbol_a,
symbol_b=symbol_b,
price_column=price_column
)

File diff suppressed because one or more lines are too long

View File

@ -72,7 +72,7 @@ def run_backtest(
bt_result: BacktestResult = BacktestResult(config=config)
pairs_trades = []
for pair in create_pairs(datafile, price_column, config, instruments):
for pair in create_pairs(datafile=datafile, fit_method=fit_method, price_column=price_column, config=config, instruments=instruments):
single_pair_trades = fit_method.run_pair(
pair=pair, bt_result=bt_result
)

View File

@ -2,6 +2,8 @@ import glob
import os
from typing import Dict, List, Optional
from pt_trading.fit_method import PairsTradingFitMethod
def resolve_datafiles(config: Dict, cli_datafiles: Optional[str] = None) -> List[str]:
@ -42,7 +44,7 @@ def resolve_datafiles(config: Dict, cli_datafiles: Optional[str] = None) -> List
return sorted(list(set(resolved_files))) # Remove duplicates and sort
def create_pairs(datafile: str, price_column: str, config: Dict, instruments: List[str]) -> List:
def create_pairs(datafile: str, fit_method: PairsTradingFitMethod, price_column: str, config: Dict, instruments: List[str]) -> List:
from tools.data_loader import load_market_data
from pt_trading.trading_pair import TradingPair
all_indexes = range(len(instruments))
@ -57,7 +59,7 @@ def create_pairs(datafile: str, price_column: str, config: Dict, instruments: Li
for a_index, b_index in unique_index_pairs:
from research.pt_backtest import TradingPair
pair = TradingPair(
pair = fit_method.create_trading_pair(
config=config_copy,
market_data=market_data_df,
symbol_a=instruments[a_index],

View File

@ -47,7 +47,7 @@ def run_strategy(
market_data_df = load_market_data(datafile, config=config_copy)
for a_index, b_index in unique_index_pairs:
pair = TradingPair(
pair = fit_method.create_trading_pair(
market_data=market_data_df,
symbol_a=instruments[a_index],
symbol_b=instruments[b_index],