From 38c3c730211a46737bb162afcefc06a60f0c2db3 Mon Sep 17 00:00:00 2001 From: Oleg Sheynin Date: Wed, 28 Jan 2026 23:27:52 +0000 Subject: [PATCH] stats history for pair selector --- .vscode/launch.json | 4 +- apps/pair_selector.py | 11 +++-- lib/pair_selector_engine.py | 89 ++++++++++++++++++++++++------------- 3 files changed, 68 insertions(+), 36 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 050412a..a2f0620 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -52,8 +52,8 @@ "env": { "PYTHONPATH": "${workspaceFolder}/..", "CONFIG_SERVICE": "cloud16.cvtt.vpn:6789", - // "CVTT_URL": "http://cvtt-tester-01.cvtt.vpn:23456", - "CVTT_URL": "http://dev-server-02.cvtt.vpn:23456", + "CVTT_URL": "http://cvtt-tester-01.cvtt.vpn:23456", + // "CVTT_URL": "http://dev-server-02.cvtt.vpn:23456", "PAIR_SELECTOR_REST_PORT": "44320" }, "args": [ diff --git a/apps/pair_selector.py b/apps/pair_selector.py index 20c6c0d..d65f871 100644 --- a/apps/pair_selector.py +++ b/apps/pair_selector.py @@ -81,16 +81,18 @@ class PairSelector(NamedObject): quality = self.engine_.quality_dicts() if fmt == "json": return web.json_response(quality) - return web.Response(text=self._render_quality_html(quality), content_type="text/html") + return web.Response(text=HtmlRenderer.render_data_quality_html(quality), content_type="text/html") async def _on_pair_selection(self, request: web.Request) -> web.Response: fmt = request.query.get("format", "html").lower() pairs = self.engine_.pair_dicts() if fmt == "json": return web.json_response(pairs) - return web.Response(text=self._render_pairs_html(pairs), content_type="text/html") + return web.Response(text=HtmlRenderer.render_pair_selection_html(pairs), content_type="text/html") - def _render_quality_html(self, quality: List[Dict[str, Any]]) -> str: +class HtmlRenderer: + @staticmethod + def render_data_quality_html(quality: List[Dict[str, Any]]) -> str: rows = "".join( f"" f"{q.get('instrument','')}" @@ -126,7 +128,8 @@ class PairSelector(NamedObject): """ - def _render_pairs_html(self, pairs: List[Dict[str, Any]]) -> str: + @staticmethod + def render_pair_selection_html(pairs: List[Dict[str, Any]]) -> str: if not pairs: body = "

No pairs available. Check data quality and try again.

" else: diff --git a/lib/pair_selector_engine.py b/lib/pair_selector_engine.py index 4848426..4eeb768 100644 --- a/lib/pair_selector_engine.py +++ b/lib/pair_selector_engine.py @@ -1,5 +1,6 @@ from __future__ import annotations from dataclasses import dataclass +from readline import insert_text from typing import Any, Dict, List, Optional, Tuple import numpy as np @@ -28,6 +29,7 @@ class InstrumentQuality(NamedObject): @dataclass class PairStats(NamedObject): + tstamp_: pd.Timestamp instrument_a_: ExchangeInstrument instrument_b_: ExchangeInstrument pvalue_eg_: Optional[float] @@ -110,7 +112,7 @@ class QualityChecker(NamedObject): latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True) now_ts = pd.Timestamp.utcnow() - recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_) + recency_cutoff = now_ts - pd.Timedelta(seconds=10 * self.interval_sec_) if latest_ts <= recency_cutoff: return InstrumentQuality( instrument_=inst, @@ -163,6 +165,21 @@ class QualityChecker(NamedObject): return float(prev_nt) return (prev_nt + next_nt) / 2.0 +class PairOfInstruments(NamedObject): + intrument_a_: ExchangeInstrument + instrument_b_: ExchangeInstrument + stats_history_: List[PairStats] + + + def __init__(self, instrument_a: ExchangeInstrument, instrument_b: ExchangeInstrument) -> None: + self.instrument_a_ = instrument_a + self.instrument_b_ = instrument_b + self.stats_history_ = [] + + def add_stats(self, stats: PairStats) -> None: + self.stats_history_.append(stats) + + class PairAnalyzer(NamedObject): price_field_: str @@ -172,27 +189,24 @@ class PairAnalyzer(NamedObject): self.price_field_ = price_field self.interval_sec_ = interval_sec - def analyze(self, series: Dict[ExchangeInstrument, pd.DataFrame]) -> List[PairStats]: - instruments = list(series.keys()) - results: List[PairStats] = [] - for i in range(len(instruments)): - for j in range(i + 1, len(instruments)): - inst_a = instruments[i] - inst_b = instruments[j] - df_a = series[inst_a][["tstamp", "price"]].rename( - columns={"price": "price_a"} - ) - df_b = series[inst_b][["tstamp", "price"]].rename( - columns={"price": "price_b"} - ) - merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values( - "tstamp" - ) - stats = self._compute_stats(inst_a, inst_b, merged) - if stats: - results.append(stats) - self._rank(results) - return results + def analyze(self, pairs: List[PairOfInstruments], series: Dict[ExchangeInstrument, pd.DataFrame]) -> None: + for pair in pairs: + inst_a = pair.instrument_a_ + inst_b = pair.instrument_b_ + df_a = series[inst_a][["tstamp", "price"]].rename( + columns={"price": "price_a"} + ) + df_b = series[inst_b][["tstamp", "price"]].rename( + columns={"price": "price_b"} + ) + merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values( + "tstamp" + ) + stats = self._compute_stats(inst_a, inst_b, merged) + if stats: + pair.add_stats(stats) + + self._rank(pairs) def _compute_stats( self, @@ -250,6 +264,7 @@ class PairAnalyzer(NamedObject): trace_stat = None return PairStats( + tstamp_=pd.Timestamp.utcnow(), instrument_a_=inst_a, instrument_b_=inst_b, pvalue_eg_=p_eg, @@ -258,13 +273,15 @@ class PairAnalyzer(NamedObject): trace_stat_j_=trace_stat, ) - def _rank(self, results: List[PairStats]) -> None: - self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_") - self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_") - self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_") - for res in results: + def _rank(self, pairs: List[PairOfInstruments]) -> None: + last_stats: List[PairStats] = [pair.stats_history_[-1] for pair in pairs] + + self._assign_ranks(last_stats, key=lambda r: r.pvalue_eg_, attr="rank_eg_") + self._assign_ranks(last_stats, key=lambda r: r.pvalue_adf_, attr="rank_adf_") + self._assign_ranks(last_stats, key=lambda r: r.pvalue_j_, attr="rank_j_") + for res in last_stats: res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_ - results.sort(key=lambda r: r.composite_rank_) + last_stats.sort(key=lambda r: r.composite_rank_) @staticmethod def _assign_ranks( @@ -292,6 +309,9 @@ class PairSelectionEngine(NamedObject): history_depth_sec_: int data_quality_cache_: List[InstrumentQuality] pair_results_cache_: List[PairStats] + + pairs_: List[PairOfInstruments] + def __init__( self, @@ -323,7 +343,15 @@ class PairSelectionEngine(NamedObject): self.data_quality_cache_ = [] self.pair_results_cache_ = [] + + self.create_pairs(instruments) + def create_pairs(self, instruments: List[ExchangeInstrument]) -> None: + self.pairs_ = [] + for i in range(len(instruments)): + for j in range(i + 1, len(instruments)): + self.pairs_.append(PairOfInstruments(instruments[i], instruments[j])) + async def run_once(self) -> None: quality_results: List[InstrumentQuality] = [] price_series: Dict[ExchangeInstrument, pd.DataFrame] = {} @@ -338,8 +366,9 @@ class PairSelectionEngine(NamedObject): df = self._to_dataframe(aggr, inst) if len(df) > 0: price_series[inst] = df + self.analyzer_.analyze(pairs=self.pairs_, series=price_series) self.data_quality_cache_ = quality_results - self.pair_results_cache_ = self.analyzer_.analyze(price_series) + self.pair_results_cache_ = [pair.stats_history_[-1] for pair in self.pairs_] def _to_dataframe(self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument) -> pd.DataFrame: rows: List[Dict[str, Any]] = [] @@ -388,4 +417,4 @@ class PairSelectionEngine(NamedObject): return res def pair_dicts(self) -> List[Dict[str, Any]]: - return [p.as_dict() for p in self.pair_results_cache_] + return [p.as_dict() for p in [pair.stats_history_[-1] for pair in self.pairs_]]