From 842eb3ec623b56ea8302fbe8b0f697e1f4490f4a Mon Sep 17 00:00:00 2001 From: Oleg Sheynin Date: Thu, 1 Jan 2026 01:36:31 +0000 Subject: [PATCH] dev progress --- apps/pairs_trader.py | 52 +++++-------- lib/live/mkt_data_client.py | 108 +++++++++++++++++++------- lib/live/ti_sender.py | 56 +++++++++++++ lib/pt_strategy/live/live_strategy.py | 2 - lib/pt_strategy/live/ti_sender.py | 86 -------------------- 5 files changed, 155 insertions(+), 149 deletions(-) create mode 100644 lib/live/ti_sender.py delete mode 100644 lib/pt_strategy/live/ti_sender.py diff --git a/apps/pairs_trader.py b/apps/pairs_trader.py index 45d8fbb..c2fcd07 100644 --- a/apps/pairs_trader.py +++ b/apps/pairs_trader.py @@ -1,23 +1,21 @@ from __future__ import annotations -from functools import partial -from typing import Callable, Coroutine, Dict, List +from typing import Callable, Coroutine, List -from cvttpy_tools.settings.cvtt_types import JsonDictT from cvttpy_tools.app import App from cvttpy_tools.config import Config from cvttpy_tools.base import NamedObject from cvttpy_tools.config import CvttAppConfig from cvttpy_tools.logger import Log +from cvttpy_tools.settings.cvtt_types import BookIdT # --- from cvttpy_trading.trading.instrument import ExchangeInstrument -from cvttpy_trading.trading.active_instruments import Instruments from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate from cvttpy_trading.trading.exchange_config import ExchangeAccounts # --- from pairs_trading.lib.pt_strategy.live.live_strategy import PtLiveStrategy -from pairs_trading.lib.live.mkt_data_client import CvttRestMktDataClient, MdSummary -from pairs_trading.lib.pt_strategy.live.ti_sender import TradingInstructionsSender +from pairs_trading.lib.live.mkt_data_client import CvttRestMktDataClient +from pairs_trading.lib.live.ti_sender import TradingInstructionsSender # import sys # print("PYTHONPATH directories:") @@ -25,17 +23,7 @@ from pairs_trading.lib.pt_strategy.live.ti_sender import TradingInstructionsSend # print(path) ''' -Config -======= -{ - "cvtt_base_url": "http://cvtt-tester-01.cvtt.vpn:23456", - "ti_config": { - TODO - }, - "strategy_config": { - TODO - } -} +config http://cloud16.cvtt.vpn/apps/pairs_trading ''' HistMdCbT = Callable[[List[MdTradesAggregate]], Coroutine] @@ -44,6 +32,7 @@ UpdateMdCbT = Callable[[MdTradesAggregate], Coroutine] class PairsTrader(NamedObject): config_: CvttAppConfig instruments_: List[ExchangeInstrument] + book_id_: BookIdT live_strategy_: PtLiveStrategy pricer_client_: CvttRestMktDataClient @@ -62,11 +51,18 @@ class PairsTrader(NamedObject): ), ) + App.instance().add_cmdline_arg( + "--book_id", + type=str, + required=True, + help="Book ID" + ) App.instance().add_call(App.Stage.Config, self._on_config()) App.instance().add_call(App.Stage.Run, self.run()) async def _on_config(self) -> None: self.config_ = CvttAppConfig.instance() + self.book_id_ = App.instance().get_argument(name="book_id") # ------- PARSE INSTRUMENTS ------- instr_str = App.instance().get_argument("pair", "") @@ -90,13 +86,6 @@ class PairsTrader(NamedObject): Log.info(f"{self.fname()} Instruments: {self.instruments_[0].details_short()} <==> {self.instruments_[1].details_short()}") - - # ------- CREATE CVTT CLIENT ------- - ti_config = self.config_.get_subconfig("ti_config", Config(json_src={})) - self.ti_sender_ = TradingInstructionsSender(config=ti_config) - Log.info(f"{self.fname()} TI client created: {self.ti_sender_}") - - # ------- CREATE STRATEGY ------- strategy_config = self.config_.get_subconfig("strategy_config", Config({})) self.live_strategy_ = PtLiveStrategy( @@ -108,13 +97,11 @@ class PairsTrader(NamedObject): # # ------- CREATE PRICER CLIENT ------- self.pricer_client_ = CvttRestMktDataClient(config=self.config_) + Log.info(f"{self.fname()} MD client created: {self.pricer_client_}") # ------- CREATE TRADER CLIENT ------- - # URGENT CREATE TRADER CLIENT - # (send TradingInstructions) - # ti_config = self.config_.get_subconfig("ti_config", {}) - # self.ti_sender_ = TradingInstructionsSender(config=ti_config) - # Log.info(f"{self.fname()} TI client created: {self.ti_sender_}") + self.ti_sender_ = TradingInstructionsSender(config=self.config_, pairs_trader=self) + Log.info(f"{self.fname()} TI client created: {self.ti_sender_}") # # ------- CREATE REST SERVER ------- @@ -133,10 +120,9 @@ class PairsTrader(NamedObject): callback=self._on_md_summary ) - async def _on_md_summary(self, history: List[MdSummary]) -> None: - # depth = len(history) - # if depth < 2: - pass # URGENT + async def _on_md_summary(self, history: List[MdTradesAggregate]) -> None: + # Snapshot or update? + await self.live_strategy_.on_mkt_data_hist_snapshot(hist_aggr=history) async def run(self) -> None: Log.info(f"{self.fname()} ...") diff --git a/lib/live/mkt_data_client.py b/lib/live/mkt_data_client.py index b91a044..d94aacb 100644 --- a/lib/live/mkt_data_client.py +++ b/lib/live/mkt_data_client.py @@ -10,11 +10,16 @@ from cvttpy_tools.app import App from cvttpy_tools.logger import Log from cvttpy_tools.config import Config from cvttpy_tools.timer import Timer -from cvttpy_tools.timeutils import NanosT, current_seconds +from cvttpy_tools.timeutils import NanosT, current_seconds, NanoPerSec from cvttpy_tools.settings.cvtt_types import InstrumentIdT, IntervalSecT + # --- from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar +from cvttpy_trading.trading.instrument import ExchangeInstrument from cvttpy_trading.trading.accounting.exch_account import ExchangeAccountNameT +from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate +from cvttpy_trading.trading.exchange_config import ExchangeAccounts + # --- from pairs_trading.lib.live.rest_client import RESTSender @@ -60,19 +65,36 @@ class MdSummary(HistMdBar): ) return res -MdSummaryCallbackT = Callable[[List[MdSummary]], Coroutine] + def create_md_trades_aggregate( + self, + exch_acct: ExchangeAccountNameT, + exch_inst: ExchangeInstrument, + interval_sec: IntervalSecT, + ) -> MdTradesAggregate: + res = MdTradesAggregate( + exch_acct=exch_acct, + exch_inst=exch_inst, + interval_ns=interval_sec * NanoPerSec, + ) + res.set(mdbar=self) + return res + + +MdSummaryCallbackT = Callable[[List[MdTradesAggregate]], Coroutine] + class MdSummaryCollector(NamedObject): sender_: RESTSender exch_acct_: ExchangeAccountNameT - instrument_id_: InstrumentIdT + exch_inst_: ExchangeInstrument interval_sec_: IntervalSecT history_depth_sec_: IntervalSecT - history_: List[MdSummary] + history_: List[MdTradesAggregate] + callbacks_: List[MdSummaryCallbackT] timer_: Optional[Timer] - + def __init__( self, sender: RESTSender, @@ -83,24 +105,36 @@ class MdSummaryCollector(NamedObject): ) -> None: self.sender_ = sender self.exch_acct_ = exch_acct - self.instrument_id_ = instrument_id + + exch_inst = ExchangeAccounts.instance().get_exchange_instrument( + exch_acct=exch_acct, instrument_id=instrument_id + ) + assert exch_inst is not None, f"Unable to find Exchange instrument for {exch_acct}/{instrument_id}" + self.exch_inst_ = exch_inst self.interval_sec_ = interval_sec self.history_depth_sec_ = history_depth_sec self.history_ = [] self.callbacks_ = [] self.timer_ = None - + def add_callback(self, cb: MdSummaryCallbackT) -> None: self.callbacks_.append(cb) def __hash__(self): - return hash((self.exch_acct_, self.instrument_id_, self.interval_sec_, self.history_depth_sec_)) - + return hash( + ( + self.exch_acct_, + self.exch_inst_.instrument_id(), + self.interval_sec_, + self.history_depth_sec_, + ) + ) + def rqst_data(self) -> Dict[str, Any]: return { "exch_acct": self.exch_acct_, - "instrument_id": self.instrument_id_, + "instrument_id": self.exch_inst_.instrument_id(), "interval_sec": self.interval_sec_, "history_depth_sec": self.history_depth_sec_, } @@ -110,7 +144,9 @@ class MdSummaryCollector(NamedObject): endpoint="md_summary", post_body=self.rqst_data() ) if response.status_code not in (200, 201): - Log.error(f"{self.fname()}: Received error: {response.status_code} - {response.text}") + Log.error( + f"{self.fname()}: Received error: {response.status_code} - {response.text}" + ) return [] return MdSummary.from_REST_response(response=response) @@ -121,19 +157,29 @@ class MdSummaryCollector(NamedObject): endpoint="md_summary", post_body=rqst_data ) if response.status_code not in (200, 201): - Log.error(f"{self.fname()}: Received error: {response.status_code} - {response.text}") + Log.error( + f"{self.fname()}: Received error: {response.status_code} - {response.text}" + ) return None res = MdSummary.from_REST_response(response=response) return None if len(res) == 0 else res[-1] def is_empty(self) -> bool: return len(self.history_) == 0 - + async def start(self) -> None: if self.timer_: Log.error(f"{self.fname()}: Timer is already started") return - self.history_ = self.get_history() + mdsum_hist = self.get_history() + self.history_ = [ + mdsum.create_md_trades_aggregate( + exch_acct=self.exch_acct_, + exch_inst=self.exch_inst_, + interval_sec=self.interval_sec_, + ) + for mdsum in mdsum_hist + ] await self.run_callbacks() self.set_timer() @@ -148,27 +194,30 @@ class MdSummaryCollector(NamedObject): def next_load_time(self) -> NanosT: curr_sec = int(current_seconds()) return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + 2 - + async def _load_new(self) -> None: - + last: Optional[MdSummary] = self.get_last() if not last: Log.warning(f"{self.fname()}: did not get last update") - elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].ts_ns_: - Log.info(f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}") + elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].time_ns_: + Log.info( + f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}" + ) else: - self.history_.append(last) + self.history_.append(last.create_md_trades_aggregate(exch_acct=self.exch_acct_, exch_inst=self.exch_inst_, interval_sec=self.interval_sec_)) await self.run_callbacks() self.set_timer() - + async def run_callbacks(self) -> None: [await cb(self.history_) for cb in self.callbacks_] - + def stop(self) -> None: if self.timer_: self.timer_.cancel() self.timer_ = None + class CvttRestMktDataClient(NamedObject): config_: Config sender_: RESTSender @@ -181,12 +230,13 @@ class CvttRestMktDataClient(NamedObject): self.sender_ = RESTSender(base_url=base_url) self.collectors_ = set() - async def add_subscription(self, + async def add_subscription( + self, exch_acct: ExchangeAccountNameT, instrument_id: InstrumentIdT, interval_sec: IntervalSecT, history_depth_sec: IntervalSecT, - callback: MdSummaryCallbackT + callback: MdSummaryCallbackT, ) -> None: mdsc = MdSummaryCollector( sender=self.sender_, @@ -199,14 +249,16 @@ class CvttRestMktDataClient(NamedObject): self.collectors_.add(mdsc) await mdsc.start() + if __name__ == "__main__": config = Config(json_src={"cvtt_base_url": "http://cvtt-tester-01.cvtt.vpn:23456"}) # config = Config(json_src={"cvtt_base_url": "http://dev-server-02.cvtt.vpn:23456"}) - async def _calback(history: List[MdSummary]) -> None: - Log.info(f"MdSummary Hist Length is {len(history)}. Last summary: {history[-1] if len(history) > 0 else '[]'}") + async def _calback(history: List[MdTradesAggregate]) -> None: + Log.info( + f"MdSummary Hist Length is {len(history)}. Last summary: {history[-1] if len(history) > 0 else '[]'}" + ) - async def __run() -> None: Log.info("Starting...") cvtt_client = CvttRestMktDataClient(config) @@ -215,10 +267,10 @@ if __name__ == "__main__": instrument_id="PAIR-BTC-USD", interval_sec=60, history_depth_sec=24 * 3600, - callback=_calback + callback=_calback, ) while True: await asyncio.sleep(5) - + asyncio.run(__run()) pass diff --git a/lib/live/ti_sender.py b/lib/live/ti_sender.py new file mode 100644 index 0000000..0138888 --- /dev/null +++ b/lib/live/ti_sender.py @@ -0,0 +1,56 @@ +from enum import Enum + +import requests + +# import aiohttp +from cvttpy_tools.base import NamedObject +from cvttpy_tools.config import Config +from cvttpy_tools.logger import Log +# --- +from cvttpy_trading.trading.trading_instructions import TradingInstructions +# --- +from pairs_trading.lib.live.rest_client import RESTSender +from pairs_trading.apps.pairs_trader import PairsTrader + + +class TradingInstructionsSender(NamedObject): + config_: Config + sender_: RESTSender + pairs_trader_: PairsTrader + + class TradingInstType(str, Enum): + TARGET_POSITION = "TARGET_POSITION" + DIRECT_ORDER = "DIRECT_ORDER" + MARKET_MAKING = "MARKET_MAKING" + NONE = "NONE" + + # config_: Config + # ti_method_: str + # ti_url_: str + # health_check_method_: str + # health_check_url_: str + + def __init__(self, config: Config, pairs_trader: PairsTrader) -> None: + self.config_ = config + base_url = self.config_.get_value("cvtt_base_url", default="") + assert base_url + self.sender_ = RESTSender(base_url=base_url) + self.pairs_trader_ = pairs_trader + + self.book_id_ = self.pairs_trader_.book_id_ + assert self.book_id_, "book_id is required" + + self.strategy_id_ = config.get_value("strategy_id", "") + assert self.strategy_id_, "strategy_id is required" + + + async def send_trading_instructions(self, ti: TradingInstructions) -> None: + + response: requests.Response = self.sender_.send_post( + endpoint="trading_instructions", post_body=ti.to_dict() + ) + if response.status_code not in (200, 201): + Log.error( + f"{self.fname()}: Received error: {response.status_code} - {response.text}" + ) + diff --git a/lib/pt_strategy/live/live_strategy.py b/lib/pt_strategy/live/live_strategy.py index 5dc814c..cb037ef 100644 --- a/lib/pt_strategy/live/live_strategy.py +++ b/lib/pt_strategy/live/live_strategy.py @@ -120,7 +120,6 @@ class PtLiveStrategy(NamedObject): # await self.pt_mkt_data_.on_mkt_data_hist_snapshot(snapshot=aggr) pass # URGENT PtiveStrategy.on_mkt_data_hist_snapshot() - async def on_mkt_data_update(self, aggr: MdTradesAggregate) -> None: # if market_data_df is not None: # self.trading_pair_.market_data_ = market_data_df # self.model_data_policy_.advance() @@ -139,7 +138,6 @@ class PtLiveStrategy(NamedObject): # if len(trading_instructions) > 0: # await self._send_trading_instructions(trading_instructions) # # trades = self._create_trades(prediction=prediction, last_row=market_data_df.iloc[-1]) - pass # URGENT def interval_sec(self) -> IntervalSecT: return self.interval_sec_ diff --git a/lib/pt_strategy/live/ti_sender.py b/lib/pt_strategy/live/ti_sender.py deleted file mode 100644 index 9c00cfc..0000000 --- a/lib/pt_strategy/live/ti_sender.py +++ /dev/null @@ -1,86 +0,0 @@ -import time -from enum import Enum -from typing import Tuple - -# import aiohttp -from cvttpy_tools.app import App -from cvttpy_tools.base import NamedObject -from cvttpy_tools.config import Config -from cvttpy_tools.logger import Log -from cvttpy_tools.timer import Timer -from cvttpy_tools.timeutils import NanoPerSec -from cvttpy_tools.web.rest_client import REST_RequestProcessor - - -class TradingInstructionsSender(NamedObject): - - class TradingInstType(str, Enum): - TARGET_POSITION = "TARGET_POSITION" - DIRECT_ORDER = "DIRECT_ORDER" - MARKET_MAKING = "MARKET_MAKING" - NONE = "NONE" - - config_: Config - ti_method_: str - ti_url_: str - health_check_method_: str - health_check_url_: str - - def __init__(self, config: Config): - self.config_ = config - base_url = config.get_value("url", "ws://localhost:12346/ws") - - self.book_id_ = config.get_value("book_id", "") - assert self.book_id_, "book_id is required" - - self.strategy_id_ = config.get_value("strategy_id", "") - assert self.strategy_id_, "strategy_id is required" - - endpoint_uri = config.get_value("ti_endpoint/url", "/trading_instructions") - endpoint_method = config.get_value("ti_endpoint/method", "POST") - - health_check_uri = config.get_value("health_check_endpoint/url", "/ping") - health_check_method = config.get_value("health_check_endpoint/method", "GET") - - - - self.ti_method_ = endpoint_method - self.ti_url_ = f"{base_url}{endpoint_uri}" - - self.health_check_method_ = health_check_method - self.health_check_url_ = f"{base_url}{health_check_uri}" - - App.instance().add_call(App.Stage.Start, self._set_health_check_timer(), can_run_now=True) - - async def _set_health_check_timer(self) -> None: - # TODO: configurable interval - self.health_check_timer_ = Timer(is_periodic=True, period_interval=15, start_in_sec=0, func=self._health_check) - Log.info(f"{self.fname()} Health check timer set to 15 seconds") - - async def _health_check(self) -> None: - rqst = REST_RequestProcessor(method=self.health_check_method_, url=self.health_check_url_) - async with rqst as (status, msg, headers): - if status != 200: - Log.error(f"{self.fname()} CVTT Service is not responding") - - async def send_tgt_positions(self, strength: float, base_asset: str, quote_asset: str) -> Tuple[int, str]: - instr = { - "type": self.TradingInstType.TARGET_POSITION.value, - "book_id": self.book_id_, - "strategy_id": self.strategy_id_, - "issued_ts_ns": int(time.time() * NanoPerSec), - "data": { - "strength": strength, - "base_asset": base_asset, - "quote_asset": quote_asset, - "user_data": {}, - }, - } - - rqst = REST_RequestProcessor(method=self.ti_method_, url=self.ti_url_, params=instr) - async with rqst as (status, msg, headers): - if status != 200: - raise ConnectionError(f"Failed to send trading instructions: {msg}") - return (status, msg) - -