diff --git a/VERSION b/VERSION index 7bcd0e3..6812f81 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.0.2 \ No newline at end of file +0.0.3 \ No newline at end of file diff --git a/apps/pair_trader.py b/apps/pair_trader.py index acda879..c43d2a7 100644 --- a/apps/pair_trader.py +++ b/apps/pair_trader.py @@ -145,7 +145,7 @@ class PairTrader(NamedObject): self.latest_history_[exch_inst] = history if len(self.latest_history_) == 2: from itertools import chain - all_aggrs = sorted(list(chain.from_iterable(self.latest_history_.values())), key=lambda X: X.time_ns_) + all_aggrs = sorted(list(chain.from_iterable(self.latest_history_.values())), key=lambda X: X.aggr_time_ns_) await self.live_strategy_.on_mkt_data_hist_snapshot(hist_aggr=all_aggrs) self.latest_history_ = {} diff --git a/lib/live/mkt_data_client.py b/lib/live/mkt_data_client.py index bb6eae6..99a36c3 100644 --- a/lib/live/mkt_data_client.py +++ b/lib/live/mkt_data_client.py @@ -163,6 +163,7 @@ class MdSummaryCollector(NamedObject): ) return None res = MdSummary.from_REST_response(response=response) + Log.info(f"DEBUG *** {self.exch_inst_.base_asset_id_}: {res[-1].tstamp_}") return None if len(res) == 0 else res[-1] def is_empty(self) -> bool: @@ -196,14 +197,14 @@ class MdSummaryCollector(NamedObject): def next_load_time(self) -> NanosT: curr_sec = int(current_seconds()) - return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + 2 + return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + 5 async def _load_new(self) -> None: last: Optional[MdSummary] = self.get_last() if not last: Log.warning(f"{self.fname()}: did not get last update") - elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].time_ns_: + elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].aggr_time_ns_: Log.info( f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}" ) diff --git a/lib/live/rest_client.py b/lib/live/rest_client.py index a5ed6c1..f576228 100644 --- a/lib/live/rest_client.py +++ b/lib/live/rest_client.py @@ -1,18 +1,12 @@ from __future__ import annotations -import asyncio -from typing import Callable, Dict, Any, List, Optional +from typing import Dict import time import requests from cvttpy_tools.base import NamedObject -from cvttpy_tools.logger import Log -from cvttpy_tools.config import Config -from cvttpy_tools.timer import Timer -from cvttpy_tools.timeutils import NanoPerSec, NanosT, current_nanoseconds, current_seconds -from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar class RESTSender(NamedObject): diff --git a/lib/pt_strategy/live/live_strategy.py b/lib/pt_strategy/live/live_strategy.py index 1520308..c23cde7 100644 --- a/lib/pt_strategy/live/live_strategy.py +++ b/lib/pt_strategy/live/live_strategy.py @@ -9,7 +9,7 @@ from cvttpy_tools.base import NamedObject from cvttpy_tools.app import App from cvttpy_tools.config import Config from cvttpy_tools.settings.cvtt_types import IntervalSecT -from cvttpy_tools.timeutils import SecPerHour, current_nanoseconds, NanoPerSec +from cvttpy_tools.timeutils import NanosT, SecPerHour, current_nanoseconds, NanoPerSec, format_nanos_utc from cvttpy_tools.logger import Log # --- @@ -132,16 +132,26 @@ class PtLiveStrategy(NamedObject): await self._send_trading_instructions(trading_instructions) def _is_md_actual(self, hist_aggr: List[MdTradesAggregate]) -> bool: - curr_ns = current_nanoseconds() - LAG_THRESHOLD = 5 * NanoPerSec - if len(hist_aggr) == 0: Log.warning(f"{self.fname()} list of aggregates IS EMPTY") return False + + ALLOWED_LAG_SEC = 5.0 + curr_ns = current_nanoseconds() + LAG_THRESHOLD = NanosT((self.interval_sec() + ALLOWED_LAG_SEC) * NanoPerSec) + # MAYBE check market data length - lag_ns = curr_ns - hist_aggr[-1].time_ns_ + lag_ns = curr_ns - hist_aggr[-1].aggr_time_ns_ if lag_ns > LAG_THRESHOLD: - Log.warning(f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()} Lagging {int(lag_ns/NanoPerSec)} seconds") + Log.warning( + f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()}" + f" Lagging {int(lag_ns/NanoPerSec)} seconds:" + f"\n{len(hist_aggr)} records" + f"\n{hist_aggr[-1].exch_inst_.base_asset_id_}: {hist_aggr[-1].tstamp()}" + f"\n{hist_aggr[-2].exch_inst_.base_asset_id_}: {hist_aggr[-2].tstamp()}" + # f" {hist_aggr[-1].exch_inst_.base_asset_id_}: {format_nanos_utc(hist_aggr[-1].aggr_time_ns_)}" + # f" {hist_aggr[-2].exch_inst_.base_asset_id_}: {format_nanos_utc(hist_aggr[-2].aggr_time_ns_)}" + ) return False return True @@ -163,8 +173,8 @@ class PtLiveStrategy(NamedObject): rows.append( { # convert nanoseconds → tz-aware pandas timestamp - "tstamp": pd.to_datetime(aggr.time_ns_, unit="ns", utc=True), - "time_ns": aggr.time_ns_, + "tstamp": pd.to_datetime(aggr.aggr_time_ns_, unit="ns", utc=True), + "time_ns": aggr.aggr_time_ns_, "symbol": exch_inst.instrument_id().split("-", 1)[1], "exchange_id": exch_inst.exchange_id_, "instrument_id": exch_inst.instrument_id(),