Compare commits

..

9 Commits

Author SHA1 Message Date
Oleg Sheynin
73135ee8c2 before refactoring 2026-02-03 19:35:42 +00:00
Oleg Sheynin
e4a3795793 progress 0.0.7 2026-02-01 23:36:46 +00:00
Oleg Sheynin
f311315ef8 . 2026-01-31 20:11:07 +00:00
Oleg Sheynin
76f9a80ad6 fix 2026-01-28 01:00:17 +00:00
Oleg Sheynin
bf25eb7fb5 progress 0.0.5 2026-01-26 21:46:50 +00:00
Oleg Sheynin
f2a5d6a7ad progress 0.0.4 2026-01-24 20:35:59 +00:00
Oleg Sheynin
b9d479ae8c progress 0.0.4 2026-01-23 20:15:24 +00:00
Oleg Sheynin
e6ae62ebb6 progress 0.0.3 2026-01-22 23:52:17 +00:00
Oleg Sheynin
170e48d646 minor 2026-01-19 18:04:05 +00:00
14 changed files with 1146 additions and 117 deletions

1
.gitignore vendored
View File

@ -9,3 +9,4 @@ cvttpy
# SpecStory explanation file # SpecStory explanation file
.specstory/.what-is-this.md .specstory/.what-is-this.md
results/ results/
tmp/

25
.vscode/launch.json vendored
View File

@ -17,31 +17,6 @@
"PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.." "PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.."
}, },
}, },
{
"name": "-------- Live Pair Trading --------",
},
{
"name": "PAIR TRADER",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "${workspaceFolder}/apps/pair_trader.py",
"console": "integratedTerminal",
"env": {
"PYTHONPATH": "${workspaceFolder}/..",
"CONFIG_SERVICE": "cloud16.cvtt.vpn:6789",
"MODEL_CONFIG": "vecm",
"CVTT_URL": "http://cvtt-tester-01.cvtt.vpn:23456",
// "CVTT_URL": "http://dev-server-02.cvtt.vpn:23456",
},
"args": [
// "--config=${workspaceFolder}/configuration/pair_trader.cfg",
"--config=http://cloud16.cvtt.vpn:6789/apps/pairs_trading/pair_trader",
"--book_id=TSTBOOK_PT_20260113",
"--instrument_A=COINBASE_AT:PAIR-ADA-USD",
"--instrument_B=COINBASE_AT:PAIR-SOL-USD",
],
},
{ {
"name": "-------- VECM --------", "name": "-------- VECM --------",
}, },

View File

@ -6,7 +6,6 @@
], ],
"python.testing.cwd": "${workspaceFolder}", "python.testing.cwd": "${workspaceFolder}",
"python.testing.autoTestDiscoverOnSaveEnabled": true, "python.testing.autoTestDiscoverOnSaveEnabled": true,
"python.defaultInterpreterPath": "/home/oleg/.pyenv/python3.12-venv/bin/python3",
"python.testing.pytestPath": "python3", "python.testing.pytestPath": "python3",
"python.analysis.extraPaths": [ "python.analysis.extraPaths": [
"${workspaceFolder}", "${workspaceFolder}",
@ -16,4 +15,5 @@
"python.envFile": "${workspaceFolder}/.env", "python.envFile": "${workspaceFolder}/.env",
"python.testing.debugPort": 3000, "python.testing.debugPort": 3000,
"python.testing.promptToConfigure": false, "python.testing.promptToConfigure": false,
"python.defaultInterpreterPath": "/home/oleg/.pyenv/python3.12-venv/bin/python"
} }

View File

@ -1,2 +0,0 @@
cvttpy_tools: 1.3.4
cvttpy_trading: 2.4.1

View File

@ -1 +1 @@
0.0.2 0.0.7

View File

