dev progress

This commit is contained in:
Oleg Sheynin 2026-01-01 01:36:31 +00:00
parent 69a0b19e9f
commit 842eb3ec62
5 changed files with 155 additions and 149 deletions

View File

@ -1,23 +1,21 @@
from __future__ import annotations
from functools import partial
from typing import Callable, Coroutine, Dict, List
from typing import Callable, Coroutine, List
from cvttpy_tools.settings.cvtt_types import JsonDictT
from cvttpy_tools.app import App
from cvttpy_tools.config import Config
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import CvttAppConfig
from cvttpy_tools.logger import Log
from cvttpy_tools.settings.cvtt_types import BookIdT
# ---
from cvttpy_trading.trading.instrument import ExchangeInstrument
from cvttpy_trading.trading.active_instruments import Instruments
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate
from cvttpy_trading.trading.exchange_config import ExchangeAccounts
# ---
from pairs_trading.lib.pt_strategy.live.live_strategy import PtLiveStrategy
from pairs_trading.lib.live.mkt_data_client import CvttRestMktDataClient, MdSummary
from pairs_trading.lib.pt_strategy.live.ti_sender import TradingInstructionsSender
from pairs_trading.lib.live.mkt_data_client import CvttRestMktDataClient
from pairs_trading.lib.live.ti_sender import TradingInstructionsSender
# import sys
# print("PYTHONPATH directories:")
@ -25,17 +23,7 @@ from pairs_trading.lib.pt_strategy.live.ti_sender import TradingInstructionsSend
# print(path)
'''
Config
=======
{
"cvtt_base_url": "http://cvtt-tester-01.cvtt.vpn:23456",
"ti_config": {
TODO
},
"strategy_config": {
TODO
}
}
config http://cloud16.cvtt.vpn/apps/pairs_trading
'''
HistMdCbT = Callable[[List[MdTradesAggregate]], Coroutine]
@ -44,6 +32,7 @@ UpdateMdCbT = Callable[[MdTradesAggregate], Coroutine]
class PairsTrader(NamedObject):
config_: CvttAppConfig
instruments_: List[ExchangeInstrument]
book_id_: BookIdT
live_strategy_: PtLiveStrategy
pricer_client_: CvttRestMktDataClient
@ -62,11 +51,18 @@ class PairsTrader(NamedObject):
),
)
App.instance().add_cmdline_arg(
"--book_id",
type=str,
required=True,
help="Book ID"
)
App.instance().add_call(App.Stage.Config, self._on_config())
App.instance().add_call(App.Stage.Run, self.run())
async def _on_config(self) -> None:
self.config_ = CvttAppConfig.instance()
self.book_id_ = App.instance().get_argument(name="book_id")
# ------- PARSE INSTRUMENTS -------
instr_str = App.instance().get_argument("pair", "")
@ -90,13 +86,6 @@ class PairsTrader(NamedObject):
Log.info(f"{self.fname()} Instruments: {self.instruments_[0].details_short()} <==> {self.instruments_[1].details_short()}")
# ------- CREATE CVTT CLIENT -------
ti_config = self.config_.get_subconfig("ti_config", Config(json_src={}))
self.ti_sender_ = TradingInstructionsSender(config=ti_config)
Log.info(f"{self.fname()} TI client created: {self.ti_sender_}")
# ------- CREATE STRATEGY -------
strategy_config = self.config_.get_subconfig("strategy_config", Config({}))
self.live_strategy_ = PtLiveStrategy(
@ -108,13 +97,11 @@ class PairsTrader(NamedObject):
# # ------- CREATE PRICER CLIENT -------
self.pricer_client_ = CvttRestMktDataClient(config=self.config_)
Log.info(f"{self.fname()} MD client created: {self.pricer_client_}")
# ------- CREATE TRADER CLIENT -------
# URGENT CREATE TRADER CLIENT
# (send TradingInstructions)
# ti_config = self.config_.get_subconfig("ti_config", {})
# self.ti_sender_ = TradingInstructionsSender(config=ti_config)
# Log.info(f"{self.fname()} TI client created: {self.ti_sender_}")
self.ti_sender_ = TradingInstructionsSender(config=self.config_, pairs_trader=self)
Log.info(f"{self.fname()} TI client created: {self.ti_sender_}")
# # ------- CREATE REST SERVER -------
@ -133,10 +120,9 @@ class PairsTrader(NamedObject):
callback=self._on_md_summary
)
async def _on_md_summary(self, history: List[MdSummary]) -> None:
# depth = len(history)
# if depth < 2:
pass # URGENT
async def _on_md_summary(self, history: List[MdTradesAggregate]) -> None:
# Snapshot or update?
await self.live_strategy_.on_mkt_data_hist_snapshot(hist_aggr=history)
async def run(self) -> None:
Log.info(f"{self.fname()} ...")

