Compare commits

..

2 Commits

Author SHA1 Message Date
Oleg Sheynin
8a207a13a0 . 2026-01-30 20:21:44 +00:00
Oleg Sheynin
38c3c73021 stats history for pair selector 2026-01-28 23:27:52 +00:00
8 changed files with 195 additions and 552 deletions

1
.gitignore vendored
View File

@ -9,4 +9,3 @@ cvttpy
# SpecStory explanation file # SpecStory explanation file
.specstory/.what-is-this.md .specstory/.what-is-this.md
results/ results/
tmp/

44
.vscode/launch.json vendored
View File

@ -17,6 +17,50 @@
"PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.." "PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.."
}, },
}, },
{
"name": "-------- Live Pair Trading --------",
},
{
"name": "PAIR TRADER",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "${workspaceFolder}/apps/pair_trader.py",
"console": "integratedTerminal",
"env": {
"PYTHONPATH": "${workspaceFolder}/..",
"CONFIG_SERVICE": "cloud16.cvtt.vpn:6789",
"CVTT_URL": "http://cvtt-tester-01.cvtt.vpn:23456",
"MODEL_CONFIG": "vecm",
"PAIR_TRADER_REST_PORT": "54320"
},
"args": [
// "--config=${workspaceFolder}/configuration/pair_trader.cfg",
"--config=http://cloud16.cvtt.vpn:6789/apps/pairs_trading/pair_trader",
"--book_id=TSTBOOK_PT_20260113",
"--instrument_A=COINBASE_AT:PAIR-ADA-USD",
"--instrument_B=COINBASE_AT:PAIR-SOL-USD",
],
},
{
"name": "PAIR SELECTOR",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "${workspaceFolder}/apps/pair_selector.py",
"console": "integratedTerminal",
"env": {
"PYTHONPATH": "${workspaceFolder}/..",
"CONFIG_SERVICE": "cloud16.cvtt.vpn:6789",
// "CVTT_URL": "http://cvtt-tester-01.cvtt.vpn:23456",
"CVTT_URL": "http://dev-server-02.cvtt.vpn:23456",
"PAIR_SELECTOR_REST_PORT": "44320"
},
"args": [
// "--config=${workspaceFolder}/configuration/pair_trader.cfg",
"--config=http://cloud16.cvtt.vpn:6789/apps/pairs_trading/pair_selector",
],
},
{ {
"name": "-------- VECM --------", "name": "-------- VECM --------",
}, },

View File

@ -6,6 +6,7 @@
], ],
"python.testing.cwd": "${workspaceFolder}", "python.testing.cwd": "${workspaceFolder}",
"python.testing.autoTestDiscoverOnSaveEnabled": true, "python.testing.autoTestDiscoverOnSaveEnabled": true,
"python.defaultInterpreterPath": "/home/oleg/.pyenv/python3.12-venv/bin/python3",
"python.testing.pytestPath": "python3", "python.testing.pytestPath": "python3",
"python.analysis.extraPaths": [ "python.analysis.extraPaths": [
"${workspaceFolder}", "${workspaceFolder}",
@ -15,5 +16,4 @@
"python.envFile": "${workspaceFolder}/.env", "python.envFile": "${workspaceFolder}/.env",
"python.testing.debugPort": 3000, "python.testing.debugPort": 3000,
"python.testing.promptToConfigure": false, "python.testing.promptToConfigure": false,
"python.defaultInterpreterPath": "/home/oleg/.pyenv/python3.12-venv/bin/python"
} }

View File

@ -1 +1 @@
0.0.7 0.0.6

View File