@ -0,0 +1,509 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from aiohttp import web
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, coint # type: ignore
from statsmodels.tsa.vector_ar.vecm import coint_johansen # type: ignore
from cvttpy_tools.app import App
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import Config, CvttAppConfig
from cvttpy_tools.logger import Log
from cvttpy_tools.timeutils import NanoPerSec, SecPerHour, current_nanoseconds
from cvttpy_tools.web.rest_client import RESTSender
from cvttpy_tools.web.rest_service import RestService
from cvttpy_trading.trading.exchange_config import ExchangeAccounts
from cvttpy_trading.trading.instrument import ExchangeInstrument
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate, MdSummary
from pairs_trading.apps.pair_selector.renderer import HtmlRenderer
@dataclass
class InstrumentQuality(NamedObject):
instrument_: ExchangeInstrument
record_count_: int
latest_tstamp_: Optional[pd.Timestamp]
status_: str
reason_: str
@dataclass
class PairStats(NamedObject):
instrument_a_: ExchangeInstrument
instrument_b_: ExchangeInstrument
pvalue_eg_: Optional[float]
pvalue_adf_: Optional[float]
pvalue_j_: Optional[float]
trace_stat_j_: Optional[float]
rank_eg_: int = 0
rank_adf_: int = 0
rank_j_: int = 0
composite_rank_: int = 0
def as_dict(self) -> Dict[str, Any]:
return {
"instrument_a": self.instrument_a_.instrument_id(),
"instrument_b": self.instrument_b_.instrument_id(),
"pvalue_eg": self.pvalue_eg_,
"pvalue_adf": self.pvalue_adf_,
"pvalue_j": self.pvalue_j_,
"trace_stat_j": self.trace_stat_j_,
"rank_eg": self.rank_eg_,
"rank_adf": self.rank_adf_,
"rank_j": self.rank_j_,
"composite_rank": self.composite_rank_,
}
class DataFetcher(NamedObject):
sender_: RESTSender
interval_sec_: int
history_depth_sec_: int
def __init__(
self,
base_url: str,
interval_sec: int,
history_depth_sec: int,
) -> None:
self.sender_ = RESTSender(base_url=base_url)
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
def fetch(
self, exch_acct: str, inst: ExchangeInstrument
) -> List[MdTradesAggregate]:
rqst_data = {
"exch_acct": exch_acct,
"instrument_id": inst.instrument_id(),
"interval_sec": self.interval_sec_,
"history_depth_sec": self.history_depth_sec_,
}
response = self.sender_.send_post(endpoint="md_summary", post_body=rqst_data)
if response.status_code not in (200, 201):
Log.error(
f"{self.fname()}: error {response.status_code} for {inst.details_short()}: {response.text}"
)
return []
mdsums: List[MdSummary] = MdSummary.from_REST_response(response=response)
return [
mdsum.create_md_trades_aggregate(
exch_acct=exch_acct, exch_inst=inst, interval_sec=self.interval_sec_
)
for mdsum in mdsums
]
class QualityChecker(NamedObject):
interval_sec_: int
def __init__(self, interval_sec: int) -> None:
self.interval_sec_ = interval_sec
def evaluate(
self, inst: ExchangeInstrument, aggr: List[MdTradesAggregate]
) -> InstrumentQuality:
if len(aggr) == 0:
return InstrumentQuality(
instrument_=inst,
record_count_=0,
latest_tstamp_=None,
status_="FAIL",
reason_="no records",
)
aggr_sorted = sorted(aggr, key=lambda a: a.aggr_time_ns_)
latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True)
now_ts = pd.Timestamp.utcnow()
recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_)
if latest_ts <= recency_cutoff:
return InstrumentQuality(
instrument_=inst,
record_count_=len(aggr_sorted),
latest_tstamp_=latest_ts,
status_="FAIL",
reason_=f"stale: latest {latest_ts} <= cutoff {recency_cutoff}",
)
gaps_ok, reason = self._check_gaps(aggr_sorted)
status = "PASS" if gaps_ok else "FAIL"
return InstrumentQuality(
instrument_=inst,
record_count_=len(aggr_sorted),
latest_tstamp_=latest_ts,
status_=status,
reason_=reason,
)
def _check_gaps(self, aggr: List[MdTradesAggregate]) -> Tuple[bool, str]:
NUM_TRADES_THRESHOLD = 50
if len(aggr) < 2:
return True, "ok"
interval_ns = self.interval_sec_ * NanoPerSec
for idx in range(1, len(aggr)):
prev = aggr[idx - 1]
curr = aggr[idx]
delta = curr.aggr_time_ns_ - prev.aggr_time_ns_
missing_intervals = int(delta // interval_ns) - 1
if missing_intervals <= 0:
continue
prev_nt = prev.num_trades_
next_nt = curr.num_trades_
estimate = self._approximate_num_trades(prev_nt, next_nt)
if estimate > NUM_TRADES_THRESHOLD:
return False, (
f"gap of {missing_intervals} interval(s), est num_trades={estimate} > {NUM_TRADES_THRESHOLD}"
)
return True, "ok"
@staticmethod
def _approximate_num_trades(prev_nt: int, next_nt: int) -> float:
if prev_nt is None and next_nt is None:
return 0.0
if prev_nt is None:
return float(next_nt)
if next_nt is None:
return float(prev_nt)
return (prev_nt + next_nt) / 2.0
class PairAnalyzer(NamedObject):
price_field_: str
interval_sec_: int
def __init__(self, price_field: str, interval_sec: int) -> None:
self.price_field_ = price_field
self.interval_sec_ = interval_sec
def analyze(
self, series: Dict[ExchangeInstrument, pd.DataFrame]
) -> List[PairStats]:
instruments = list(series.keys())
results: List[PairStats] = []
for i in range(len(instruments)):
for j in range(i + 1, len(instruments)):
inst_a = instruments[i]
inst_b = instruments[j]
df_a = series[inst_a][["tstamp", "price"]].rename(
columns={"price": "price_a"}
)
df_b = series[inst_b][["tstamp", "price"]].rename(
columns={"price": "price_b"}
)
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
"tstamp"
)
stats = self._compute_stats(inst_a, inst_b, merged)
if stats:
results.append(stats)
self._rank(results)
return results
def _compute_stats(
self,
inst_a: ExchangeInstrument,
inst_b: ExchangeInstrument,
merged: pd.DataFrame,
) -> Optional[PairStats]:
if len(merged) < 2:
return None
px_a = merged["price_a"].astype(float)
px_b = merged["price_b"].astype(float)
std_a = float(px_a.std())
std_b = float(px_b.std())
if std_a == 0 or std_b == 0:
return None
z_a = (px_a - float(px_a.mean())) / std_a
z_b = (px_b - float(px_b.mean())) / std_b
p_eg: Optional[float]
p_adf: Optional[float]
p_j: Optional[float]
trace_stat: Optional[float]
try:
p_eg = float(coint(z_a, z_b)[1])
except Exception as exc:
Log.warning(
f"{self.fname()}: EG failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}"
)
p_eg = None
try:
spread = z_a - z_b
p_adf = float(adfuller(spread, maxlag=1, regression="c")[1])
except Exception as exc:
Log.warning(
f"{self.fname()}: ADF failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}"
)
p_adf = None
try:
data = np.column_stack([z_a, z_b])
res = coint_johansen(data, det_order=0, k_ar_diff=1)
trace_stat = float(res.lr1[0])
cv10, cv5, cv1 = res.cvt[0]
if trace_stat > cv1:
p_j = 0.01
elif trace_stat > cv5:
p_j = 0.05
elif trace_stat > cv10:
p_j = 0.10
else:
p_j = 1.0
except Exception as exc:
Log.warning(
f"{self.fname()}: Johansen failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}"
)
p_j = None
trace_stat = None
return PairStats(
instrument_a_=inst_a,
instrument_b_=inst_b,
pvalue_eg_=p_eg,
pvalue_adf_=p_adf,
pvalue_j_=p_j,
trace_stat_j_=trace_stat,
)
def _rank(self, results: List[PairStats]) -> None:
self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_")
for res in results:
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
results.sort(key=lambda r: r.composite_rank_)
@staticmethod
def _assign_ranks(results: List[PairStats], key, attr: str) -> None:
values = [key(r) for r in results]
sorted_vals = sorted([v for v in values if v is not None])
for res in results:
val = key(res)
if val is None:
setattr(res, attr, len(sorted_vals) + 1)
continue
rank = 1 + sum(1 for v in sorted_vals if v < val)
setattr(res, attr, rank)
class PairSelectionEngine(NamedObject):
config_: object
instruments_: List[ExchangeInstrument]
price_field_: str
fetcher_: DataFetcher
quality_: QualityChecker
analyzer_: PairAnalyzer
interval_sec_: int
history_depth_sec_: int
data_quality_cache_: List[InstrumentQuality]
pair_results_cache_: List[PairStats]
def __init__(
self,
config: Config,
instruments: List[ExchangeInstrument],
price_field: str,
) -> None:
self.config_ = config
self.instruments_ = instruments
self.price_field_ = price_field
interval_sec = int(config.get_value("interval_sec", 0))
history_depth_sec = int(config.get_value("history_depth_hours", 0)) * SecPerHour
base_url = config.get_value("cvtt_base_url", None)
assert interval_sec > 0, "interval_sec must be > 0"
assert history_depth_sec > 0, "history_depth_sec must be > 0"
assert base_url, "cvtt_base_url must be set"
self.fetcher_ = DataFetcher(
base_url=base_url,
interval_sec=interval_sec,
history_depth_sec=history_depth_sec,
)
self.quality_ = QualityChecker(interval_sec=interval_sec)
self.analyzer_ = PairAnalyzer(
price_field=price_field, interval_sec=interval_sec
)
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
self.data_quality_cache_ = []
self.pair_results_cache_ = []
async def run_once(self) -> None:
quality_results: List[InstrumentQuality] = []
price_series: Dict[ExchangeInstrument, pd.DataFrame] = {}
for inst in self.instruments_:
exch_acct = inst.user_data_.get("exch_acct") or inst.exchange_id_
aggr = self.fetcher_.fetch(exch_acct=exch_acct, inst=inst)
q = self.quality_.evaluate(inst, aggr)
quality_results.append(q)
if q.status_ != "PASS":
continue
df = self._to_dataframe(aggr, inst)
if len(df) > 0:
price_series[inst] = df
self.data_quality_cache_ = quality_results
self.pair_results_cache_ = self.analyzer_.analyze(price_series)
def _to_dataframe(
self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument
) -> pd.DataFrame:
rows: List[Dict[str, Any]] = []
for item in aggr:
rows.append(
{
"tstamp": pd.to_datetime(item.aggr_time_ns_, unit="ns", utc=True),
"price": self._extract_price(item, inst),
"num_trades": item.num_trades_,
}
)
df = pd.DataFrame(rows)
return df.sort_values("tstamp").reset_index(drop=True)
def _extract_price(
self, aggr: MdTradesAggregate, inst: ExchangeInstrument
) -> float:
price_field = self.price_field_
# MdTradesAggregate inherits hist bar with fields open_, high_, low_, close_, vwap_
field_map = {
"open": aggr.open_,
"high": aggr.high_,
"low": aggr.low_,
"close": aggr.close_,
"vwap": aggr.vwap_,
}
raw = field_map.get(price_field, aggr.close_)
return inst.get_price(raw)
def sleep_seconds_until_next_cycle(self) -> float:
now_ns = current_nanoseconds()
interval_ns = self.interval_sec_ * NanoPerSec
next_boundary = (now_ns // interval_ns + 1) * interval_ns
return max(0.0, (next_boundary - now_ns) / NanoPerSec)
def quality_dicts(self) -> List[Dict[str, Any]]:
res: List[Dict[str, Any]] = []
for q in self.data_quality_cache_:
res.append(
{
"instrument": q.instrument_.instrument_id(),
"record_count": q.record_count_,
"latest_tstamp": (
q.latest_tstamp_.isoformat() if q.latest_tstamp_ else None
),
"status": q.status_,
"reason": q.reason_,
}
)
return res
def pair_dicts(self) -> List[Dict[str, Any]]:
return [p.as_dict() for p in self.pair_results_cache_]
class PairSelector(NamedObject):
instruments_: List[ExchangeInstrument]
engine_: PairSelectionEngine
rest_service_: RestService
def __init__(self) -> None:
App.instance().add_cmdline_arg("--oneshot", action="store_true", default=False)
App.instance().add_call(App.Stage.Config, self._on_config())
App.instance().add_call(App.Stage.Run, self.run())
async def _on_config(self) -> None:
cfg = CvttAppConfig.instance()
self.instruments_ = self._load_instruments(cfg)
price_field = cfg.get_value("model/stat_model_price", "close")
self.engine_ = PairSelectionEngine(
config=cfg,
instruments=self.instruments_,
price_field=price_field,
)
self.rest_service_ = RestService(config_key="/api/REST")
self.rest_service_.add_handler("GET", "/data_quality", self._on_data_quality)
self.rest_service_.add_handler(
"GET", "/pair_selection", self._on_pair_selection
)
def _load_instruments(self, cfg: CvttAppConfig) -> List[ExchangeInstrument]:
instruments_cfg = cfg.get_value("instruments", [])
instruments: List[ExchangeInstrument] = []
assert len(instruments_cfg) >= 2, "at least two instruments required"
for item in instruments_cfg:
if isinstance(item, str):
parts = item.split(":", 1)
if len(parts) != 2:
raise ValueError(f"invalid instrument format: {item}")
exch_acct, instrument_id = parts
elif isinstance(item, dict):
exch_acct = item.get("exch_acct", "")
instrument_id = item.get("instrument_id", "")
if not exch_acct or not instrument_id:
raise ValueError(f"invalid instrument config: {item}")
else:
raise ValueError(f"unsupported instrument entry: {item}")
exch_inst = ExchangeAccounts.instance().get_exchange_instrument(
exch_acct=exch_acct, instrument_id=instrument_id
)
assert (
exch_inst is not None
), f"no ExchangeInstrument for {exch_acct}:{instrument_id}"
exch_inst.user_data_["exch_acct"] = exch_acct
instruments.append(exch_inst)
return instruments
async def run(self) -> None:
oneshot = App.instance().get_argument("oneshot", False)
while True:
await self.engine_.run_once()
if oneshot:
break
sleep_for = self.engine_.sleep_seconds_until_next_cycle()
await asyncio.sleep(sleep_for)
async def _on_data_quality(self, request: web.Request) -> web.Response:
fmt = request.query.get("format", "html").lower()
quality = self.engine_.quality_dicts()
if fmt == "json":
return web.json_response(quality)
return web.Response(
text=HtmlRenderer.render_data_quality(quality), content_type="text/html"
)
async def _on_pair_selection(self, request: web.Request) -> web.Response:
fmt = request.query.get("format", "html").lower()
pairs = self.engine_.pair_dicts()
if fmt == "json":
return web.json_response(pairs)
return web.Response(
text=HtmlRenderer.render_pairs(pairs), content_type="text/html"
)
if __name__ == "__main__":
App()
CvttAppConfig()
PairSelector()
App.instance().run()

View File

@ -0,0 +1,394 @@
```python
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, coint
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.tsa.vector_ar.vecm import coint_johansen # type: ignore
# ---
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import Config
from cvttpy_tools.logger import Log
from cvttpy_tools.timeutils import NanoPerSec, SecPerHour, current_nanoseconds
from cvttpy_tools.web.rest_client import RESTSender
# ---
from cvttpy_trading.trading.instrument import ExchangeInstrument
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate, MdSummary
@dataclass
class InstrumentQuality(NamedObject):
instrument_: ExchangeInstrument
record_count_: int
latest_tstamp_: Optional[pd.Timestamp]
status_: str
reason_: str
@dataclass
class PairStats(NamedObject):
instrument_a_: ExchangeInstrument
instrument_b_: ExchangeInstrument
pvalue_eg_: Optional[float]
pvalue_adf_: Optional[float]
pvalue_j_: Optional[float]
trace_stat_j_: Optional[float]
rank_eg_: int = 0
rank_adf_: int = 0
rank_j_: int = 0
composite_rank_: int = 0
def as_dict(self) -> Dict[str, Any]:
return {
"instrument_a": self.instrument_a_.instrument_id(),
"instrument_b": self.instrument_b_.instrument_id(),
"pvalue_eg": self.pvalue_eg_,
"pvalue_adf": self.pvalue_adf_,
"pvalue_j": self.pvalue_j_,
"trace_stat_j": self.trace_stat_j_,
"rank_eg": self.rank_eg_,
"rank_adf": self.rank_adf_,
"rank_j": self.rank_j_,
"composite_rank": self.composite_rank_,
}
class DataFetcher(NamedObject):
sender_: RESTSender
interval_sec_: int
history_depth_sec_: int
def __init__(
self,
base_url: str,
interval_sec: int,
history_depth_sec: int,
) -> None:
self.sender_ = RESTSender(base_url=base_url)
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
def fetch(self, exch_acct: str, inst: ExchangeInstrument) -> List[MdTradesAggregate]:
rqst_data = {
"exch_acct": exch_acct,
"instrument_id": inst.instrument_id(),
"interval_sec": self.interval_sec_,
"history_depth_sec": self.history_depth_sec_,
}
response = self.sender_.send_post(endpoint="md_summary", post_body=rqst_data)
if response.status_code not in (200, 201):
Log.error(
f"{self.fname()}: error {response.status_code} for {inst.details_short()}: {response.text}")
return []
mdsums: List[MdSummary] = MdSummary.from_REST_response(response=response)
return [
mdsum.create_md_trades_aggregate(
exch_acct=exch_acct, exch_inst=inst, interval_sec=self.interval_sec_
)
for mdsum in mdsums
]
class QualityChecker(NamedObject):
interval_sec_: int
def __init__(self, interval_sec: int) -> None:
self.interval_sec_ = interval_sec
def evaluate(self, inst: ExchangeInstrument, aggr: List[MdTradesAggregate]) -> InstrumentQuality:
if len(aggr) == 0:
return InstrumentQuality(
instrument_=inst,
record_count_=0,
latest_tstamp_=None,
status_="FAIL",
reason_="no records",
)
aggr_sorted = sorted(aggr, key=lambda a: a.aggr_time_ns_)
latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True)
now_ts = pd.Timestamp.utcnow()
recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_)
if latest_ts <= recency_cutoff:
return InstrumentQuality(
instrument_=inst,
record_count_=len(aggr_sorted),
latest_tstamp_=latest_ts,
status_="FAIL",
reason_=f"stale: latest {latest_ts} <= cutoff {recency_cutoff}",
)
gaps_ok, reason = self._check_gaps(aggr_sorted)
status = "PASS" if gaps_ok else "FAIL"
return InstrumentQuality(
instrument_=inst,
record_count_=len(aggr_sorted),
latest_tstamp_=latest_ts,
status_=status,
reason_=reason,
)
def _check_gaps(self, aggr: List[MdTradesAggregate]) -> Tuple[bool, str]:
NUM_TRADES_THRESHOLD = 50
if len(aggr) < 2:
return True, "ok"
interval_ns = self.interval_sec_ * NanoPerSec
for idx in range(1, len(aggr)):
prev = aggr[idx - 1]
curr = aggr[idx]
delta = curr.aggr_time_ns_ - prev.aggr_time_ns_
missing_intervals = int(delta // interval_ns) - 1
if missing_intervals <= 0:
continue
prev_nt = prev.num_trades_
next_nt = curr.num_trades_
estimate = self._approximate_num_trades(prev_nt, next_nt)
if estimate > NUM_TRADES_THRESHOLD:
return False, (
f"gap of {missing_intervals} interval(s), est num_trades={estimate} > {NUM_TRADES_THRESHOLD}"
)
return True, "ok"
@staticmethod
def _approximate_num_trades(prev_nt: int, next_nt: int) -> float:
if prev_nt is None and next_nt is None:
return 0.0
if prev_nt is None:
return float(next_nt)
if next_nt is None:
return float(prev_nt)
return (prev_nt + next_nt) / 2.0
class PairAnalyzer(NamedObject):
price_field_: str
interval_sec_: int
def __init__(self, price_field: str, interval_sec: int) -> None:
self.price_field_ = price_field
self.interval_sec_ = interval_sec
def analyze(self, series: Dict[ExchangeInstrument, pd.DataFrame]) -> List[PairStats]:
instruments = list(series.keys())
results: List[PairStats] = []
for i in range(len(instruments)):
for j in range(i + 1, len(instruments)):
inst_a = instruments[i]
inst_b = instruments[j]
df_a = series[inst_a][["tstamp", "price"]].rename(
columns={"price": "price_a"}
)
df_b = series[inst_b][["tstamp", "price"]].rename(
columns={"price": "price_b"}
)
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
"tstamp"
)
stats = self._compute_stats(inst_a, inst_b, merged)
if stats:
results.append(stats)
self._rank(results)
return results
def _compute_stats(
self,
inst_a: ExchangeInstrument,
inst_b: ExchangeInstrument,
merged: pd.DataFrame,
) -> Optional[PairStats]:
if len(merged) < 2:
return None
px_a = merged["price_a"].astype(float)
px_b = merged["price_b"].astype(float)
std_a = float(px_a.std())
std_b = float(px_b.std())
if std_a == 0 or std_b == 0:
return None
z_a = (px_a - float(px_a.mean())) / std_a
z_b = (px_b - float(px_b.mean())) / std_b
p_eg: Optional[float]
p_adf: Optional[float]
p_j: Optional[float]
trace_stat: Optional[float]
try:
p_eg = float(coint(z_a, z_b)[1])
except Exception as exc:
Log.warning(f"{self.fname()}: EG failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}")
p_eg = None
try:
spread = z_a - z_b
p_adf = float(adfuller(spread, maxlag=1, regression="c")[1])
except Exception as exc:
Log.warning(f"{self.fname()}: ADF failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}")
p_adf = None
try:
data = np.column_stack([z_a, z_b])
res = coint_johansen(data, det_order=0, k_ar_diff=1)
trace_stat = float(res.lr1[0])
cv10, cv5, cv1 = res.cvt[0]
if trace_stat > cv1:
p_j = 0.01
elif trace_stat > cv5:
p_j = 0.05
elif trace_stat > cv10:
p_j = 0.10
else:
p_j = 1.0
except Exception as exc:
Log.warning(f"{self.fname()}: Johansen failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}")
p_j = None
trace_stat = None
return PairStats(
instrument_a_=inst_a,
instrument_b_=inst_b,
pvalue_eg_=p_eg,
pvalue_adf_=p_adf,
pvalue_j_=p_j,
trace_stat_j_=trace_stat,
)
def _rank(self, results: List[PairStats]) -> None:
self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_")
for res in results:
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
results.sort(key=lambda r: r.composite_rank_)
@staticmethod
def _assign_ranks(
results: List[PairStats], key, attr: str
) -> None:
values = [key(r) for r in results]
sorted_vals = sorted([v for v in values if v is not None])
for res in results:
val = key(res)
if val is None:
setattr(res, attr, len(sorted_vals) + 1)
continue
rank = 1 + sum(1 for v in sorted_vals if v < val)
setattr(res, attr, rank)
class PairSelectionEngine(NamedObject):
config_: object
instruments_: List[ExchangeInstrument]
price_field_: str
fetcher_: DataFetcher
quality_: QualityChecker
analyzer_: PairAnalyzer
interval_sec_: int
history_depth_sec_: int
data_quality_cache_: List[InstrumentQuality]
pair_results_cache_: List[PairStats]
def __init__(
self,
config: Config,
instruments: List[ExchangeInstrument],
price_field: str,
) -> None:
self.config_ = config
self.instruments_ = instruments
self.price_field_ = price_field
interval_sec = int(config.get_value("interval_sec", 0))
history_depth_sec = int(config.get_value("history_depth_hours", 0)) * SecPerHour
base_url = config.get_value("cvtt_base_url", None)
assert interval_sec > 0, "interval_sec must be > 0"
assert history_depth_sec > 0, "history_depth_sec must be > 0"
assert base_url, "cvtt_base_url must be set"
self.fetcher_ = DataFetcher(
base_url=base_url,
interval_sec=interval_sec,
history_depth_sec=history_depth_sec,
)
self.quality_ = QualityChecker(interval_sec=interval_sec)
self.analyzer_ = PairAnalyzer(price_field=price_field, interval_sec=interval_sec)
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
self.data_quality_cache_ = []
self.pair_results_cache_ = []
async def run_once(self) -> None:
quality_results: List[InstrumentQuality] = []
price_series: Dict[ExchangeInstrument, pd.DataFrame] = {}
for inst in self.instruments_:
exch_acct = inst.user_data_.get("exch_acct") or inst.exchange_id_
aggr = self.fetcher_.fetch(exch_acct=exch_acct, inst=inst)
q = self.quality_.evaluate(inst, aggr)
quality_results.append(q)
if q.status_ != "PASS":
continue
df = self._to_dataframe(aggr, inst)
if len(df) > 0:
price_series[inst] = df
self.data_quality_cache_ = quality_results
self.pair_results_cache_ = self.analyzer_.analyze(price_series)
def _to_dataframe(self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument) -> pd.DataFrame:
rows: List[Dict[str, Any]] = []
for item in aggr:
rows.append(
{
"tstamp": pd.to_datetime(item.aggr_time_ns_, unit="ns", utc=True),
"price": self._extract_price(item, inst),
"num_trades": item.num_trades_,
}
)
df = pd.DataFrame(rows)
return df.sort_values("tstamp").reset_index(drop=True)
def _extract_price(self, aggr: MdTradesAggregate, inst: ExchangeInstrument) -> float:
price_field = self.price_field_
# MdTradesAggregate inherits hist bar with fields open_, high_, low_, close_, vwap_
field_map = {
"open": aggr.open_,
"high": aggr.high_,
"low": aggr.low_,
"close": aggr.close_,
"vwap": aggr.vwap_,
}
raw = field_map.get(price_field, aggr.close_)
return inst.get_price(raw)
def sleep_seconds_until_next_cycle(self) -> float:
now_ns = current_nanoseconds()
interval_ns = self.interval_sec_ * NanoPerSec
next_boundary = (now_ns // interval_ns + 1) * interval_ns
return max(0.0, (next_boundary - now_ns) / NanoPerSec)
def quality_dicts(self) -> List[Dict[str, Any]]:
res: List[Dict[str, Any]] = []
for q in self.data_quality_cache_:
res.append(
{
"instrument": q.instrument_.instrument_id(),
"record_count": q.record_count_,
"latest_tstamp": q.latest_tstamp_.isoformat() if q.latest_tstamp_ else None,
"status": q.status_,
"reason": q.reason_,
}
)
return res
def pair_dicts(self) -> List[Dict[str, Any]]:
return [p.as_dict() for p in self.pair_results_cache_]
```

View File

@ -0,0 +1,140 @@
from __future__ import annotations
from typing import Any, Dict, List
from cvttpy_tools.app import App
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import CvttAppConfig
class HtmlRenderer(NamedObject):
def __init__(self) -> None:
pass
@staticmethod
def render_data_quality(quality: List[Dict[str, Any]]) -> str:
rows = "".join(
f"<tr>"
f"<td>{q.get('instrument','')}</td>"
f"<td>{q.get('record_count','')}</td>"
f"<td>{q.get('latest_tstamp','')}</td>"
f"<td>{q.get('status','')}</td>"
f"<td>{q.get('reason','')}</td>"
f"</tr>"
for q in sorted(quality, key=lambda x: str(x.get("instrument", "")))
)
return f"""
<!DOCTYPE html>
<html>
<head>
<meta charset='utf-8'/>
<title>Data Quality</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ccc; padding: 8px; text-align: left; }}
th {{ background: #f2f2f2; }}
</style>
</head>
<body>
<h2>Data Quality</h2>
<table>
<thead>
<tr><th>Instrument</th><th>Records</th><th>Latest</th><th>Status</th><th>Reason</th></tr>
</thead>
<tbody>{rows}</tbody>
</table>
</body>
</html>
"""
@staticmethod
def render_pairs(pairs: List[Dict[str, Any]]) -> str:
if not pairs:
body = "<p>No pairs available. Check data quality and try again.</p>"
else:
body_rows = []
for p in pairs:
body_rows.append(
"<tr>"
f"<td>{p.get('instrument_a','')}</td>"
f"<td>{p.get('instrument_b','')}</td>"
f"<td data-value='{p.get('rank_eg',0)}'>{p.get('rank_eg','')}</td>"
f"<td data-value='{p.get('rank_adf',0)}'>{p.get('rank_adf','')}</td>"
f"<td data-value='{p.get('rank_j',0)}'>{p.get('rank_j','')}</td>"
f"<td data-value='{p.get('pvalue_eg','')}'>{p.get('pvalue_eg','')}</td>"
f"<td data-value='{p.get('pvalue_adf','')}'>{p.get('pvalue_adf','')}</td>"
f"<td data-value='{p.get('pvalue_j','')}'>{p.get('pvalue_j','')}</td>"
"</tr>"
)
body = "\n".join(body_rows)
return f"""
<!DOCTYPE html>
<html>
<head>
<meta charset='utf-8'/>
<title>Pair Selection</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ccc; padding: 8px; text-align: left; }}
th.sortable {{ cursor: pointer; background: #f2f2f2; }}
</style>
</head>
<body>
<h2>Pair Selection</h2>
<table id="pairs-table">
<thead>
<tr>
<th>Instrument A</th>
<th>Instrument B</th>
<th class="sortable" data-type="num">Rank-EG</th>
<th class="sortable" data-type="num">Rank-ADF</th>
<th class="sortable" data-type="num">Rank-J</th>
<th>EG p-value</th>
<th>ADF p-value</th>
<th>Johansen pseudo p</th>
</tr>
</thead>
<tbody>
{body}
</tbody>
</table>
<script>
(function() {{
const table = document.getElementById('pairs-table');
if (!table) return;
const getValue = (cell) => {{
const val = cell.getAttribute('data-value');
const num = parseFloat(val);
return isNaN(num) ? val : num;
}};
const toggleSort = (index, isNumeric) => {{
const tbody = table.querySelector('tbody');
const rows = Array.from(tbody.querySelectorAll('tr'));
const th = table.querySelectorAll('th')[index];
const dir = th.getAttribute('data-dir') === 'asc' ? 'desc' : 'asc';
th.setAttribute('data-dir', dir);
rows.sort((a, b) => {{
const va = getValue(a.children[index]);
const vb = getValue(b.children[index]);
if (isNumeric && !isNaN(va) && !isNaN(vb)) {{
return dir === 'asc' ? va - vb : vb - va;
}}
return dir === 'asc'
? String(va).localeCompare(String(vb))
: String(vb).localeCompare(String(va));
}});
tbody.innerHTML = '';
rows.forEach(r => tbody.appendChild(r));
}};
table.querySelectorAll('th.sortable').forEach((th, idx) => {{
th.addEventListener('click', () => toggleSort(idx, th.dataset.type === 'num'));
}});
}})();
</script>
</body>
</html>
"""

View File

@ -141,12 +141,11 @@ class PairTrader(NamedObject):
) )
async def _on_md_summary(self, history: List[MdTradesAggregate], exch_inst: ExchangeInstrument) -> None: async def _on_md_summary(self, history: List[MdTradesAggregate], exch_inst: ExchangeInstrument) -> None:
# URGENT before calling stragegy, make sure that **BOTH** instruments market data is combined.
Log.info(f"{self.fname()}: got {exch_inst.details_short()} data") Log.info(f"{self.fname()}: got {exch_inst.details_short()} data")
self.latest_history_[exch_inst] = history self.latest_history_[exch_inst] = history
if len(self.latest_history_) == 2: if len(self.latest_history_) == 2:
from itertools import chain from itertools import chain
all_aggrs = sorted(list(chain.from_iterable(self.latest_history_.values())), key=lambda X: X.time_ns_) all_aggrs = sorted(list(chain.from_iterable(self.latest_history_.values())), key=lambda X: X.aggr_time_ns_)
await self.live_strategy_.on_mkt_data_hist_snapshot(hist_aggr=all_aggrs) await self.live_strategy_.on_mkt_data_hist_snapshot(hist_aggr=all_aggrs)
self.latest_history_ = {} self.latest_history_ = {}

