Compare commits
2 Commits
master
...
20260128_p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8a207a13a0 | ||
|
|
38c3c73021 |
@ -81,16 +81,18 @@ class PairSelector(NamedObject):
|
|||||||
quality = self.engine_.quality_dicts()
|
quality = self.engine_.quality_dicts()
|
||||||
if fmt == "json":
|
if fmt == "json":
|
||||||
return web.json_response(quality)
|
return web.json_response(quality)
|
||||||
return web.Response(text=self._render_quality_html(quality), content_type="text/html")
|
return web.Response(text=HtmlRenderer.render_data_quality_html(quality), content_type="text/html")
|
||||||
|
|
||||||
async def _on_pair_selection(self, request: web.Request) -> web.Response:
|
async def _on_pair_selection(self, request: web.Request) -> web.Response:
|
||||||
fmt = request.query.get("format", "html").lower()
|
fmt = request.query.get("format", "html").lower()
|
||||||
pairs = self.engine_.pair_dicts()
|
pairs = self.engine_.pair_dicts()
|
||||||
if fmt == "json":
|
if fmt == "json":
|
||||||
return web.json_response(pairs)
|
return web.json_response(pairs)
|
||||||
return web.Response(text=self._render_pairs_html(pairs), content_type="text/html")
|
return web.Response(text=HtmlRenderer.render_pair_selection_html(pairs), content_type="text/html")
|
||||||
|
|
||||||
def _render_quality_html(self, quality: List[Dict[str, Any]]) -> str:
|
class HtmlRenderer:
|
||||||
|
@staticmethod
|
||||||
|
def render_data_quality_html(quality: List[Dict[str, Any]]) -> str:
|
||||||
rows = "".join(
|
rows = "".join(
|
||||||
f"<tr>"
|
f"<tr>"
|
||||||
f"<td>{q.get('instrument','')}</td>"
|
f"<td>{q.get('instrument','')}</td>"
|
||||||
@ -126,7 +128,8 @@ class PairSelector(NamedObject):
|
|||||||
</html>
|
</html>
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _render_pairs_html(self, pairs: List[Dict[str, Any]]) -> str:
|
@staticmethod
|
||||||
|
def render_pair_selection_html(pairs: List[Dict[str, Any]]) -> str:
|
||||||
if not pairs:
|
if not pairs:
|
||||||
body = "<p>No pairs available. Check data quality and try again.</p>"
|
body = "<p>No pairs available. Check data quality and try again.</p>"
|
||||||
else:
|
else:
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from readline import insert_text
|
||||||
from typing import Any, Dict, List, Optional, Tuple
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
@ -28,6 +29,7 @@ class InstrumentQuality(NamedObject):
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class PairStats(NamedObject):
|
class PairStats(NamedObject):
|
||||||
|
tstamp_: pd.Timestamp
|
||||||
instrument_a_: ExchangeInstrument
|
instrument_a_: ExchangeInstrument
|
||||||
instrument_b_: ExchangeInstrument
|
instrument_b_: ExchangeInstrument
|
||||||
pvalue_eg_: Optional[float]
|
pvalue_eg_: Optional[float]
|
||||||
@ -110,7 +112,7 @@ class QualityChecker(NamedObject):
|
|||||||
|
|
||||||
latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True)
|
latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True)
|
||||||
now_ts = pd.Timestamp.utcnow()
|
now_ts = pd.Timestamp.utcnow()
|
||||||
recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_)
|
recency_cutoff = now_ts - pd.Timedelta(seconds=10 * self.interval_sec_)
|
||||||
if latest_ts <= recency_cutoff:
|
if latest_ts <= recency_cutoff:
|
||||||
return InstrumentQuality(
|
return InstrumentQuality(
|
||||||
instrument_=inst,
|
instrument_=inst,
|
||||||
@ -163,6 +165,21 @@ class QualityChecker(NamedObject):
|
|||||||
return float(prev_nt)
|
return float(prev_nt)
|
||||||
return (prev_nt + next_nt) / 2.0
|
return (prev_nt + next_nt) / 2.0
|
||||||
|
|
||||||
|
class PairOfInstruments(NamedObject):
|
||||||
|
intrument_a_: ExchangeInstrument
|
||||||
|
instrument_b_: ExchangeInstrument
|
||||||
|
stats_history_: List[PairStats]
|
||||||
|
|
||||||
|
|
||||||
|
def __init__(self, instrument_a: ExchangeInstrument, instrument_b: ExchangeInstrument) -> None:
|
||||||
|
self.instrument_a_ = instrument_a
|
||||||
|
self.instrument_b_ = instrument_b
|
||||||
|
self.stats_history_ = []
|
||||||
|
|
||||||
|
def add_stats(self, stats: PairStats) -> None:
|
||||||
|
self.stats_history_.append(stats)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class PairAnalyzer(NamedObject):
|
class PairAnalyzer(NamedObject):
|
||||||
price_field_: str
|
price_field_: str
|
||||||
@ -172,27 +189,24 @@ class PairAnalyzer(NamedObject):
|
|||||||
self.price_field_ = price_field
|
self.price_field_ = price_field
|
||||||
self.interval_sec_ = interval_sec
|
self.interval_sec_ = interval_sec
|
||||||
|
|
||||||
def analyze(self, series: Dict[ExchangeInstrument, pd.DataFrame]) -> List[PairStats]:
|
def analyze(self, pairs: List[PairOfInstruments], series: Dict[ExchangeInstrument, pd.DataFrame]) -> None:
|
||||||
instruments = list(series.keys())
|
for pair in pairs:
|
||||||
results: List[PairStats] = []
|
inst_a = pair.instrument_a_
|
||||||
for i in range(len(instruments)):
|
inst_b = pair.instrument_b_
|
||||||
for j in range(i + 1, len(instruments)):
|
df_a = series[inst_a][["tstamp", "price"]].rename(
|
||||||
inst_a = instruments[i]
|
columns={"price": "price_a"}
|
||||||
inst_b = instruments[j]
|
)
|
||||||
df_a = series[inst_a][["tstamp", "price"]].rename(
|
df_b = series[inst_b][["tstamp", "price"]].rename(
|
||||||
columns={"price": "price_a"}
|
columns={"price": "price_b"}
|
||||||
)
|
)
|
||||||
df_b = series[inst_b][["tstamp", "price"]].rename(
|
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
|
||||||
columns={"price": "price_b"}
|
"tstamp"
|
||||||
)
|
)
|
||||||
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
|
stats = self._compute_stats(inst_a, inst_b, merged)
|
||||||
"tstamp"
|
if stats:
|
||||||
)
|
pair.add_stats(stats)
|
||||||
stats = self._compute_stats(inst_a, inst_b, merged)
|
|
||||||
if stats:
|
self._rank(pairs)
|
||||||
results.append(stats)
|
|
||||||
self._rank(results)
|
|
||||||
return results
|
|
||||||
|
|
||||||
def _compute_stats(
|
def _compute_stats(
|
||||||
self,
|
self,
|
||||||
@ -250,6 +264,7 @@ class PairAnalyzer(NamedObject):
|
|||||||
trace_stat = None
|
trace_stat = None
|
||||||
|
|
||||||
return PairStats(
|
return PairStats(
|
||||||
|
tstamp_=pd.Timestamp.utcnow(),
|
||||||
instrument_a_=inst_a,
|
instrument_a_=inst_a,
|
||||||
instrument_b_=inst_b,
|
instrument_b_=inst_b,
|
||||||
pvalue_eg_=p_eg,
|
pvalue_eg_=p_eg,
|
||||||
@ -258,13 +273,15 @@ class PairAnalyzer(NamedObject):
|
|||||||
trace_stat_j_=trace_stat,
|
trace_stat_j_=trace_stat,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _rank(self, results: List[PairStats]) -> None:
|
def _rank(self, pairs: List[PairOfInstruments]) -> None:
|
||||||
self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
|
last_stats: List[PairStats] = [pair.stats_history_[-1] for pair in pairs]
|
||||||
self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
|
|
||||||
self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_")
|
self._assign_ranks(last_stats, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
|
||||||
for res in results:
|
self._assign_ranks(last_stats, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
|
||||||
|
self._assign_ranks(last_stats, key=lambda r: r.pvalue_j_, attr="rank_j_")
|
||||||
|
for res in last_stats:
|
||||||
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
|
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
|
||||||
results.sort(key=lambda r: r.composite_rank_)
|
last_stats.sort(key=lambda r: r.composite_rank_)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _assign_ranks(
|
def _assign_ranks(
|
||||||
@ -292,6 +309,9 @@ class PairSelectionEngine(NamedObject):
|
|||||||
history_depth_sec_: int
|
history_depth_sec_: int
|
||||||
data_quality_cache_: List[InstrumentQuality]
|
data_quality_cache_: List[InstrumentQuality]
|
||||||
pair_results_cache_: List[PairStats]
|
pair_results_cache_: List[PairStats]
|
||||||
|
|
||||||
|
pairs_: List[PairOfInstruments]
|
||||||
|
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@ -323,7 +343,15 @@ class PairSelectionEngine(NamedObject):
|
|||||||
|
|
||||||
self.data_quality_cache_ = []
|
self.data_quality_cache_ = []
|
||||||
self.pair_results_cache_ = []
|
self.pair_results_cache_ = []
|
||||||
|
|
||||||
|
self.create_pairs(instruments)
|
||||||
|
|
||||||
|
def create_pairs(self, instruments: List[ExchangeInstrument]) -> None:
|
||||||
|
self.pairs_ = []
|
||||||
|
for i in range(len(instruments)):
|
||||||
|
for j in range(i + 1, len(instruments)):
|
||||||
|
self.pairs_.append(PairOfInstruments(instruments[i], instruments[j]))
|
||||||
|
|
||||||
async def run_once(self) -> None:
|
async def run_once(self) -> None:
|
||||||
quality_results: List[InstrumentQuality] = []
|
quality_results: List[InstrumentQuality] = []
|
||||||
price_series: Dict[ExchangeInstrument, pd.DataFrame] = {}
|
price_series: Dict[ExchangeInstrument, pd.DataFrame] = {}
|
||||||
@ -338,8 +366,9 @@ class PairSelectionEngine(NamedObject):
|
|||||||
df = self._to_dataframe(aggr, inst)
|
df = self._to_dataframe(aggr, inst)
|
||||||
if len(df) > 0:
|
if len(df) > 0:
|
||||||
price_series[inst] = df
|
price_series[inst] = df
|
||||||
|
self.analyzer_.analyze(pairs=self.pairs_, series=price_series)
|
||||||
self.data_quality_cache_ = quality_results
|
self.data_quality_cache_ = quality_results
|
||||||
self.pair_results_cache_ = self.analyzer_.analyze(price_series)
|
self.pair_results_cache_ = [pair.stats_history_[-1] for pair in self.pairs_]
|
||||||
|
|
||||||
def _to_dataframe(self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument) -> pd.DataFrame:
|
def _to_dataframe(self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument) -> pd.DataFrame:
|
||||||
rows: List[Dict[str, Any]] = []
|
rows: List[Dict[str, Any]] = []
|
||||||
@ -388,4 +417,4 @@ class PairSelectionEngine(NamedObject):
|
|||||||
return res
|
return res
|
||||||
|
|
||||||
def pair_dicts(self) -> List[Dict[str, Any]]:
|
def pair_dicts(self) -> List[Dict[str, Any]]:
|
||||||
return [p.as_dict() for p in self.pair_results_cache_]
|
return [p.as_dict() for p in [pair.stats_history_[-1] for pair in self.pairs_]]
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user