View File

@ -10,11 +10,16 @@ from cvttpy_tools.app import App
from cvttpy_tools.logger import Log
from cvttpy_tools.config import Config
from cvttpy_tools.timer import Timer
from cvttpy_tools.timeutils import NanosT, current_seconds
from cvttpy_tools.timeutils import NanosT, current_seconds, NanoPerSec
from cvttpy_tools.settings.cvtt_types import InstrumentIdT, IntervalSecT
# ---
from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar
from cvttpy_trading.trading.instrument import ExchangeInstrument
from cvttpy_trading.trading.accounting.exch_account import ExchangeAccountNameT
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate
from cvttpy_trading.trading.exchange_config import ExchangeAccounts
# ---
from pairs_trading.lib.live.rest_client import RESTSender
@ -60,16 +65,33 @@ class MdSummary(HistMdBar):
)
return res
MdSummaryCallbackT = Callable[[List[MdSummary]], Coroutine]
def create_md_trades_aggregate(
self,
exch_acct: ExchangeAccountNameT,
exch_inst: ExchangeInstrument,
interval_sec: IntervalSecT,
) -> MdTradesAggregate:
res = MdTradesAggregate(
exch_acct=exch_acct,
exch_inst=exch_inst,
interval_ns=interval_sec * NanoPerSec,
)
res.set(mdbar=self)
return res
MdSummaryCallbackT = Callable[[List[MdTradesAggregate]], Coroutine]
class MdSummaryCollector(NamedObject):
sender_: RESTSender
exch_acct_: ExchangeAccountNameT
instrument_id_: InstrumentIdT
exch_inst_: ExchangeInstrument
interval_sec_: IntervalSecT
history_depth_sec_: IntervalSecT
history_: List[MdSummary]
history_: List[MdTradesAggregate]
callbacks_: List[MdSummaryCallbackT]
timer_: Optional[Timer]
@ -83,7 +105,12 @@ class MdSummaryCollector(NamedObject):
) -> None:
self.sender_ = sender
self.exch_acct_ = exch_acct
self.instrument_id_ = instrument_id
exch_inst = ExchangeAccounts.instance().get_exchange_instrument(
exch_acct=exch_acct, instrument_id=instrument_id
)
assert exch_inst is not None, f"Unable to find Exchange instrument for {exch_acct}/{instrument_id}"
self.exch_inst_ = exch_inst
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
@ -95,12 +122,19 @@ class MdSummaryCollector(NamedObject):
self.callbacks_.append(cb)
def __hash__(self):
return hash((self.exch_acct_, self.instrument_id_, self.interval_sec_, self.history_depth_sec_))
return hash(
(
self.exch_acct_,
self.exch_inst_.instrument_id(),
self.interval_sec_,
self.history_depth_sec_,
)
)
def rqst_data(self) -> Dict[str, Any]:
return {
"exch_acct": self.exch_acct_,
"instrument_id": self.instrument_id_,
"instrument_id": self.exch_inst_.instrument_id(),
"interval_sec": self.interval_sec_,
"history_depth_sec": self.history_depth_sec_,
}
@ -110,7 +144,9 @@ class MdSummaryCollector(NamedObject):
endpoint="md_summary", post_body=self.rqst_data()
)
if response.status_code not in (200, 201):
Log.error(f"{self.fname()}: Received error: {response.status_code} - {response.text}")
Log.error(
f"{self.fname()}: Received error: {response.status_code} - {response.text}"
)
return []
return MdSummary.from_REST_response(response=response)
@ -121,7 +157,9 @@ class MdSummaryCollector(NamedObject):
endpoint="md_summary", post_body=rqst_data
)
if response.status_code not in (200, 201):
Log.error(f"{self.fname()}: Received error: {response.status_code} - {response.text}")
Log.error(
f"{self.fname()}: Received error: {response.status_code} - {response.text}"
)
return None
res = MdSummary.from_REST_response(response=response)
return None if len(res) == 0 else res[-1]
@ -133,7 +171,15 @@ class MdSummaryCollector(NamedObject):
if self.timer_:
Log.error(f"{self.fname()}: Timer is already started")
return
self.history_ = self.get_history()
mdsum_hist = self.get_history()
self.history_ = [
mdsum.create_md_trades_aggregate(
exch_acct=self.exch_acct_,
exch_inst=self.exch_inst_,
interval_sec=self.interval_sec_,
)
for mdsum in mdsum_hist
]
await self.run_callbacks()
self.set_timer()
@ -154,10 +200,12 @@ class MdSummaryCollector(NamedObject):
last: Optional[MdSummary] = self.get_last()
if not last:
Log.warning(f"{self.fname()}: did not get last update")
elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].ts_ns_:
Log.info(f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}")
elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].time_ns_:
Log.info(
f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}"
)
else:
self.history_.append(last)
self.history_.append(last.create_md_trades_aggregate(exch_acct=self.exch_acct_, exch_inst=self.exch_inst_, interval_sec=self.interval_sec_))
await self.run_callbacks()
self.set_timer()
@ -169,6 +217,7 @@ class MdSummaryCollector(NamedObject):
self.timer_.cancel()
self.timer_ = None
class CvttRestMktDataClient(NamedObject):
config_: Config
sender_: RESTSender
@ -181,12 +230,13 @@ class CvttRestMktDataClient(NamedObject):
self.sender_ = RESTSender(base_url=base_url)
self.collectors_ = set()
async def add_subscription(self,
async def add_subscription(
self,
exch_acct: ExchangeAccountNameT,
instrument_id: InstrumentIdT,
interval_sec: IntervalSecT,
history_depth_sec: IntervalSecT,
callback: MdSummaryCallbackT
callback: MdSummaryCallbackT,
) -> None:
mdsc = MdSummaryCollector(
sender=self.sender_,
@ -199,13 +249,15 @@ class CvttRestMktDataClient(NamedObject):
self.collectors_.add(mdsc)
await mdsc.start()
if __name__ == "__main__":
config = Config(json_src={"cvtt_base_url": "http://cvtt-tester-01.cvtt.vpn:23456"})
# config = Config(json_src={"cvtt_base_url": "http://dev-server-02.cvtt.vpn:23456"})
async def _calback(history: List[MdSummary]) -> None:
Log.info(f"MdSummary Hist Length is {len(history)}. Last summary: {history[-1] if len(history) > 0 else '[]'}")
async def _calback(history: List[MdTradesAggregate]) -> None:
Log.info(
f"MdSummary Hist Length is {len(history)}. Last summary: {history[-1] if len(history) > 0 else '[]'}"
)
async def __run() -> None:
Log.info("Starting...")
@ -215,7 +267,7 @@ if __name__ == "__main__":
instrument_id="PAIR-BTC-USD",
interval_sec=60,
history_depth_sec=24 * 3600,
callback=_calback
callback=_calback,
)
while True:
await asyncio.sleep(5)

56
lib/live/ti_sender.py Normal file
View File

@ -0,0 +1,56 @@
from enum import Enum
import requests
# import aiohttp
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import Config
from cvttpy_tools.logger import Log
# ---
from cvttpy_trading.trading.trading_instructions import TradingInstructions
# ---
from pairs_trading.lib.live.rest_client import RESTSender
from pairs_trading.apps.pairs_trader import PairsTrader
class TradingInstructionsSender(NamedObject):
config_: Config
sender_: RESTSender
pairs_trader_: PairsTrader
class TradingInstType(str, Enum):
TARGET_POSITION = "TARGET_POSITION"
DIRECT_ORDER = "DIRECT_ORDER"
MARKET_MAKING = "MARKET_MAKING"
NONE = "NONE"
# config_: Config
# ti_method_: str
# ti_url_: str
# health_check_method_: str
# health_check_url_: str
def __init__(self, config: Config, pairs_trader: PairsTrader) -> None:
self.config_ = config
base_url = self.config_.get_value("cvtt_base_url", default="")
assert base_url
self.sender_ = RESTSender(base_url=base_url)
self.pairs_trader_ = pairs_trader
self.book_id_ = self.pairs_trader_.book_id_
assert self.book_id_, "book_id is required"
self.strategy_id_ = config.get_value("strategy_id", "")
assert self.strategy_id_, "strategy_id is required"
async def send_trading_instructions(self, ti: TradingInstructions) -> None:
response: requests.Response = self.sender_.send_post(
endpoint="trading_instructions", post_body=ti.to_dict()
)
if response.status_code not in (200, 201):
Log.error(
f"{self.fname()}: Received error: {response.status_code} - {response.text}"
)

View File

@ -120,7 +120,6 @@ class PtLiveStrategy(NamedObject):
# await self.pt_mkt_data_.on_mkt_data_hist_snapshot(snapshot=aggr)
pass # URGENT PtiveStrategy.on_mkt_data_hist_snapshot()
async def on_mkt_data_update(self, aggr: MdTradesAggregate) -> None:
# if market_data_df is not None:
# self.trading_pair_.market_data_ = market_data_df
# self.model_data_policy_.advance()
@ -139,7 +138,6 @@ class PtLiveStrategy(NamedObject):
# if len(trading_instructions) > 0:
# await self._send_trading_instructions(trading_instructions)
# # trades = self._create_trades(prediction=prediction, last_row=market_data_df.iloc[-1])
pass # URGENT
def interval_sec(self) -> IntervalSecT:
return self.interval_sec_

View File

@ -1,86 +0,0 @@
import time
from enum import Enum
from typing import Tuple
# import aiohttp
from cvttpy_tools.app import App
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import Config
from cvttpy_tools.logger import Log
from cvttpy_tools.timer import Timer
from cvttpy_tools.timeutils import NanoPerSec
from cvttpy_tools.web.rest_client import REST_RequestProcessor
class TradingInstructionsSender(NamedObject):
class TradingInstType(str, Enum):
TARGET_POSITION = "TARGET_POSITION"
DIRECT_ORDER = "DIRECT_ORDER"
MARKET_MAKING = "MARKET_MAKING"
NONE = "NONE"
config_: Config
ti_method_: str
ti_url_: str
health_check_method_: str
health_check_url_: str
def __init__(self, config: Config):
self.config_ = config
base_url = config.get_value("url", "ws://localhost:12346/ws")
self.book_id_ = config.get_value("book_id", "")
assert self.book_id_, "book_id is required"
self.strategy_id_ = config.get_value("strategy_id", "")
assert self.strategy_id_, "strategy_id is required"
endpoint_uri = config.get_value("ti_endpoint/url", "/trading_instructions")
endpoint_method = config.get_value("ti_endpoint/method", "POST")
health_check_uri = config.get_value("health_check_endpoint/url", "/ping")
health_check_method = config.get_value("health_check_endpoint/method", "GET")
self.ti_method_ = endpoint_method
self.ti_url_ = f"{base_url}{endpoint_uri}"
self.health_check_method_ = health_check_method
self.health_check_url_ = f"{base_url}{health_check_uri}"
App.instance().add_call(App.Stage.Start, self._set_health_check_timer(), can_run_now=True)
async def _set_health_check_timer(self) -> None:
# TODO: configurable interval
self.health_check_timer_ = Timer(is_periodic=True, period_interval=15, start_in_sec=0, func=self._health_check)
Log.info(f"{self.fname()} Health check timer set to 15 seconds")
async def _health_check(self) -> None:
rqst = REST_RequestProcessor(method=self.health_check_method_, url=self.health_check_url_)
async with rqst as (status, msg, headers):
if status != 200:
Log.error(f"{self.fname()} CVTT Service is not responding")
async def send_tgt_positions(self, strength: float, base_asset: str, quote_asset: str) -> Tuple[int, str]:
instr = {
"type": self.TradingInstType.TARGET_POSITION.value,
"book_id": self.book_id_,
"strategy_id": self.strategy_id_,
"issued_ts_ns": int(time.time() * NanoPerSec),
"data": {
"strength": strength,
"base_asset": base_asset,
"quote_asset": quote_asset,
"user_data": {},
},
}
rqst = REST_RequestProcessor(method=self.ti_method_, url=self.ti_url_, params=instr)
async with rqst as (status, msg, headers):
if status != 200:
raise ConnectionError(f"Failed to send trading instructions: {msg}")
return (status, msg)