View File

@ -136,6 +136,9 @@ echo "Release version: ${release_version}"
confirm confirm
version_tag="v${release_version}" version_tag="v${release_version}"
if [ "$(git tag -l "${version_tag}")" != "" ]; then
version_tag="${version_tag}.$(date +%Y%m%d_%H%M)"
fi
version_comment="'${version_tag} ${project} ${branch} $(date +%Y-%m-%d)\n${whats_new}'" version_comment="'${version_tag} ${project} ${branch} $(date +%Y-%m-%d)\n${whats_new}'"
cmd_arr=() cmd_arr=()

View File

@ -1,86 +1,82 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from typing import Callable, Coroutine, Dict, Any, List, Optional, Set from typing import Dict, Any, List, Optional, Set
import requests import requests
from cvttpy_tools.base import NamedObject from cvttpy_tools.base import NamedObject
from cvttpy_tools.app import App
from cvttpy_tools.logger import Log from cvttpy_tools.logger import Log
from cvttpy_tools.config import Config from cvttpy_tools.config import Config
from cvttpy_tools.timer import Timer from cvttpy_tools.timer import Timer
from cvttpy_tools.timeutils import NanosT, current_seconds, NanoPerSec from cvttpy_tools.timeutils import NanosT, current_seconds
from cvttpy_tools.settings.cvtt_types import InstrumentIdT, IntervalSecT from cvttpy_tools.settings.cvtt_types import InstrumentIdT, IntervalSecT
from cvttpy_tools.web.rest_client import RESTSender
# --- # ---
from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar
from cvttpy_trading.trading.instrument import ExchangeInstrument from cvttpy_trading.trading.instrument import ExchangeInstrument
from cvttpy_trading.trading.accounting.exch_account import ExchangeAccountNameT from cvttpy_trading.trading.accounting.exch_account import ExchangeAccountNameT
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate, MdSummary, MdSummaryCallbackT
from cvttpy_trading.trading.exchange_config import ExchangeAccounts from cvttpy_trading.trading.exchange_config import ExchangeAccounts
# --- # ---
from pairs_trading.lib.live.rest_client import RESTSender
class MdSummary(HistMdBar): # class MdSummary(HistMdBar):
def __init__( # def __init__(
self, # self,
ts_ns: int, # ts_ns: int,
open: float, # open: float,
high: float, # high: float,
low: float, # low: float,
close: float, # close: float,
volume: float, # volume: float,
vwap: float, # vwap: float,
num_trades: int, # num_trades: int,
): # ):
super().__init__(ts=ts_ns) # super().__init__(ts=ts_ns)
self.open_ = open # self.open_ = open
self.high_ = high # self.high_ = high
self.low_ = low # self.low_ = low
self.close_ = close # self.close_ = close
self.volume_ = volume # self.volume_ = volume
self.vwap_ = vwap # self.vwap_ = vwap
self.num_trades_ = num_trades # self.num_trades_ = num_trades
@classmethod # @classmethod
def from_REST_response(cls, response: requests.Response) -> List[MdSummary]: # def from_REST_response(cls, response: requests.Response) -> List[MdSummary]:
res: List[MdSummary] = [] # res: List[MdSummary] = []
jresp = response.json() # jresp = response.json()
hist_data = jresp.get("historical_data", []) # hist_data = jresp.get("historical_data", [])
for hd in hist_data: # for hd in hist_data:
res.append( # res.append(
MdSummary( # MdSummary(
ts_ns=hd["time_ns"], # ts_ns=hd["time_ns"],
open=hd["open"], # open=hd["open"],
high=hd["high"], # high=hd["high"],
low=hd["low"], # low=hd["low"],
close=hd["close"], # close=hd["close"],
volume=hd["volume"], # volume=hd["volume"],
vwap=hd["vwap"], # vwap=hd["vwap"],
num_trades=hd["num_trades"], # num_trades=hd["num_trades"],
) # )
) # )
return res # return res
def create_md_trades_aggregate( # def create_md_trades_aggregate(
self, # self,
exch_acct: ExchangeAccountNameT, # exch_acct: ExchangeAccountNameT,
exch_inst: ExchangeInstrument, # exch_inst: ExchangeInstrument,
interval_sec: IntervalSecT, # interval_sec: IntervalSecT,
) -> MdTradesAggregate: # ) -> MdTradesAggregate:
res = MdTradesAggregate( # res = MdTradesAggregate(
exch_acct=exch_acct, # exch_acct=exch_acct,
exch_inst=exch_inst, # exch_inst=exch_inst,
interval_ns=interval_sec * NanoPerSec, # interval_ns=interval_sec * NanoPerSec,
) # )
res.set(mdbar=self) # res.set(mdbar=self)
return res # return res
MdSummaryCallbackT = Callable[[List[MdTradesAggregate]], Coroutine] # MdSummaryCallbackT = Callable[[List[MdTradesAggregate]], Coroutine]
class MdSummaryCollector(NamedObject): class MdSummaryCollector(NamedObject):
@ -163,6 +159,7 @@ class MdSummaryCollector(NamedObject):
) )
return None return None
res = MdSummary.from_REST_response(response=response) res = MdSummary.from_REST_response(response=response)
Log.info(f"DEBUG *** {self.exch_inst_.base_asset_id_}: {res[-1].tstamp_}")
return None if len(res) == 0 else res[-1] return None if len(res) == 0 else res[-1]
def is_empty(self) -> bool: def is_empty(self) -> bool:
@ -195,15 +192,16 @@ class MdSummaryCollector(NamedObject):
Log.info(f"{self.fname()} Timer for {self.exch_inst_.details_short()} is set to run in {start_in} sec") Log.info(f"{self.fname()} Timer for {self.exch_inst_.details_short()} is set to run in {start_in} sec")
def next_load_time(self) -> NanosT: def next_load_time(self) -> NanosT:
ALLOW_LAG_SEC = 1
curr_sec = int(current_seconds()) curr_sec = int(current_seconds())
return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + 2 return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + ALLOW_LAG_SEC
async def _load_new(self) -> None: async def _load_new(self) -> None:
last: Optional[MdSummary] = self.get_last() last: Optional[MdSummary] = self.get_last()
if not last: if not last:
Log.warning(f"{self.fname()}: did not get last update") Log.warning(f"{self.fname()}: did not get last update")
elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].time_ns_: elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].aggr_time_ns_:
Log.info( Log.info(
f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}" f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}"
) )

