280 lines
8.9 KiB
Python
280 lines
8.9 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import Callable, Coroutine, Dict, Any, List, Optional, Set
|
|
|
|
import requests
|
|
|
|
from cvttpy_tools.base import NamedObject
|
|
from cvttpy_tools.app import App
|
|
from cvttpy_tools.logger import Log
|
|
from cvttpy_tools.config import Config
|
|
from cvttpy_tools.timer import Timer
|
|
from cvttpy_tools.timeutils import NanosT, current_seconds, NanoPerSec
|
|
from cvttpy_tools.settings.cvtt_types import InstrumentIdT, IntervalSecT
|
|
|
|
# ---
|
|
from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar
|
|
from cvttpy_trading.trading.instrument import ExchangeInstrument
|
|
from cvttpy_trading.trading.accounting.exch_account import ExchangeAccountNameT
|
|
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate
|
|
from cvttpy_trading.trading.exchange_config import ExchangeAccounts
|
|
|
|
# ---
|
|
from pairs_trading.lib.live.rest_client import RESTSender
|
|
|
|
|
|
class MdSummary(HistMdBar):
|
|
def __init__(
|
|
self,
|
|
ts_ns: int,
|
|
open: float,
|
|
high: float,
|
|
low: float,
|
|
close: float,
|
|
volume: float,
|
|
vwap: float,
|
|
num_trades: int,
|
|
):
|
|
super().__init__(ts=ts_ns)
|
|
self.open_ = open
|
|
self.high_ = high
|
|
self.low_ = low
|
|
self.close_ = close
|
|
self.volume_ = volume
|
|
self.vwap_ = vwap
|
|
self.num_trades_ = num_trades
|
|
|
|
@classmethod
|
|
def from_REST_response(cls, response: requests.Response) -> List[MdSummary]:
|
|
res: List[MdSummary] = []
|
|
jresp = response.json()
|
|
hist_data = jresp.get("historical_data", [])
|
|
for hd in hist_data:
|
|
res.append(
|
|
MdSummary(
|
|
ts_ns=hd["time_ns"],
|
|
open=hd["open"],
|
|
high=hd["high"],
|
|
low=hd["low"],
|
|
close=hd["close"],
|
|
volume=hd["volume"],
|
|
vwap=hd["vwap"],
|
|
num_trades=hd["num_trades"],
|
|
)
|
|
)
|
|
return res
|
|
|
|
def create_md_trades_aggregate(
|
|
self,
|
|
exch_acct: ExchangeAccountNameT,
|
|
exch_inst: ExchangeInstrument,
|
|
interval_sec: IntervalSecT,
|
|
) -> MdTradesAggregate:
|
|
res = MdTradesAggregate(
|
|
exch_acct=exch_acct,
|
|
exch_inst=exch_inst,
|
|
interval_ns=interval_sec * NanoPerSec,
|
|
)
|
|
res.set(mdbar=self)
|
|
return res
|
|
|
|
|
|
MdSummaryCallbackT = Callable[[List[MdTradesAggregate]], Coroutine]
|
|
|
|
|
|
class MdSummaryCollector(NamedObject):
|
|
sender_: RESTSender
|
|
exch_acct_: ExchangeAccountNameT
|
|
exch_inst_: ExchangeInstrument
|
|
interval_sec_: IntervalSecT
|
|
history_depth_sec_: IntervalSecT
|
|
|
|
history_: List[MdTradesAggregate]
|
|
|
|
callbacks_: List[MdSummaryCallbackT]
|
|
timer_: Optional[Timer]
|
|
|
|
def __init__(
|
|
self,
|
|
sender: RESTSender,
|
|
exch_acct: ExchangeAccountNameT,
|
|
instrument_id: InstrumentIdT,
|
|
interval_sec: IntervalSecT,
|
|
history_depth_sec: IntervalSecT,
|
|
) -> None:
|
|
self.sender_ = sender
|
|
self.exch_acct_ = exch_acct
|
|
|
|
exch_inst = ExchangeAccounts.instance().get_exchange_instrument(
|
|
exch_acct=exch_acct, instrument_id=instrument_id
|
|
)
|
|
assert exch_inst is not None, f"Unable to find Exchange instrument for {exch_acct}/{instrument_id}"
|
|
self.exch_inst_ = exch_inst
|
|
self.interval_sec_ = interval_sec
|
|
self.history_depth_sec_ = history_depth_sec
|
|
|
|
self.history_ = []
|
|
self.callbacks_ = []
|
|
self.timer_ = None
|
|
|
|
def add_callback(self, cb: MdSummaryCallbackT) -> None:
|
|
self.callbacks_.append(cb)
|
|
|
|
def __hash__(self):
|
|
return hash(
|
|
(
|
|
self.exch_acct_,
|
|
self.exch_inst_.instrument_id(),
|
|
self.interval_sec_,
|
|
self.history_depth_sec_,
|
|
)
|
|
)
|
|
|
|
def rqst_data(self) -> Dict[str, Any]:
|
|
return {
|
|
"exch_acct": self.exch_acct_,
|
|
"instrument_id": self.exch_inst_.instrument_id(),
|
|
"interval_sec": self.interval_sec_,
|
|
"history_depth_sec": self.history_depth_sec_,
|
|
}
|
|
|
|
def get_history(self) -> List[MdSummary]:
|
|
response: requests.Response = self.sender_.send_post(
|
|
endpoint="md_summary", post_body=self.rqst_data()
|
|
)
|
|
if response.status_code not in (200, 201):
|
|
Log.error(
|
|
f"{self.fname()}: Received error: {response.status_code} - {response.text}"
|
|
)
|
|
return []
|
|
return MdSummary.from_REST_response(response=response)
|
|
|
|
def get_last(self) -> Optional[MdSummary]:
|
|
Log.info(f"{self.fname()}: for {self.exch_inst_.details_short()}")
|
|
rqst_data = self.rqst_data()
|
|
rqst_data["history_depth_sec"] = self.interval_sec_ * 2
|
|
response: requests.Response = self.sender_.send_post(
|
|
endpoint="md_summary", post_body=rqst_data
|
|
)
|
|
if response.status_code not in (200, 201):
|
|
Log.error(
|
|
f"{self.fname()}: Received error: {response.status_code} - {response.text}"
|
|
)
|
|
return None
|
|
res = MdSummary.from_REST_response(response=response)
|
|
return None if len(res) == 0 else res[-1]
|
|
|
|
def is_empty(self) -> bool:
|
|
return len(self.history_) == 0
|
|
|
|
async def start(self) -> None:
|
|
if self.timer_:
|
|
Log.error(f"{self.fname()}: Timer is already started")
|
|
return
|
|
mdsum_hist = self.get_history()
|
|
self.history_ = [
|
|
mdsum.create_md_trades_aggregate(
|
|
exch_acct=self.exch_acct_,
|
|
exch_inst=self.exch_inst_,
|
|
interval_sec=self.interval_sec_,
|
|
)
|
|
for mdsum in mdsum_hist
|
|
]
|
|
await self.run_callbacks()
|
|
self.set_timer()
|
|
|
|
def set_timer(self):
|
|
if self.timer_:
|
|
self.timer_.cancel()
|
|
start_in = self.next_load_time() - current_seconds()
|
|
self.timer_ = Timer(
|
|
start_in_sec=start_in,
|
|
func=self._load_new,
|
|
)
|
|
Log.info(f"{self.fname()} Timer for {self.exch_inst_.details_short()} is set to run in {start_in} sec")
|
|
|
|
def next_load_time(self) -> NanosT:
|
|
curr_sec = int(current_seconds())
|
|
return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + 2
|
|
|
|
async def _load_new(self) -> None:
|
|
|
|
last: Optional[MdSummary] = self.get_last()
|
|
if not last:
|
|
Log.warning(f"{self.fname()}: did not get last update")
|
|
elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].time_ns_:
|
|
Log.info(
|
|
f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}"
|
|
)
|
|
else:
|
|
self.history_.append(last.create_md_trades_aggregate(exch_acct=self.exch_acct_, exch_inst=self.exch_inst_, interval_sec=self.interval_sec_))
|
|
await self.run_callbacks()
|
|
self.set_timer()
|
|
|
|
async def run_callbacks(self) -> None:
|
|
[await cb(self.history_) for cb in self.callbacks_]
|
|
|
|
def stop(self) -> None:
|
|
if self.timer_:
|
|
self.timer_.cancel()
|
|
self.timer_ = None
|
|
|
|
|
|
class CvttRestMktDataClient(NamedObject):
|
|
config_: Config
|
|
sender_: RESTSender
|
|
collectors_: Set[MdSummaryCollector]
|
|
|
|
def __init__(self, config: Config) -> None:
|
|
self.config_ = config
|
|
base_url = self.config_.get_value("cvtt_base_url", default="")
|
|
assert base_url
|
|
self.sender_ = RESTSender(base_url=base_url)
|
|
self.collectors_ = set()
|
|
|
|
async def add_subscription(
|
|
self,
|
|
exch_acct: ExchangeAccountNameT,
|
|
instrument_id: InstrumentIdT,
|
|
interval_sec: IntervalSecT,
|
|
history_depth_sec: IntervalSecT,
|
|
callback: MdSummaryCallbackT,
|
|
) -> None:
|
|
mdsc = MdSummaryCollector(
|
|
sender=self.sender_,
|
|
exch_acct=exch_acct,
|
|
instrument_id=instrument_id,
|
|
interval_sec=interval_sec,
|
|
history_depth_sec=history_depth_sec,
|
|
)
|
|
mdsc.add_callback(callback)
|
|
self.collectors_.add(mdsc)
|
|
await mdsc.start()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
config = Config(json_src={"cvtt_base_url": "http://cvtt-tester-01.cvtt.vpn:23456"})
|
|
# config = Config(json_src={"cvtt_base_url": "http://dev-server-02.cvtt.vpn:23456"})
|
|
|
|
async def _calback(history: List[MdTradesAggregate]) -> None:
|
|
Log.info(
|
|
f"MdSummary Hist Length is {len(history)}. Last summary: {history[-1] if len(history) > 0 else '[]'}"
|
|
)
|
|
|
|
async def __run() -> None:
|
|
Log.info("Starting...")
|
|
cvtt_client = CvttRestMktDataClient(config)
|
|
await cvtt_client.add_subscription(
|
|
exch_acct="COINBASE_AT",
|
|
instrument_id="PAIR-BTC-USD",
|
|
interval_sec=60,
|
|
history_depth_sec=24 * 3600,
|
|
callback=_calback,
|
|
)
|
|
while True:
|
|
await asyncio.sleep(5)
|
|
|
|
asyncio.run(__run())
|
|
pass
|