from __future__ import annotations from typing import Dict, Any, List, Optional import time import requests from cvttpy_tools.base import NamedObject from cvttpy_tools.logger import Log from cvttpy_tools.config import Config from cvttpy_tools.timer import Timer from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar class RESTSender(NamedObject): session_: requests.Session base_url_: str def __init__(self, base_url: str) -> None: self.base_url_ = base_url self.session_ = requests.Session() def is_ready(self) -> bool: """Checks if the server is up and responding""" url = f"{self.base_url_}/ping" try: response = self.session_.get(url) response.raise_for_status() return True except requests.exceptions.RequestException: return False def send_post(self, endpoint: str, post_body: Dict) -> requests.Response: while not self.is_ready(): print("Waiting for FrontGateway to start...") time.sleep(5) url = f"{self.base_url_}/{endpoint}" try: return self.session_.request( method="POST", url=url, json=post_body, headers={"Content-Type": "application/json"}, ) except requests.exceptions.RequestException as excpt: raise ConnectionError( f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore ) from excpt def send_get(self, endpoint: str) -> requests.Response: while not self.is_ready(): print("Waiting for FrontGateway to start...") time.sleep(5) url = f"{self.base_url_}/{endpoint}" try: return self.session_.request(method="GET", url=url) except requests.exceptions.RequestException as excpt: raise ConnectionError( f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore ) from excpt class MdSummary(HistMdBar): def __init__( self, ts_ns: int, open: float, high: float, low: float, close: float, volume: float, vwap: float, num_trades: int, ): super().__init__(ts=ts_ns) self.open_ = open self.high_ = high self.low_ = low self.close_ = close self.volume_ = volume self.vwap_ = vwap self.num_trades_ = num_trades @classmethod def from_REST_response(cls, response: requests.Response) -> List[MdSummary]: res: List[MdSummary] = [] jresp = response.json() hist_data = jresp.get("historical_data", []) for hd in hist_data: res.append( MdSummary( ts_ns=hd["time_ns"], open=hd["open"], high=hd["high"], low=hd["low"], close=hd["close"], volume=hd["volume"], vwap=hd["vwap"], num_trades=hd["num_trades"], ) ) return res class MdSummaryCollector(NamedObject): sender_: RESTSender exch_acct_: str instrument_id_: str interval_sec_: int history_depth_sec_: int history_: List[MdSummary] timer_: Optional[Timer] def __init__( self, sender: RESTSender, exch_acct: str, instrument_id: str, interval_sec: int, history_depth_sec: int, ) -> None: self.sender_ = sender self.exch_acct_ = exch_acct self.instrument_id_ = instrument_id self.interval_sec_ = interval_sec self.history_depth_sec_ = history_depth_sec self.history_depth_sec_ = [] self.timer_ = None def rqst_data(self) -> Dict[str, Any]: return { "exch_acct": self.exch_acct_, "instrument_id": self.instrument_id_, "interval_sec": self.interval_sec_, "history_depth_sec": self.history_depth_sec_, } def get_history(self) -> List[MdSummary]: response: requests.Response = self.sender_.send_post( endpoint="md_summary", post_body=self.rqst_data() ) return MdSummary.from_REST_response(response=response) def get_last(self) -> Optional[MdSummary]: rqst_data = self.rqst_data() rqst_data["history_depth_sec"] = self.interval_sec_ response: requests.Response = self.sender_.send_post( endpoint="md_summary", post_body=rqst_data ) res = MdSummary.from_REST_response(response=response) return None if len(res) == 0 else res[-1] async def start(self) -> None: if self.timer_: Log.error(f"{self.fname()}: Timer is already started") return self.history_ = self.get_history() self.timer_ = Timer( start_in_sec=self.interval_sec_, is_periodic=True, period_interval=self.interval_sec_, func=self._load_new, ) async def _load_new(self) -> None: last: Optional[MdSummary] = self.get_last() if not last: # URGENT logging return if last.ts_ns_ <= self.history_[-1].ts_ns_: # URGENT logging return self.history_.append(last) # URGENT implement notification def stop(self) -> None: if self.timer_: self.timer_.cancel() self.timer_ = None class CvttRESTClient(NamedObject): config_: Config sender_: RESTSender def __init__(self, config: Config) -> None: self.config_ = config base_url = self.config_.get_value("cvtt_base_url", default="") assert base_url self.sender_ = RESTSender(base_url=base_url) if __name__ == "__main__": config = Config(json_src={"cvtt_base_url": "http://cvtt-tester-01.cvtt.vpn:23456"}) cvtt_client = CvttRESTClient(config) mdsc = MdSummaryCollector( sender=cvtt_client.sender_, exch_acct="COINBASE_AT", instrument_id="PAIR-BTC-USD", interval_sec=60, history_depth_sec=24 * 3600, ) hist = mdsc.get_history() last = mdsc.get_last() pass