pairs_trading/lib/pt_strategy/model_data_policy.py
Oleg Sheynin 73f36ddcea progress
2025-08-02 00:12:31 +00:00

257 lines
9.7 KiB
Python

from __future__ import annotations
import copy
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, Optional, cast
import numpy as np
import pandas as pd
@dataclass
class DataWindowParams:
training_size: int
training_start_index: int
class ModelDataPolicy(ABC):
config_: Dict[str, Any]
current_data_params_: DataWindowParams
count_: int
is_real_time_: bool
def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any):
self.config_ = config
training_size = config.get("training_size", 120)
training_start_index = 0
if kwargs.get("is_real_time", False):
training_size = 120
training_start_index = 0
else:
training_size = config.get("training_size", 120)
self.current_data_params_ = DataWindowParams(
training_size=config.get("training_size", 120),
training_start_index=0,
)
self.count_ = 0
self.is_real_time_ = kwargs.get("is_real_time", False)
@abstractmethod
def advance(self, mkt_data_df: Optional[pd.DataFrame] = None) -> DataWindowParams:
self.count_ += 1
print(self.count_, end="\r")
return self.current_data_params_
@staticmethod
def create(config: Dict[str, Any], *args: Any, **kwargs: Any) -> ModelDataPolicy:
import importlib
model_data_policy_class_name = config.get("model_data_policy_class", None)
assert model_data_policy_class_name is not None
module_name, class_name = model_data_policy_class_name.rsplit(".", 1)
module = importlib.import_module(module_name)
model_training_data_policy_object = getattr(module, class_name)(
config=config, *args, **kwargs
)
return cast(ModelDataPolicy, model_training_data_policy_object)
class RollingWindowDataPolicy(ModelDataPolicy):
def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any):
super().__init__(config, *args, **kwargs)
self.count_ = 1
def advance(self, mkt_data_df: Optional[pd.DataFrame] = None) -> DataWindowParams:
super().advance(mkt_data_df)
if self.is_real_time_:
self.current_data_params_.training_start_index = -self.current_data_params_.training_size
else:
self.current_data_params_.training_start_index += 1
return self.current_data_params_
class OptimizedWndDataPolicy(ModelDataPolicy, ABC):
mkt_data_df_: pd.DataFrame
pair_: TradingPair # type: ignore
min_training_size_: int
max_training_size_: int
end_index_: int
prices_a_: np.ndarray
prices_b_: np.ndarray
def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any):
super().__init__(config, *args, **kwargs)
assert (
kwargs.get("pair") is not None
), "pair must be provided"
assert (
"min_training_size" in config and "max_training_size" in config
), "min_training_size and max_training_size must be provided"
self.min_training_size_ = cast(int, config.get("min_training_size"))
self.max_training_size_ = cast(int, config.get("max_training_size"))
from pt_strategy.trading_pair import TradingPair
self.pair_ = cast(TradingPair, kwargs.get("pair"))
if "mkt_data" in kwargs:
self.mkt_data_df_ = cast(pd.DataFrame, kwargs.get("mkt_data"))
col_a, col_b = self.pair_.colnames()
self.prices_a_ = np.array(self.mkt_data_df_[col_a])
self.prices_b_ = np.array(self.mkt_data_df_[col_b])
assert self.min_training_size_ < self.max_training_size_
def advance(self, mkt_data_df: Optional[pd.DataFrame] = None) -> DataWindowParams:
super().advance(mkt_data_df)
if mkt_data_df is not None:
self.mkt_data_df_ = mkt_data_df
if self.is_real_time_:
self.end_index_ = len(self.mkt_data_df_) - 1
else:
self.end_index_ = self.current_data_params_.training_start_index + self.max_training_size_
if self.end_index_ > len(self.mkt_data_df_) - 1:
self.end_index_ = len(self.mkt_data_df_) - 1
self.current_data_params_.training_start_index = self.end_index_ - self.max_training_size_
if self.current_data_params_.training_start_index < 0:
self.current_data_params_.training_start_index = 0
col_a, col_b = self.pair_.colnames()
self.prices_a_ = np.array(self.mkt_data_df_[col_a])
self.prices_b_ = np.array(self.mkt_data_df_[col_b])
self.current_data_params_ = self.optimize_window_size()
return self.current_data_params_
@abstractmethod
def optimize_window_size(self) -> DataWindowParams:
...
class EGOptimizedWndDataPolicy(OptimizedWndDataPolicy):
'''
# Engle-Granger cointegration test
*** VERY SLOW ***
'''
def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any):
super().__init__(config, *args, **kwargs)
def optimize_window_size(self) -> DataWindowParams:
# Run Engle-Granger cointegration test
last_pvalue = 1.0
result = copy.copy(self.current_data_params_)
for trn_size in range(self.min_training_size_, self.max_training_size_):
if self.end_index_ - trn_size < 0:
break
from statsmodels.tsa.stattools import coint # type: ignore
start_index = self.end_index_ - trn_size
series_a = self.prices_a_[start_index : self.end_index_]
series_b = self.prices_b_[start_index : self.end_index_]
eg_pvalue = float(coint(series_a, series_b)[1])
if eg_pvalue < last_pvalue:
last_pvalue = eg_pvalue
result.training_size = trn_size
result.training_start_index = start_index
# print(
# f"*** DEBUG *** end_index={self.end_index_}, best_trn_size={self.current_data_params_.training_size}, {last_pvalue=}"
# )
return result
class ADFOptimizedWndDataPolicy(OptimizedWndDataPolicy):
# Augmented Dickey-Fuller test
def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any):
super().__init__(config, *args, **kwargs)
def optimize_window_size(self) -> DataWindowParams:
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.tools import add_constant
from statsmodels.tsa.stattools import adfuller
last_pvalue = 1.0
result = copy.copy(self.current_data_params_)
for trn_size in range(self.min_training_size_, self.max_training_size_):
if self.end_index_ - trn_size < 0:
break
start_index = self.end_index_ - trn_size
y = self.prices_a_[start_index : self.end_index_]
x = self.prices_b_[start_index : self.end_index_]
# Add constant to x for intercept
x_with_const = add_constant(x)
# OLS regression: y = a + b*x + e
model = OLS(y, x_with_const).fit()
residuals = y - model.predict(x_with_const)
# ADF test on residuals
try:
adf_result = adfuller(residuals, maxlag=1, regression="c")
adf_pvalue = float(adf_result[1])
except Exception as e:
# Handle edge cases with exception (e.g., constant series, etc.)
adf_pvalue = 1.0
if adf_pvalue < last_pvalue:
last_pvalue = adf_pvalue
result.training_size = trn_size
result.training_start_index = start_index
# print(
# f"*** DEBUG *** end_index={self.end_index_},"
# f" best_trn_size={self.current_data_params_.training_size},"
# f" {last_pvalue=}"
# )
return result
class JohansenOptdWndDataPolicy(OptimizedWndDataPolicy):
# Johansen test
def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any):
super().__init__(config, *args, **kwargs)
def optimize_window_size(self) -> DataWindowParams:
from statsmodels.tsa.vector_ar.vecm import coint_johansen
import numpy as np
best_stat = -np.inf
best_trn_size = 0
best_start_index = -1
result = copy.copy(self.current_data_params_)
for trn_size in range(self.min_training_size_, self.max_training_size_):
if self.end_index_ - trn_size < 0:
break
start_index = self.end_index_ - trn_size
series_a = self.prices_a_[start_index:self.end_index_]
series_b = self.prices_b_[start_index:self.end_index_]
# Combine into 2D matrix for Johansen test
try:
data = np.column_stack([series_a, series_b])
# Johansen test: det_order=0 (no deterministic trend), k_ar_diff=1 (lag)
res = coint_johansen(data, det_order=0, k_ar_diff=1)
# Trace statistic for cointegration rank 1
trace_stat = res.lr1[0] # test stat for rank=0 vs >=1
critical_value = res.cvt[0, 1] # 5% critical value
if trace_stat > best_stat:
best_stat = trace_stat
best_trn_size = trn_size
best_start_index = start_index
except Exception:
continue
if best_trn_size > 0:
result.training_size = best_trn_size
result.training_start_index = best_start_index
else:
print("*** WARNING: No valid cointegration window found.")
# print(
# f"*** DEBUG *** end_index={self.end_index_}, best_trn_size={best_trn_size}, trace_stat={best_stat}"
# )
return result