@ -1,19 +1,98 @@
from __future__ import annotations from __future__ import annotations
import asyncio
from typing import Any, Dict, List from typing import Any, Dict, List
from aiohttp import web
from cvttpy_tools.app import App from cvttpy_tools.app import App
from cvttpy_tools.base import NamedObject from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import CvttAppConfig from cvttpy_tools.config import CvttAppConfig
from cvttpy_tools.logger import Log
from cvttpy_tools.web.rest_service import RestService
from cvttpy_trading.trading.exchange_config import ExchangeAccounts
from cvttpy_trading.trading.instrument import ExchangeInstrument
from pairs_trading.lib.pair_selector_engine import PairSelectionEngine
class HtmlRenderer(NamedObject): class PairSelector(NamedObject):
instruments_: List[ExchangeInstrument]
engine_: PairSelectionEngine
rest_service_: RestService
def __init__(self) -> None: def __init__(self) -> None:
pass App.instance().add_cmdline_arg("--oneshot", action="store_true", default=False)
App.instance().add_call(App.Stage.Config, self._on_config())
App.instance().add_call(App.Stage.Run, self.run())
async def _on_config(self) -> None:
cfg = CvttAppConfig.instance()
self.instruments_ = self._load_instruments(cfg)
price_field = cfg.get_value("model/stat_model_price", "close")
self.engine_ = PairSelectionEngine(
config=cfg,
instruments=self.instruments_,
price_field=price_field,
)
self.rest_service_ = RestService(config_key="/api/REST")
self.rest_service_.add_handler("GET", "/data_quality", self._on_data_quality)
self.rest_service_.add_handler("GET", "/pair_selection", self._on_pair_selection)
def _load_instruments(self, cfg: CvttAppConfig) -> List[ExchangeInstrument]:
instruments_cfg = cfg.get_value("instruments", [])
instruments: List[ExchangeInstrument] = []
assert len(instruments_cfg) >= 2, "at least two instruments required"
for item in instruments_cfg:
if isinstance(item, str):
parts = item.split(":", 1)
if len(parts) != 2:
raise ValueError(f"invalid instrument format: {item}")
exch_acct, instrument_id = parts
elif isinstance(item, dict):
exch_acct = item.get("exch_acct", "")
instrument_id = item.get("instrument_id", "")
if not exch_acct or not instrument_id:
raise ValueError(f"invalid instrument config: {item}")
else:
raise ValueError(f"unsupported instrument entry: {item}")
exch_inst = ExchangeAccounts.instance().get_exchange_instrument(
exch_acct=exch_acct, instrument_id=instrument_id
)
assert exch_inst is not None, f"no ExchangeInstrument for {exch_acct}:{instrument_id}"
exch_inst.user_data_["exch_acct"] = exch_acct
instruments.append(exch_inst)
return instruments
async def run(self) -> None:
oneshot = App.instance().get_argument("oneshot", False)
while True:
await self.engine_.run_once()
if oneshot:
break
sleep_for = self.engine_.sleep_seconds_until_next_cycle()
await asyncio.sleep(sleep_for)
async def _on_data_quality(self, request: web.Request) -> web.Response:
fmt = request.query.get("format", "html").lower()
quality = self.engine_.quality_dicts()
if fmt == "json":
return web.json_response(quality)
return web.Response(text=HtmlRenderer.render_data_quality_html(quality), content_type="text/html")
async def _on_pair_selection(self, request: web.Request) -> web.Response:
fmt = request.query.get("format", "html").lower()
pairs = self.engine_.pair_dicts()
if fmt == "json":
return web.json_response(pairs)
return web.Response(text=HtmlRenderer.render_pair_selection_html(pairs), content_type="text/html")
class HtmlRenderer:
@staticmethod @staticmethod
def render_data_quality(quality: List[Dict[str, Any]]) -> str: def render_data_quality_html(quality: List[Dict[str, Any]]) -> str:
rows = "".join( rows = "".join(
f"<tr>" f"<tr>"
f"<td>{q.get('instrument','')}</td>" f"<td>{q.get('instrument','')}</td>"
@ -50,7 +129,7 @@ class HtmlRenderer(NamedObject):
""" """
@staticmethod @staticmethod
def render_pairs(pairs: List[Dict[str, Any]]) -> str: def render_pair_selection_html(pairs: List[Dict[str, Any]]) -> str:
if not pairs: if not pairs:
body = "<p>No pairs available. Check data quality and try again.</p>" body = "<p>No pairs available. Check data quality and try again.</p>"
else: else:
@ -138,3 +217,10 @@ class HtmlRenderer(NamedObject):
</body> </body>
</html> </html>
""" """
if __name__ == "__main__":
App()
CvttAppConfig()
PairSelector()
App.instance().run()

View File

@ -1,509 +0,0 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from aiohttp import web
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, coint # type: ignore
from statsmodels.tsa.vector_ar.vecm import coint_johansen # type: ignore
from cvttpy_tools.app import App
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import Config, CvttAppConfig
from cvttpy_tools.logger import Log
from cvttpy_tools.timeutils import NanoPerSec, SecPerHour, current_nanoseconds
from cvttpy_tools.web.rest_client import RESTSender
from cvttpy_tools.web.rest_service import RestService
from cvttpy_trading.trading.exchange_config import ExchangeAccounts
from cvttpy_trading.trading.instrument import ExchangeInstrument
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate, MdSummary
from pairs_trading.apps.pair_selector.renderer import HtmlRenderer
@dataclass
class InstrumentQuality(NamedObject):
instrument_: ExchangeInstrument
record_count_: int
latest_tstamp_: Optional[pd.Timestamp]
status_: str
reason_: str
@dataclass
class PairStats(NamedObject):
instrument_a_: ExchangeInstrument
instrument_b_: ExchangeInstrument
pvalue_eg_: Optional[float]
pvalue_adf_: Optional[float]
pvalue_j_: Optional[float]
trace_stat_j_: Optional[float]
rank_eg_: int = 0
rank_adf_: int = 0
rank_j_: int = 0
composite_rank_: int = 0
def as_dict(self) -> Dict[str, Any]:
return {
"instrument_a": self.instrument_a_.instrument_id(),
"instrument_b": self.instrument_b_.instrument_id(),
"pvalue_eg": self.pvalue_eg_,
"pvalue_adf": self.pvalue_adf_,
"pvalue_j": self.pvalue_j_,
"trace_stat_j": self.trace_stat_j_,
"rank_eg": self.rank_eg_,
"rank_adf": self.rank_adf_,
"rank_j": self.rank_j_,
"composite_rank": self.composite_rank_,
}
class DataFetcher(NamedObject):
sender_: RESTSender
interval_sec_: int
history_depth_sec_: int
def __init__(
self,
base_url: str,
interval_sec: int,
history_depth_sec: int,
) -> None:
self.sender_ = RESTSender(base_url=base_url)
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
def fetch(
self, exch_acct: str, inst: ExchangeInstrument
) -> List[MdTradesAggregate]:
rqst_data = {
"exch_acct": exch_acct,
"instrument_id": inst.instrument_id(),
"interval_sec": self.interval_sec_,
"history_depth_sec": self.history_depth_sec_,
}
response = self.sender_.send_post(endpoint="md_summary", post_body=rqst_data)
if response.status_code not in (200, 201):
Log.error(
f"{self.fname()}: error {response.status_code} for {inst.details_short()}: {response.text}"
)
return []
mdsums: List[MdSummary] = MdSummary.from_REST_response(response=response)
return [
mdsum.create_md_trades_aggregate(
exch_acct=exch_acct, exch_inst=inst, interval_sec=self.interval_sec_
)
for mdsum in mdsums
]
class QualityChecker(NamedObject):
interval_sec_: int
def __init__(self, interval_sec: int) -> None:
self.interval_sec_ = interval_sec
def evaluate(
self, inst: ExchangeInstrument, aggr: List[MdTradesAggregate]
) -> InstrumentQuality:
if len(aggr) == 0:
return InstrumentQuality(
instrument_=inst,
record_count_=0,
latest_tstamp_=None,
status_="FAIL",
reason_="no records",
)
aggr_sorted = sorted(aggr, key=lambda a: a.aggr_time_ns_)
latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True)
now_ts = pd.Timestamp.utcnow()
recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_)
if latest_ts <= recency_cutoff:
return InstrumentQuality(
instrument_=inst,
record_count_=len(aggr_sorted),
latest_tstamp_=latest_ts,
status_="FAIL",
reason_=f"stale: latest {latest_ts} <= cutoff {recency_cutoff}",
)
gaps_ok, reason = self._check_gaps(aggr_sorted)
status = "PASS" if gaps_ok else "FAIL"
return InstrumentQuality(
instrument_=inst,
record_count_=len(aggr_sorted),
latest_tstamp_=latest_ts,
status_=status,
reason_=reason,
)
def _check_gaps(self, aggr: List[MdTradesAggregate]) -> Tuple[bool, str]:
NUM_TRADES_THRESHOLD = 50
if len(aggr) < 2:
return True, "ok"
interval_ns = self.interval_sec_ * NanoPerSec
for idx in range(1, len(aggr)):
prev = aggr[idx - 1]
curr = aggr[idx]
delta = curr.aggr_time_ns_ - prev.aggr_time_ns_
missing_intervals = int(delta // interval_ns) - 1
if missing_intervals <= 0:
continue
prev_nt = prev.num_trades_
next_nt = curr.num_trades_
estimate = self._approximate_num_trades(prev_nt, next_nt)
if estimate > NUM_TRADES_THRESHOLD:
return False, (
f"gap of {missing_intervals} interval(s), est num_trades={estimate} > {NUM_TRADES_THRESHOLD}"
)
return True, "ok"
@staticmethod
def _approximate_num_trades(prev_nt: int, next_nt: int) -> float:
if prev_nt is None and next_nt is None:
return 0.0
if prev_nt is None:
return float(next_nt)
if next_nt is None:
return float(prev_nt)
return (prev_nt + next_nt) / 2.0
class PairAnalyzer(NamedObject):
price_field_: str
interval_sec_: int
def __init__(self, price_field: str, interval_sec: int) -> None:
self.price_field_ = price_field
self.interval_sec_ = interval_sec
def analyze(
self, series: Dict[ExchangeInstrument, pd.DataFrame]
) -> List[PairStats]:
instruments = list(series.keys())
results: List[PairStats] = []
for i in range(len(instruments)):
for j in range(i + 1, len(instruments)):
inst_a = instruments[i]
inst_b = instruments[j]
df_a = series[inst_a][["tstamp", "price"]].rename(
columns={"price": "price_a"}
)
df_b = series[inst_b][["tstamp", "price"]].rename(
columns={"price": "price_b"}
)
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
"tstamp"
)
stats = self._compute_stats(inst_a, inst_b, merged)
if stats:
results.append(stats)
self._rank(results)
return results
def _compute_stats(
self,
inst_a: ExchangeInstrument,
inst_b: ExchangeInstrument,
merged: pd.DataFrame,
) -> Optional[PairStats]:
if len(merged) < 2:
return None
px_a = merged["price_a"].astype(float)
px_b = merged["price_b"].astype(float)
std_a = float(px_a.std())
std_b = float(px_b.std())
if std_a == 0 or std_b == 0:
return None
z_a = (px_a - float(px_a.mean())) / std_a
z_b = (px_b - float(px_b.mean())) / std_b
p_eg: Optional[float]
p_adf: Optional[float]
p_j: Optional[float]
trace_stat: Optional[float]
try:
p_eg = float(coint(z_a, z_b)[1])
except Exception as exc:
Log.warning(
f"{self.fname()}: EG failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}"
)
p_eg = None
try:
spread = z_a - z_b
p_adf = float(adfuller(spread, maxlag=1, regression="c")[1])
except Exception as exc:
Log.warning(
f"{self.fname()}: ADF failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}"
)
p_adf = None
try:
data = np.column_stack([z_a, z_b])
res = coint_johansen(data, det_order=0, k_ar_diff=1)
trace_stat = float(res.lr1[0])
cv10, cv5, cv1 = res.cvt[0]
if trace_stat > cv1:
p_j = 0.01
elif trace_stat > cv5:
p_j = 0.05
elif trace_stat > cv10:
p_j = 0.10
else:
p_j = 1.0
except Exception as exc:
Log.warning(
f"{self.fname()}: Johansen failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}"
)
p_j = None
trace_stat = None
return PairStats(
instrument_a_=inst_a,
instrument_b_=inst_b,
pvalue_eg_=p_eg,
pvalue_adf_=p_adf,
pvalue_j_=p_j,
trace_stat_j_=trace_stat,
)
def _rank(self, results: List[PairStats]) -> None:
self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_")
for res in results:
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
results.sort(key=lambda r: r.composite_rank_)
@staticmethod
def _assign_ranks(results: List[PairStats], key, attr: str) -> None:
values = [key(r) for r in results]
sorted_vals = sorted([v for v in values if v is not None])
for res in results:
val = key(res)
if val is None:
setattr(res, attr, len(sorted_vals) + 1)
continue
rank = 1 + sum(1 for v in sorted_vals if v < val)
setattr(res, attr, rank)
class PairSelectionEngine(NamedObject):
config_: object
instruments_: List[ExchangeInstrument]
price_field_: str
fetcher_: DataFetcher
quality_: QualityChecker
analyzer_: PairAnalyzer
interval_sec_: int
history_depth_sec_: int
data_quality_cache_: List[InstrumentQuality]
pair_results_cache_: List[PairStats]
def __init__(
self,
config: Config,
instruments: List[ExchangeInstrument],
price_field: str,
) -> None:
self.config_ = config
self.instruments_ = instruments
self.price_field_ = price_field
interval_sec = int(config.get_value("interval_sec", 0))
history_depth_sec = int(config.get_value("history_depth_hours", 0)) * SecPerHour
base_url = config.get_value("cvtt_base_url", None)
assert interval_sec > 0, "interval_sec must be > 0"
assert history_depth_sec > 0, "history_depth_sec must be > 0"
assert base_url, "cvtt_base_url must be set"
self.fetcher_ = DataFetcher(
base_url=base_url,
interval_sec=interval_sec,
history_depth_sec=history_depth_sec,
)
self.quality_ = QualityChecker(interval_sec=interval_sec)
self.analyzer_ = PairAnalyzer(
price_field=price_field, interval_sec=interval_sec
)
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
self.data_quality_cache_ = []
self.pair_results_cache_ = []
async def run_once(self) -> None:
quality_results: List[InstrumentQuality] = []
price_series: Dict[ExchangeInstrument, pd.DataFrame] = {}
for inst in self.instruments_:
exch_acct = inst.user_data_.get("exch_acct") or inst.exchange_id_
aggr = self.fetcher_.fetch(exch_acct=exch_acct, inst=inst)
q = self.quality_.evaluate(inst, aggr)
quality_results.append(q)
if q.status_ != "PASS":
continue
df = self._to_dataframe(aggr, inst)
if len(df) > 0:
price_series[inst] = df
self.data_quality_cache_ = quality_results
self.pair_results_cache_ = self.analyzer_.analyze(price_series)
def _to_dataframe(
self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument
) -> pd.DataFrame:
rows: List[Dict[str, Any]] = []
for item in aggr:
rows.append(
{
"tstamp": pd.to_datetime(item.aggr_time_ns_, unit="ns", utc=True),
"price": self._extract_price(item, inst),
"num_trades": item.num_trades_,
}
)
df = pd.DataFrame(rows)
return df.sort_values("tstamp").reset_index(drop=True)
def _extract_price(
self, aggr: MdTradesAggregate, inst: ExchangeInstrument
) -> float:
price_field = self.price_field_
# MdTradesAggregate inherits hist bar with fields open_, high_, low_, close_, vwap_
field_map = {
"open": aggr.open_,
"high": aggr.high_,
"low": aggr.low_,
"close": aggr.close_,
"vwap": aggr.vwap_,
}
raw = field_map.get(price_field, aggr.close_)
return inst.get_price(raw)
def sleep_seconds_until_next_cycle(self) -> float:
now_ns = current_nanoseconds()
interval_ns = self.interval_sec_ * NanoPerSec
next_boundary = (now_ns // interval_ns + 1) * interval_ns
return max(0.0, (next_boundary - now_ns) / NanoPerSec)
def quality_dicts(self) -> List[Dict[str, Any]]:
res: List[Dict[str, Any]] = []
for q in self.data_quality_cache_:
res.append(
{
"instrument": q.instrument_.instrument_id(),
"record_count": q.record_count_,
"latest_tstamp": (
q.latest_tstamp_.isoformat() if q.latest_tstamp_ else None
),
"status": q.status_,
"reason": q.reason_,
}
)
return res
def pair_dicts(self) -> List[Dict[str, Any]]:
return [p.as_dict() for p in self.pair_results_cache_]
class PairSelector(NamedObject):
instruments_: List[ExchangeInstrument]
engine_: PairSelectionEngine
rest_service_: RestService
def __init__(self) -> None:
App.instance().add_cmdline_arg("--oneshot", action="store_true", default=False)
App.instance().add_call(App.Stage.Config, self._on_config())
App.instance().add_call(App.Stage.Run, self.run())
async def _on_config(self) -> None:
cfg = CvttAppConfig.instance()
self.instruments_ = self._load_instruments(cfg)
price_field = cfg.get_value("model/stat_model_price", "close")
self.engine_ = PairSelectionEngine(
config=cfg,
instruments=self.instruments_,
price_field=price_field,
)
self.rest_service_ = RestService(config_key="/api/REST")
self.rest_service_.add_handler("GET", "/data_quality", self._on_data_quality)
self.rest_service_.add_handler(
"GET", "/pair_selection", self._on_pair_selection
)
def _load_instruments(self, cfg: CvttAppConfig) -> List[ExchangeInstrument]:
instruments_cfg = cfg.get_value("instruments", [])
instruments: List[ExchangeInstrument] = []
assert len(instruments_cfg) >= 2, "at least two instruments required"
for item in instruments_cfg:
if isinstance(item, str):
parts = item.split(":", 1)
if len(parts) != 2:
raise ValueError(f"invalid instrument format: {item}")
exch_acct, instrument_id = parts
elif isinstance(item, dict):
exch_acct = item.get("exch_acct", "")
instrument_id = item.get("instrument_id", "")
if not exch_acct or not instrument_id:
raise ValueError(f"invalid instrument config: {item}")
else:
raise ValueError(f"unsupported instrument entry: {item}")
exch_inst = ExchangeAccounts.instance().get_exchange_instrument(
exch_acct=exch_acct, instrument_id=instrument_id
)
assert (
exch_inst is not None
), f"no ExchangeInstrument for {exch_acct}:{instrument_id}"
exch_inst.user_data_["exch_acct"] = exch_acct
instruments.append(exch_inst)
return instruments
async def run(self) -> None:
oneshot = App.instance().get_argument("oneshot", False)
while True:
await self.engine_.run_once()
if oneshot:
break
sleep_for = self.engine_.sleep_seconds_until_next_cycle()
await asyncio.sleep(sleep_for)
async def _on_data_quality(self, request: web.Request) -> web.Response:
fmt = request.query.get("format", "html").lower()
quality = self.engine_.quality_dicts()
if fmt == "json":
return web.json_response(quality)
return web.Response(
text=HtmlRenderer.render_data_quality(quality), content_type="text/html"
)
async def _on_pair_selection(self, request: web.Request) -> web.Response:
fmt = request.query.get("format", "html").lower()
pairs = self.engine_.pair_dicts()
if fmt == "json":
return web.json_response(pairs)
return web.Response(
text=HtmlRenderer.render_pairs(pairs), content_type="text/html"
)
if __name__ == "__main__":
App()
CvttAppConfig()
PairSelector()
App.instance().run()

View File

@ -136,9 +136,6 @@ echo "Release version: ${release_version}"
confirm confirm
version_tag="v${release_version}" version_tag="v${release_version}"
if [ "$(git tag -l "${version_tag}")" != "" ]; then
version_tag="${version_tag}.$(date +%Y%m%d_%H%M)"
fi
version_comment="'${version_tag} ${project} ${branch} $(date +%Y-%m-%d)\n${whats_new}'" version_comment="'${version_tag} ${project} ${branch} $(date +%Y-%m-%d)\n${whats_new}'"
cmd_arr=() cmd_arr=()

View File

@ -1,13 +1,12 @@
```python
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from readline import insert_text
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from statsmodels.tsa.stattools import adfuller, coint from statsmodels.tsa.stattools import adfuller, coint
from statsmodels.tsa.vector_ar.vecm import coint_johansen from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.tsa.vector_ar.vecm import coint_johansen # type: ignore
# --- # ---
from cvttpy_tools.base import NamedObject from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import Config from cvttpy_tools.config import Config
@ -30,6 +29,7 @@ class InstrumentQuality(NamedObject):
@dataclass @dataclass
class PairStats(NamedObject): class PairStats(NamedObject):
tstamp_: pd.Timestamp
instrument_a_: ExchangeInstrument instrument_a_: ExchangeInstrument
instrument_b_: ExchangeInstrument instrument_b_: ExchangeInstrument
pvalue_eg_: Optional[float] pvalue_eg_: Optional[float]
@ -112,7 +112,7 @@ class QualityChecker(NamedObject):
latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True) latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True)
now_ts = pd.Timestamp.utcnow() now_ts = pd.Timestamp.utcnow()
recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_) recency_cutoff = now_ts - pd.Timedelta(seconds=10 * self.interval_sec_)
if latest_ts <= recency_cutoff: if latest_ts <= recency_cutoff:
return InstrumentQuality( return InstrumentQuality(
instrument_=inst, instrument_=inst,
@ -165,6 +165,21 @@ class QualityChecker(NamedObject):
return float(prev_nt) return float(prev_nt)
return (prev_nt + next_nt) / 2.0 return (prev_nt + next_nt) / 2.0
class PairOfInstruments(NamedObject):
intrument_a_: ExchangeInstrument
instrument_b_: ExchangeInstrument
stats_history_: List[PairStats]
def __init__(self, instrument_a: ExchangeInstrument, instrument_b: ExchangeInstrument) -> None:
self.instrument_a_ = instrument_a
self.instrument_b_ = instrument_b
self.stats_history_ = []
def add_stats(self, stats: PairStats) -> None:
self.stats_history_.append(stats)
class PairAnalyzer(NamedObject): class PairAnalyzer(NamedObject):
price_field_: str price_field_: str
@ -174,27 +189,24 @@ class PairAnalyzer(NamedObject):
self.price_field_ = price_field self.price_field_ = price_field
self.interval_sec_ = interval_sec self.interval_sec_ = interval_sec
def analyze(self, series: Dict[ExchangeInstrument, pd.DataFrame]) -> List[PairStats]: def analyze(self, pairs: List[PairOfInstruments], series: Dict[ExchangeInstrument, pd.DataFrame]) -> None:
instruments = list(series.keys()) for pair in pairs:
results: List[PairStats] = [] inst_a = pair.instrument_a_
for i in range(len(instruments)): inst_b = pair.instrument_b_
for j in range(i + 1, len(instruments)): df_a = series[inst_a][["tstamp", "price"]].rename(
inst_a = instruments[i] columns={"price": "price_a"}
inst_b = instruments[j] )
df_a = series[inst_a][["tstamp", "price"]].rename( df_b = series[inst_b][["tstamp", "price"]].rename(
columns={"price": "price_a"} columns={"price": "price_b"}
) )
df_b = series[inst_b][["tstamp", "price"]].rename( merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
columns={"price": "price_b"} "tstamp"
) )
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values( stats = self._compute_stats(inst_a, inst_b, merged)
"tstamp" if stats:
) pair.add_stats(stats)
stats = self._compute_stats(inst_a, inst_b, merged)
if stats: self._rank(pairs)
results.append(stats)
self._rank(results)
return results
def _compute_stats( def _compute_stats(
self, self,
@ -252,6 +264,7 @@ class PairAnalyzer(NamedObject):
trace_stat = None trace_stat = None
return PairStats( return PairStats(
tstamp_=pd.Timestamp.utcnow(),
instrument_a_=inst_a, instrument_a_=inst_a,
instrument_b_=inst_b, instrument_b_=inst_b,
pvalue_eg_=p_eg, pvalue_eg_=p_eg,
@ -260,13 +273,15 @@ class PairAnalyzer(NamedObject):
trace_stat_j_=trace_stat, trace_stat_j_=trace_stat,
) )
def _rank(self, results: List[PairStats]) -> None: def _rank(self, pairs: List[PairOfInstruments]) -> None:
self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_") last_stats: List[PairStats] = [pair.stats_history_[-1] for pair in pairs]
self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_") self._assign_ranks(last_stats, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
for res in results: self._assign_ranks(last_stats, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
self._assign_ranks(last_stats, key=lambda r: r.pvalue_j_, attr="rank_j_")
for res in last_stats:
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_ res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
results.sort(key=lambda r: r.composite_rank_) last_stats.sort(key=lambda r: r.composite_rank_)
@staticmethod @staticmethod
def _assign_ranks( def _assign_ranks(
@ -295,6 +310,9 @@ class PairSelectionEngine(NamedObject):
data_quality_cache_: List[InstrumentQuality] data_quality_cache_: List[InstrumentQuality]
pair_results_cache_: List[PairStats] pair_results_cache_: List[PairStats]
pairs_: List[PairOfInstruments]
def __init__( def __init__(
self, self,
config: Config, config: Config,
@ -326,6 +344,14 @@ class PairSelectionEngine(NamedObject):
self.data_quality_cache_ = [] self.data_quality_cache_ = []
self.pair_results_cache_ = [] self.pair_results_cache_ = []
self.create_pairs(instruments)
def create_pairs(self, instruments: List[ExchangeInstrument]) -> None:
self.pairs_ = []
for i in range(len(instruments)):
for j in range(i + 1, len(instruments)):
self.pairs_.append(PairOfInstruments(instruments[i], instruments[j]))
async def run_once(self) -> None: async def run_once(self) -> None:
quality_results: List[InstrumentQuality] = [] quality_results: List[InstrumentQuality] = []
price_series: Dict[ExchangeInstrument, pd.DataFrame] = {} price_series: Dict[ExchangeInstrument, pd.DataFrame] = {}
@ -340,8 +366,9 @@ class PairSelectionEngine(NamedObject):
df = self._to_dataframe(aggr, inst) df = self._to_dataframe(aggr, inst)
if len(df) > 0: if len(df) > 0:
price_series[inst] = df price_series[inst] = df
self.analyzer_.analyze(pairs=self.pairs_, series=price_series)
self.data_quality_cache_ = quality_results self.data_quality_cache_ = quality_results
self.pair_results_cache_ = self.analyzer_.analyze(price_series) self.pair_results_cache_ = [pair.stats_history_[-1] for pair in self.pairs_]
def _to_dataframe(self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument) -> pd.DataFrame: def _to_dataframe(self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument) -> pd.DataFrame:
rows: List[Dict[str, Any]] = [] rows: List[Dict[str, Any]] = []
@ -390,5 +417,4 @@ class PairSelectionEngine(NamedObject):
return res return res
def pair_dicts(self) -> List[Dict[str, Any]]: def pair_dicts(self) -> List[Dict[str, Any]]:
return [p.as_dict() for p in self.pair_results_cache_] return [p.as_dict() for p in [pair.stats_history_[-1] for pair in self.pairs_]]
```