from __future__ import annotations import asyncio from typing import Callable, Coroutine, Dict, Any, List, Optional, Set import requests from cvttpy_tools.base import NamedObject from cvttpy_tools.app import App from cvttpy_tools.logger import Log from cvttpy_tools.config import Config from cvttpy_tools.timer import Timer from cvttpy_tools.timeutils import NanosT, current_seconds, NanoPerSec from cvttpy_tools.settings.cvtt_types import InstrumentIdT, IntervalSecT # --- from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar from cvttpy_trading.trading.instrument import ExchangeInstrument from cvttpy_trading.trading.accounting.exch_account import ExchangeAccountNameT from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate from cvttpy_trading.trading.exchange_config import ExchangeAccounts # --- from pairs_trading.lib.live.rest_client import RESTSender class MdSummary(HistMdBar): def __init__( self, ts_ns: int, open: float, high: float, low: float, close: float, volume: float, vwap: float, num_trades: int, ): super().__init__(ts=ts_ns) self.open_ = open self.high_ = high self.low_ = low self.close_ = close self.volume_ = volume self.vwap_ = vwap self.num_trades_ = num_trades @classmethod def from_REST_response(cls, response: requests.Response) -> List[MdSummary]: res: List[MdSummary] = [] jresp = response.json() hist_data = jresp.get("historical_data", []) for hd in hist_data: res.append( MdSummary( ts_ns=hd["time_ns"], open=hd["open"], high=hd["high"], low=hd["low"], close=hd["close"], volume=hd["volume"], vwap=hd["vwap"], num_trades=hd["num_trades"], ) ) return res def create_md_trades_aggregate( self, exch_acct: ExchangeAccountNameT, exch_inst: ExchangeInstrument, interval_sec: IntervalSecT, ) -> MdTradesAggregate: res = MdTradesAggregate( exch_acct=exch_acct, exch_inst=exch_inst, interval_ns=interval_sec * NanoPerSec, ) res.set(mdbar=self) return res MdSummaryCallbackT = Callable[[List[MdTradesAggregate]], Coroutine] class MdSummaryCollector(NamedObject): sender_: RESTSender exch_acct_: ExchangeAccountNameT exch_inst_: ExchangeInstrument interval_sec_: IntervalSecT history_depth_sec_: IntervalSecT history_: List[MdTradesAggregate] callbacks_: List[MdSummaryCallbackT] timer_: Optional[Timer] def __init__( self, sender: RESTSender, exch_acct: ExchangeAccountNameT, instrument_id: InstrumentIdT, interval_sec: IntervalSecT, history_depth_sec: IntervalSecT, ) -> None: self.sender_ = sender self.exch_acct_ = exch_acct exch_inst = ExchangeAccounts.instance().get_exchange_instrument( exch_acct=exch_acct, instrument_id=instrument_id ) assert exch_inst is not None, f"Unable to find Exchange instrument for {exch_acct}/{instrument_id}" self.exch_inst_ = exch_inst self.interval_sec_ = interval_sec self.history_depth_sec_ = history_depth_sec self.history_ = [] self.callbacks_ = [] self.timer_ = None def add_callback(self, cb: MdSummaryCallbackT) -> None: self.callbacks_.append(cb) def __hash__(self): return hash( ( self.exch_acct_, self.exch_inst_.instrument_id(), self.interval_sec_, self.history_depth_sec_, ) ) def rqst_data(self) -> Dict[str, Any]: return { "exch_acct": self.exch_acct_, "instrument_id": self.exch_inst_.instrument_id(), "interval_sec": self.interval_sec_, "history_depth_sec": self.history_depth_sec_, } def get_history(self) -> List[MdSummary]: response: requests.Response = self.sender_.send_post( endpoint="md_summary", post_body=self.rqst_data() ) if response.status_code not in (200, 201): Log.error( f"{self.fname()}: Received error: {response.status_code} - {response.text}" ) return [] return MdSummary.from_REST_response(response=response) def get_last(self) -> Optional[MdSummary]: Log.info(f"{self.fname()}: for {self.exch_inst_.details_short()}") rqst_data = self.rqst_data() rqst_data["history_depth_sec"] = self.interval_sec_ * 2 response: requests.Response = self.sender_.send_post( endpoint="md_summary", post_body=rqst_data ) if response.status_code not in (200, 201): Log.error( f"{self.fname()}: Received error: {response.status_code} - {response.text}" ) return None res = MdSummary.from_REST_response(response=response) return None if len(res) == 0 else res[-1] def is_empty(self) -> bool: return len(self.history_) == 0 async def start(self) -> None: if self.timer_: Log.error(f"{self.fname()}: Timer is already started") return mdsum_hist = self.get_history() self.history_ = [ mdsum.create_md_trades_aggregate( exch_acct=self.exch_acct_, exch_inst=self.exch_inst_, interval_sec=self.interval_sec_, ) for mdsum in mdsum_hist ] await self.run_callbacks() self.set_timer() def set_timer(self): if self.timer_: self.timer_.cancel() start_in = self.next_load_time() - current_seconds() self.timer_ = Timer( start_in_sec=start_in, func=self._load_new, ) Log.info(f"{self.fname()} Timer for {self.exch_inst_.details_short()} is set to run in {start_in} sec") def next_load_time(self) -> NanosT: curr_sec = int(current_seconds()) return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + 2 async def _load_new(self) -> None: last: Optional[MdSummary] = self.get_last() if not last: Log.warning(f"{self.fname()}: did not get last update") elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].time_ns_: Log.info( f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}" ) else: self.history_.append(last.create_md_trades_aggregate(exch_acct=self.exch_acct_, exch_inst=self.exch_inst_, interval_sec=self.interval_sec_)) await self.run_callbacks() self.set_timer() async def run_callbacks(self) -> None: [await cb(self.history_) for cb in self.callbacks_] def stop(self) -> None: if self.timer_: self.timer_.cancel() self.timer_ = None class CvttRestMktDataClient(NamedObject): config_: Config sender_: RESTSender collectors_: Set[MdSummaryCollector] def __init__(self, config: Config) -> None: self.config_ = config base_url = self.config_.get_value("cvtt_base_url", default="") assert base_url self.sender_ = RESTSender(base_url=base_url) self.collectors_ = set() async def add_subscription( self, exch_acct: ExchangeAccountNameT, instrument_id: InstrumentIdT, interval_sec: IntervalSecT, history_depth_sec: IntervalSecT, callback: MdSummaryCallbackT, ) -> None: mdsc = MdSummaryCollector( sender=self.sender_, exch_acct=exch_acct, instrument_id=instrument_id, interval_sec=interval_sec, history_depth_sec=history_depth_sec, ) mdsc.add_callback(callback) self.collectors_.add(mdsc) await mdsc.start() if __name__ == "__main__": config = Config(json_src={"cvtt_base_url": "http://cvtt-tester-01.cvtt.vpn:23456"}) # config = Config(json_src={"cvtt_base_url": "http://dev-server-02.cvtt.vpn:23456"}) async def _calback(history: List[MdTradesAggregate]) -> None: Log.info( f"MdSummary Hist Length is {len(history)}. Last summary: {history[-1] if len(history) > 0 else '[]'}" ) async def __run() -> None: Log.info("Starting...") cvtt_client = CvttRestMktDataClient(config) await cvtt_client.add_subscription( exch_acct="COINBASE_AT", instrument_id="PAIR-BTC-USD", interval_sec=60, history_depth_sec=24 * 3600, callback=_calback, ) while True: await asyncio.sleep(5) asyncio.run(__run()) pass