Compare commits
2 Commits
master
...
20260128_p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8a207a13a0 | ||
|
|
38c3c73021 |
@ -81,16 +81,18 @@ class PairSelector(NamedObject):
|
||||
quality = self.engine_.quality_dicts()
|
||||
if fmt == "json":
|
||||
return web.json_response(quality)
|
||||
return web.Response(text=self._render_quality_html(quality), content_type="text/html")
|
||||
return web.Response(text=HtmlRenderer.render_data_quality_html(quality), content_type="text/html")
|
||||
|
||||
async def _on_pair_selection(self, request: web.Request) -> web.Response:
|
||||
fmt = request.query.get("format", "html").lower()
|
||||
pairs = self.engine_.pair_dicts()
|
||||
if fmt == "json":
|
||||
return web.json_response(pairs)
|
||||
return web.Response(text=self._render_pairs_html(pairs), content_type="text/html")
|
||||
return web.Response(text=HtmlRenderer.render_pair_selection_html(pairs), content_type="text/html")
|
||||
|
||||
def _render_quality_html(self, quality: List[Dict[str, Any]]) -> str:
|
||||
class HtmlRenderer:
|
||||
@staticmethod
|
||||
def render_data_quality_html(quality: List[Dict[str, Any]]) -> str:
|
||||
rows = "".join(
|
||||
f"<tr>"
|
||||
f"<td>{q.get('instrument','')}</td>"
|
||||
@ -126,7 +128,8 @@ class PairSelector(NamedObject):
|
||||
</html>
|
||||
"""
|
||||
|
||||
def _render_pairs_html(self, pairs: List[Dict[str, Any]]) -> str:
|
||||
@staticmethod
|
||||
def render_pair_selection_html(pairs: List[Dict[str, Any]]) -> str:
|
||||
if not pairs:
|
||||
body = "<p>No pairs available. Check data quality and try again.</p>"
|
||||
else:
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
from readline import insert_text
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
@ -28,6 +29,7 @@ class InstrumentQuality(NamedObject):
|
||||
|
||||
@dataclass
|
||||
class PairStats(NamedObject):
|
||||
tstamp_: pd.Timestamp
|
||||
instrument_a_: ExchangeInstrument
|
||||
instrument_b_: ExchangeInstrument
|
||||
pvalue_eg_: Optional[float]
|
||||
@ -110,7 +112,7 @@ class QualityChecker(NamedObject):
|
||||
|
||||
latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True)
|
||||
now_ts = pd.Timestamp.utcnow()
|
||||
recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_)
|
||||
recency_cutoff = now_ts - pd.Timedelta(seconds=10 * self.interval_sec_)
|
||||
if latest_ts <= recency_cutoff:
|
||||
return InstrumentQuality(
|
||||
instrument_=inst,
|
||||
@ -163,6 +165,21 @@ class QualityChecker(NamedObject):
|
||||
return float(prev_nt)
|
||||
return (prev_nt + next_nt) / 2.0
|
||||
|
||||
class PairOfInstruments(NamedObject):
|
||||
intrument_a_: ExchangeInstrument
|
||||
instrument_b_: ExchangeInstrument
|
||||
stats_history_: List[PairStats]
|
||||
|
||||
|
||||
def __init__(self, instrument_a: ExchangeInstrument, instrument_b: ExchangeInstrument) -> None:
|
||||
self.instrument_a_ = instrument_a
|
||||
self.instrument_b_ = instrument_b
|
||||
self.stats_history_ = []
|
||||
|
||||
def add_stats(self, stats: PairStats) -> None:
|
||||
self.stats_history_.append(stats)
|
||||
|
||||
|
||||
|
||||
class PairAnalyzer(NamedObject):
|
||||
price_field_: str
|
||||
@ -172,27 +189,24 @@ class PairAnalyzer(NamedObject):
|
||||
self.price_field_ = price_field
|
||||
self.interval_sec_ = interval_sec
|
||||
|
||||
def analyze(self, series: Dict[ExchangeInstrument, pd.DataFrame]) -> List[PairStats]:
|
||||
instruments = list(series.keys())
|
||||
results: List[PairStats] = []
|
||||
for i in range(len(instruments)):
|
||||
for j in range(i + 1, len(instruments)):
|
||||
inst_a = instruments[i]
|
||||
inst_b = instruments[j]
|
||||
df_a = series[inst_a][["tstamp", "price"]].rename(
|
||||
columns={"price": "price_a"}
|
||||
)
|
||||
df_b = series[inst_b][["tstamp", "price"]].rename(
|
||||
columns={"price": "price_b"}
|
||||
)
|
||||
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
|
||||
"tstamp"
|
||||
)
|
||||
stats = self._compute_stats(inst_a, inst_b, merged)
|
||||
if stats:
|
||||
results.append(stats)
|
||||
self._rank(results)
|
||||
return results
|
||||
def analyze(self, pairs: List[PairOfInstruments], series: Dict[ExchangeInstrument, pd.DataFrame]) -> None:
|
||||
for pair in pairs:
|
||||
inst_a = pair.instrument_a_
|
||||
inst_b = pair.instrument_b_
|
||||
df_a = series[inst_a][["tstamp", "price"]].rename(
|
||||
columns={"price": "price_a"}
|
||||
)
|
||||
df_b = series[inst_b][["tstamp", "price"]].rename(
|
||||
columns={"price": "price_b"}
|
||||
)
|
||||
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
|
||||
"tstamp"
|
||||
)
|
||||
stats = self._compute_stats(inst_a, inst_b, merged)
|
||||
if stats:
|
||||
pair.add_stats(stats)
|
||||
|
||||
self._rank(pairs)
|
||||
|
||||
def _compute_stats(
|
||||
self,
|
||||
@ -250,6 +264,7 @@ class PairAnalyzer(NamedObject):
|
||||
trace_stat = None
|
||||
|
||||
return PairStats(
|
||||
tstamp_=pd.Timestamp.utcnow(),
|
||||
instrument_a_=inst_a,
|
||||
instrument_b_=inst_b,
|
||||
pvalue_eg_=p_eg,
|
||||
@ -258,13 +273,15 @@ class PairAnalyzer(NamedObject):
|
||||
trace_stat_j_=trace_stat,
|
||||
)
|
||||
|
||||
def _rank(self, results: List[PairStats]) -> None:
|
||||
self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
|
||||
self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
|
||||
self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_")
|
||||
for res in results:
|
||||
def _rank(self, pairs: List[PairOfInstruments]) -> None:
|
||||
last_stats: List[PairStats] = [pair.stats_history_[-1] for pair in pairs]
|
||||
|
||||
self._assign_ranks(last_stats, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
|
||||
self._assign_ranks(last_stats, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
|
||||
self._assign_ranks(last_stats, key=lambda r: r.pvalue_j_, attr="rank_j_")
|
||||
for res in last_stats:
|
||||
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
|
||||
results.sort(key=lambda r: r.composite_rank_)
|
||||
last_stats.sort(key=lambda r: r.composite_rank_)
|
||||
|
||||
@staticmethod
|
||||
def _assign_ranks(
|
||||
@ -293,6 +310,9 @@ class PairSelectionEngine(NamedObject):
|
||||
data_quality_cache_: List[InstrumentQuality]
|
||||
pair_results_cache_: List[PairStats]
|
||||
|
||||
pairs_: List[PairOfInstruments]
|
||||
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: Config,
|
||||
@ -324,6 +344,14 @@ class PairSelectionEngine(NamedObject):
|
||||
self.data_quality_cache_ = []
|
||||
self.pair_results_cache_ = []
|
||||
|
||||
self.create_pairs(instruments)
|
||||
|
||||
def create_pairs(self, instruments: List[ExchangeInstrument]) -> None:
|
||||
self.pairs_ = []
|
||||
for i in range(len(instruments)):
|
||||
for j in range(i + 1, len(instruments)):
|
||||
self.pairs_.append(PairOfInstruments(instruments[i], instruments[j]))
|
||||
|
||||
async def run_once(self) -> None:
|
||||
quality_results: List[InstrumentQuality] = []
|
||||
price_series: Dict[ExchangeInstrument, pd.DataFrame] = {}
|
||||
@ -338,8 +366,9 @@ class PairSelectionEngine(NamedObject):
|
||||
df = self._to_dataframe(aggr, inst)
|
||||
if len(df) > 0:
|
||||
price_series[inst] = df
|
||||
self.analyzer_.analyze(pairs=self.pairs_, series=price_series)
|
||||
self.data_quality_cache_ = quality_results
|
||||
self.pair_results_cache_ = self.analyzer_.analyze(price_series)
|
||||
self.pair_results_cache_ = [pair.stats_history_[-1] for pair in self.pairs_]
|
||||
|
||||
def _to_dataframe(self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument) -> pd.DataFrame:
|
||||
rows: List[Dict[str, Any]] = []
|
||||
@ -388,4 +417,4 @@ class PairSelectionEngine(NamedObject):
|
||||
return res
|
||||
|
||||
def pair_dicts(self) -> List[Dict[str, Any]]:
|
||||
return [p.as_dict() for p in self.pair_results_cache_]
|
||||
return [p.as_dict() for p in [pair.stats_history_[-1] for pair in self.pairs_]]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user