pairs_trading/lib/live/mkt_data_client.py
2026-01-22 23:52:17 +00:00

281 lines
9.0 KiB
Python

from __future__ import annotations
import asyncio
from typing import Callable, Coroutine, Dict, Any, List, Optional, Set
import requests
from cvttpy_tools.base import NamedObject
from cvttpy_tools.app import App
from cvttpy_tools.logger import Log
from cvttpy_tools.config import Config
from cvttpy_tools.timer import Timer
from cvttpy_tools.timeutils import NanosT, current_seconds, NanoPerSec
from cvttpy_tools.settings.cvtt_types import InstrumentIdT, IntervalSecT
# ---
from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar
from cvttpy_trading.trading.instrument import ExchangeInstrument
from cvttpy_trading.trading.accounting.exch_account import ExchangeAccountNameT
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate
from cvttpy_trading.trading.exchange_config import ExchangeAccounts
# ---
from pairs_trading.lib.live.rest_client import RESTSender
class MdSummary(HistMdBar):
def __init__(
self,
ts_ns: int,
open: float,
high: float,
low: float,
close: float,
volume: float,
vwap: float,
num_trades: int,
):
super().__init__(ts=ts_ns)
self.open_ = open
self.high_ = high
self.low_ = low
self.close_ = close
self.volume_ = volume
self.vwap_ = vwap
self.num_trades_ = num_trades
@classmethod
def from_REST_response(cls, response: requests.Response) -> List[MdSummary]:
res: List[MdSummary] = []
jresp = response.json()
hist_data = jresp.get("historical_data", [])
for hd in hist_data:
res.append(
MdSummary(
ts_ns=hd["time_ns"],
open=hd["open"],
high=hd["high"],
low=hd["low"],
close=hd["close"],
volume=hd["volume"],
vwap=hd["vwap"],
num_trades=hd["num_trades"],
)
)
return res
def create_md_trades_aggregate(
self,
exch_acct: ExchangeAccountNameT,
exch_inst: ExchangeInstrument,
interval_sec: IntervalSecT,
) -> MdTradesAggregate:
res = MdTradesAggregate(
exch_acct=exch_acct,
exch_inst=exch_inst,
interval_ns=interval_sec * NanoPerSec,
)
res.set(mdbar=self)
return res
MdSummaryCallbackT = Callable[[List[MdTradesAggregate]], Coroutine]
class MdSummaryCollector(NamedObject):
sender_: RESTSender
exch_acct_: ExchangeAccountNameT
exch_inst_: ExchangeInstrument
interval_sec_: IntervalSecT
history_depth_sec_: IntervalSecT
history_: List[MdTradesAggregate]
callbacks_: List[MdSummaryCallbackT]
timer_: Optional[Timer]
def __init__(
self,
sender: RESTSender,
exch_acct: ExchangeAccountNameT,
instrument_id: InstrumentIdT,
interval_sec: IntervalSecT,
history_depth_sec: IntervalSecT,
) -> None:
self.sender_ = sender
self.exch_acct_ = exch_acct
exch_inst = ExchangeAccounts.instance().get_exchange_instrument(
exch_acct=exch_acct, instrument_id=instrument_id
)
assert exch_inst is not None, f"Unable to find Exchange instrument for {exch_acct}/{instrument_id}"
self.exch_inst_ = exch_inst
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
self.history_ = []
self.callbacks_ = []
self.timer_ = None
def add_callback(self, cb: MdSummaryCallbackT) -> None:
self.callbacks_.append(cb)
def __hash__(self):
return hash(
(
self.exch_acct_,
self.exch_inst_.instrument_id(),
self.interval_sec_,
self.history_depth_sec_,
)
)
def rqst_data(self) -> Dict[str, Any]:
return {
"exch_acct": self.exch_acct_,
"instrument_id": self.exch_inst_.instrument_id(),
"interval_sec": self.interval_sec_,
"history_depth_sec": self.history_depth_sec_,
}
def get_history(self) -> List[MdSummary]:
response: requests.Response = self.sender_.send_post(
endpoint="md_summary", post_body=self.rqst_data()
)
if response.status_code not in (200, 201):
Log.error(
f"{self.fname()}: Received error: {response.status_code} - {response.text}"
)
return []
return MdSummary.from_REST_response(response=response)
def get_last(self) -> Optional[MdSummary]:
Log.info(f"{self.fname()}: for {self.exch_inst_.details_short()}")
rqst_data = self.rqst_data()
rqst_data["history_depth_sec"] = self.interval_sec_ * 2
response: requests.Response = self.sender_.send_post(
endpoint="md_summary", post_body=rqst_data
)
if response.status_code not in (200, 201):
Log.error(
f"{self.fname()}: Received error: {response.status_code} - {response.text}"
)
return None
res = MdSummary.from_REST_response(response=response)
Log.info(f"DEBUG *** {self.exch_inst_.base_asset_id_}: {res[-1].tstamp_}")
return None if len(res) == 0 else res[-1]
def is_empty(self) -> bool:
return len(self.history_) == 0
async def start(self) -> None:
if self.timer_:
Log.error(f"{self.fname()}: Timer is already started")
return
mdsum_hist = self.get_history()
self.history_ = [
mdsum.create_md_trades_aggregate(
exch_acct=self.exch_acct_,
exch_inst=self.exch_inst_,
interval_sec=self.interval_sec_,
)
for mdsum in mdsum_hist
]
await self.run_callbacks()
self.set_timer()
def set_timer(self):
if self.timer_:
self.timer_.cancel()
start_in = self.next_load_time() - current_seconds()
self.timer_ = Timer(
start_in_sec=start_in,
func=self._load_new,
)
Log.info(f"{self.fname()} Timer for {self.exch_inst_.details_short()} is set to run in {start_in} sec")
def next_load_time(self) -> NanosT:
curr_sec = int(current_seconds())
return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + 5
async def _load_new(self) -> None:
last: Optional[MdSummary] = self.get_last()
if not last:
Log.warning(f"{self.fname()}: did not get last update")
elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].aggr_time_ns_:
Log.info(
f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}"
)
else:
self.history_.append(last.create_md_trades_aggregate(exch_acct=self.exch_acct_, exch_inst=self.exch_inst_, interval_sec=self.interval_sec_))
await self.run_callbacks()
self.set_timer()
async def run_callbacks(self) -> None:
[await cb(self.history_) for cb in self.callbacks_]
def stop(self) -> None:
if self.timer_:
self.timer_.cancel()
self.timer_ = None
class CvttRestMktDataClient(NamedObject):
config_: Config
sender_: RESTSender
collectors_: Set[MdSummaryCollector]
def __init__(self, config: Config) -> None:
self.config_ = config
base_url = self.config_.get_value("cvtt_base_url", default="")
assert base_url
self.sender_ = RESTSender(base_url=base_url)
self.collectors_ = set()
async def add_subscription(
self,
exch_acct: ExchangeAccountNameT,
instrument_id: InstrumentIdT,
interval_sec: IntervalSecT,
history_depth_sec: IntervalSecT,
callback: MdSummaryCallbackT,
) -> None:
mdsc = MdSummaryCollector(
sender=self.sender_,
exch_acct=exch_acct,
instrument_id=instrument_id,
interval_sec=interval_sec,
history_depth_sec=history_depth_sec,
)
mdsc.add_callback(callback)
self.collectors_.add(mdsc)
await mdsc.start()
if __name__ == "__main__":
config = Config(json_src={"cvtt_base_url": "http://cvtt-tester-01.cvtt.vpn:23456"})
# config = Config(json_src={"cvtt_base_url": "http://dev-server-02.cvtt.vpn:23456"})
async def _calback(history: List[MdTradesAggregate]) -> None:
Log.info(
f"MdSummary Hist Length is {len(history)}. Last summary: {history[-1] if len(history) > 0 else '[]'}"
)
async def __run() -> None:
Log.info("Starting...")
cvtt_client = CvttRestMktDataClient(config)
await cvtt_client.add_subscription(
exch_acct="COINBASE_AT",
instrument_id="PAIR-BTC-USD",
interval_sec=60,
history_depth_sec=24 * 3600,
callback=_calback,
)
while True:
await asyncio.sleep(5)
asyncio.run(__run())
pass