progress 0.0.3

This commit is contained in:
Oleg Sheynin 2026-01-22 23:52:17 +00:00
parent 170e48d646
commit d40bb7a1b4
5 changed files with 24 additions and 19 deletions

View File

@ -1 +1 @@
0.0.2 0.0.3

View File

@ -145,7 +145,7 @@ class PairTrader(NamedObject):
self.latest_history_[exch_inst] = history self.latest_history_[exch_inst] = history
if len(self.latest_history_) == 2: if len(self.latest_history_) == 2:
from itertools import chain from itertools import chain
all_aggrs = sorted(list(chain.from_iterable(self.latest_history_.values())), key=lambda X: X.time_ns_) all_aggrs = sorted(list(chain.from_iterable(self.latest_history_.values())), key=lambda X: X.aggr_time_ns_)
await self.live_strategy_.on_mkt_data_hist_snapshot(hist_aggr=all_aggrs) await self.live_strategy_.on_mkt_data_hist_snapshot(hist_aggr=all_aggrs)
self.latest_history_ = {} self.latest_history_ = {}

View File

@ -163,6 +163,7 @@ class MdSummaryCollector(NamedObject):
) )
return None return None
res = MdSummary.from_REST_response(response=response) res = MdSummary.from_REST_response(response=response)
Log.info(f"DEBUG *** {self.exch_inst_.base_asset_id_}: {res[-1].tstamp_}")
return None if len(res) == 0 else res[-1] return None if len(res) == 0 else res[-1]
def is_empty(self) -> bool: def is_empty(self) -> bool:
@ -196,14 +197,14 @@ class MdSummaryCollector(NamedObject):
def next_load_time(self) -> NanosT: def next_load_time(self) -> NanosT:
curr_sec = int(current_seconds()) curr_sec = int(current_seconds())
return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + 2 return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + 5
async def _load_new(self) -> None: async def _load_new(self) -> None:
last: Optional[MdSummary] = self.get_last() last: Optional[MdSummary] = self.get_last()
if not last: if not last:
Log.warning(f"{self.fname()}: did not get last update") Log.warning(f"{self.fname()}: did not get last update")
elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].time_ns_: elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].aggr_time_ns_:
Log.info( Log.info(
f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}" f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}"
) )

View File

@ -1,18 +1,12 @@
from __future__ import annotations from __future__ import annotations
import asyncio from typing import Dict
from typing import Callable, Dict, Any, List, Optional
import time import time
import requests import requests
from cvttpy_tools.base import NamedObject from cvttpy_tools.base import NamedObject
from cvttpy_tools.logger import Log
from cvttpy_tools.config import Config
from cvttpy_tools.timer import Timer
from cvttpy_tools.timeutils import NanoPerSec, NanosT, current_nanoseconds, current_seconds
from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar
class RESTSender(NamedObject): class RESTSender(NamedObject):

View File

@ -9,7 +9,7 @@ from cvttpy_tools.base import NamedObject
from cvttpy_tools.app import App from cvttpy_tools.app import App
from cvttpy_tools.config import Config from cvttpy_tools.config import Config
from cvttpy_tools.settings.cvtt_types import IntervalSecT from cvttpy_tools.settings.cvtt_types import IntervalSecT
from cvttpy_tools.timeutils import SecPerHour, current_nanoseconds, NanoPerSec from cvttpy_tools.timeutils import NanosT, SecPerHour, current_nanoseconds, NanoPerSec, format_nanos_utc
from cvttpy_tools.logger import Log from cvttpy_tools.logger import Log
# --- # ---
@ -132,16 +132,26 @@ class PtLiveStrategy(NamedObject):
await self._send_trading_instructions(trading_instructions) await self._send_trading_instructions(trading_instructions)
def _is_md_actual(self, hist_aggr: List[MdTradesAggregate]) -> bool: def _is_md_actual(self, hist_aggr: List[MdTradesAggregate]) -> bool:
curr_ns = current_nanoseconds()
LAG_THRESHOLD = 5 * NanoPerSec
if len(hist_aggr) == 0: if len(hist_aggr) == 0:
Log.warning(f"{self.fname()} list of aggregates IS EMPTY") Log.warning(f"{self.fname()} list of aggregates IS EMPTY")
return False return False
ALLOWED_LAG_SEC = 5.0
curr_ns = current_nanoseconds()
LAG_THRESHOLD = NanosT((self.interval_sec() + ALLOWED_LAG_SEC) * NanoPerSec)
# MAYBE check market data length # MAYBE check market data length
lag_ns = curr_ns - hist_aggr[-1].time_ns_ lag_ns = curr_ns - hist_aggr[-1].aggr_time_ns_
if lag_ns > LAG_THRESHOLD: if lag_ns > LAG_THRESHOLD:
Log.warning(f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()} Lagging {int(lag_ns/NanoPerSec)} seconds") Log.warning(
f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()}"
f" Lagging {int(lag_ns/NanoPerSec)} seconds:"
f"\n{len(hist_aggr)} records"
f"\n{hist_aggr[-1].exch_inst_.base_asset_id_}: {hist_aggr[-1].tstamp()}"
f"\n{hist_aggr[-2].exch_inst_.base_asset_id_}: {hist_aggr[-2].tstamp()}"
# f" {hist_aggr[-1].exch_inst_.base_asset_id_}: {format_nanos_utc(hist_aggr[-1].aggr_time_ns_)}"
# f" {hist_aggr[-2].exch_inst_.base_asset_id_}: {format_nanos_utc(hist_aggr[-2].aggr_time_ns_)}"
)
return False return False
return True return True
@ -163,8 +173,8 @@ class PtLiveStrategy(NamedObject):
rows.append( rows.append(
{ {
# convert nanoseconds → tz-aware pandas timestamp # convert nanoseconds → tz-aware pandas timestamp
"tstamp": pd.to_datetime(aggr.time_ns_, unit="ns", utc=True), "tstamp": pd.to_datetime(aggr.aggr_time_ns_, unit="ns", utc=True),
"time_ns": aggr.time_ns_, "time_ns": aggr.aggr_time_ns_,
"symbol": exch_inst.instrument_id().split("-", 1)[1], "symbol": exch_inst.instrument_id().split("-", 1)[1],
"exchange_id": exch_inst.exchange_id_, "exchange_id": exch_inst.exchange_id_,
"instrument_id": exch_inst.instrument_id(), "instrument_id": exch_inst.instrument_id(),