from __future__ import annotations import copy from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any, Dict, cast import numpy as np import pandas as pd @dataclass class DataWindowParams: training_size: int training_start_index: int class ModelDataPolicy(ABC): config_: Dict[str, Any] current_data_params_: DataWindowParams count_: int def __init__(self, config: Dict[str, Any]): self.config_ = config self.current_data_params_ = DataWindowParams( training_size=config.get("training_size", 120), training_start_index=0, ) self.count_ = 0 @abstractmethod def advance(self) -> DataWindowParams: self.count_ += 1 print(self.count_, end="\r") return self.current_data_params_ @staticmethod def create(config: Dict[str, Any], *args: Any, **kwargs: Any) -> ModelDataPolicy: import importlib model_data_policy_class_name = config.get("model_data_policy_class", None) assert model_data_policy_class_name is not None module_name, class_name = model_data_policy_class_name.rsplit(".", 1) module = importlib.import_module(module_name) model_training_data_policy_object = getattr(module, class_name)( config=config, *args, **kwargs ) return cast(ModelDataPolicy, model_training_data_policy_object) class RollingWindowDataPolicy(ModelDataPolicy): def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any): super().__init__(config) self.count_ = 1 def advance(self) -> DataWindowParams: super().advance() self.current_data_params_.training_start_index += 1 return self.current_data_params_ class ExpandingWindowDataPolicy(ModelDataPolicy): def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any): super().__init__(config) def advance(self) -> DataWindowParams: super().advance() self.current_data_params_.training_size += 1 return self.current_data_params_ class OptimizedWndDataPolicy(ModelDataPolicy, ABC): mkt_data_df_: pd.DataFrame pair_: TradingPair # type: ignore min_training_size_: int max_training_size_: int end_index_: int prices_a_: np.ndarray prices_b_: np.ndarray def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any): super().__init__(config) assert ( kwargs.get("mkt_data") is not None and kwargs.get("pair") is not None ), "mkt_data and/or pair must be provided" assert ( "min_training_size" in config and "max_training_size" in config ), "min_training_size and max_training_size must be provided" self.min_training_size_ = cast(int, config.get("min_training_size")) self.max_training_size_ = cast(int, config.get("max_training_size")) assert self.min_training_size_ < self.max_training_size_ from pt_strategy.trading_pair import TradingPair self.mkt_data_df_ = cast(pd.DataFrame, kwargs.get("mkt_data")) self.pair_ = cast(TradingPair, kwargs.get("pair")) self.end_index_ = ( self.current_data_params_.training_start_index + self.max_training_size_ ) col_a, col_b = self.pair_.colnames() self.prices_a_ = np.array(self.mkt_data_df_[col_a]) self.prices_b_ = np.array(self.mkt_data_df_[col_b]) def advance(self) -> DataWindowParams: super().advance() self.current_data_params_ = self.optimize_window_size() self.end_index_ += 1 return self.current_data_params_ @abstractmethod def optimize_window_size(self) -> DataWindowParams: ... class EGOptimizedWndDataPolicy(OptimizedWndDataPolicy): ''' # Engle-Granger cointegration test *** VERY SLOW *** ''' def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any): super().__init__(config, *args, **kwargs) def optimize_window_size(self) -> DataWindowParams: # Run Engle-Granger cointegration test last_pvalue = 1.0 result = copy.copy(self.current_data_params_) for trn_size in range(self.min_training_size_, self.max_training_size_): from statsmodels.tsa.stattools import coint # type: ignore start_index = self.end_index_ - trn_size series_a = self.prices_a_[start_index : self.end_index_] series_b = self.prices_b_[start_index : self.end_index_] eg_pvalue = float(coint(series_a, series_b)[1]) if eg_pvalue < last_pvalue: last_pvalue = eg_pvalue result.training_size = trn_size result.training_start_index = start_index # print( # f"*** DEBUG *** end_index={self.end_index_}, best_trn_size={self.current_data_params_.training_size}, {last_pvalue=}" # ) return result class ADFOptimizedWndDataPolicy(OptimizedWndDataPolicy): # Augmented Dickey-Fuller test def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any): super().__init__(config, *args, **kwargs) def optimize_window_size(self) -> DataWindowParams: from statsmodels.regression.linear_model import OLS from statsmodels.tools.tools import add_constant from statsmodels.tsa.stattools import adfuller last_pvalue = 1.0 result = copy.copy(self.current_data_params_) for trn_size in range(self.min_training_size_, self.max_training_size_): start_index = self.end_index_ - trn_size y = self.prices_a_[start_index : self.end_index_] x = self.prices_b_[start_index : self.end_index_] # Add constant to x for intercept x_with_const = add_constant(x) # OLS regression: y = a + b*x + e model = OLS(y, x_with_const).fit() residuals = y - model.predict(x_with_const) # ADF test on residuals try: adf_result = adfuller(residuals, maxlag=1, regression="c") adf_pvalue = float(adf_result[1]) except Exception as e: # Handle edge cases with exception (e.g., constant series, etc.) adf_pvalue = 1.0 if adf_pvalue < last_pvalue: last_pvalue = adf_pvalue result.training_size = trn_size result.training_start_index = start_index # print( # f"*** DEBUG *** end_index={self.end_index_}," # f" best_trn_size={self.current_data_params_.training_size}," # f" {last_pvalue=}" # ) return result class JohansenOptdWndDataPolicy(OptimizedWndDataPolicy): # Johansen test def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any): super().__init__(config, *args, **kwargs) def optimize_window_size(self) -> DataWindowParams: from statsmodels.tsa.vector_ar.vecm import coint_johansen import numpy as np best_stat = -np.inf best_trn_size = 0 best_start_index = -1 result = copy.copy(self.current_data_params_) for trn_size in range(self.min_training_size_, self.max_training_size_): start_index = self.end_index_ - trn_size series_a = self.prices_a_[start_index:self.end_index_] series_b = self.prices_b_[start_index:self.end_index_] # Combine into 2D matrix for Johansen test try: data = np.column_stack([series_a, series_b]) # Johansen test: det_order=0 (no deterministic trend), k_ar_diff=1 (lag) res = coint_johansen(data, det_order=0, k_ar_diff=1) # Trace statistic for cointegration rank 1 trace_stat = res.lr1[0] # test stat for rank=0 vs >=1 critical_value = res.cvt[0, 1] # 5% critical value if trace_stat > best_stat: best_stat = trace_stat best_trn_size = trn_size best_start_index = start_index except Exception: continue if best_trn_size > 0: result.training_size = best_trn_size result.training_start_index = best_start_index else: print("*** WARNING: No valid cointegration window found.") # print( # f"*** DEBUG *** end_index={self.end_index_}, best_trn_size={best_trn_size}, trace_stat={best_stat}" # ) return result