From 8e6ac39674540eb64a8949ac68531854093403f7 Mon Sep 17 00:00:00 2001 From: Oleg Sheynin Date: Thu, 31 Jul 2025 18:53:42 +0000 Subject: [PATCH] added window size optimization classes --- configuration/ols-opt.cfg | 48 ++++ configuration/ols.cfg | 10 +- configuration/{vecm-exp.cfg => vecm-opt.cfg} | 11 +- configuration/vecm.cfg | 8 +- lib/pt_strategy/model_data_policy.py | 219 +++++++++++++++++-- lib/pt_strategy/results.py | 7 +- lib/pt_strategy/trading_pair.py | 12 +- lib/pt_strategy/trading_strategy.py | 30 +-- 8 files changed, 283 insertions(+), 62 deletions(-) create mode 100644 configuration/ols-opt.cfg rename configuration/{vecm-exp.cfg => vecm-opt.cfg} (77%) diff --git a/configuration/ols-opt.cfg b/configuration/ols-opt.cfg new file mode 100644 index 0000000..567f80a --- /dev/null +++ b/configuration/ols-opt.cfg @@ -0,0 +1,48 @@ +{ + "market_data_loading": { + "CRYPTO": { + "data_directory": "./data/crypto", + "db_table_name": "md_1min_bars", + "instrument_id_pfx": "PAIR-", + }, + "EQUITY": { + "data_directory": "./data/equity", + "db_table_name": "md_1min_bars", + "instrument_id_pfx": "STOCK-", + } + }, + + # ====== Funding ====== + "funding_per_pair": 2000.0, + # ====== Trading Parameters ====== + "stat_model_price": "close", + "execution_price": { + "column": "vwap", + "shift": 1, + }, + "dis-equilibrium_open_trshld": 1.75, + "dis-equilibrium_close_trshld": 0.9, + "model_class": "pt_strategy.models.OLSModel", + + # "training_size": 120, + # "model_data_policy_class": "pt_strategy.model_data_policy.RollingWindowDataPolicy", + # "model_data_policy_class": "pt_strategy.model_data_policy.ADFOptimizedWndDataPolicy", + "model_data_policy_class": "pt_strategy.model_data_policy.JohansenOptdWndDataPolicy", + "min_training_size": 60, + "max_training_size": 150, + + # ====== Stop Conditions ====== + "stop_close_conditions": { + "profit": 2.0, + "loss": -0.5 + } + + # ====== End of Session Closeout ====== + "close_outstanding_positions": true, + # "close_outstanding_positions": false, + "trading_hours": { + "timezone": "America/New_York", + "begin_session": "7:30:00", + "end_session": "18:30:00", + } +} \ No newline at end of file diff --git a/configuration/ols.cfg b/configuration/ols.cfg index 0958bc9..ceaa4b8 100644 --- a/configuration/ols.cfg +++ b/configuration/ols.cfg @@ -20,11 +20,15 @@ "column": "vwap", "shift": 1, }, - "dis-equilibrium_open_trshld": 2.0, - "dis-equilibrium_close_trshld": 0.5, - "training_size": 120, + "dis-equilibrium_open_trshld": 1.75, + "dis-equilibrium_close_trshld": 0.9, "model_class": "pt_strategy.models.OLSModel", + + "training_size": 120, "model_data_policy_class": "pt_strategy.model_data_policy.RollingWindowDataPolicy", + # "model_data_policy_class": "pt_strategy.model_data_policy.OptimizedWindowDataPolicy", + # "min_training_size": 60, + # "max_training_size": 150, # ====== Stop Conditions ====== "stop_close_conditions": { diff --git a/configuration/vecm-exp.cfg b/configuration/vecm-opt.cfg similarity index 77% rename from configuration/vecm-exp.cfg rename to configuration/vecm-opt.cfg index d6031cd..8c3f4e5 100644 --- a/configuration/vecm-exp.cfg +++ b/configuration/vecm-opt.cfg @@ -21,11 +21,16 @@ "column": "vwap", "shift": 1, }, - "dis-equilibrium_open_trshld": 2.0, + "dis-equilibrium_open_trshld": 1.75, "dis-equilibrium_close_trshld": 1.0, - "training_size": 120, + "model_class": "pt_strategy.models.VECMModel", - "model_data_policy_class": "pt_strategy.model_data_policy.ExpandingWindowDataPolicy", + + # "training_size": 120, + # "model_data_policy_class": "pt_strategy.model_data_policy.RollingWindowDataPolicy", + "model_data_policy_class": "pt_strategy.model_data_policy.ADFOptimizedWndDataPolicy", + "min_training_size": 60, + "max_training_size": 150, # ====== Stop Conditions ====== "stop_close_conditions": { diff --git a/configuration/vecm.cfg b/configuration/vecm.cfg index 16562d1..71fb9c6 100644 --- a/configuration/vecm.cfg +++ b/configuration/vecm.cfg @@ -21,11 +21,15 @@ "column": "vwap", "shift": 1, }, - "dis-equilibrium_open_trshld": 2.0, + "dis-equilibrium_open_trshld": 1.75, "dis-equilibrium_close_trshld": 1.0, - "training_size": 120, "model_class": "pt_strategy.models.VECMModel", + + "training_size": 120, "model_data_policy_class": "pt_strategy.model_data_policy.RollingWindowDataPolicy", + # "model_data_policy_class": "pt_strategy.model_data_policy.OptimizedWindowDataPolicy", + # "min_training_size": 60, + # "max_training_size": 150, # ====== Stop Conditions ====== "stop_close_conditions": { diff --git a/lib/pt_strategy/model_data_policy.py b/lib/pt_strategy/model_data_policy.py index 6907526..fd0f39b 100644 --- a/lib/pt_strategy/model_data_policy.py +++ b/lib/pt_strategy/model_data_policy.py @@ -1,64 +1,235 @@ from __future__ import annotations +import copy from abc import ABC, abstractmethod from dataclasses import dataclass -from enum import Enum -from typing import Any, Dict, Optional, cast, Generator, List +from typing import Any, Dict, cast + +import numpy as np +import pandas as pd @dataclass -class DataParams: +class DataWindowParams: training_size: int training_start_index: int - + + class ModelDataPolicy(ABC): config_: Dict[str, Any] - current_data_params_: DataParams - count_:int + current_data_params_: DataWindowParams + count_: int - def __init__(self, config: Dict[str, Any]): self.config_ = config - self.current_data_params_ = DataParams( + self.current_data_params_ = DataWindowParams( training_size=config.get("training_size", 120), training_start_index=0, ) self.count_ = 0 - + @abstractmethod - def advance(self) -> DataParams: + def advance(self) -> DataWindowParams: self.count_ += 1 - print(self.count_, end='\r') - + print(self.count_, end="\r") + return self.current_data_params_ + @staticmethod - def create(config: Dict[str, Any]) -> ModelDataPolicy: + def create(config: Dict[str, Any], *args: Any, **kwargs: Any) -> ModelDataPolicy: import importlib - + model_data_policy_class_name = config.get("model_data_policy_class", None) assert model_data_policy_class_name is not None module_name, class_name = model_data_policy_class_name.rsplit(".", 1) module = importlib.import_module(module_name) - model_training_data_policy_object = getattr(module, class_name)(config=config) + model_training_data_policy_object = getattr(module, class_name)( + config=config, *args, **kwargs + ) return cast(ModelDataPolicy, model_training_data_policy_object) + class RollingWindowDataPolicy(ModelDataPolicy): - def __init__(self, config: Dict[str, Any]): + def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any): super().__init__(config) self.count_ = 1 - - def advance(self) -> DataParams: + + def advance(self) -> DataWindowParams: super().advance() self.current_data_params_.training_start_index += 1 return self.current_data_params_ - - + + class ExpandingWindowDataPolicy(ModelDataPolicy): - def __init__(self, config: Dict[str, Any]): + def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any): super().__init__(config) - - def advance(self) -> DataParams: + + def advance(self) -> DataWindowParams: super().advance() self.current_data_params_.training_size += 1 return self.current_data_params_ - + +class OptimizedWndDataPolicy(ModelDataPolicy, ABC): + mkt_data_df_: pd.DataFrame + pair_: TradingPair # type: ignore + min_training_size_: int + max_training_size_: int + end_index_: int + prices_a_: np.ndarray + prices_b_: np.ndarray + + def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any): + super().__init__(config) + assert ( + kwargs.get("mkt_data") is not None and kwargs.get("pair") is not None + ), "mkt_data and/or pair must be provided" + assert ( + "min_training_size" in config and "max_training_size" in config + ), "min_training_size and max_training_size must be provided" + self.min_training_size_ = cast(int, config.get("min_training_size")) + self.max_training_size_ = cast(int, config.get("max_training_size")) + assert self.min_training_size_ < self.max_training_size_ + + from pt_strategy.trading_pair import TradingPair + + self.mkt_data_df_ = cast(pd.DataFrame, kwargs.get("mkt_data")) + self.pair_ = cast(TradingPair, kwargs.get("pair")) + + self.end_index_ = ( + self.current_data_params_.training_start_index + self.max_training_size_ + ) + col_a, col_b = self.pair_.colnames() + self.prices_a_ = np.array(self.mkt_data_df_[col_a]) + self.prices_b_ = np.array(self.mkt_data_df_[col_b]) + + + def advance(self) -> DataWindowParams: + super().advance() + self.current_data_params_ = self.optimize_window_size() + self.end_index_ += 1 + return self.current_data_params_ + + @abstractmethod + def optimize_window_size(self) -> DataWindowParams: + ... + +class EGOptimizedWndDataPolicy(OptimizedWndDataPolicy): + ''' + # Engle-Granger cointegration test + *** VERY SLOW *** + ''' + def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any): + super().__init__(config, *args, **kwargs) + + def optimize_window_size(self) -> DataWindowParams: + # Run Engle-Granger cointegration test + last_pvalue = 1.0 + result = copy.copy(self.current_data_params_) + for trn_size in range(self.min_training_size_, self.max_training_size_): + from statsmodels.tsa.stattools import coint # type: ignore + + start_index = self.end_index_ - trn_size + series_a = self.prices_a_[start_index : self.end_index_] + series_b = self.prices_b_[start_index : self.end_index_] + eg_pvalue = float(coint(series_a, series_b)[1]) + if eg_pvalue < last_pvalue: + last_pvalue = eg_pvalue + result.training_size = trn_size + result.training_start_index = start_index + + # print( + # f"*** DEBUG *** end_index={self.end_index_}, best_trn_size={self.current_data_params_.training_size}, {last_pvalue=}" + # ) + return result + +class ADFOptimizedWndDataPolicy(OptimizedWndDataPolicy): + # Augmented Dickey-Fuller test + def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any): + super().__init__(config, *args, **kwargs) + + def optimize_window_size(self) -> DataWindowParams: + from statsmodels.regression.linear_model import OLS + from statsmodels.tools.tools import add_constant + from statsmodels.tsa.stattools import adfuller + + last_pvalue = 1.0 + result = copy.copy(self.current_data_params_) + for trn_size in range(self.min_training_size_, self.max_training_size_): + start_index = self.end_index_ - trn_size + y = self.prices_a_[start_index : self.end_index_] + x = self.prices_b_[start_index : self.end_index_] + + # Add constant to x for intercept + x_with_const = add_constant(x) + + # OLS regression: y = a + b*x + e + model = OLS(y, x_with_const).fit() + residuals = y - model.predict(x_with_const) + + # ADF test on residuals + try: + adf_result = adfuller(residuals, maxlag=1, regression="c") + adf_pvalue = float(adf_result[1]) + except Exception as e: + # Handle edge cases with exception (e.g., constant series, etc.) + adf_pvalue = 1.0 + + if adf_pvalue < last_pvalue: + last_pvalue = adf_pvalue + result.training_size = trn_size + result.training_start_index = start_index + + # print( + # f"*** DEBUG *** end_index={self.end_index_}," + # f" best_trn_size={self.current_data_params_.training_size}," + # f" {last_pvalue=}" + # ) + return result + +class JohansenOptdWndDataPolicy(OptimizedWndDataPolicy): + # Johansen test + def __init__(self, config: Dict[str, Any], *args: Any, **kwargs: Any): + super().__init__(config, *args, **kwargs) + + def optimize_window_size(self) -> DataWindowParams: + from statsmodels.tsa.vector_ar.vecm import coint_johansen + import numpy as np + + best_stat = -np.inf + best_trn_size = 0 + best_start_index = -1 + + result = copy.copy(self.current_data_params_) + for trn_size in range(self.min_training_size_, self.max_training_size_): + start_index = self.end_index_ - trn_size + series_a = self.prices_a_[start_index:self.end_index_] + series_b = self.prices_b_[start_index:self.end_index_] + + # Combine into 2D matrix for Johansen test + try: + data = np.column_stack([series_a, series_b]) + + # Johansen test: det_order=0 (no deterministic trend), k_ar_diff=1 (lag) + res = coint_johansen(data, det_order=0, k_ar_diff=1) + + # Trace statistic for cointegration rank 1 + trace_stat = res.lr1[0] # test stat for rank=0 vs >=1 + critical_value = res.cvt[0, 1] # 5% critical value + + if trace_stat > best_stat: + best_stat = trace_stat + best_trn_size = trn_size + best_start_index = start_index + except Exception: + continue + + if best_trn_size > 0: + result.training_size = best_trn_size + result.training_start_index = best_start_index + else: + print("*** WARNING: No valid cointegration window found.") + + # print( + # f"*** DEBUG *** end_index={self.end_index_}, best_trn_size={best_trn_size}, trace_stat={best_stat}" + # ) + return result \ No newline at end of file diff --git a/lib/pt_strategy/results.py b/lib/pt_strategy/results.py index 77aada9..9e58042 100644 --- a/lib/pt_strategy/results.py +++ b/lib/pt_strategy/results.py @@ -439,9 +439,12 @@ class PairResearchResult: summary = self.get_return_summary() print(f"\n====== PAIR RESEARCH GRAND TOTALS ======") + print('---') print(f"Total Return: {summary['total_return']:+.2f}%") + print('---') print(f"Total Days Traded: {summary['total_days']}") - print(f"Total Pair Trades: {summary['total_pairs']}") + print(f"Total Open-Close Actions: {summary['total_pairs']}") + print(f"Total Trades: 4 * {summary['total_pairs']} = {4 * summary['total_pairs']}") if summary['total_days'] > 0: print(f"Average Daily Return: {summary['average_daily_return']:+.2f}%") @@ -469,8 +472,8 @@ class PairResearchResult: self.calculate_returns() self.print_returns_by_day() self.print_outstanding_positions() - self.print_grand_totals() self._print_additional_metrics() + self.print_grand_totals() def _print_additional_metrics(self) -> None: """Print additional performance metrics.""" diff --git a/lib/pt_strategy/trading_pair.py b/lib/pt_strategy/trading_pair.py index 0ab13a9..89da29b 100644 --- a/lib/pt_strategy/trading_pair.py +++ b/lib/pt_strategy/trading_pair.py @@ -3,11 +3,10 @@ from __future__ import annotations from abc import ABC, abstractmethod from datetime import datetime from enum import Enum -from typing import Any, Dict, Optional, Type, cast, Generator, List +from typing import Any, Dict, Generator, List, Optional, Type, cast import pandas as pd - -from pt_strategy.model_data_policy import DataParams +from pt_strategy.model_data_policy import DataWindowParams class PairState(Enum): @@ -26,7 +25,6 @@ class TradingPair: stat_model_price_: str model_: PairsTradingModel # type: ignore[assignment] - model_tdp_: ModelDataPolicy # type: ignore[assignment] user_data_: Dict[str, Any] @@ -44,7 +42,6 @@ class TradingPair: self.symbol_a_ = instruments[0]["symbol"] self.symbol_b_ = instruments[1]["symbol"] self.model_ = PairsTradingModel.create(config) - self.model_tdp_ = ModelDataPolicy.create(config) self.stat_model_price_ = config["stat_model_price"] self.user_data_ = { "state": PairState.INITIAL, @@ -159,12 +156,9 @@ class TradingPair: }) - def run(self, market_data: pd.DataFrame, data_params: DataParams) -> Prediction: # type: ignore[assignment] + def run(self, market_data: pd.DataFrame, data_params: DataWindowParams) -> Prediction: # type: ignore[assignment] self.market_data_ = market_data[data_params.training_start_index:data_params.training_start_index + data_params.training_size] return self.model_.predict(pair=self) - while self.model_tdp_.has_next_training_data(): - training_data = self.model_tdp_.get_next_training_data() - diff --git a/lib/pt_strategy/trading_strategy.py b/lib/pt_strategy/trading_strategy.py index 0119cc4..fa0a8e8 100644 --- a/lib/pt_strategy/trading_strategy.py +++ b/lib/pt_strategy/trading_strategy.py @@ -1,22 +1,12 @@ from __future__ import annotations -import os -from abc import ABC, abstractmethod -from enum import Enum -from typing import Any, Dict, Generator, List, Optional, Type, cast +from typing import Any, Dict, List, Optional import pandas as pd from pt_strategy.model_data_policy import ModelDataPolicy from pt_strategy.pt_market_data import PtMarketData from pt_strategy.pt_model import Prediction -from pt_strategy.results import ( - PairResearchResult, - create_result_database, - store_config_in_database, -) from pt_strategy.trading_pair import PairState, TradingPair -from tools.filetools import resolve_datafiles -from tools.instruments import get_instruments class PtResearchStrategy: @@ -41,7 +31,6 @@ class PtResearchStrategy: self.config_ = config self.trades_ = [] self.trading_pair_ = TradingPair(config=config, instruments=instruments) - self.model_data_policy_ = ModelDataPolicy.create(config) self.predictions_ = pd.DataFrame() import copy @@ -54,6 +43,9 @@ class PtResearchStrategy: config=config_copy, md_class=ResearchMarketData ) self.pt_mkt_data_.load() + self.model_data_policy_ = ModelDataPolicy.create( + config, mkt_data=self.pt_mkt_data_.market_data_df_, pair=self.trading_pair_ + ) def outstanding_positions(self) -> List[Dict[str, Any]]: return list(self.trading_pair_.user_data_.get("outstanding_positions", [])) @@ -67,9 +59,7 @@ class PtResearchStrategy: while self.pt_mkt_data_.has_next(): market_data_series = self.pt_mkt_data_.get_next() new_row = pd.DataFrame([market_data_series]) - market_data_df = pd.concat( - [market_data_df, new_row], ignore_index=True - ) + market_data_df = pd.concat([market_data_df, new_row], ignore_index=True) if idx >= training_minutes: break idx += 1 @@ -85,7 +75,9 @@ class PtResearchStrategy: prediction = self.trading_pair_.run( market_data_df, self.model_data_policy_.advance() ) - self.predictions_ = pd.concat([self.predictions_, prediction.to_df()], ignore_index=True) + self.predictions_ = pd.concat( + [self.predictions_, prediction.to_df()], ignore_index=True + ) assert prediction is not None trades = self._create_trades( @@ -223,13 +215,13 @@ class PtResearchStrategy: side_b = "SELL" # save closing sides - pair.user_data_["open_side_a"] = side_a # used in oustanding positions + pair.user_data_["open_side_a"] = side_a # used in oustanding positions pair.user_data_["open_side_b"] = side_b pair.user_data_["open_px_a"] = px_a pair.user_data_["open_px_b"] = px_b pair.user_data_["open_tstamp"] = tstamp - - pair.user_data_["close_side_a"] = side_b # used for closing trades + + pair.user_data_["close_side_a"] = side_b # used for closing trades pair.user_data_["close_side_b"] = side_a # create opening trades