pairs_trading/lib/client/cvtt_client.py
Oleg Sheynin 38e1621b2f progress
2025-12-19 23:04:31 +00:00

214 lines
6.2 KiB
Python

from __future__ import annotations
from typing import Dict, Any, List, Optional
import time
import requests
from cvttpy_tools.base import NamedObject
from cvttpy_tools.logger import Log
from cvttpy_tools.config import Config
from cvttpy_tools.timer import Timer
from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar
class RESTSender(NamedObject):
session_: requests.Session
base_url_: str
def __init__(self, base_url: str) -> None:
self.base_url_ = base_url
self.session_ = requests.Session()
def is_ready(self) -> bool:
"""Checks if the server is up and responding"""
url = f"{self.base_url_}/ping"
try:
response = self.session_.get(url)
response.raise_for_status()
return True
except requests.exceptions.RequestException:
return False
def send_post(self, endpoint: str, post_body: Dict) -> requests.Response:
while not self.is_ready():
print("Waiting for FrontGateway to start...")
time.sleep(5)
url = f"{self.base_url_}/{endpoint}"
try:
return self.session_.request(
method="POST",
url=url,
json=post_body,
headers={"Content-Type": "application/json"},
)
except requests.exceptions.RequestException as excpt:
raise ConnectionError(
f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore
) from excpt
def send_get(self, endpoint: str) -> requests.Response:
while not self.is_ready():
print("Waiting for FrontGateway to start...")
time.sleep(5)
url = f"{self.base_url_}/{endpoint}"
try:
return self.session_.request(method="GET", url=url)
except requests.exceptions.RequestException as excpt:
raise ConnectionError(
f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore
) from excpt
class MdSummary(HistMdBar):
def __init__(
self,
ts_ns: int,
open: float,
high: float,
low: float,
close: float,
volume: float,
vwap: float,
num_trades: int,
):
super().__init__(ts=ts_ns)
self.open_ = open
self.high_ = high
self.low_ = low
self.close_ = close
self.volume_ = volume
self.vwap_ = vwap
self.num_trades_ = num_trades
@classmethod
def from_REST_response(cls, response: requests.Response) -> List[MdSummary]:
res: List[MdSummary] = []
jresp = response.json()
hist_data = jresp.get("historical_data", [])
for hd in hist_data:
res.append(
MdSummary(
ts_ns=hd["time_ns"],
open=hd["open"],
high=hd["high"],
low=hd["low"],
close=hd["close"],
volume=hd["volume"],
vwap=hd["vwap"],
num_trades=hd["num_trades"],
)
)
return res
class MdSummaryCollector(NamedObject):
sender_: RESTSender
exch_acct_: str
instrument_id_: str
interval_sec_: int
history_depth_sec_: int
history_: List[MdSummary]
timer_: Optional[Timer]
def __init__(
self,
sender: RESTSender,
exch_acct: str,
instrument_id: str,
interval_sec: int,
history_depth_sec: int,
) -> None:
self.sender_ = sender
self.exch_acct_ = exch_acct
self.instrument_id_ = instrument_id
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
self.history_depth_sec_ = []
self.timer_ = None
def rqst_data(self) -> Dict[str, Any]:
return {
"exch_acct": self.exch_acct_,
"instrument_id": self.instrument_id_,
"interval_sec": self.interval_sec_,
"history_depth_sec": self.history_depth_sec_,
}
def get_history(self) -> List[MdSummary]:
response: requests.Response = self.sender_.send_post(
endpoint="md_summary", post_body=self.rqst_data()
)
return MdSummary.from_REST_response(response=response)
def get_last(self) -> Optional[MdSummary]:
rqst_data = self.rqst_data()
rqst_data["history_depth_sec"] = self.interval_sec_
response: requests.Response = self.sender_.send_post(
endpoint="md_summary", post_body=rqst_data
)
res = MdSummary.from_REST_response(response=response)
return None if len(res) == 0 else res[-1]
async def start(self) -> None:
if self.timer_:
Log.error(f"{self.fname()}: Timer is already started")
return
self.history_ = self.get_history()
self.timer_ = Timer(
start_in_sec=self.interval_sec_,
is_periodic=True,
period_interval=self.interval_sec_,
func=self._load_new,
)
async def _load_new(self) -> None:
last: Optional[MdSummary] = self.get_last()
if not last:
# URGENT logging
return
if last.ts_ns_ <= self.history_[-1].ts_ns_:
# URGENT logging
return
self.history_.append(last)
# URGENT implement notification
def stop(self) -> None:
if self.timer_:
self.timer_.cancel()
self.timer_ = None
class CvttRESTClient(NamedObject):
config_: Config
sender_: RESTSender
def __init__(self, config: Config) -> None:
self.config_ = config
base_url = self.config_.get_value("cvtt_base_url", default="")
assert base_url
self.sender_ = RESTSender(base_url=base_url)
if __name__ == "__main__":
config = Config(json_src={"cvtt_base_url": "http://cvtt-tester-01.cvtt.vpn:23456"})
cvtt_client = CvttRESTClient(config)
mdsc = MdSummaryCollector(
sender=cvtt_client.sender_,
exch_acct="COINBASE_AT",
instrument_id="PAIR-BTC-USD",
interval_sec=60,
history_depth_sec=24 * 3600,
)
hist = mdsc.get_history()
last = mdsc.get_last()
pass