View File

@ -1,19 +1,12 @@
```python
from __future__ import annotations from __future__ import annotations
import asyncio from typing import Dict
from typing import Callable, Dict, Any, List, Optional
import time import time
import requests import requests
from cvttpy_tools.base import NamedObject from cvttpy_tools.base import NamedObject
from cvttpy_tools.logger import Log
from cvttpy_tools.config import Config
from cvttpy_tools.timer import Timer
from cvttpy_tools.timeutils import NanoPerSec, NanosT, current_nanoseconds, current_seconds
from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar
class RESTSender(NamedObject): class RESTSender(NamedObject):
session_: requests.Session session_: requests.Session
@ -64,4 +57,4 @@ class RESTSender(NamedObject):
raise ConnectionError( raise ConnectionError(
f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore
) from excpt ) from excpt
```

View File

@ -6,10 +6,10 @@ import requests
from cvttpy_tools.base import NamedObject from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import Config from cvttpy_tools.config import Config
from cvttpy_tools.logger import Log from cvttpy_tools.logger import Log
from cvttpy_tools.web.rest_client import RESTSender
# --- # ---
from cvttpy_trading.trading.trading_instructions import TradingInstructions from cvttpy_trading.trading.trading_instructions import TradingInstructions
# --- # ---
from pairs_trading.lib.live.rest_client import RESTSender
from pairs_trading.apps.pair_trader import PairTrader from pairs_trading.apps.pair_trader import PairTrader

