From bd6cf1d4d0c97cd2a2741b6708bd2024784659a1 Mon Sep 17 00:00:00 2001 From: Oleg Sheynin Date: Sun, 11 Jan 2026 18:17:05 +0000 Subject: [PATCH] progress. Initial untested version --- lib/pt_strategy/live/live_strategy.py | 43 +++++++------------ lib/pt_strategy/results.py | 23 ++++------- lib/pt_strategy/trading_pair.py | 48 +++++++--------------- lib/tools/filetools.py | 6 +-- research/backtest.py | 6 +-- research/notebooks/pair_trading_test.ipynb | 8 ++-- 6 files changed, 46 insertions(+), 88 deletions(-) diff --git a/lib/pt_strategy/live/live_strategy.py b/lib/pt_strategy/live/live_strategy.py index 78da9e3..1c4fdee 100644 --- a/lib/pt_strategy/live/live_strategy.py +++ b/lib/pt_strategy/live/live_strategy.py @@ -1,8 +1,6 @@ from __future__ import annotations -from dataclasses import dataclass -from typing import Any, Dict, List, Optional, cast -from enum import Enum +from typing import Any, Dict, List, Optional import pandas as pd @@ -10,15 +8,14 @@ import pandas as pd from cvttpy_tools.base import NamedObject from cvttpy_tools.app import App from cvttpy_tools.config import Config -from cvttpy_tools.settings.cvtt_types import BookIdT, IntervalSecT -from cvttpy_tools.timeutils import SecPerHour, current_nanoseconds +from cvttpy_tools.settings.cvtt_types import IntervalSecT +from cvttpy_tools.timeutils import SecPerHour, current_nanoseconds, NanoPerSec from cvttpy_tools.logger import Log # --- from cvttpy_trading.trading.instrument import ExchangeInstrument from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate from cvttpy_trading.trading.trading_instructions import TradingInstructions -from cvttpy_trading.trading.accounting.cvtt_book import CvttBook from cvttpy_trading.trading.trading_instructions import TargetPositionSignal # --- @@ -29,23 +26,6 @@ from pairs_trading.apps.pairs_trader import PairsTrader from pairs_trading.lib.pt_strategy.pt_market_data import LiveMarketData -""" - --config=pair.cfg - --pair=PAIR-BTC-USDT:COINBASE_AT,PAIR-ETH-USDT:COINBASE_AT -""" - - -# class TradingInstructionType(Enum): -# TARGET_POSITION = "TARGET_POSITION" - - -# @dataclass -# class TradingInstruction(NamedObject): -# type_: TradingInstructionType -# exch_instr_: ExchangeInstrument -# specifics_: Dict[str, Any] - - class PtLiveStrategy(NamedObject): config_: Config instruments_: List[ExchangeInstrument] @@ -59,12 +39,9 @@ class PtLiveStrategy(NamedObject): model_data_policy_: ModelDataPolicy pairs_trader_: PairsTrader - # ti_sender_: TradingInstructionsSender - # for presentation: history of prediction values and trading signals predictions_df_: pd.DataFrame trading_signals_df_: pd.DataFrame - # book_: CvttBook def __init__( self, @@ -155,7 +132,15 @@ class PtLiveStrategy(NamedObject): await self._send_trading_instructions(trading_instructions) def _is_md_actual(self, hist_aggr: List[MdTradesAggregate]) -> bool: - return False # URGENT _is_md_actual + LAG_THRESHOLD = 5 * NanoPerSec + + if len(hist_aggr) == 0: + Log.warning(f"{self.fname()} list of aggregates IS EMPTY") + return False + # MAYBE check market data length + if current_nanoseconds() - hist_aggr[-1].time_ns_ > LAG_THRESHOLD: + return False + return True def _create_md_df(self, hist_aggr: List[MdTradesAggregate]) -> pd.DataFrame: """ @@ -259,8 +244,8 @@ class PtLiveStrategy(NamedObject): return trd_instructions - def _strength(self, scaled_disequilibrium) -> float: - # URGENT PtLiveStrategy._strength() + def _strength(self, scaled_disequilibrium: float) -> float: + # TODO PtLiveStrategy._strength() return 1.0 def _create_open_trade_instructions( diff --git a/lib/pt_strategy/results.py b/lib/pt_strategy/results.py index 5fecfa3..5b0c105 100644 --- a/lib/pt_strategy/results.py +++ b/lib/pt_strategy/results.py @@ -4,11 +4,13 @@ from datetime import date, datetime from typing import Any, Dict, List, Optional, Tuple import pandas as pd +# --- +from cvttpy_tools.config import Config +# --- from cvttpy_trading.trading.instrument import ExchangeInstrument - +# --- from pairs_trading.lib.pt_strategy.trading_pair import TradingPair - # Recommended replacement adapters and converters for Python 3.12+ # From: https://docs.python.org/3/library/sqlite3.html#sqlite3-adapter-converter-recipes def adapt_date_iso(val: date) -> str: @@ -20,12 +22,10 @@ def adapt_datetime_iso(val: datetime) -> str: """Adapt datetime.datetime to timezone-naive ISO 8601 date.""" return val.isoformat() - def convert_date(val: bytes) -> date: """Convert ISO 8601 date to datetime.date object.""" return datetime.fromisoformat(val.decode()).date() - def convert_datetime(val: bytes) -> datetime: """Convert ISO 8601 datetime to datetime.datetime object.""" return datetime.fromisoformat(val.decode()) @@ -120,7 +120,7 @@ def create_result_database(db_path: str) -> None: def store_config_in_database( db_path: str, config_file_path: str, - config: Dict, + config: Config, datafiles: List[Tuple[str, str]], instruments: List[ExchangeInstrument], ) -> None: @@ -137,7 +137,7 @@ def store_config_in_database( cursor = conn.cursor() # Convert config to JSON string - config_json = json.dumps(config, indent=2, default=str) + config_json = json.dumps(config.data(), indent=2, default=str) # Convert lists to comma-separated strings for storage datafiles_str = ", ".join([f"{datafile}" for _, datafile in datafiles]) @@ -206,9 +206,9 @@ class PairResearchResult: trades_: Dict[DayT, pd.DataFrame] outstanding_positions_: Dict[DayT, List[OutstandingPositionT]] symbol_roundtrip_trades_: Dict[str, List[Dict[str, Any]]] - + config_: Config - def __init__(self, config: Dict[str, Any]) -> None: + def __init__(self, config: Config) -> None: self.config_ = config self.trades_ = {} self.outstanding_positions_ = {} @@ -220,13 +220,6 @@ class PairResearchResult: self.trades_[day] = trades self.outstanding_positions_[day] = outstanding_positions - # def all_trades(self) -> List[TradeT]: - # """Get all trades across all days as a flat list.""" - # all_trades_list: List[TradeT] = [] - # for day_trades in self.trades_.values(): - # all_trades_list.extend(day_trades.to_dict(orient="records")) - # return all_trades_list - def outstanding_positions(self) -> List[OutstandingPositionT]: """Get all outstanding positions across all days as a flat list.""" res: List[Dict[str, Any]] = [] diff --git a/lib/pt_strategy/trading_pair.py b/lib/pt_strategy/trading_pair.py index 792d82f..a854616 100644 --- a/lib/pt_strategy/trading_pair.py +++ b/lib/pt_strategy/trading_pair.py @@ -27,22 +27,6 @@ class PairState(Enum): CLOSE_STOP_PROFIT = 6 -# def get_symbol(instrument: Dict[str, str]) -> str: -# if "symbol" in instrument: -# return instrument["symbol"] -# elif "instrument_id" in instrument: -# instrument_id = instrument["instrument_id"] -# instrument_pfx = instrument_id[: instrument_id.find("-") + 1] -# symbol = instrument_id[len(instrument_pfx) :] -# instrument["symbol"] = symbol -# instrument["instrument_id_pfx"] = instrument_pfx -# return symbol -# else: -# raise ValueError( -# f"Invalid instrument: {instrument}, missing symbol or instrument_id" -# ) - - class TradingPair(NamedObject, ABC): config_: Config model_: Any # "PairsTradingModel" @@ -67,13 +51,12 @@ class TradingPair(NamedObject, ABC): self.instruments_[0].user_data_["symbol"] = instruments[0].instrument_id().split("-", 1)[1] self.instruments_[1].user_data_["symbol"] = instruments[1].instrument_id().split("-", 1)[1] - def __repr__(self) -> str: - return ( - f"{self.__class__.__name__}:" - f" symbol_a={self.symbol_a()}," - f" symbol_b={self.symbol_b()}," - f" model={self.model_.__class__.__name__}" - ) + def run(self, market_data: pd.DataFrame, data_params: DataWindowParams) -> Prediction: # type: ignore[assignment] + self.market_data_ = market_data[ + data_params.training_start_index_ : data_params.training_start_index_ + + data_params.training_size_ + ] + return self.model_.predict(pair=self) def colnames(self) -> List[str]: return [ @@ -91,9 +74,15 @@ class TradingPair(NamedObject, ABC): def get_instrument_b(self) -> ExchangeInstrument: return self.instruments_[1] + + def __repr__(self) -> str: + return ( + f"{self.__class__.__name__}:" + f" symbol_a={self.symbol_a()}," + f" symbol_b={self.symbol_b()}," + f" model={self.model_.__class__.__name__}" + ) - - class ResearchTradingPair(TradingPair): def __init__( @@ -109,8 +98,6 @@ class ResearchTradingPair(TradingPair): "state": PairState.INITIAL, } - # URGENT set exchange instruments for the pair - def is_closed(self) -> bool: return self.user_data_["state"] in [ PairState.CLOSE, @@ -228,13 +215,6 @@ class ResearchTradingPair(TradingPair): } ) - def run(self, market_data: pd.DataFrame, data_params: DataWindowParams) -> Prediction: # type: ignore[assignment] - self.market_data_ = market_data[ - data_params.training_start_index_ : data_params.training_start_index_ - + data_params.training_size_ - ] - return self.model_.predict(pair=self) - class LiveTradingPair(TradingPair): def __init__(self, config: Config, instruments: List[ExchangeInstrument]): diff --git a/lib/tools/filetools.py b/lib/tools/filetools.py index 950ede2..5b373bf 100644 --- a/lib/tools/filetools.py +++ b/lib/tools/filetools.py @@ -2,7 +2,7 @@ import os import glob from typing import Dict, List, Tuple # --- -from cvttpy_tools.config import CvttAppConfig +from cvttpy_tools.config import Config # --- from cvttpy_trading.trading.instrument import ExchangeInstrument @@ -10,13 +10,13 @@ DayT = str DataFileNameT = str def resolve_datafiles( - config: Dict, date_pattern: str, instruments: List[ExchangeInstrument] + config: Config, date_pattern: str, instruments: List[ExchangeInstrument] ) -> List[Tuple[DayT, DataFileNameT]]: resolved_files: List[Tuple[DayT, DataFileNameT]] = [] for exch_inst in instruments: pattern = date_pattern inst_type = exch_inst.user_data_.get("instrument_type", "?instrument_type?") - data_dir = config["market_data_loading"][inst_type]["data_directory"] + data_dir = config.get_value(f"market_data_loading/{inst_type}/data_directory") if "*" in pattern or "?" in pattern: # Handle wildcards if not os.path.isabs(pattern): diff --git a/research/backtest.py b/research/backtest.py index 3b2b523..e65b92b 100644 --- a/research/backtest.py +++ b/research/backtest.py @@ -58,7 +58,7 @@ class Runner(NamedObject): # Resolve data files (CLI takes priority over config) instruments: List[ExchangeInstrument] = self._get_instruments() datafiles = resolve_datafiles( - config=CvttAppConfig.instance().to_dict(), + config=CvttAppConfig.instance(), date_pattern=App.instance().get_argument("date_pattern"), instruments=instruments, ) @@ -77,7 +77,7 @@ class Runner(NamedObject): is_config_stored = False # Process each data file - results = PairResearchResult(config=CvttAppConfig.instance().to_dict()) + results = PairResearchResult(config=CvttAppConfig.instance()) for day in sorted(days): md_datafiles = [datafile for md_day, datafile in datafiles if md_day == day] if not all([os.path.exists(datafile) for datafile in md_datafiles]): @@ -89,7 +89,7 @@ class Runner(NamedObject): store_config_in_database( db_path=App.instance().get_argument("result_db"), config_file_path=App.instance().get_argument("config"), - config=CvttAppConfig.instance().to_dict(), + config=CvttAppConfig.instance(), datafiles=datafiles, instruments=instruments, ) diff --git a/research/notebooks/pair_trading_test.ipynb b/research/notebooks/pair_trading_test.ipynb index c268a7a..1fa1980 100644 --- a/research/notebooks/pair_trading_test.ipynb +++ b/research/notebooks/pair_trading_test.ipynb @@ -429,7 +429,7 @@ "✓ Successfully loaded configuration\n", " Open threshold: 1.75\n", " Close threshold: 1\n", - "[2026-01-11 13:31:46.741606] INFO Config.set_value(): NEW Config parameter [datafiles] is set to [['./data/crypto/20250910.mktdata.ohlcv.db']]\n", + "[2026-01-11 18:12:57.365998] INFO Config.set_value(): NEW Config parameter [datafiles] is set to [['./data/crypto/20250910.mktdata.ohlcv.db']]\n", "\n", "Data Configuration:\n", " Data File: 20250910.mktdata.ohlcv.db\n", @@ -447,7 +447,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "[2026-01-11 13:31:48.046862] INFO Config.set_value(): NEW Config parameter [instruments] is set to [[[instrument_id_=PAIR-ADA-USDT][base_asset_id_=][quote_asset_id_=USDT][price_tick_=0.0][quantity_precision_=0.0001][exchange_id_=BNBSPOT][md_symbol_=][trade_symbol_=][contract_code_=][contract_size_=1.0][specifics_={}][no_loss_proc_queue_=EventProcessQueue:BNBSPOT_PAIR-ADA-USDT][book_top_proc_queue_=EventProcessQueue:TOP_BNBSPOT_PAIR-ADA-USDT][book_depth_proc_queue_=EventProcessQueue:DEPTH_BNBSPOT_PAIR-ADA-USDT][mkt_data_=ExchInstMarketData Object][user_data_={'instrument_type': 'CRYPTO', 'symbol': 'ADA-USDT'}][exchange_id_=BNBSPOT][md_symbol_=][trade_symbol_=][contract_code_=][contract_size_=1.0][price_tick_=0.0][user_data_={'instrument_type': 'CRYPTO', 'symbol': 'ADA-USDT'}], [instrument_id_=PAIR-SOL-USDT][base_asset_id_=][quote_asset_id_=USDT][price_tick_=0.0][quantity_precision_=0.0001][exchange_id_=BNBSPOT][md_symbol_=][trade_symbol_=][contract_code_=][contract_size_=1.0][specifics_={}][no_loss_proc_queue_=EventProcessQueue:BNBSPOT_PAIR-SOL-USDT][book_top_proc_queue_=EventProcessQueue:TOP_BNBSPOT_PAIR-SOL-USDT][book_depth_proc_queue_=EventProcessQueue:DEPTH_BNBSPOT_PAIR-SOL-USDT][mkt_data_=ExchInstMarketData Object][user_data_={'instrument_type': 'CRYPTO', 'symbol': 'SOL-USDT'}][exchange_id_=BNBSPOT][md_symbol_=][trade_symbol_=][contract_code_=][contract_size_=1.0][price_tick_=0.0][user_data_={'instrument_type': 'CRYPTO', 'symbol': 'SOL-USDT'}]]]\n", + "[2026-01-11 18:12:58.573312] INFO Config.set_value(): NEW Config parameter [instruments] is set to [[[instrument_id_=PAIR-ADA-USDT][base_asset_id_=][quote_asset_id_=USDT][price_tick_=0.0][quantity_precision_=0.0001][exchange_id_=BNBSPOT][md_symbol_=][trade_symbol_=][contract_code_=][contract_size_=1.0][specifics_={}][no_loss_proc_queue_=EventProcessQueue:BNBSPOT_PAIR-ADA-USDT][book_top_proc_queue_=EventProcessQueue:TOP_BNBSPOT_PAIR-ADA-USDT][book_depth_proc_queue_=EventProcessQueue:DEPTH_BNBSPOT_PAIR-ADA-USDT][mkt_data_=ExchInstMarketData Object][user_data_={'instrument_type': 'CRYPTO', 'symbol': 'ADA-USDT'}][exchange_id_=BNBSPOT][md_symbol_=][trade_symbol_=][contract_code_=][contract_size_=1.0][price_tick_=0.0][user_data_={'instrument_type': 'CRYPTO', 'symbol': 'ADA-USDT'}], [instrument_id_=PAIR-SOL-USDT][base_asset_id_=][quote_asset_id_=USDT][price_tick_=0.0][quantity_precision_=0.0001][exchange_id_=BNBSPOT][md_symbol_=][trade_symbol_=][contract_code_=][contract_size_=1.0][specifics_={}][no_loss_proc_queue_=EventProcessQueue:BNBSPOT_PAIR-SOL-USDT][book_top_proc_queue_=EventProcessQueue:TOP_BNBSPOT_PAIR-SOL-USDT][book_depth_proc_queue_=EventProcessQueue:DEPTH_BNBSPOT_PAIR-SOL-USDT][mkt_data_=ExchInstMarketData Object][user_data_={'instrument_type': 'CRYPTO', 'symbol': 'SOL-USDT'}][exchange_id_=BNBSPOT][md_symbol_=][trade_symbol_=][contract_code_=][contract_size_=1.0][price_tick_=0.0][user_data_={'instrument_type': 'CRYPTO', 'symbol': 'SOL-USDT'}]]]\n", "OPEN_TRADES: 2025-09-10 13:50:00 scaled_disequilibrium=np.float64(1.9292245624262738)\n", "OPEN TRADES:\n", " time action symbol side price disequilibrium scaled_disequilibrium signed_scaled_disequilibrium status\n", @@ -6361,9 +6361,9 @@ }, "text/html": [ "
\n", - "