From 8b115cee75fa213f7a00b18e222d7b23e0e718c4 Mon Sep 17 00:00:00 2001 From: Oleg Sheynin Date: Mon, 22 Dec 2025 23:58:41 +0000 Subject: [PATCH] renewed development --- .vscode/settings.json | 3 - lib/client/cvtt_client.py | 55 ++++++++++++---- lib/pt_strategy/live/live_strategy.py | 19 ++++-- ...er_md_client.py => pricer_md_client.py.md} | 12 ++-- pyproject.toml | 66 ------------------- pyrightconfig.json | 25 ------- 6 files changed, 62 insertions(+), 118 deletions(-) rename lib/pt_strategy/live/{pricer_md_client.py => pricer_md_client.py.md} (89%) delete mode 100644 pyproject.toml delete mode 100644 pyrightconfig.json diff --git a/.vscode/settings.json b/.vscode/settings.json index 8b71a6a..f57d945 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -15,9 +15,6 @@ ], "python.envFile": "${workspaceFolder}/.env", "python.testing.debugPort": 3000, - "python.linting.enabled": true, - "python.linting.pylintEnabled": false, - "python.linting.mypyEnabled": true, "files.associations": { "*.py": "python" }, diff --git a/lib/client/cvtt_client.py b/lib/client/cvtt_client.py index 8815f68..7d01568 100644 --- a/lib/client/cvtt_client.py +++ b/lib/client/cvtt_client.py @@ -1,6 +1,7 @@ from __future__ import annotations -from typing import Dict, Any, List, Optional +import asyncio +from typing import Callable, Dict, Any, List, Optional import time import requests @@ -63,7 +64,6 @@ class RESTSender(NamedObject): f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore ) from excpt - class MdSummary(HistMdBar): def __init__( self, @@ -105,6 +105,7 @@ class MdSummary(HistMdBar): ) return res +MdSummaryCallbackT = Callable[[List[MdSummary]], None] class MdSummaryCollector(NamedObject): sender_: RESTSender @@ -114,8 +115,9 @@ class MdSummaryCollector(NamedObject): history_depth_sec_: int history_: List[MdSummary] + callbacks_: List[MdSummaryCallbackT] timer_: Optional[Timer] - + def __init__( self, sender: RESTSender, @@ -130,8 +132,12 @@ class MdSummaryCollector(NamedObject): self.interval_sec_ = interval_sec self.history_depth_sec_ = history_depth_sec - self.history_depth_sec_ = [] + self.history_ = [] + self.callbacks_ = [] self.timer_ = None + + def add_callback(self, cb: MdSummaryCallbackT) -> None: + self.callbacks_.append(cb) def rqst_data(self) -> Dict[str, Any]: return { @@ -145,22 +151,32 @@ class MdSummaryCollector(NamedObject): response: requests.Response = self.sender_.send_post( endpoint="md_summary", post_body=self.rqst_data() ) + if response.status_code not in (200, 201): + Log.error(f"{self.fname()}: Received error: {response.status_code} - {response.text}") + return [] return MdSummary.from_REST_response(response=response) def get_last(self) -> Optional[MdSummary]: rqst_data = self.rqst_data() - rqst_data["history_depth_sec"] = self.interval_sec_ + rqst_data["history_depth_sec"] = self.interval_sec_ * 2 response: requests.Response = self.sender_.send_post( endpoint="md_summary", post_body=rqst_data ) + if response.status_code not in (200, 201): + Log.error(f"{self.fname()}: Received error: {response.status_code} - {response.text}") + return None res = MdSummary.from_REST_response(response=response) return None if len(res) == 0 else res[-1] + def is_empty(self) -> bool: + return len(self.history_) == 0 + async def start(self) -> None: if self.timer_: Log.error(f"{self.fname()}: Timer is already started") return self.history_ = self.get_history() + self.run_callbacks() self.timer_ = Timer( start_in_sec=self.interval_sec_, is_periodic=True, @@ -169,15 +185,19 @@ class MdSummaryCollector(NamedObject): ) async def _load_new(self) -> None: + last: Optional[MdSummary] = self.get_last() if not last: - # URGENT logging + Log.warning(f"{self.fname()}: did not get last update") return - if last.ts_ns_ <= self.history_[-1].ts_ns_: - # URGENT logging + if not self.is_empty() and last.ts_ns_ <= self.history_[-1].ts_ns_: + Log.info(f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}") return self.history_.append(last) - # URGENT implement notification + self.run_callbacks() + + def run_callbacks(self) -> None: + [cb(self.history_) for cb in self.callbacks_] def stop(self) -> None: if self.timer_: @@ -196,7 +216,8 @@ class CvttRESTClient(NamedObject): if __name__ == "__main__": - config = Config(json_src={"cvtt_base_url": "http://cvtt-tester-01.cvtt.vpn:23456"}) + # config = Config(json_src={"cvtt_base_url": "http://cvtt-tester-01.cvtt.vpn:23456"}) + config = Config(json_src={"cvtt_base_url": "http://dev-server-02.cvtt.vpn:23456"}) cvtt_client = CvttRESTClient(config) @@ -208,6 +229,16 @@ if __name__ == "__main__": history_depth_sec=24 * 3600, ) - hist = mdsc.get_history() - last = mdsc.get_last() + def _calback(history: List[MdSummary]) -> None: + Log.info(f"MdSummary Hist Length is {len(history)}. Last summary: {history[-1] if len(history) > 0 else '[]'}") + + mdsc.add_callback(_calback) + + async def __run() -> None: + Log.info("Starting...") + await mdsc.start() + while True: + await asyncio.sleep(5) + + asyncio.run(__run()) pass diff --git a/lib/pt_strategy/live/live_strategy.py b/lib/pt_strategy/live/live_strategy.py index a1ee8f0..cdad098 100644 --- a/lib/pt_strategy/live/live_strategy.py +++ b/lib/pt_strategy/live/live_strategy.py @@ -12,11 +12,11 @@ from cvttpy_tools.settings.cvtt_types import JsonDictT # --- from cvttpy_trading.trading.instrument import ExchangeInstrument # --- -from pt_strategy.live.ti_sender import TradingInstructionsSender -from pt_strategy.model_data_policy import ModelDataPolicy -from pt_strategy.pt_market_data import RealTimeMarketData -from pt_strategy.pt_model import Prediction -from pt_strategy.trading_pair import PairState, TradingPair +from pairs_trading.lib.pt_strategy.live.ti_sender import TradingInstructionsSender +from pairs_trading.lib.pt_strategy.model_data_policy import ModelDataPolicy +from pairs_trading.lib.pt_strategy.pt_market_data import RealTimeMarketData +from pairs_trading.lib.pt_strategy.pt_model import Prediction +from pairs_trading.lib.pt_strategy.trading_pair import PairState, TradingPair """ --config=pair.cfg @@ -104,9 +104,9 @@ class PtLiveStrategy(NamedObject): pass async def _send_trading_instructions( - self, trading_instructions: pd.DataFrame + self, trading_instructions: List[TradingInstruction] ) -> None: - pass + pass # URGENT implement _send_trading_instructions def _create_trading_instructions( self, prediction: Prediction, last_row: pd.Series @@ -186,6 +186,11 @@ class PtLiveStrategy(NamedObject): } return df + def _create_close_trade_instructions( + self, pair: TradingPair, row: pd.Series #, prediction: Prediction + ) -> List[TradingInstruction]: + return [] # URGENT implement _create_close_trade_instructions + def _handle_outstanding_positions(self) -> Optional[pd.DataFrame]: trades = None pair = self.trading_pair_ diff --git a/lib/pt_strategy/live/pricer_md_client.py b/lib/pt_strategy/live/pricer_md_client.py.md similarity index 89% rename from lib/pt_strategy/live/pricer_md_client.py rename to lib/pt_strategy/live/pricer_md_client.py.md index 83a97fe..c60d37b 100644 --- a/lib/pt_strategy/live/pricer_md_client.py +++ b/lib/pt_strategy/live/pricer_md_client.py.md @@ -1,18 +1,19 @@ +```python from __future__ import annotations from functools import partial from typing import Dict, List -from cvtt_client.mkt_data import (CvttPricerWebSockClient, - CvttPricesSubscription, MessageTypeT, - SubscriptionIdT) +# from cvtt_client.mkt_data import (CvttPricerWebSockClient, +# CvttPricesSubscription, MessageTypeT, +# SubscriptionIdT) from cvttpy_tools.app import App from cvttpy_tools.base import NamedObject from cvttpy_tools.config import Config from cvttpy_tools.logger import Log from cvttpy_tools.settings.cvtt_types import JsonDictT -from pt_strategy.live.live_strategy import PtLiveStrategy -from pt_strategy.trading_pair import TradingPair +from pairs_trading.lib.pt_strategy.live.live_strategy import PtLiveStrategy +from pairs_trading.lib.pt_strategy.trading_pair import TradingPair """ --config=pair.cfg @@ -83,3 +84,4 @@ class PtMktDataClient(NamedObject): await self._subscribe() await self.pricer_client_.run() +``` \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml deleted file mode 100644 index dfec06d..0000000 --- a/pyproject.toml +++ /dev/null @@ -1,66 +0,0 @@ -[build-system] -requires = ["setuptools>=45", "wheel"] -build-backend = "setuptools.build_meta" - -[project] -name = "pairs-trading" -version = "0.1.0" -description = "Pairs Trading Backtesting Framework" -requires-python = ">=3.8" - -[tool.black] -line-length = 88 -target-version = ['py38'] -include = '\.pyi?$' -extend-exclude = ''' -/( - # directories - \.eggs - | \.git - | \.hg - | \.mypy_cache - | \.tox - | \.venv - | build - | dist -)/ -''' - -[tool.flake8] -max-line-length = 88 -extend-ignore = ["E203", "W503"] -exclude = [ - ".git", - "__pycache__", - "build", - "dist", - ".venv", - ".mypy_cache", - ".tox" -] - -[tool.mypy] -python_version = "3.8" -warn_return_any = true -warn_unused_configs = true -disallow_untyped_defs = true -disallow_incomplete_defs = true -check_untyped_defs = true -disallow_untyped_decorators = true -no_implicit_optional = true -warn_redundant_casts = true -warn_unused_ignores = true -warn_no_return = true -warn_unreachable = true -strict_equality = true - -[[tool.mypy.overrides]] -module = [ - "numpy.*", - "pandas.*", - "matplotlib.*", - "seaborn.*", - "scipy.*", - "sklearn.*" -] -ignore_missing_imports = true \ No newline at end of file diff --git a/pyrightconfig.json b/pyrightconfig.json deleted file mode 100644 index c2d10fd..0000000 --- a/pyrightconfig.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "include": [ - "lib" - ], - "exclude": [ - "**/node_modules", - "**/__pycache__", - "**/.*", - "results", - "data" - ], - "ignore": [], - "defineConstant": {}, - "typeCheckingMode": "basic", - "useLibraryCodeForTypes": true, - "autoImportCompletions": true, - "autoSearchPaths": true, - "extraPaths": [ - "lib", - ".." - ], - "stubPath": "./typings", - "venvPath": ".", - "venv": "python3.12-venv" -} \ No newline at end of file