From 2e32b26fad2835186b94992a8df0d2da8db20966 Mon Sep 17 00:00:00 2001 From: Oleg Sheynin Date: Sun, 28 Dec 2025 19:30:00 +0000 Subject: [PATCH] dev progress --- .envrc | 1 + .vscode/.env | 2 +- .vscode/pairs_trading.code-workspace | 13 ++- .vscode/settings.__OLD__.json | 112 -------------------------- .vscode/settings.json | 4 - {bin => apps}/pairs_trader.py | 49 +++++++---- lib/client/cvtt_client.py | 25 ++++-- lib/pt_strategy/live/live_strategy.py | 60 +++++++++----- lib/pt_strategy/pt_market_data.py | 8 +- lib/pt_strategy/pt_model.py | 4 +- lib/pt_strategy/trading_pair.py | 17 +++- 11 files changed, 122 insertions(+), 173 deletions(-) create mode 100644 .envrc delete mode 100644 .vscode/settings.__OLD__.json rename {bin => apps}/pairs_trader.py (73%) diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..29e1689 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +source /home/oleg/.pyenv/python3.12-venv/bin/activate \ No newline at end of file diff --git a/.vscode/.env b/.vscode/.env index 4c0877a..b10da38 100644 --- a/.vscode/.env +++ b/.vscode/.env @@ -1 +1 @@ -PYTHONPATH=/home/oleg/develop +PYTHONPATH=/home/oleg/develop \ No newline at end of file diff --git a/.vscode/pairs_trading.code-workspace b/.vscode/pairs_trading.code-workspace index cd71e69..9e68e72 100644 --- a/.vscode/pairs_trading.code-workspace +++ b/.vscode/pairs_trading.code-workspace @@ -1,10 +1,7 @@ { - "folders": [ - { - "path": ".." - } - ], - "settings": { - "workbench.colorTheme": "Noctis Minimus" - } + "folders": [ + { + "path": ".." + } + ] } \ No newline at end of file diff --git a/.vscode/settings.__OLD__.json b/.vscode/settings.__OLD__.json deleted file mode 100644 index 2a330b4..0000000 --- a/.vscode/settings.__OLD__.json +++ /dev/null @@ -1,112 +0,0 @@ -{ - "PythonVersion": "3.12", - "[python]": { - "editor.defaultFormatter": "ms-python.black-formatter" - }, - // =========================================================== - "workbench.activityBar.orientation": "vertical", - // =========================================================== - - // "markdown.styles": [ - // "/home/oleg/develop/cvtt2/.vscode/light-theme.css" - // ], - "markdown.preview.background": "#ffffff", - "markdown.preview.textEditorTheme": "light", - "markdown-pdf.styles": [ - "/home/oleg/develop/cvtt2/.vscode/light-theme.css" - ], - "editor.detectIndentation": false, - // Configure editor settings to be overridden for [yaml] language. - "[yaml]": { - "editor.insertSpaces": true, - "editor.tabSize": 4, - }, - "pylint.args": [ - "--disable=missing-docstring" - , "--disable=invalid-name" - , "--disable=too-few-public-methods" - , "--disable=broad-exception-raised" - , "--disable=broad-exception-caught" - , "--disable=pointless-string-statement" - , "--disable=unused-argument" - , "--disable=line-too-long" - , "--disable=import-outside-toplevel" - , "--disable=fixme" - , "--disable=protected-access" - , "--disable=logging-fstring-interpolation" - ], - - // ===== TESTING CONFIGURATION ===== - "python.testing.unittestEnabled": false, - "python.testing.pytestEnabled": true, - "python.testing.pytestArgs": [ - "-v", - "--tb=short", - "--disable-warnings" - ], - "python.testing.envVars": { - "PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.." - }, - "python.testing.cwd": "${workspaceFolder}", - "python.testing.autoTestDiscoverOnSaveEnabled": true, - "python.testing.pytestPath": "/home/oleg/.pyenv/python3.12-venv/bin/pytest", - "python.testing.promptToConfigure": false, - "python.testing.pytest.enabled": true, - - - // Python interpreter settings - "python.defaultInterpreterPath": "/home/oleg/.pyenv/python3.12-venv/bin/python3.12", - - // Environment variables for Python execution - "python.envFile": "${workspaceFolder}/.vscode/.env", - "python.terminal.activateEnvironment": false, - "python.terminal.activateEnvInCurrentTerminal": false, - - // Global environment variables for VS Code Python extension - "terminal.integrated.env.linux": { - "PYTHONPATH": "/home/oleg/develop/:${env:PYTHONPATH}" - }, - - "pylint.enabled": true, - "github.copilot.enable": false, - "markdown.extension.print.theme": "dark", - "python.analysis.extraPaths": [ - "${workspaceFolder}/..", - "${workspaceFolder}/lib" - ], - - // Try enabling regular Python language server alongside CursorPyright - "python.languageServer": "None", - "python.analysis.diagnosticMode": "workspace", - "workbench.colorTheme": "Atom One Dark", - "cursorpyright.analysis.enable": false, - "cursorpyright.analysis.extraPaths": [ - "${workspaceFolder}/..", - "${workspaceFolder}/lib" - ], - - // Enable quick fixes for unused imports - "python.analysis.autoImportCompletions": true, - "python.analysis.fixAll": ["source.unusedImports"], - "python.analysis.typeCheckingMode": "basic", - - // Enable code actions for CursorPyright - "cursorpyright.analysis.autoImportCompletions": true, - "cursorpyright.analysis.typeCheckingMode": "off", - "cursorpyright.reportUnusedImport": "warning", - "cursorpyright.reportUnusedVariable": "warning", - "cursorpyright.analysis.diagnosticMode": "workspace", - - // Force enable code actions - "editor.lightBulb.enabled": true, - "editor.codeActionsOnSave": { - "source.organizeImports": "explicit", - "source.fixAll": "explicit", - "source.unusedImports": "explicit" - }, - - // Enable Python-specific code actions - "python.analysis.completeFunctionParens": true, - "python.analysis.addImport.exactMatchOnly": false, - "workbench.tree.indent": 24, -} diff --git a/.vscode/settings.json b/.vscode/settings.json index f57d945..e42933d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -15,9 +15,5 @@ ], "python.envFile": "${workspaceFolder}/.env", "python.testing.debugPort": 3000, - "files.associations": { - "*.py": "python" - }, "python.testing.promptToConfigure": false, - "workbench.colorTheme": "Dracula Theme Soft" } \ No newline at end of file diff --git a/bin/pairs_trader.py b/apps/pairs_trader.py similarity index 73% rename from bin/pairs_trader.py rename to apps/pairs_trader.py index 89bb4dc..1a48e55 100644 --- a/bin/pairs_trader.py +++ b/apps/pairs_trader.py @@ -1,15 +1,21 @@ from __future__ import annotations from functools import partial -from typing import Dict, List +from typing import Callable, Coroutine, Dict, List from cvttpy_tools.settings.cvtt_types import JsonDictT from cvttpy_tools.app import App +from cvttpy_tools.config import Config from cvttpy_tools.base import NamedObject from cvttpy_tools.config import CvttAppConfig from cvttpy_tools.logger import Log +# --- +from cvttpy_trading.trading.instrument import ExchangeInstrument +from cvttpy_trading.trading.active_instruments import Instruments +from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate +# --- from pairs_trading.lib.pt_strategy.live.live_strategy import PtLiveStrategy -from pairs_trading.lib.pt_strategy.live.pricer_md_client import PtMktDataClient +# from pairs_trading.lib.pt_strategy.live.pricer_md_client import PtMktDataClient from pairs_trading.lib.pt_strategy.live.ti_sender import TradingInstructionsSender # import sys @@ -17,13 +23,15 @@ from pairs_trading.lib.pt_strategy.live.ti_sender import TradingInstructionsSend # for path in sys.path: # print(path) +HistMdCbT = Callable[[List[MdTradesAggregate]], Coroutine] +UpdateMdCbT = Callable[[MdTradesAggregate], Coroutine] -class PairTradingRunner(NamedObject): +class PairsTrader(NamedObject): config_: CvttAppConfig instruments_: List[JsonDictT] live_strategy_: PtLiveStrategy - pricer_client_: PtMktDataClient + # pricer_client_: PtMktDataClient def __init__(self) -> None: self.instruments_ = [] @@ -55,22 +63,19 @@ class PairTradingRunner(NamedObject): if len(instr_parts) != 2: raise ValueError(f"Invalid pair format: {instr}") instrument_id = instr_parts[0] - exchange_config_name = instr_parts[1] + exch_acct = instr_parts[1] + exch_inst = Instruments.instance() self.instruments_.append({ - "exchange_config_name": exchange_config_name, + "exch_acct": exch_acct, "instrument_id": instrument_id }) assert len(self.instruments_) == 2, "Only two instruments are supported" Log.info(f"{self.fname()} Instruments: {self.instruments_}") - # # ------- CREATE TI (trading instructions) CLIENT ------- - # ti_config = self.config_.get_subconfig("ti_config", {}) - # self.ti_sender_ = TradingInstructionsSender(config=ti_config) - # Log.info(f"{self.fname()} TI client created: {self.ti_sender_}") # ------- CREATE CVTT CLIENT ------- - ti_config = self.config_.get_subconfig("ti_config", {}) + ti_config = self.config_.get_subconfig("ti_config", Config(json_src={})) self.ti_sender_ = TradingInstructionsSender(config=ti_config) Log.info(f"{self.fname()} TI client created: {self.ti_sender_}") @@ -80,18 +85,34 @@ class PairTradingRunner(NamedObject): self.live_strategy_ = PtLiveStrategy( config=strategy_config, instruments=self.instruments_, - ti_sender=self.ti_sender_ + pairs_trader=self ) Log.info(f"{self.fname()} Strategy created: {self.live_strategy_}") # # ------- CREATE PRICER CLIENT ------- + # URGENT # pricer_config = self.config_.get_subconfig("pricer_config", {}) # self.pricer_client_ = PtMktDataClient( # live_strategy=self.live_strategy_, # pricer_config=pricer_config # ) - # Log.info(f"{self.fname()} CVTT Pricer client created: {self.pricer_client_}") + # Log.info(f"{self.fname()} CVTT Pricer client created: {self.pricer_client_}") + + # ------- CREATE TRADER CLIENT ------- + # URGENT + # (send TradingInstructions) + # ti_config = self.config_.get_subconfig("ti_config", {}) + # self.ti_sender_ = TradingInstructionsSender(config=ti_config) + # Log.info(f"{self.fname()} TI client created: {self.ti_sender_}") + + # # ------- CREATE REST SERVER ------- + # for dashboard communications + + async def subscribe_md(self) -> None: + pass + # URGENT implement PairsTrader.subscribe_md() + async def run(self) -> None: Log.info(f"{self.fname()} ...") pass @@ -99,5 +120,5 @@ class PairTradingRunner(NamedObject): if __name__ == "__main__": App() CvttAppConfig() - PairTradingRunner() + PairsTrader() App.instance().run() diff --git a/lib/client/cvtt_client.py b/lib/client/cvtt_client.py index f13cbe4..d14d805 100644 --- a/lib/client/cvtt_client.py +++ b/lib/client/cvtt_client.py @@ -11,6 +11,7 @@ from cvttpy_tools.logger import Log from cvttpy_tools.config import Config from cvttpy_tools.timer import Timer +from cvttpy_tools.timeutils import NanoPerSec, NanosT, current_nanoseconds, current_seconds from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar @@ -177,24 +178,32 @@ class MdSummaryCollector(NamedObject): return self.history_ = self.get_history() self.run_callbacks() + self.set_timer() + + def set_timer(self): + if self.timer_: + self.timer_.cancel() self.timer_ = Timer( - start_in_sec=self.interval_sec_, - is_periodic=True, - period_interval=self.interval_sec_, + start_in_sec=(self.next_load_time() - current_seconds()), func=self._load_new, ) + def next_load_time(self) -> NanosT: + curr_sec = int(current_seconds()) + return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + 2 + + async def _load_new(self) -> None: last: Optional[MdSummary] = self.get_last() if not last: Log.warning(f"{self.fname()}: did not get last update") - return - if not self.is_empty() and last.ts_ns_ <= self.history_[-1].ts_ns_: + elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].ts_ns_: Log.info(f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}") - return - self.history_.append(last) - self.run_callbacks() + else: + self.history_.append(last) + self.run_callbacks() + self.set_timer() def run_callbacks(self) -> None: [cb(self.history_) for cb in self.callbacks_] diff --git a/lib/pt_strategy/live/live_strategy.py b/lib/pt_strategy/live/live_strategy.py index cdad098..c63b4e6 100644 --- a/lib/pt_strategy/live/live_strategy.py +++ b/lib/pt_strategy/live/live_strategy.py @@ -7,17 +7,18 @@ from enum import Enum import pandas as pd # --- from cvttpy_tools.base import NamedObject +from cvttpy_tools.app import App from cvttpy_tools.logger import Log from cvttpy_tools.settings.cvtt_types import JsonDictT # --- from cvttpy_trading.trading.instrument import ExchangeInstrument +from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate # --- -from pairs_trading.lib.pt_strategy.live.ti_sender import TradingInstructionsSender from pairs_trading.lib.pt_strategy.model_data_policy import ModelDataPolicy -from pairs_trading.lib.pt_strategy.pt_market_data import RealTimeMarketData from pairs_trading.lib.pt_strategy.pt_model import Prediction from pairs_trading.lib.pt_strategy.trading_pair import PairState, TradingPair - +from pairs_trading.apps.pairs_trader import PairsTrader +from pairs_trading.lib.pt_strategy.pt_market_data import RealTimeMarketData """ --config=pair.cfg --pair=PAIR-BTC-USDT:COINBASE_AT,PAIR-ETH-USDT:COINBASE_AT @@ -38,8 +39,10 @@ class PtLiveStrategy(NamedObject): config_: Dict[str, Any] trading_pair_: TradingPair model_data_policy_: ModelDataPolicy + pairs_trader_: PairsTrader + pt_mkt_data_: RealTimeMarketData - ti_sender_: TradingInstructionsSender + # ti_sender_: TradingInstructionsSender # for presentation: history of prediction values and trading signals predictions_: pd.DataFrame @@ -49,23 +52,29 @@ class PtLiveStrategy(NamedObject): self, config: Dict[str, Any], instruments: List[Dict[str, str]], - ti_sender: TradingInstructionsSender, + pairs_trader: PairsTrader, ): self.config_ = config self.trading_pair_ = TradingPair(config=config, instruments=instruments) self.predictions_ = pd.DataFrame() self.trading_signals_ = pd.DataFrame() - self.ti_sender_ = ti_sender + self.pairs_trader_ = pairs_trader import copy # modified config must be passed to PtMarketData config_copy = copy.deepcopy(config) config_copy["instruments"] = instruments - self.pt_mkt_data_ = RealTimeMarketData(config=config_copy) + self.config_ = config_copy + + App.instance().add_call(stage=App.Stage.Config, func=self._on_config(), can_run_now=True) + + async def _on_config(self) -> None: + await self.pairs_trader_.subscribe_md() + self.model_data_policy_ = ModelDataPolicy.create( - config, is_real_time=True, pair=self.trading_pair_ + self.config_, is_real_time=True, pair=self.trading_pair_ ) self.open_threshold_ = self.config_.get("dis-equilibrium_open_trshld", 0.0) assert self.open_threshold_ > 0, "open_threshold must be greater than 0" @@ -75,12 +84,12 @@ class PtLiveStrategy(NamedObject): def __repr__(self) -> str: return f"{self.classname()}: trading_pair={self.trading_pair_}, mdp={self.model_data_policy_.__class__.__name__}, " - async def on_mkt_data_hist_snapshot(self, aggr: JsonDictT) -> None: - Log.info(f"on_mkt_data_hist_snapshot: {aggr}") - await self.pt_mkt_data_.on_mkt_data_hist_snapshot(snapshot=aggr) - pass + async def on_mkt_data_hist_snapshot(self, hist_aggr: List[MdTradesAggregate]) -> None: + # Log.info(f"on_mkt_data_hist_snapshot: {aggr}") + # await self.pt_mkt_data_.on_mkt_data_hist_snapshot(snapshot=aggr) + pass # URGENT PtiveStrategy.on_mkt_data_hist_snapshot() - async def on_mkt_data_update(self, aggr: JsonDictT) -> None: + async def on_mkt_data_update(self, aggr: MdTradesAggregate) -> None: market_data_df = await self.pt_mkt_data_.on_mkt_data_update(update=aggr) if market_data_df is not None: self.trading_pair_.market_data_ = market_data_df @@ -100,7 +109,6 @@ class PtLiveStrategy(NamedObject): if len(trading_instructions) > 0: await self._send_trading_instructions(trading_instructions) # trades = self._create_trades(prediction=prediction, last_row=market_data_df.iloc[-1]) - # URGENT implement this pass async def _send_trading_instructions( @@ -125,7 +133,7 @@ class PtLiveStrategy(NamedObject): elif pair.is_open(): if abs_scaled_disequilibrium <= self.close_threshold_: trd_instructions = self._create_close_trade_instructions( - pair, row=last_row, prediction=prediction + pair, row=last_row #, prediction=prediction ) elif pair.to_stop_close_conditions(predicted_row=last_row): trd_instructions = self._create_close_trade_instructions( @@ -142,15 +150,25 @@ class PtLiveStrategy(NamedObject): if scaled_disequilibrium > 0: side_a = "SELL" trd_inst_a = TradingInstruction( - type=TradingInstructionType.TARGET_POSITION, - exch_instr=pair.get_instrument_a(), - specifics={"side": "SELL", "strength": -1}, + type_=TradingInstructionType.TARGET_POSITION, + exch_instr_=pair.get_instrument_a(), + specifics_={"side": "SELL", "strength": -1}, ) side_b = "BUY" else: side_a = "BUY" side_b = "SELL" + colname_a, colname_b = pair.exec_prices_colnames() + px_a = row[f"{colname_a}"] + px_b = row[f"{colname_b}"] + + tstamp = row["tstamp"] + diseqlbrm = prediction.disequilibrium_ + scaled_disequilibrium = prediction.scaled_disequilibrium_ + + df = self._trades_df() + # save closing sides pair.user_data_["open_side_a"] = side_a # used in oustanding positions pair.user_data_["open_side_b"] = side_b @@ -184,7 +202,11 @@ class PtLiveStrategy(NamedObject): "signed_scaled_disequilibrium": scaled_disequilibrium, # "pair": pair, } - return df + ti: List[TradingInstruction] =self._create_trading_instructions( + prediction=prediction, last_row=row + ) + return ti + def _create_close_trade_instructions( self, pair: TradingPair, row: pd.Series #, prediction: Prediction diff --git a/lib/pt_strategy/pt_market_data.py b/lib/pt_strategy/pt_market_data.py index 84a8a9f..da13a2e 100644 --- a/lib/pt_strategy/pt_market_data.py +++ b/lib/pt_strategy/pt_market_data.py @@ -3,8 +3,12 @@ from __future__ import annotations from typing import Any, Dict, List, Optional import pandas as pd +# --- from cvttpy_tools.settings.cvtt_types import JsonDictT -from tools.data_loader import load_market_data +# --- +from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate +# --- +from pairs_trading.lib.tools.data_loader import load_market_data class PtMarketData(): @@ -199,7 +203,7 @@ class RealTimeMarketData(PtMarketData): pass - async def on_mkt_data_update(self, update: JsonDictT) -> Optional[pd.DataFrame]: + async def on_mkt_data_update(self, update: MdTradesAggregate) -> Optional[pd.DataFrame]: # URGENT # make sure update has both instruments # create DataFrame tmp1 from update diff --git a/lib/pt_strategy/pt_model.py b/lib/pt_strategy/pt_model.py index daf193d..6af809e 100644 --- a/lib/pt_strategy/pt_model.py +++ b/lib/pt_strategy/pt_model.py @@ -3,8 +3,8 @@ from __future__ import annotations from abc import ABC, abstractmethod from typing import Any, Dict, cast -from pt_strategy.prediction import Prediction - +from pairs_trading.lib.pt_strategy.prediction import Prediction +from pairs_trading.lib.pt_strategy.trading_pair import TradingPair class PairsTradingModel(ABC): diff --git a/lib/pt_strategy/trading_pair.py b/lib/pt_strategy/trading_pair.py index db24b9e..2c60801 100644 --- a/lib/pt_strategy/trading_pair.py +++ b/lib/pt_strategy/trading_pair.py @@ -6,8 +6,12 @@ from typing import Any, Dict, List import pandas as pd -from pt_strategy.model_data_policy import DataWindowParams -from pt_strategy.prediction import Prediction +# --- +from cvttpy_trading.trading.instrument import ExchangeInstrument +# --- +from pairs_trading.lib.pt_strategy.model_data_policy import DataWindowParams +from pairs_trading.lib.pt_strategy.prediction import Prediction +from pairs_trading.lib.pt_strategy.models import PairsTradingModel class PairState(Enum): @@ -44,6 +48,9 @@ class TradingPair: user_data_: Dict[str, Any] + exch_inst_a_: ExchangeInstrument + exch_inst_b_: ExchangeInstrument + def __init__( self, config: Dict[str, Any], @@ -190,7 +197,11 @@ class TradingPair: "last_value": last_px * shares, }) - + def get_instrument_a(self) -> ExchangeInstrument: + return self.exch_inst_a_ + def get_instrument_b(self) -> ExchangeInstrument: + return self.exch_inst_b_ + def run(self, market_data: pd.DataFrame, data_params: DataWindowParams) -> Prediction: # type: ignore[assignment] self.market_data_ = market_data[data_params.training_start_index:data_params.training_start_index + data_params.training_size] return self.model_.predict(pair=self)