pairs_trading/lib/live/rest_client.py.md
2026-01-26 21:46:50 +00:00

61 lines
1.9 KiB
Markdown

```python
from __future__ import annotations
from typing import Dict
import time
import requests
from cvttpy_tools.base import NamedObject
class RESTSender(NamedObject):
session_: requests.Session
base_url_: str
def __init__(self, base_url: str) -> None:
self.base_url_ = base_url
self.session_ = requests.Session()
def is_ready(self) -> bool:
"""Checks if the server is up and responding"""
url = f"{self.base_url_}/ping"
try:
response = self.session_.get(url)
response.raise_for_status()
return True
except requests.exceptions.RequestException:
return False
def send_post(self, endpoint: str, post_body: Dict) -> requests.Response:
while not self.is_ready():
print("Waiting for FrontGateway to start...")
time.sleep(5)
url = f"{self.base_url_}/{endpoint}"
try:
return self.session_.request(
method="POST",
url=url,
json=post_body,
headers={"Content-Type": "application/json"},
)
except requests.exceptions.RequestException as excpt:
raise ConnectionError(
f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore
) from excpt
def send_get(self, endpoint: str) -> requests.Response:
while not self.is_ready():
print("Waiting for FrontGateway to start...")
time.sleep(5)
url = f"{self.base_url_}/{endpoint}"
try:
return self.session_.request(method="GET", url=url)
except requests.exceptions.RequestException as excpt:
raise ConnectionError(
f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore
) from excpt
```