View File

@ -9,7 +9,7 @@ from cvttpy_tools.base import NamedObject
from cvttpy_tools.app import App from cvttpy_tools.app import App
from cvttpy_tools.config import Config from cvttpy_tools.config import Config
from cvttpy_tools.settings.cvtt_types import IntervalSecT from cvttpy_tools.settings.cvtt_types import IntervalSecT
from cvttpy_tools.timeutils import SecPerHour, current_nanoseconds, NanoPerSec from cvttpy_tools.timeutils import NanosT, SecPerHour, current_nanoseconds, NanoPerSec, format_nanos_utc
from cvttpy_tools.logger import Log from cvttpy_tools.logger import Log
# --- # ---
@ -42,14 +42,14 @@ class PtLiveStrategy(NamedObject):
# for presentation: history of prediction values and trading signals # for presentation: history of prediction values and trading signals
predictions_df_: pd.DataFrame predictions_df_: pd.DataFrame
trading_signals_df_: pd.DataFrame trading_signals_df_: pd.DataFrame
allowed_md_lag_sec_: int
def __init__( def __init__(
self, self,
config: Config, config: Config,
pairs_trader: PairTrader, pairs_trader: PairTrader,
): ):
# import copy
# self.config_ = Config(json_src=copy.deepcopy(config.data()))
self.config_ = config self.config_ = config
self.pairs_trader_ = pairs_trader self.pairs_trader_ = pairs_trader
@ -83,7 +83,7 @@ class PtLiveStrategy(NamedObject):
) )
assert self.history_depth_sec_ > 0, "history_depth_hours cannot be 0" assert self.history_depth_sec_ > 0, "history_depth_hours cannot be 0"
await self.pairs_trader_.subscribe_md() self.allowed_md_lag_sec_ = self.config_.get_value("allowed_md_lag_sec", 3)
self.open_threshold_ = self.config_.get_value( self.open_threshold_ = self.config_.get_value(
"model/disequilibrium/open_trshld", 0.0 "model/disequilibrium/open_trshld", 0.0
@ -99,6 +99,9 @@ class PtLiveStrategy(NamedObject):
self.close_threshold_ > 0 self.close_threshold_ > 0
), "disequilibrium/close_trshld must be greater than 0" ), "disequilibrium/close_trshld must be greater than 0"
await self.pairs_trader_.subscribe_md()
def __repr__(self) -> str: def __repr__(self) -> str:
return f"{self.classname()}: trading_pair={self.trading_pair_}, mdp={self.model_data_policy_.__class__.__name__}, " return f"{self.classname()}: trading_pair={self.trading_pair_}, mdp={self.model_data_policy_.__class__.__name__}, "
@ -132,17 +135,33 @@ class PtLiveStrategy(NamedObject):
await self._send_trading_instructions(trading_instructions) await self._send_trading_instructions(trading_instructions)
def _is_md_actual(self, hist_aggr: List[MdTradesAggregate]) -> bool: def _is_md_actual(self, hist_aggr: List[MdTradesAggregate]) -> bool:
curr_ns = current_nanoseconds()
LAG_THRESHOLD = 5 * NanoPerSec
if len(hist_aggr) == 0: if len(hist_aggr) == 0:
Log.warning(f"{self.fname()} list of aggregates IS EMPTY") Log.warning(f"{self.fname()} list of aggregates IS EMPTY")
return False return False
curr_ns = current_nanoseconds()
# MAYBE check market data length # MAYBE check market data length
lag_ns = curr_ns - hist_aggr[-1].time_ns_
if lag_ns > LAG_THRESHOLD: # at 18:05:01 we should see data for 18:04:00
Log.warning(f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()} Lagging {int(lag_ns/NanoPerSec)} seconds") lag_sec = (curr_ns - hist_aggr[-1].aggr_time_ns_) / NanoPerSec - self.interval_sec()
if lag_sec > self.allowed_md_lag_sec_:
Log.warning(
f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()}"
f" Lagging {int(lag_sec)} > {self.allowed_md_lag_sec_} seconds:"
f"\n{len(hist_aggr)} records"
f"\n{hist_aggr[-1].exch_inst_.base_asset_id_}: {hist_aggr[-1].tstamp()}"
f"\n{hist_aggr[-2].exch_inst_.base_asset_id_}: {hist_aggr[-2].tstamp()}"
)
return False return False
else:
Log.info(
f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()}"
f" Lag {int(lag_sec)} <= {self.allowed_md_lag_sec_} seconds"
f"\n{len(hist_aggr)} records"
f"\n{hist_aggr[-1].exch_inst_.base_asset_id_}: {hist_aggr[-1].tstamp()}"
f"\n{hist_aggr[-2].exch_inst_.base_asset_id_}: {hist_aggr[-2].tstamp()}"
)
return True return True
def _create_md_df(self, hist_aggr: List[MdTradesAggregate]) -> pd.DataFrame: def _create_md_df(self, hist_aggr: List[MdTradesAggregate]) -> pd.DataFrame:
@ -163,8 +182,8 @@ class PtLiveStrategy(NamedObject):
rows.append( rows.append(
{ {
# convert nanoseconds → tz-aware pandas timestamp # convert nanoseconds → tz-aware pandas timestamp
"tstamp": pd.to_datetime(aggr.time_ns_, unit="ns", utc=True), "tstamp": pd.to_datetime(aggr.aggr_time_ns_, unit="ns", utc=True),
"time_ns": aggr.time_ns_, "time_ns": aggr.aggr_time_ns_,
"symbol": exch_inst.instrument_id().split("-", 1)[1], "symbol": exch_inst.instrument_id().split("-", 1)[1],
"exchange_id": exch_inst.exchange_id_, "exchange_id": exch_inst.exchange_id_,
"instrument_id": exch_inst.instrument_id(), "instrument_id": exch_inst.instrument_id(),