227 lines
8.4 KiB
Python
227 lines
8.4 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import Any, Dict, List
|
|
|
|
from aiohttp import web
|
|
|
|
from cvttpy_tools.app import App
|
|
from cvttpy_tools.base import NamedObject
|
|
from cvttpy_tools.config import CvttAppConfig
|
|
from cvttpy_tools.logger import Log
|
|
from cvttpy_tools.web.rest_service import RestService
|
|
from cvttpy_trading.trading.exchange_config import ExchangeAccounts
|
|
from cvttpy_trading.trading.instrument import ExchangeInstrument
|
|
|
|
from pairs_trading.lib.pair_selector_engine import PairSelectionEngine
|
|
|
|
|
|
class PairSelector(NamedObject):
|
|
instruments_: List[ExchangeInstrument]
|
|
engine_: PairSelectionEngine
|
|
rest_service_: RestService
|
|
|
|
def __init__(self) -> None:
|
|
App.instance().add_cmdline_arg("--oneshot", action="store_true", default=False)
|
|
App.instance().add_call(App.Stage.Config, self._on_config())
|
|
App.instance().add_call(App.Stage.Run, self.run())
|
|
|
|
async def _on_config(self) -> None:
|
|
cfg = CvttAppConfig.instance()
|
|
self.instruments_ = self._load_instruments(cfg)
|
|
price_field = cfg.get_value("model/stat_model_price", "close")
|
|
|
|
self.engine_ = PairSelectionEngine(
|
|
config=cfg,
|
|
instruments=self.instruments_,
|
|
price_field=price_field,
|
|
)
|
|
|
|
self.rest_service_ = RestService(config_key="/api/REST")
|
|
self.rest_service_.add_handler("GET", "/data_quality", self._on_data_quality)
|
|
self.rest_service_.add_handler("GET", "/pair_selection", self._on_pair_selection)
|
|
|
|
def _load_instruments(self, cfg: CvttAppConfig) -> List[ExchangeInstrument]:
|
|
instruments_cfg = cfg.get_value("instruments", [])
|
|
instruments: List[ExchangeInstrument] = []
|
|
assert len(instruments_cfg) >= 2, "at least two instruments required"
|
|
for item in instruments_cfg:
|
|
if isinstance(item, str):
|
|
parts = item.split(":", 1)
|
|
if len(parts) != 2:
|
|
raise ValueError(f"invalid instrument format: {item}")
|
|
exch_acct, instrument_id = parts
|
|
elif isinstance(item, dict):
|
|
exch_acct = item.get("exch_acct", "")
|
|
instrument_id = item.get("instrument_id", "")
|
|
if not exch_acct or not instrument_id:
|
|
raise ValueError(f"invalid instrument config: {item}")
|
|
else:
|
|
raise ValueError(f"unsupported instrument entry: {item}")
|
|
|
|
exch_inst = ExchangeAccounts.instance().get_exchange_instrument(
|
|
exch_acct=exch_acct, instrument_id=instrument_id
|
|
)
|
|
assert exch_inst is not None, f"no ExchangeInstrument for {exch_acct}:{instrument_id}"
|
|
exch_inst.user_data_["exch_acct"] = exch_acct
|
|
instruments.append(exch_inst)
|
|
return instruments
|
|
|
|
async def run(self) -> None:
|
|
oneshot = App.instance().get_argument("oneshot", False)
|
|
while True:
|
|
await self.engine_.run_once()
|
|
if oneshot:
|
|
break
|
|
sleep_for = self.engine_.sleep_seconds_until_next_cycle()
|
|
await asyncio.sleep(sleep_for)
|
|
|
|
async def _on_data_quality(self, request: web.Request) -> web.Response:
|
|
fmt = request.query.get("format", "html").lower()
|
|
quality = self.engine_.quality_dicts()
|
|
if fmt == "json":
|
|
return web.json_response(quality)
|
|
return web.Response(text=HtmlRenderer.render_data_quality_html(quality), content_type="text/html")
|
|
|
|
async def _on_pair_selection(self, request: web.Request) -> web.Response:
|
|
fmt = request.query.get("format", "html").lower()
|
|
pairs = self.engine_.pair_dicts()
|
|
if fmt == "json":
|
|
return web.json_response(pairs)
|
|
return web.Response(text=HtmlRenderer.render_pair_selection_html(pairs), content_type="text/html")
|
|
|
|
class HtmlRenderer:
|
|
@staticmethod
|
|
def render_data_quality_html(quality: List[Dict[str, Any]]) -> str:
|
|
rows = "".join(
|
|
f"<tr>"
|
|
f"<td>{q.get('instrument','')}</td>"
|
|
f"<td>{q.get('record_count','')}</td>"
|
|
f"<td>{q.get('latest_tstamp','')}</td>"
|
|
f"<td>{q.get('status','')}</td>"
|
|
f"<td>{q.get('reason','')}</td>"
|
|
f"</tr>"
|
|
for q in sorted(quality, key=lambda x: str(x.get("instrument", "")))
|
|
)
|
|
return f"""
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<meta charset='utf-8'/>
|
|
<title>Data Quality</title>
|
|
<style>
|
|
body {{ font-family: Arial, sans-serif; margin: 20px; }}
|
|
table {{ border-collapse: collapse; width: 100%; }}
|
|
th, td {{ border: 1px solid #ccc; padding: 8px; text-align: left; }}
|
|
th {{ background: #f2f2f2; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h2>Data Quality</h2>
|
|
<table>
|
|
<thead>
|
|
<tr><th>Instrument</th><th>Records</th><th>Latest</th><th>Status</th><th>Reason</th></tr>
|
|
</thead>
|
|
<tbody>{rows}</tbody>
|
|
</table>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
@staticmethod
|
|
def render_pair_selection_html(pairs: List[Dict[str, Any]]) -> str:
|
|
if not pairs:
|
|
body = "<p>No pairs available. Check data quality and try again.</p>"
|
|
else:
|
|
body_rows = []
|
|
for p in pairs:
|
|
body_rows.append(
|
|
"<tr>"
|
|
f"<td>{p.get('instrument_a','')}</td>"
|
|
f"<td>{p.get('instrument_b','')}</td>"
|
|
f"<td data-value='{p.get('rank_eg',0)}'>{p.get('rank_eg','')}</td>"
|
|
f"<td data-value='{p.get('rank_adf',0)}'>{p.get('rank_adf','')}</td>"
|
|
f"<td data-value='{p.get('rank_j',0)}'>{p.get('rank_j','')}</td>"
|
|
f"<td data-value='{p.get('pvalue_eg','')}'>{p.get('pvalue_eg','')}</td>"
|
|
f"<td data-value='{p.get('pvalue_adf','')}'>{p.get('pvalue_adf','')}</td>"
|
|
f"<td data-value='{p.get('pvalue_j','')}'>{p.get('pvalue_j','')}</td>"
|
|
"</tr>"
|
|
)
|
|
body = "\n".join(body_rows)
|
|
|
|
return f"""
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<meta charset='utf-8'/>
|
|
<title>Pair Selection</title>
|
|
<style>
|
|
body {{ font-family: Arial, sans-serif; margin: 20px; }}
|
|
table {{ border-collapse: collapse; width: 100%; }}
|
|
th, td {{ border: 1px solid #ccc; padding: 8px; text-align: left; }}
|
|
th.sortable {{ cursor: pointer; background: #f2f2f2; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h2>Pair Selection</h2>
|
|
<table id="pairs-table">
|
|
<thead>
|
|
<tr>
|
|
<th>Instrument A</th>
|
|
<th>Instrument B</th>
|
|
<th class="sortable" data-type="num">Rank-EG</th>
|
|
<th class="sortable" data-type="num">Rank-ADF</th>
|
|
<th class="sortable" data-type="num">Rank-J</th>
|
|
<th>EG p-value</th>
|
|
<th>ADF p-value</th>
|
|
<th>Johansen pseudo p</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody>
|
|
{body}
|
|
</tbody>
|
|
</table>
|
|
<script>
|
|
(function() {{
|
|
const table = document.getElementById('pairs-table');
|
|
if (!table) return;
|
|
const getValue = (cell) => {{
|
|
const val = cell.getAttribute('data-value');
|
|
const num = parseFloat(val);
|
|
return isNaN(num) ? val : num;
|
|
}};
|
|
const toggleSort = (index, isNumeric) => {{
|
|
const tbody = table.querySelector('tbody');
|
|
const rows = Array.from(tbody.querySelectorAll('tr'));
|
|
const th = table.querySelectorAll('th')[index];
|
|
const dir = th.getAttribute('data-dir') === 'asc' ? 'desc' : 'asc';
|
|
th.setAttribute('data-dir', dir);
|
|
rows.sort((a, b) => {{
|
|
const va = getValue(a.children[index]);
|
|
const vb = getValue(b.children[index]);
|
|
if (isNumeric && !isNaN(va) && !isNaN(vb)) {{
|
|
return dir === 'asc' ? va - vb : vb - va;
|
|
}}
|
|
return dir === 'asc'
|
|
? String(va).localeCompare(String(vb))
|
|
: String(vb).localeCompare(String(va));
|
|
}});
|
|
tbody.innerHTML = '';
|
|
rows.forEach(r => tbody.appendChild(r));
|
|
}};
|
|
table.querySelectorAll('th.sortable').forEach((th, idx) => {{
|
|
th.addEventListener('click', () => toggleSort(idx, th.dataset.type === 'num'));
|
|
}});
|
|
}})();
|
|
</script>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
if __name__ == "__main__":
|
|
App()
|
|
CvttAppConfig()
|
|
PairSelector()
|
|
App.instance().run()
|