diff --git a/.vscode/launch.json b/.vscode/launch.json index f18f6af..2693199 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -30,9 +30,8 @@ "env": { "PYTHONPATH": "${workspaceFolder}/..", "CONFIG_SERVICE": "cloud16.cvtt.vpn:6789", - "MODEL_CONFIG": "vecm", "CVTT_URL": "http://cvtt-tester-01.cvtt.vpn:23456", - // "CVTT_URL": "http://dev-server-02.cvtt.vpn:23456", + "MODEL_CONFIG": "vecm" }, "args": [ // "--config=${workspaceFolder}/configuration/pair_trader.cfg", @@ -42,6 +41,25 @@ "--instrument_B=COINBASE_AT:PAIR-SOL-USD", ], }, + { + "name": "PAIR SELECTOR", + "type": "debugpy", + "request": "launch", + "python": "/home/oleg/.pyenv/python3.12-venv/bin/python", + "program": "${workspaceFolder}/apps/pair_selector.py", + "console": "integratedTerminal", + "env": { + "PYTHONPATH": "${workspaceFolder}/..", + "CONFIG_SERVICE": "cloud16.cvtt.vpn:6789", + // "CVTT_URL": "http://cvtt-tester-01.cvtt.vpn:23456", + "CVTT_URL": "http://dev-server-02.cvtt.vpn:23456", + "PAIR_SELECTOR_REST_PORT": "44320" + }, + "args": [ + // "--config=${workspaceFolder}/configuration/pair_trader.cfg", + "--config=http://cloud16.cvtt.vpn:6789/apps/pairs_trading/pair_selector", + ], + }, { "name": "-------- VECM --------", }, diff --git a/VERSION b/VERSION index 05b19b1..fa3de58 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.0.4 \ No newline at end of file +0.0.5 \ No newline at end of file diff --git a/apps/pair_selector.py b/apps/pair_selector.py new file mode 100644 index 0000000..20c6c0d --- /dev/null +++ b/apps/pair_selector.py @@ -0,0 +1,223 @@ +from __future__ import annotations + +import asyncio +from typing import Any, Dict, List + +from aiohttp import web + +from cvttpy_tools.app import App +from cvttpy_tools.base import NamedObject +from cvttpy_tools.config import CvttAppConfig +from cvttpy_tools.logger import Log +from cvttpy_tools.web.rest_service import RestService +from cvttpy_trading.trading.exchange_config import ExchangeAccounts +from cvttpy_trading.trading.instrument import ExchangeInstrument + +from pairs_trading.lib.pair_selector_engine import PairSelectionEngine + + +class PairSelector(NamedObject): + instruments_: List[ExchangeInstrument] + engine_: PairSelectionEngine + rest_service_: RestService + + def __init__(self) -> None: + App.instance().add_cmdline_arg("--oneshot", action="store_true", default=False) + App.instance().add_call(App.Stage.Config, self._on_config()) + App.instance().add_call(App.Stage.Run, self.run()) + + async def _on_config(self) -> None: + cfg = CvttAppConfig.instance() + self.instruments_ = self._load_instruments(cfg) + price_field = cfg.get_value("model/stat_model_price", "close") + + self.engine_ = PairSelectionEngine( + config=cfg, + instruments=self.instruments_, + price_field=price_field, + ) + + self.rest_service_ = RestService(config_key="/api/REST") + self.rest_service_.add_handler("GET", "/data_quality", self._on_data_quality) + self.rest_service_.add_handler("GET", "/pair_selection", self._on_pair_selection) + + def _load_instruments(self, cfg: CvttAppConfig) -> List[ExchangeInstrument]: + instruments_cfg = cfg.get_value("instruments", []) + instruments: List[ExchangeInstrument] = [] + assert len(instruments_cfg) >= 2, "at least two instruments required" + for item in instruments_cfg: + if isinstance(item, str): + parts = item.split(":", 1) + if len(parts) != 2: + raise ValueError(f"invalid instrument format: {item}") + exch_acct, instrument_id = parts + elif isinstance(item, dict): + exch_acct = item.get("exch_acct", "") + instrument_id = item.get("instrument_id", "") + if not exch_acct or not instrument_id: + raise ValueError(f"invalid instrument config: {item}") + else: + raise ValueError(f"unsupported instrument entry: {item}") + + exch_inst = ExchangeAccounts.instance().get_exchange_instrument( + exch_acct=exch_acct, instrument_id=instrument_id + ) + assert exch_inst is not None, f"no ExchangeInstrument for {exch_acct}:{instrument_id}" + exch_inst.user_data_["exch_acct"] = exch_acct + instruments.append(exch_inst) + return instruments + + async def run(self) -> None: + oneshot = App.instance().get_argument("oneshot", False) + while True: + await self.engine_.run_once() + if oneshot: + break + sleep_for = self.engine_.sleep_seconds_until_next_cycle() + await asyncio.sleep(sleep_for) + + async def _on_data_quality(self, request: web.Request) -> web.Response: + fmt = request.query.get("format", "html").lower() + quality = self.engine_.quality_dicts() + if fmt == "json": + return web.json_response(quality) + return web.Response(text=self._render_quality_html(quality), content_type="text/html") + + async def _on_pair_selection(self, request: web.Request) -> web.Response: + fmt = request.query.get("format", "html").lower() + pairs = self.engine_.pair_dicts() + if fmt == "json": + return web.json_response(pairs) + return web.Response(text=self._render_pairs_html(pairs), content_type="text/html") + + def _render_quality_html(self, quality: List[Dict[str, Any]]) -> str: + rows = "".join( + f"" + f"{q.get('instrument','')}" + f"{q.get('record_count','')}" + f"{q.get('latest_tstamp','')}" + f"{q.get('status','')}" + f"{q.get('reason','')}" + f"" + for q in sorted(quality, key=lambda x: str(x.get("instrument", ""))) + ) + return f""" + + + + + Data Quality + + + +

Data Quality

+ + + + + {rows} +
InstrumentRecordsLatestStatusReason
+ + +""" + + def _render_pairs_html(self, pairs: List[Dict[str, Any]]) -> str: + if not pairs: + body = "

No pairs available. Check data quality and try again.

" + else: + body_rows = [] + for p in pairs: + body_rows.append( + "" + f"{p.get('instrument_a','')}" + f"{p.get('instrument_b','')}" + f"{p.get('rank_eg','')}" + f"{p.get('rank_adf','')}" + f"{p.get('rank_j','')}" + f"{p.get('pvalue_eg','')}" + f"{p.get('pvalue_adf','')}" + f"{p.get('pvalue_j','')}" + "" + ) + body = "\n".join(body_rows) + + return f""" + + + + + Pair Selection + + + +

Pair Selection

+ + + + + + + + + + + + + + + {body} + +
Instrument AInstrument BRank-EGRank-ADFRank-JEG p-valueADF p-valueJohansen pseudo p
+ + + +""" + + +if __name__ == "__main__": + App() + CvttAppConfig() + PairSelector() + App.instance().run() diff --git a/lib/live/mkt_data_client.py b/lib/live/mkt_data_client.py index b11a6f4..91613d0 100644 --- a/lib/live/mkt_data_client.py +++ b/lib/live/mkt_data_client.py @@ -1,86 +1,82 @@ from __future__ import annotations import asyncio -from typing import Callable, Coroutine, Dict, Any, List, Optional, Set +from typing import Dict, Any, List, Optional, Set import requests from cvttpy_tools.base import NamedObject -from cvttpy_tools.app import App from cvttpy_tools.logger import Log from cvttpy_tools.config import Config from cvttpy_tools.timer import Timer -from cvttpy_tools.timeutils import NanosT, current_seconds, NanoPerSec +from cvttpy_tools.timeutils import NanosT, current_seconds from cvttpy_tools.settings.cvtt_types import InstrumentIdT, IntervalSecT - +from cvttpy_tools.web.rest_client import RESTSender # --- -from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar from cvttpy_trading.trading.instrument import ExchangeInstrument from cvttpy_trading.trading.accounting.exch_account import ExchangeAccountNameT -from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate +from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate, MdSummary, MdSummaryCallbackT from cvttpy_trading.trading.exchange_config import ExchangeAccounts - # --- -from pairs_trading.lib.live.rest_client import RESTSender -class MdSummary(HistMdBar): - def __init__( - self, - ts_ns: int, - open: float, - high: float, - low: float, - close: float, - volume: float, - vwap: float, - num_trades: int, - ): - super().__init__(ts=ts_ns) - self.open_ = open - self.high_ = high - self.low_ = low - self.close_ = close - self.volume_ = volume - self.vwap_ = vwap - self.num_trades_ = num_trades +# class MdSummary(HistMdBar): +# def __init__( +# self, +# ts_ns: int, +# open: float, +# high: float, +# low: float, +# close: float, +# volume: float, +# vwap: float, +# num_trades: int, +# ): +# super().__init__(ts=ts_ns) +# self.open_ = open +# self.high_ = high +# self.low_ = low +# self.close_ = close +# self.volume_ = volume +# self.vwap_ = vwap +# self.num_trades_ = num_trades - @classmethod - def from_REST_response(cls, response: requests.Response) -> List[MdSummary]: - res: List[MdSummary] = [] - jresp = response.json() - hist_data = jresp.get("historical_data", []) - for hd in hist_data: - res.append( - MdSummary( - ts_ns=hd["time_ns"], - open=hd["open"], - high=hd["high"], - low=hd["low"], - close=hd["close"], - volume=hd["volume"], - vwap=hd["vwap"], - num_trades=hd["num_trades"], - ) - ) - return res +# @classmethod +# def from_REST_response(cls, response: requests.Response) -> List[MdSummary]: +# res: List[MdSummary] = [] +# jresp = response.json() +# hist_data = jresp.get("historical_data", []) +# for hd in hist_data: +# res.append( +# MdSummary( +# ts_ns=hd["time_ns"], +# open=hd["open"], +# high=hd["high"], +# low=hd["low"], +# close=hd["close"], +# volume=hd["volume"], +# vwap=hd["vwap"], +# num_trades=hd["num_trades"], +# ) +# ) +# return res - def create_md_trades_aggregate( - self, - exch_acct: ExchangeAccountNameT, - exch_inst: ExchangeInstrument, - interval_sec: IntervalSecT, - ) -> MdTradesAggregate: - res = MdTradesAggregate( - exch_acct=exch_acct, - exch_inst=exch_inst, - interval_ns=interval_sec * NanoPerSec, - ) - res.set(mdbar=self) - return res +# def create_md_trades_aggregate( +# self, +# exch_acct: ExchangeAccountNameT, +# exch_inst: ExchangeInstrument, +# interval_sec: IntervalSecT, +# ) -> MdTradesAggregate: +# res = MdTradesAggregate( +# exch_acct=exch_acct, +# exch_inst=exch_inst, +# interval_ns=interval_sec * NanoPerSec, +# ) +# res.set(mdbar=self) +# return res -MdSummaryCallbackT = Callable[[List[MdTradesAggregate]], Coroutine] +# MdSummaryCallbackT = Callable[[List[MdTradesAggregate]], Coroutine] class MdSummaryCollector(NamedObject): diff --git a/lib/live/rest_client.py b/lib/live/rest_client.py.md similarity index 99% rename from lib/live/rest_client.py rename to lib/live/rest_client.py.md index f576228..fa84bd0 100644 --- a/lib/live/rest_client.py +++ b/lib/live/rest_client.py.md @@ -1,3 +1,4 @@ +```python from __future__ import annotations from typing import Dict @@ -7,8 +8,6 @@ import requests from cvttpy_tools.base import NamedObject - - class RESTSender(NamedObject): session_: requests.Session base_url_: str @@ -58,4 +57,4 @@ class RESTSender(NamedObject): raise ConnectionError( f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore ) from excpt - +``` diff --git a/lib/live/ti_sender.py b/lib/live/ti_sender.py index 6501bc8..d820973 100644 --- a/lib/live/ti_sender.py +++ b/lib/live/ti_sender.py @@ -6,10 +6,10 @@ import requests from cvttpy_tools.base import NamedObject from cvttpy_tools.config import Config from cvttpy_tools.logger import Log +from cvttpy_tools.web.rest_client import RESTSender # --- from cvttpy_trading.trading.trading_instructions import TradingInstructions # --- -from pairs_trading.lib.live.rest_client import RESTSender from pairs_trading.apps.pair_trader import PairTrader diff --git a/lib/pair_selector_engine.py b/lib/pair_selector_engine.py new file mode 100644 index 0000000..4848426 --- /dev/null +++ b/lib/pair_selector_engine.py @@ -0,0 +1,391 @@ +from __future__ import annotations +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Tuple + +import numpy as np +import pandas as pd +from statsmodels.tsa.stattools import adfuller, coint +from statsmodels.tsa.vector_ar.vecm import coint_johansen +# --- +from cvttpy_tools.base import NamedObject +from cvttpy_tools.config import Config +from cvttpy_tools.logger import Log +from cvttpy_tools.timeutils import NanoPerSec, SecPerHour, current_nanoseconds +from cvttpy_tools.web.rest_client import RESTSender +# --- +from cvttpy_trading.trading.instrument import ExchangeInstrument +from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate, MdSummary + + +@dataclass +class InstrumentQuality(NamedObject): + instrument_: ExchangeInstrument + record_count_: int + latest_tstamp_: Optional[pd.Timestamp] + status_: str + reason_: str + + +@dataclass +class PairStats(NamedObject): + instrument_a_: ExchangeInstrument + instrument_b_: ExchangeInstrument + pvalue_eg_: Optional[float] + pvalue_adf_: Optional[float] + pvalue_j_: Optional[float] + trace_stat_j_: Optional[float] + rank_eg_: int = 0 + rank_adf_: int = 0 + rank_j_: int = 0 + composite_rank_: int = 0 + + def as_dict(self) -> Dict[str, Any]: + return { + "instrument_a": self.instrument_a_.instrument_id(), + "instrument_b": self.instrument_b_.instrument_id(), + "pvalue_eg": self.pvalue_eg_, + "pvalue_adf": self.pvalue_adf_, + "pvalue_j": self.pvalue_j_, + "trace_stat_j": self.trace_stat_j_, + "rank_eg": self.rank_eg_, + "rank_adf": self.rank_adf_, + "rank_j": self.rank_j_, + "composite_rank": self.composite_rank_, + } + + +class DataFetcher(NamedObject): + sender_: RESTSender + interval_sec_: int + history_depth_sec_: int + + def __init__( + self, + base_url: str, + interval_sec: int, + history_depth_sec: int, + ) -> None: + self.sender_ = RESTSender(base_url=base_url) + self.interval_sec_ = interval_sec + self.history_depth_sec_ = history_depth_sec + + def fetch(self, exch_acct: str, inst: ExchangeInstrument) -> List[MdTradesAggregate]: + rqst_data = { + "exch_acct": exch_acct, + "instrument_id": inst.instrument_id(), + "interval_sec": self.interval_sec_, + "history_depth_sec": self.history_depth_sec_, + } + response = self.sender_.send_post(endpoint="md_summary", post_body=rqst_data) + if response.status_code not in (200, 201): + Log.error( + f"{self.fname()}: error {response.status_code} for {inst.details_short()}: {response.text}") + return [] + mdsums: List[MdSummary] = MdSummary.from_REST_response(response=response) + return [ + mdsum.create_md_trades_aggregate( + exch_acct=exch_acct, exch_inst=inst, interval_sec=self.interval_sec_ + ) + for mdsum in mdsums + ] + + +class QualityChecker(NamedObject): + interval_sec_: int + + def __init__(self, interval_sec: int) -> None: + self.interval_sec_ = interval_sec + + def evaluate(self, inst: ExchangeInstrument, aggr: List[MdTradesAggregate]) -> InstrumentQuality: + if len(aggr) == 0: + return InstrumentQuality( + instrument_=inst, + record_count_=0, + latest_tstamp_=None, + status_="FAIL", + reason_="no records", + ) + + aggr_sorted = sorted(aggr, key=lambda a: a.aggr_time_ns_) + + latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True) + now_ts = pd.Timestamp.utcnow() + recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_) + if latest_ts <= recency_cutoff: + return InstrumentQuality( + instrument_=inst, + record_count_=len(aggr_sorted), + latest_tstamp_=latest_ts, + status_="FAIL", + reason_=f"stale: latest {latest_ts} <= cutoff {recency_cutoff}", + ) + + gaps_ok, reason = self._check_gaps(aggr_sorted) + status = "PASS" if gaps_ok else "FAIL" + return InstrumentQuality( + instrument_=inst, + record_count_=len(aggr_sorted), + latest_tstamp_=latest_ts, + status_=status, + reason_=reason, + ) + + def _check_gaps(self, aggr: List[MdTradesAggregate]) -> Tuple[bool, str]: + NUM_TRADES_THRESHOLD = 50 + if len(aggr) < 2: + return True, "ok" + + interval_ns = self.interval_sec_ * NanoPerSec + for idx in range(1, len(aggr)): + prev = aggr[idx - 1] + curr = aggr[idx] + delta = curr.aggr_time_ns_ - prev.aggr_time_ns_ + missing_intervals = int(delta // interval_ns) - 1 + if missing_intervals <= 0: + continue + + prev_nt = prev.num_trades_ + next_nt = curr.num_trades_ + estimate = self._approximate_num_trades(prev_nt, next_nt) + if estimate > NUM_TRADES_THRESHOLD: + return False, ( + f"gap of {missing_intervals} interval(s), est num_trades={estimate} > {NUM_TRADES_THRESHOLD}" + ) + return True, "ok" + + @staticmethod + def _approximate_num_trades(prev_nt: int, next_nt: int) -> float: + if prev_nt is None and next_nt is None: + return 0.0 + if prev_nt is None: + return float(next_nt) + if next_nt is None: + return float(prev_nt) + return (prev_nt + next_nt) / 2.0 + + +class PairAnalyzer(NamedObject): + price_field_: str + interval_sec_: int + + def __init__(self, price_field: str, interval_sec: int) -> None: + self.price_field_ = price_field + self.interval_sec_ = interval_sec + + def analyze(self, series: Dict[ExchangeInstrument, pd.DataFrame]) -> List[PairStats]: + instruments = list(series.keys()) + results: List[PairStats] = [] + for i in range(len(instruments)): + for j in range(i + 1, len(instruments)): + inst_a = instruments[i] + inst_b = instruments[j] + df_a = series[inst_a][["tstamp", "price"]].rename( + columns={"price": "price_a"} + ) + df_b = series[inst_b][["tstamp", "price"]].rename( + columns={"price": "price_b"} + ) + merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values( + "tstamp" + ) + stats = self._compute_stats(inst_a, inst_b, merged) + if stats: + results.append(stats) + self._rank(results) + return results + + def _compute_stats( + self, + inst_a: ExchangeInstrument, + inst_b: ExchangeInstrument, + merged: pd.DataFrame, + ) -> Optional[PairStats]: + if len(merged) < 2: + return None + px_a = merged["price_a"].astype(float) + px_b = merged["price_b"].astype(float) + + std_a = float(px_a.std()) + std_b = float(px_b.std()) + if std_a == 0 or std_b == 0: + return None + + z_a = (px_a - float(px_a.mean())) / std_a + z_b = (px_b - float(px_b.mean())) / std_b + + p_eg: Optional[float] + p_adf: Optional[float] + p_j: Optional[float] + trace_stat: Optional[float] + + try: + p_eg = float(coint(z_a, z_b)[1]) + except Exception as exc: + Log.warning(f"{self.fname()}: EG failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}") + p_eg = None + + try: + spread = z_a - z_b + p_adf = float(adfuller(spread, maxlag=1, regression="c")[1]) + except Exception as exc: + Log.warning(f"{self.fname()}: ADF failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}") + p_adf = None + + try: + data = np.column_stack([z_a, z_b]) + res = coint_johansen(data, det_order=0, k_ar_diff=1) + trace_stat = float(res.lr1[0]) + cv10, cv5, cv1 = res.cvt[0] + if trace_stat > cv1: + p_j = 0.01 + elif trace_stat > cv5: + p_j = 0.05 + elif trace_stat > cv10: + p_j = 0.10 + else: + p_j = 1.0 + except Exception as exc: + Log.warning(f"{self.fname()}: Johansen failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}") + p_j = None + trace_stat = None + + return PairStats( + instrument_a_=inst_a, + instrument_b_=inst_b, + pvalue_eg_=p_eg, + pvalue_adf_=p_adf, + pvalue_j_=p_j, + trace_stat_j_=trace_stat, + ) + + def _rank(self, results: List[PairStats]) -> None: + self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_") + self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_") + self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_") + for res in results: + res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_ + results.sort(key=lambda r: r.composite_rank_) + + @staticmethod + def _assign_ranks( + results: List[PairStats], key, attr: str + ) -> None: + values = [key(r) for r in results] + sorted_vals = sorted([v for v in values if v is not None]) + for res in results: + val = key(res) + if val is None: + setattr(res, attr, len(sorted_vals) + 1) + continue + rank = 1 + sum(1 for v in sorted_vals if v < val) + setattr(res, attr, rank) + + +class PairSelectionEngine(NamedObject): + config_: object + instruments_: List[ExchangeInstrument] + price_field_: str + fetcher_: DataFetcher + quality_: QualityChecker + analyzer_: PairAnalyzer + interval_sec_: int + history_depth_sec_: int + data_quality_cache_: List[InstrumentQuality] + pair_results_cache_: List[PairStats] + + def __init__( + self, + config: Config, + instruments: List[ExchangeInstrument], + price_field: str, + ) -> None: + self.config_ = config + self.instruments_ = instruments + self.price_field_ = price_field + + interval_sec = int(config.get_value("interval_sec", 0)) + history_depth_sec = int(config.get_value("history_depth_hours", 0)) * SecPerHour + base_url = config.get_value("cvtt_base_url", None) + assert interval_sec > 0, "interval_sec must be > 0" + assert history_depth_sec > 0, "history_depth_sec must be > 0" + assert base_url, "cvtt_base_url must be set" + + self.fetcher_ = DataFetcher( + base_url=base_url, + interval_sec=interval_sec, + history_depth_sec=history_depth_sec, + ) + self.quality_ = QualityChecker(interval_sec=interval_sec) + self.analyzer_ = PairAnalyzer(price_field=price_field, interval_sec=interval_sec) + + self.interval_sec_ = interval_sec + self.history_depth_sec_ = history_depth_sec + + self.data_quality_cache_ = [] + self.pair_results_cache_ = [] + + async def run_once(self) -> None: + quality_results: List[InstrumentQuality] = [] + price_series: Dict[ExchangeInstrument, pd.DataFrame] = {} + + for inst in self.instruments_: + exch_acct = inst.user_data_.get("exch_acct") or inst.exchange_id_ + aggr = self.fetcher_.fetch(exch_acct=exch_acct, inst=inst) + q = self.quality_.evaluate(inst, aggr) + quality_results.append(q) + if q.status_ != "PASS": + continue + df = self._to_dataframe(aggr, inst) + if len(df) > 0: + price_series[inst] = df + self.data_quality_cache_ = quality_results + self.pair_results_cache_ = self.analyzer_.analyze(price_series) + + def _to_dataframe(self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument) -> pd.DataFrame: + rows: List[Dict[str, Any]] = [] + for item in aggr: + rows.append( + { + "tstamp": pd.to_datetime(item.aggr_time_ns_, unit="ns", utc=True), + "price": self._extract_price(item, inst), + "num_trades": item.num_trades_, + } + ) + df = pd.DataFrame(rows) + return df.sort_values("tstamp").reset_index(drop=True) + + def _extract_price(self, aggr: MdTradesAggregate, inst: ExchangeInstrument) -> float: + price_field = self.price_field_ + # MdTradesAggregate inherits hist bar with fields open_, high_, low_, close_, vwap_ + field_map = { + "open": aggr.open_, + "high": aggr.high_, + "low": aggr.low_, + "close": aggr.close_, + "vwap": aggr.vwap_, + } + raw = field_map.get(price_field, aggr.close_) + return inst.get_price(raw) + + def sleep_seconds_until_next_cycle(self) -> float: + now_ns = current_nanoseconds() + interval_ns = self.interval_sec_ * NanoPerSec + next_boundary = (now_ns // interval_ns + 1) * interval_ns + return max(0.0, (next_boundary - now_ns) / NanoPerSec) + + def quality_dicts(self) -> List[Dict[str, Any]]: + res: List[Dict[str, Any]] = [] + for q in self.data_quality_cache_: + res.append( + { + "instrument": q.instrument_.instrument_id(), + "record_count": q.record_count_, + "latest_tstamp": q.latest_tstamp_.isoformat() if q.latest_tstamp_ else None, + "status": q.status_, + "reason": q.reason_, + } + ) + return res + + def pair_dicts(self) -> List[Dict[str, Any]]: + return [p.as_dict() for p in self.pair_results_cache_]