pairs_trading/apps/pair_selector.py
2026-01-28 23:27:52 +00:00

227 lines
8.4 KiB
Python

from __future__ import annotations
import asyncio
from typing import Any, Dict, List
from aiohttp import web
from cvttpy_tools.app import App
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import CvttAppConfig
from cvttpy_tools.logger import Log
from cvttpy_tools.web.rest_service import RestService
from cvttpy_trading.trading.exchange_config import ExchangeAccounts
from cvttpy_trading.trading.instrument import ExchangeInstrument
from pairs_trading.lib.pair_selector_engine import PairSelectionEngine
class PairSelector(NamedObject):
instruments_: List[ExchangeInstrument]
engine_: PairSelectionEngine
rest_service_: RestService
def __init__(self) -> None:
App.instance().add_cmdline_arg("--oneshot", action="store_true", default=False)
App.instance().add_call(App.Stage.Config, self._on_config())
App.instance().add_call(App.Stage.Run, self.run())
async def _on_config(self) -> None:
cfg = CvttAppConfig.instance()
self.instruments_ = self._load_instruments(cfg)
price_field = cfg.get_value("model/stat_model_price", "close")
self.engine_ = PairSelectionEngine(
config=cfg,
instruments=self.instruments_,
price_field=price_field,
)
self.rest_service_ = RestService(config_key="/api/REST")
self.rest_service_.add_handler("GET", "/data_quality", self._on_data_quality)
self.rest_service_.add_handler("GET", "/pair_selection", self._on_pair_selection)
def _load_instruments(self, cfg: CvttAppConfig) -> List[ExchangeInstrument]:
instruments_cfg = cfg.get_value("instruments", [])
instruments: List[ExchangeInstrument] = []
assert len(instruments_cfg) >= 2, "at least two instruments required"
for item in instruments_cfg:
if isinstance(item, str):
parts = item.split(":", 1)
if len(parts) != 2:
raise ValueError(f"invalid instrument format: {item}")
exch_acct, instrument_id = parts
elif isinstance(item, dict):
exch_acct = item.get("exch_acct", "")
instrument_id = item.get("instrument_id", "")
if not exch_acct or not instrument_id:
raise ValueError(f"invalid instrument config: {item}")
else:
raise ValueError(f"unsupported instrument entry: {item}")
exch_inst = ExchangeAccounts.instance().get_exchange_instrument(
exch_acct=exch_acct, instrument_id=instrument_id
)
assert exch_inst is not None, f"no ExchangeInstrument for {exch_acct}:{instrument_id}"
exch_inst.user_data_["exch_acct"] = exch_acct
instruments.append(exch_inst)
return instruments
async def run(self) -> None:
oneshot = App.instance().get_argument("oneshot", False)
while True:
await self.engine_.run_once()
if oneshot:
break
sleep_for = self.engine_.sleep_seconds_until_next_cycle()
await asyncio.sleep(sleep_for)
async def _on_data_quality(self, request: web.Request) -> web.Response:
fmt = request.query.get("format", "html").lower()
quality = self.engine_.quality_dicts()
if fmt == "json":
return web.json_response(quality)
return web.Response(text=HtmlRenderer.render_data_quality_html(quality), content_type="text/html")
async def _on_pair_selection(self, request: web.Request) -> web.Response:
fmt = request.query.get("format", "html").lower()
pairs = self.engine_.pair_dicts()
if fmt == "json":
return web.json_response(pairs)
return web.Response(text=HtmlRenderer.render_pair_selection_html(pairs), content_type="text/html")
class HtmlRenderer:
@staticmethod
def render_data_quality_html(quality: List[Dict[str, Any]]) -> str:
rows = "".join(
f"<tr>"
f"<td>{q.get('instrument','')}</td>"
f"<td>{q.get('record_count','')}</td>"
f"<td>{q.get('latest_tstamp','')}</td>"
f"<td>{q.get('status','')}</td>"
f"<td>{q.get('reason','')}</td>"
f"</tr>"
for q in sorted(quality, key=lambda x: str(x.get("instrument", "")))
)
return f"""
<!DOCTYPE html>
<html>
<head>
<meta charset='utf-8'/>
<title>Data Quality</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ccc; padding: 8px; text-align: left; }}
th {{ background: #f2f2f2; }}
</style>
</head>
<body>
<h2>Data Quality</h2>
<table>
<thead>
<tr><th>Instrument</th><th>Records</th><th>Latest</th><th>Status</th><th>Reason</th></tr>
</thead>
<tbody>{rows}</tbody>
</table>
</body>
</html>
"""
@staticmethod
def render_pair_selection_html(pairs: List[Dict[str, Any]]) -> str:
if not pairs:
body = "<p>No pairs available. Check data quality and try again.</p>"
else:
body_rows = []
for p in pairs:
body_rows.append(
"<tr>"
f"<td>{p.get('instrument_a','')}</td>"
f"<td>{p.get('instrument_b','')}</td>"
f"<td data-value='{p.get('rank_eg',0)}'>{p.get('rank_eg','')}</td>"
f"<td data-value='{p.get('rank_adf',0)}'>{p.get('rank_adf','')}</td>"
f"<td data-value='{p.get('rank_j',0)}'>{p.get('rank_j','')}</td>"
f"<td data-value='{p.get('pvalue_eg','')}'>{p.get('pvalue_eg','')}</td>"
f"<td data-value='{p.get('pvalue_adf','')}'>{p.get('pvalue_adf','')}</td>"
f"<td data-value='{p.get('pvalue_j','')}'>{p.get('pvalue_j','')}</td>"
"</tr>"
)
body = "\n".join(body_rows)
return f"""
<!DOCTYPE html>
<html>
<head>
<meta charset='utf-8'/>
<title>Pair Selection</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ccc; padding: 8px; text-align: left; }}
th.sortable {{ cursor: pointer; background: #f2f2f2; }}
</style>
</head>
<body>
<h2>Pair Selection</h2>
<table id="pairs-table">
<thead>
<tr>
<th>Instrument A</th>
<th>Instrument B</th>
<th class="sortable" data-type="num">Rank-EG</th>
<th class="sortable" data-type="num">Rank-ADF</th>
<th class="sortable" data-type="num">Rank-J</th>
<th>EG p-value</th>
<th>ADF p-value</th>
<th>Johansen pseudo p</th>
</tr>
</thead>
<tbody>
{body}
</tbody>
</table>
<script>
(function() {{
const table = document.getElementById('pairs-table');
if (!table) return;
const getValue = (cell) => {{
const val = cell.getAttribute('data-value');
const num = parseFloat(val);
return isNaN(num) ? val : num;
}};
const toggleSort = (index, isNumeric) => {{
const tbody = table.querySelector('tbody');
const rows = Array.from(tbody.querySelectorAll('tr'));
const th = table.querySelectorAll('th')[index];
const dir = th.getAttribute('data-dir') === 'asc' ? 'desc' : 'asc';
th.setAttribute('data-dir', dir);
rows.sort((a, b) => {{
const va = getValue(a.children[index]);
const vb = getValue(b.children[index]);
if (isNumeric && !isNaN(va) && !isNaN(vb)) {{
return dir === 'asc' ? va - vb : vb - va;
}}
return dir === 'asc'
? String(va).localeCompare(String(vb))
: String(vb).localeCompare(String(va));
}});
tbody.innerHTML = '';
rows.forEach(r => tbody.appendChild(r));
}};
table.querySelectorAll('th.sortable').forEach((th, idx) => {{
th.addEventListener('click', () => toggleSort(idx, th.dataset.type === 'num'));
}});
}})();
</script>
</body>
</html>
"""
if __name__ == "__main__":
App()
CvttAppConfig()
PairSelector()
App.instance().run()