Compare commits

...

2 Commits

Author SHA1 Message Date
Oleg Sheynin
8a207a13a0 . 2026-01-30 20:21:44 +00:00
Oleg Sheynin
38c3c73021 stats history for pair selector 2026-01-28 23:27:52 +00:00
2 changed files with 66 additions and 34 deletions

View File

@ -81,16 +81,18 @@ class PairSelector(NamedObject):
quality = self.engine_.quality_dicts() quality = self.engine_.quality_dicts()
if fmt == "json": if fmt == "json":
return web.json_response(quality) return web.json_response(quality)
return web.Response(text=self._render_quality_html(quality), content_type="text/html") return web.Response(text=HtmlRenderer.render_data_quality_html(quality), content_type="text/html")
async def _on_pair_selection(self, request: web.Request) -> web.Response: async def _on_pair_selection(self, request: web.Request) -> web.Response:
fmt = request.query.get("format", "html").lower() fmt = request.query.get("format", "html").lower()
pairs = self.engine_.pair_dicts() pairs = self.engine_.pair_dicts()
if fmt == "json": if fmt == "json":
return web.json_response(pairs) return web.json_response(pairs)
return web.Response(text=self._render_pairs_html(pairs), content_type="text/html") return web.Response(text=HtmlRenderer.render_pair_selection_html(pairs), content_type="text/html")
def _render_quality_html(self, quality: List[Dict[str, Any]]) -> str: class HtmlRenderer:
@staticmethod
def render_data_quality_html(quality: List[Dict[str, Any]]) -> str:
rows = "".join( rows = "".join(
f"<tr>" f"<tr>"
f"<td>{q.get('instrument','')}</td>" f"<td>{q.get('instrument','')}</td>"
@ -126,7 +128,8 @@ class PairSelector(NamedObject):
</html> </html>
""" """
def _render_pairs_html(self, pairs: List[Dict[str, Any]]) -> str: @staticmethod
def render_pair_selection_html(pairs: List[Dict[str, Any]]) -> str:
if not pairs: if not pairs:
body = "<p>No pairs available. Check data quality and try again.</p>" body = "<p>No pairs available. Check data quality and try again.</p>"
else: else:

View File

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from readline import insert_text
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
import numpy as np import numpy as np
@ -28,6 +29,7 @@ class InstrumentQuality(NamedObject):
@dataclass @dataclass
class PairStats(NamedObject): class PairStats(NamedObject):
tstamp_: pd.Timestamp
instrument_a_: ExchangeInstrument instrument_a_: ExchangeInstrument
instrument_b_: ExchangeInstrument instrument_b_: ExchangeInstrument
pvalue_eg_: Optional[float] pvalue_eg_: Optional[float]
@ -110,7 +112,7 @@ class QualityChecker(NamedObject):
latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True) latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True)
now_ts = pd.Timestamp.utcnow() now_ts = pd.Timestamp.utcnow()
recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_) recency_cutoff = now_ts - pd.Timedelta(seconds=10 * self.interval_sec_)
if latest_ts <= recency_cutoff: if latest_ts <= recency_cutoff:
return InstrumentQuality( return InstrumentQuality(
instrument_=inst, instrument_=inst,
@ -163,6 +165,21 @@ class QualityChecker(NamedObject):
return float(prev_nt) return float(prev_nt)
return (prev_nt + next_nt) / 2.0 return (prev_nt + next_nt) / 2.0
class PairOfInstruments(NamedObject):
intrument_a_: ExchangeInstrument
instrument_b_: ExchangeInstrument
stats_history_: List[PairStats]
def __init__(self, instrument_a: ExchangeInstrument, instrument_b: ExchangeInstrument) -> None:
self.instrument_a_ = instrument_a
self.instrument_b_ = instrument_b
self.stats_history_ = []
def add_stats(self, stats: PairStats) -> None:
self.stats_history_.append(stats)
class PairAnalyzer(NamedObject): class PairAnalyzer(NamedObject):
price_field_: str price_field_: str
@ -172,27 +189,24 @@ class PairAnalyzer(NamedObject):
self.price_field_ = price_field self.price_field_ = price_field
self.interval_sec_ = interval_sec self.interval_sec_ = interval_sec
def analyze(self, series: Dict[ExchangeInstrument, pd.DataFrame]) -> List[PairStats]: def analyze(self, pairs: List[PairOfInstruments], series: Dict[ExchangeInstrument, pd.DataFrame]) -> None:
instruments = list(series.keys()) for pair in pairs:
results: List[PairStats] = [] inst_a = pair.instrument_a_
for i in range(len(instruments)): inst_b = pair.instrument_b_
for j in range(i + 1, len(instruments)): df_a = series[inst_a][["tstamp", "price"]].rename(
inst_a = instruments[i] columns={"price": "price_a"}
inst_b = instruments[j] )
df_a = series[inst_a][["tstamp", "price"]].rename( df_b = series[inst_b][["tstamp", "price"]].rename(
columns={"price": "price_a"} columns={"price": "price_b"}
) )
df_b = series[inst_b][["tstamp", "price"]].rename( merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
columns={"price": "price_b"} "tstamp"
) )
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values( stats = self._compute_stats(inst_a, inst_b, merged)
"tstamp" if stats:
) pair.add_stats(stats)
stats = self._compute_stats(inst_a, inst_b, merged)
if stats: self._rank(pairs)
results.append(stats)
self._rank(results)
return results
def _compute_stats( def _compute_stats(
self, self,
@ -250,6 +264,7 @@ class PairAnalyzer(NamedObject):
trace_stat = None trace_stat = None
return PairStats( return PairStats(
tstamp_=pd.Timestamp.utcnow(),
instrument_a_=inst_a, instrument_a_=inst_a,
instrument_b_=inst_b, instrument_b_=inst_b,
pvalue_eg_=p_eg, pvalue_eg_=p_eg,
@ -258,13 +273,15 @@ class PairAnalyzer(NamedObject):
trace_stat_j_=trace_stat, trace_stat_j_=trace_stat,
) )
def _rank(self, results: List[PairStats]) -> None: def _rank(self, pairs: List[PairOfInstruments]) -> None:
self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_") last_stats: List[PairStats] = [pair.stats_history_[-1] for pair in pairs]
self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_") self._assign_ranks(last_stats, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
for res in results: self._assign_ranks(last_stats, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
self._assign_ranks(last_stats, key=lambda r: r.pvalue_j_, attr="rank_j_")
for res in last_stats:
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_ res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
results.sort(key=lambda r: r.composite_rank_) last_stats.sort(key=lambda r: r.composite_rank_)
@staticmethod @staticmethod
def _assign_ranks( def _assign_ranks(
@ -293,6 +310,9 @@ class PairSelectionEngine(NamedObject):
data_quality_cache_: List[InstrumentQuality] data_quality_cache_: List[InstrumentQuality]
pair_results_cache_: List[PairStats] pair_results_cache_: List[PairStats]
pairs_: List[PairOfInstruments]
def __init__( def __init__(
self, self,
config: Config, config: Config,
@ -324,6 +344,14 @@ class PairSelectionEngine(NamedObject):
self.data_quality_cache_ = [] self.data_quality_cache_ = []
self.pair_results_cache_ = [] self.pair_results_cache_ = []
self.create_pairs(instruments)
def create_pairs(self, instruments: List[ExchangeInstrument]) -> None:
self.pairs_ = []
for i in range(len(instruments)):
for j in range(i + 1, len(instruments)):
self.pairs_.append(PairOfInstruments(instruments[i], instruments[j]))
async def run_once(self) -> None: async def run_once(self) -> None:
quality_results: List[InstrumentQuality] = [] quality_results: List[InstrumentQuality] = []
price_series: Dict[ExchangeInstrument, pd.DataFrame] = {} price_series: Dict[ExchangeInstrument, pd.DataFrame] = {}
@ -338,8 +366,9 @@ class PairSelectionEngine(NamedObject):
df = self._to_dataframe(aggr, inst) df = self._to_dataframe(aggr, inst)
if len(df) > 0: if len(df) > 0:
price_series[inst] = df price_series[inst] = df
self.analyzer_.analyze(pairs=self.pairs_, series=price_series)
self.data_quality_cache_ = quality_results self.data_quality_cache_ = quality_results
self.pair_results_cache_ = self.analyzer_.analyze(price_series) self.pair_results_cache_ = [pair.stats_history_[-1] for pair in self.pairs_]
def _to_dataframe(self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument) -> pd.DataFrame: def _to_dataframe(self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument) -> pd.DataFrame:
rows: List[Dict[str, Any]] = [] rows: List[Dict[str, Any]] = []
@ -388,4 +417,4 @@ class PairSelectionEngine(NamedObject):
return res return res
def pair_dicts(self) -> List[Dict[str, Any]]: def pair_dicts(self) -> List[Dict[str, Any]]:
return [p.as_dict() for p in self.pair_results_cache_] return [p.as_dict() for p in [pair.stats_history_[-1] for pair in self.pairs_]]