stats history for pair selector

This commit is contained in:
Oleg Sheynin 2026-01-28 23:27:52 +00:00
parent 76f9a80ad6
commit 38c3c73021
3 changed files with 68 additions and 36 deletions

4
.vscode/launch.json vendored
View File

@ -52,8 +52,8 @@
"env": {
"PYTHONPATH": "${workspaceFolder}/..",
"CONFIG_SERVICE": "cloud16.cvtt.vpn:6789",
// "CVTT_URL": "http://cvtt-tester-01.cvtt.vpn:23456",
"CVTT_URL": "http://dev-server-02.cvtt.vpn:23456",
"CVTT_URL": "http://cvtt-tester-01.cvtt.vpn:23456",
// "CVTT_URL": "http://dev-server-02.cvtt.vpn:23456",
"PAIR_SELECTOR_REST_PORT": "44320"
},
"args": [

View File

@ -81,16 +81,18 @@ class PairSelector(NamedObject):
quality = self.engine_.quality_dicts()
if fmt == "json":
return web.json_response(quality)
return web.Response(text=self._render_quality_html(quality), content_type="text/html")
return web.Response(text=HtmlRenderer.render_data_quality_html(quality), content_type="text/html")
async def _on_pair_selection(self, request: web.Request) -> web.Response:
fmt = request.query.get("format", "html").lower()
pairs = self.engine_.pair_dicts()
if fmt == "json":
return web.json_response(pairs)
return web.Response(text=self._render_pairs_html(pairs), content_type="text/html")
return web.Response(text=HtmlRenderer.render_pair_selection_html(pairs), content_type="text/html")
def _render_quality_html(self, quality: List[Dict[str, Any]]) -> str:
class HtmlRenderer:
@staticmethod
def render_data_quality_html(quality: List[Dict[str, Any]]) -> str:
rows = "".join(
f"<tr>"
f"<td>{q.get('instrument','')}</td>"
@ -126,7 +128,8 @@ class PairSelector(NamedObject):
</html>
"""
def _render_pairs_html(self, pairs: List[Dict[str, Any]]) -> str:
@staticmethod
def render_pair_selection_html(pairs: List[Dict[str, Any]]) -> str:
if not pairs:
body = "<p>No pairs available. Check data quality and try again.</p>"
else:

View File

@ -1,5 +1,6 @@
from __future__ import annotations
from dataclasses import dataclass
from readline import insert_text
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
@ -28,6 +29,7 @@ class InstrumentQuality(NamedObject):
@dataclass
class PairStats(NamedObject):
tstamp_: pd.Timestamp
instrument_a_: ExchangeInstrument
instrument_b_: ExchangeInstrument
pvalue_eg_: Optional[float]
@ -110,7 +112,7 @@ class QualityChecker(NamedObject):
latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True)
now_ts = pd.Timestamp.utcnow()
recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_)
recency_cutoff = now_ts - pd.Timedelta(seconds=10 * self.interval_sec_)
if latest_ts <= recency_cutoff:
return InstrumentQuality(
instrument_=inst,
@ -163,6 +165,21 @@ class QualityChecker(NamedObject):
return float(prev_nt)
return (prev_nt + next_nt) / 2.0
class PairOfInstruments(NamedObject):
intrument_a_: ExchangeInstrument
instrument_b_: ExchangeInstrument
stats_history_: List[PairStats]
def __init__(self, instrument_a: ExchangeInstrument, instrument_b: ExchangeInstrument) -> None:
self.instrument_a_ = instrument_a
self.instrument_b_ = instrument_b
self.stats_history_ = []
def add_stats(self, stats: PairStats) -> None:
self.stats_history_.append(stats)
class PairAnalyzer(NamedObject):
price_field_: str
@ -172,27 +189,24 @@ class PairAnalyzer(NamedObject):
self.price_field_ = price_field
self.interval_sec_ = interval_sec
def analyze(self, series: Dict[ExchangeInstrument, pd.DataFrame]) -> List[PairStats]:
instruments = list(series.keys())
results: List[PairStats] = []
for i in range(len(instruments)):
for j in range(i + 1, len(instruments)):
inst_a = instruments[i]
inst_b = instruments[j]
df_a = series[inst_a][["tstamp", "price"]].rename(
columns={"price": "price_a"}
)
df_b = series[inst_b][["tstamp", "price"]].rename(
columns={"price": "price_b"}
)
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
"tstamp"
)
stats = self._compute_stats(inst_a, inst_b, merged)
if stats:
results.append(stats)
self._rank(results)
return results
def analyze(self, pairs: List[PairOfInstruments], series: Dict[ExchangeInstrument, pd.DataFrame]) -> None:
for pair in pairs:
inst_a = pair.instrument_a_
inst_b = pair.instrument_b_
df_a = series[inst_a][["tstamp", "price"]].rename(
columns={"price": "price_a"}
)
df_b = series[inst_b][["tstamp", "price"]].rename(
columns={"price": "price_b"}
)
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
"tstamp"
)
stats = self._compute_stats(inst_a, inst_b, merged)
if stats:
pair.add_stats(stats)
self._rank(pairs)
def _compute_stats(
self,
@ -250,6 +264,7 @@ class PairAnalyzer(NamedObject):
trace_stat = None
return PairStats(
tstamp_=pd.Timestamp.utcnow(),
instrument_a_=inst_a,
instrument_b_=inst_b,
pvalue_eg_=p_eg,
@ -258,13 +273,15 @@ class PairAnalyzer(NamedObject):
trace_stat_j_=trace_stat,
)
def _rank(self, results: List[PairStats]) -> None:
self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_")
for res in results:
def _rank(self, pairs: List[PairOfInstruments]) -> None:
last_stats: List[PairStats] = [pair.stats_history_[-1] for pair in pairs]
self._assign_ranks(last_stats, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
self._assign_ranks(last_stats, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
self._assign_ranks(last_stats, key=lambda r: r.pvalue_j_, attr="rank_j_")
for res in last_stats:
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
results.sort(key=lambda r: r.composite_rank_)
last_stats.sort(key=lambda r: r.composite_rank_)
@staticmethod
def _assign_ranks(
@ -292,6 +309,9 @@ class PairSelectionEngine(NamedObject):
history_depth_sec_: int
data_quality_cache_: List[InstrumentQuality]
pair_results_cache_: List[PairStats]
pairs_: List[PairOfInstruments]
def __init__(
self,
@ -323,7 +343,15 @@ class PairSelectionEngine(NamedObject):
self.data_quality_cache_ = []
self.pair_results_cache_ = []
self.create_pairs(instruments)
def create_pairs(self, instruments: List[ExchangeInstrument]) -> None:
self.pairs_ = []
for i in range(len(instruments)):
for j in range(i + 1, len(instruments)):
self.pairs_.append(PairOfInstruments(instruments[i], instruments[j]))
async def run_once(self) -> None:
quality_results: List[InstrumentQuality] = []
price_series: Dict[ExchangeInstrument, pd.DataFrame] = {}
@ -338,8 +366,9 @@ class PairSelectionEngine(NamedObject):
df = self._to_dataframe(aggr, inst)
if len(df) > 0:
price_series[inst] = df
self.analyzer_.analyze(pairs=self.pairs_, series=price_series)
self.data_quality_cache_ = quality_results
self.pair_results_cache_ = self.analyzer_.analyze(price_series)
self.pair_results_cache_ = [pair.stats_history_[-1] for pair in self.pairs_]
def _to_dataframe(self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument) -> pd.DataFrame:
rows: List[Dict[str, Any]] = []
@ -388,4 +417,4 @@ class PairSelectionEngine(NamedObject):
return res
def pair_dicts(self) -> List[Dict[str, Any]]:
return [p.as_dict() for p in self.pair_results_cache_]
return [p.as_dict() for p in [pair.stats_history_[-1] for pair in self.pairs_]]