Compare commits

..

1 Commits

Author SHA1 Message Date
Oleg Sheynin
2819fd536a organize by pair name 2026-02-03 20:46:01 +00:00
2 changed files with 46 additions and 24 deletions

View File

@ -37,6 +37,7 @@ class InstrumentQuality(NamedObject):
@dataclass
class PairStats(NamedObject):
pair_name_: str
instrument_a_: ExchangeInstrument
instrument_b_: ExchangeInstrument
pvalue_eg_: Optional[float]
@ -188,13 +189,14 @@ class PairAnalyzer(NamedObject):
def analyze(
self, series: Dict[ExchangeInstrument, pd.DataFrame]
) -> List[PairStats]:
) -> Dict[str, PairStats]:
instruments = list(series.keys())
results: List[PairStats] = []
results: Dict[str, PairStats] = {}
for i in range(len(instruments)):
for j in range(i + 1, len(instruments)):
inst_a = instruments[i]
inst_b = instruments[j]
inst_a, inst_b, pair_name = self._normalized_pair(
instruments[i], instruments[j]
)
df_a = series[inst_a][["tstamp", "price"]].rename(
columns={"price": "price_a"}
)
@ -204,16 +206,16 @@ class PairAnalyzer(NamedObject):
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
"tstamp"
)
stats = self._compute_stats(inst_a, inst_b, merged)
stats = self._compute_stats(inst_a, inst_b, pair_name, merged)
if stats:
results.append(stats)
self._rank(results)
return results
results[pair_name] = stats
return self._rank(results)
def _compute_stats(
self,
inst_a: ExchangeInstrument,
inst_b: ExchangeInstrument,
pair_name: str,
merged: pd.DataFrame,
) -> Optional[PairStats]:
if len(merged) < 2:
@ -272,6 +274,7 @@ class PairAnalyzer(NamedObject):
trace_stat = None
return PairStats(
pair_name_=pair_name,
instrument_a_=inst_a,
instrument_b_=inst_b,
pvalue_eg_=p_eg,
@ -280,13 +283,31 @@ class PairAnalyzer(NamedObject):
trace_stat_j_=trace_stat,
)
def _rank(self, results: List[PairStats]) -> None:
self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_")
for res in results:
def _rank(self, results: Dict[str, PairStats]) -> Dict[str, PairStats]:
ranked = list(results.values())
self._assign_ranks(ranked, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
self._assign_ranks(ranked, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
self._assign_ranks(ranked, key=lambda r: r.pvalue_j_, attr="rank_j_")
for res in ranked:
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
results.sort(key=lambda r: r.composite_rank_)
ranked.sort(key=lambda r: r.composite_rank_)
return {res.pair_name_: res for res in ranked}
@staticmethod
def _normalized_pair(
inst_a: ExchangeInstrument, inst_b: ExchangeInstrument
) -> Tuple[ExchangeInstrument, ExchangeInstrument, str]:
inst_a_id = PairAnalyzer._pair_label(inst_a.instrument_id())
inst_b_id = PairAnalyzer._pair_label(inst_b.instrument_id())
if inst_a_id <= inst_b_id:
return inst_a, inst_b, f"{inst_a_id}<->{inst_b_id}"
return inst_b, inst_a, f"{inst_b_id}<->{inst_a_id}"
@staticmethod
def _pair_label(instrument_id: str) -> str:
if instrument_id.startswith("PAIR-"):
return instrument_id[len("PAIR-") :]
return instrument_id
@staticmethod
def _assign_ranks(results: List[PairStats], key, attr: str) -> None:
@ -311,7 +332,7 @@ class PairSelectionEngine(NamedObject):
interval_sec_: int
history_depth_sec_: int
data_quality_cache_: List[InstrumentQuality]
pair_results_cache_: List[PairStats]
pair_results_cache_: Dict[str, PairStats]
def __init__(
self,
@ -344,7 +365,7 @@ class PairSelectionEngine(NamedObject):
self.history_depth_sec_ = history_depth_sec
self.data_quality_cache_ = []
self.pair_results_cache_ = []
self.pair_results_cache_ = {}
async def run_once(self) -> None:
quality_results: List[InstrumentQuality] = []
@ -415,8 +436,11 @@ class PairSelectionEngine(NamedObject):
)
return res
def pair_dicts(self) -> List[Dict[str, Any]]:
return [p.as_dict() for p in self.pair_results_cache_]
def pair_dicts(self) -> Dict[str, Dict[str, Any]]:
return {
pair_name: stats.as_dict()
for pair_name, stats in self.pair_results_cache_.items()
}
class PairSelector(NamedObject):

View File

@ -50,16 +50,15 @@ class HtmlRenderer(NamedObject):
"""
@staticmethod
def render_pairs(pairs: List[Dict[str, Any]]) -> str:
def render_pairs(pairs: Dict[str, Dict[str, Any]]) -> str:
if not pairs:
body = "<p>No pairs available. Check data quality and try again.</p>"
else:
body_rows = []
for p in pairs:
for pair_name, p in pairs.items():
body_rows.append(
"<tr>"
f"<td>{p.get('instrument_a','')}</td>"
f"<td>{p.get('instrument_b','')}</td>"
f"<td>{pair_name}</td>"
f"<td data-value='{p.get('rank_eg',0)}'>{p.get('rank_eg','')}</td>"
f"<td data-value='{p.get('rank_adf',0)}'>{p.get('rank_adf','')}</td>"
f"<td data-value='{p.get('rank_j',0)}'>{p.get('rank_j','')}</td>"
@ -88,8 +87,7 @@ class HtmlRenderer(NamedObject):
<table id="pairs-table">
<thead>
<tr>
<th>Instrument A</th>
<th>Instrument B</th>
<th>Pair</th>
<th class="sortable" data-type="num">Rank-EG</th>
<th class="sortable" data-type="num">Rank-ADF</th>
<th class="sortable" data-type="num">Rank-J</th>