Compare commits

..

No commits in common. "master" and "v" have entirely different histories.
master ... v

13 changed files with 113 additions and 1334 deletions

1
.gitignore vendored
View File

@ -9,4 +9,3 @@ cvttpy
# SpecStory explanation file # SpecStory explanation file
.specstory/.what-is-this.md .specstory/.what-is-this.md
results/ results/
tmp/

23
.vscode/launch.json vendored
View File

@ -17,6 +17,29 @@
"PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.." "PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.."
}, },
}, },
{
"name": "-------- Live Pair Trading --------",
},
{
"name": "PAIR TRADER",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "${workspaceFolder}/apps/pair_trader.py",
"console": "integratedTerminal",
"env": {
"PYTHONPATH": "${workspaceFolder}/..",
"CONFIG_SERVICE": "cloud16.cvtt.vpn:6789",
"MODEL_CONFIG": "vecm"
},
"args": [
// "--config=${workspaceFolder}/configuration/pair_trader.cfg",
"--config=http://cloud16.cvtt.vpn:6789/apps/pairs_trading/pair_trader",
"--book_id=TEST_BOOK_20250818",
"--instrument_A=COINBASE_AT:PAIR-ADA-USD",
"--instrument_B=COINBASE_AT:PAIR-SOL-USD",
],
},
{ {
"name": "-------- VECM --------", "name": "-------- VECM --------",
}, },

View File

@ -6,6 +6,7 @@
], ],
"python.testing.cwd": "${workspaceFolder}", "python.testing.cwd": "${workspaceFolder}",
"python.testing.autoTestDiscoverOnSaveEnabled": true, "python.testing.autoTestDiscoverOnSaveEnabled": true,
"python.defaultInterpreterPath": "/home/oleg/.pyenv/python3.12-venv/bin/python3",
"python.testing.pytestPath": "python3", "python.testing.pytestPath": "python3",
"python.analysis.extraPaths": [ "python.analysis.extraPaths": [
"${workspaceFolder}", "${workspaceFolder}",
@ -15,5 +16,4 @@
"python.envFile": "${workspaceFolder}/.env", "python.envFile": "${workspaceFolder}/.env",
"python.testing.debugPort": 3000, "python.testing.debugPort": 3000,
"python.testing.promptToConfigure": false, "python.testing.promptToConfigure": false,
"python.defaultInterpreterPath": "/home/oleg/.pyenv/python3.12-venv/bin/python"
} }

View File

@ -1 +0,0 @@
0.0.7

View File

@ -1,509 +0,0 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from aiohttp import web
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, coint # type: ignore
from statsmodels.tsa.vector_ar.vecm import coint_johansen # type: ignore
from cvttpy_tools.app import App
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import Config, CvttAppConfig
from cvttpy_tools.logger import Log
from cvttpy_tools.timeutils import NanoPerSec, SecPerHour, current_nanoseconds
from cvttpy_tools.web.rest_client import RESTSender
from cvttpy_tools.web.rest_service import RestService
from cvttpy_trading.trading.exchange_config import ExchangeAccounts
from cvttpy_trading.trading.instrument import ExchangeInstrument
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate, MdSummary
from pairs_trading.apps.pair_selector.renderer import HtmlRenderer
@dataclass
class InstrumentQuality(NamedObject):
instrument_: ExchangeInstrument
record_count_: int
latest_tstamp_: Optional[pd.Timestamp]
status_: str
reason_: str
@dataclass
class PairStats(NamedObject):
instrument_a_: ExchangeInstrument
instrument_b_: ExchangeInstrument
pvalue_eg_: Optional[float]
pvalue_adf_: Optional[float]
pvalue_j_: Optional[float]
trace_stat_j_: Optional[float]
rank_eg_: int = 0
rank_adf_: int = 0
rank_j_: int = 0
composite_rank_: int = 0
def as_dict(self) -> Dict[str, Any]:
return {
"instrument_a": self.instrument_a_.instrument_id(),
"instrument_b": self.instrument_b_.instrument_id(),
"pvalue_eg": self.pvalue_eg_,
"pvalue_adf": self.pvalue_adf_,
"pvalue_j": self.pvalue_j_,
"trace_stat_j": self.trace_stat_j_,
"rank_eg": self.rank_eg_,
"rank_adf": self.rank_adf_,
"rank_j": self.rank_j_,
"composite_rank": self.composite_rank_,
}
class DataFetcher(NamedObject):
sender_: RESTSender
interval_sec_: int
history_depth_sec_: int
def __init__(
self,
base_url: str,
interval_sec: int,
history_depth_sec: int,
) -> None:
self.sender_ = RESTSender(base_url=base_url)
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
def fetch(
self, exch_acct: str, inst: ExchangeInstrument
) -> List[MdTradesAggregate]:
rqst_data = {
"exch_acct": exch_acct,
"instrument_id": inst.instrument_id(),
"interval_sec": self.interval_sec_,
"history_depth_sec": self.history_depth_sec_,
}
response = self.sender_.send_post(endpoint="md_summary", post_body=rqst_data)
if response.status_code not in (200, 201):
Log.error(
f"{self.fname()}: error {response.status_code} for {inst.details_short()}: {response.text}"
)
return []
mdsums: List[MdSummary] = MdSummary.from_REST_response(response=response)
return [
mdsum.create_md_trades_aggregate(
exch_acct=exch_acct, exch_inst=inst, interval_sec=self.interval_sec_
)
for mdsum in mdsums
]
class QualityChecker(NamedObject):
interval_sec_: int
def __init__(self, interval_sec: int) -> None:
self.interval_sec_ = interval_sec
def evaluate(
self, inst: ExchangeInstrument, aggr: List[MdTradesAggregate]
) -> InstrumentQuality:
if len(aggr) == 0:
return InstrumentQuality(
instrument_=inst,
record_count_=0,
latest_tstamp_=None,
status_="FAIL",
reason_="no records",
)
aggr_sorted = sorted(aggr, key=lambda a: a.aggr_time_ns_)
latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True)
now_ts = pd.Timestamp.utcnow()
recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_)
if latest_ts <= recency_cutoff:
return InstrumentQuality(
instrument_=inst,
record_count_=len(aggr_sorted),
latest_tstamp_=latest_ts,
status_="FAIL",
reason_=f"stale: latest {latest_ts} <= cutoff {recency_cutoff}",
)
gaps_ok, reason = self._check_gaps(aggr_sorted)
status = "PASS" if gaps_ok else "FAIL"
return InstrumentQuality(
instrument_=inst,
record_count_=len(aggr_sorted),
latest_tstamp_=latest_ts,
status_=status,
reason_=reason,
)
def _check_gaps(self, aggr: List[MdTradesAggregate]) -> Tuple[bool, str]:
NUM_TRADES_THRESHOLD = 50
if len(aggr) < 2:
return True, "ok"
interval_ns = self.interval_sec_ * NanoPerSec
for idx in range(1, len(aggr)):
prev = aggr[idx - 1]
curr = aggr[idx]
delta = curr.aggr_time_ns_ - prev.aggr_time_ns_
missing_intervals = int(delta // interval_ns) - 1
if missing_intervals <= 0:
continue
prev_nt = prev.num_trades_
next_nt = curr.num_trades_
estimate = self._approximate_num_trades(prev_nt, next_nt)
if estimate > NUM_TRADES_THRESHOLD:
return False, (
f"gap of {missing_intervals} interval(s), est num_trades={estimate} > {NUM_TRADES_THRESHOLD}"
)
return True, "ok"
@staticmethod
def _approximate_num_trades(prev_nt: int, next_nt: int) -> float:
if prev_nt is None and next_nt is None:
return 0.0
if prev_nt is None:
return float(next_nt)
if next_nt is None:
return float(prev_nt)
return (prev_nt + next_nt) / 2.0
class PairAnalyzer(NamedObject):
price_field_: str
interval_sec_: int
def __init__(self, price_field: str, interval_sec: int) -> None:
self.price_field_ = price_field
self.interval_sec_ = interval_sec
def analyze(
self, series: Dict[ExchangeInstrument, pd.DataFrame]
) -> List[PairStats]:
instruments = list(series.keys())
results: List[PairStats] = []
for i in range(len(instruments)):
for j in range(i + 1, len(instruments)):
inst_a = instruments[i]
inst_b = instruments[j]
df_a = series[inst_a][["tstamp", "price"]].rename(
columns={"price": "price_a"}
)
df_b = series[inst_b][["tstamp", "price"]].rename(
columns={"price": "price_b"}
)
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
"tstamp"
)
stats = self._compute_stats(inst_a, inst_b, merged)
if stats:
results.append(stats)
self._rank(results)
return results
def _compute_stats(
self,
inst_a: ExchangeInstrument,
inst_b: ExchangeInstrument,
merged: pd.DataFrame,
) -> Optional[PairStats]:
if len(merged) < 2:
return None
px_a = merged["price_a"].astype(float)
px_b = merged["price_b"].astype(float)
std_a = float(px_a.std())
std_b = float(px_b.std())
if std_a == 0 or std_b == 0:
return None
z_a = (px_a - float(px_a.mean())) / std_a
z_b = (px_b - float(px_b.mean())) / std_b
p_eg: Optional[float]
p_adf: Optional[float]
p_j: Optional[float]
trace_stat: Optional[float]
try:
p_eg = float(coint(z_a, z_b)[1])
except Exception as exc:
Log.warning(
f"{self.fname()}: EG failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}"
)
p_eg = None
try:
spread = z_a - z_b
p_adf = float(adfuller(spread, maxlag=1, regression="c")[1])
except Exception as exc:
Log.warning(
f"{self.fname()}: ADF failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}"
)
p_adf = None
try:
data = np.column_stack([z_a, z_b])
res = coint_johansen(data, det_order=0, k_ar_diff=1)
trace_stat = float(res.lr1[0])
cv10, cv5, cv1 = res.cvt[0]
if trace_stat > cv1:
p_j = 0.01
elif trace_stat > cv5:
p_j = 0.05
elif trace_stat > cv10:
p_j = 0.10
else:
p_j = 1.0
except Exception as exc:
Log.warning(
f"{self.fname()}: Johansen failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}"
)
p_j = None
trace_stat = None
return PairStats(
instrument_a_=inst_a,
instrument_b_=inst_b,
pvalue_eg_=p_eg,
pvalue_adf_=p_adf,
pvalue_j_=p_j,
trace_stat_j_=trace_stat,
)
def _rank(self, results: List[PairStats]) -> None:
self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_")
for res in results:
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
results.sort(key=lambda r: r.composite_rank_)
@staticmethod
def _assign_ranks(results: List[PairStats], key, attr: str) -> None:
values = [key(r) for r in results]
sorted_vals = sorted([v for v in values if v is not None])
for res in results:
val = key(res)
if val is None:
setattr(res, attr, len(sorted_vals) + 1)
continue
rank = 1 + sum(1 for v in sorted_vals if v < val)
setattr(res, attr, rank)
class PairSelectionEngine(NamedObject):
config_: object
instruments_: List[ExchangeInstrument]
price_field_: str
fetcher_: DataFetcher
quality_: QualityChecker
analyzer_: PairAnalyzer
interval_sec_: int
history_depth_sec_: int
data_quality_cache_: List[InstrumentQuality]
pair_results_cache_: List[PairStats]
def __init__(
self,
config: Config,
instruments: List[ExchangeInstrument],
price_field: str,
) -> None:
self.config_ = config
self.instruments_ = instruments
self.price_field_ = price_field
interval_sec = int(config.get_value("interval_sec", 0))
history_depth_sec = int(config.get_value("history_depth_hours", 0)) * SecPerHour
base_url = config.get_value("cvtt_base_url", None)
assert interval_sec > 0, "interval_sec must be > 0"
assert history_depth_sec > 0, "history_depth_sec must be > 0"
assert base_url, "cvtt_base_url must be set"
self.fetcher_ = DataFetcher(
base_url=base_url,
interval_sec=interval_sec,
history_depth_sec=history_depth_sec,
)
self.quality_ = QualityChecker(interval_sec=interval_sec)
self.analyzer_ = PairAnalyzer(
price_field=price_field, interval_sec=interval_sec
)
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
self.data_quality_cache_ = []
self.pair_results_cache_ = []
async def run_once(self) -> None:
quality_results: List[InstrumentQuality] = []
price_series: Dict[ExchangeInstrument, pd.DataFrame] = {}
for inst in self.instruments_:
exch_acct = inst.user_data_.get("exch_acct") or inst.exchange_id_
aggr = self.fetcher_.fetch(exch_acct=exch_acct, inst=inst)
q = self.quality_.evaluate(inst, aggr)
quality_results.append(q)
if q.status_ != "PASS":
continue
df = self._to_dataframe(aggr, inst)
if len(df) > 0:
price_series[inst] = df
self.data_quality_cache_ = quality_results
self.pair_results_cache_ = self.analyzer_.analyze(price_series)
def _to_dataframe(
self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument
) -> pd.DataFrame:
rows: List[Dict[str, Any]] = []
for item in aggr:
rows.append(
{
"tstamp": pd.to_datetime(item.aggr_time_ns_, unit="ns", utc=True),
"price": self._extract_price(item, inst),
"num_trades": item.num_trades_,
}
)
df = pd.DataFrame(rows)
return df.sort_values("tstamp").reset_index(drop=True)
def _extract_price(
self, aggr: MdTradesAggregate, inst: ExchangeInstrument
) -> float:
price_field = self.price_field_
# MdTradesAggregate inherits hist bar with fields open_, high_, low_, close_, vwap_
field_map = {
"open": aggr.open_,
"high": aggr.high_,
"low": aggr.low_,
"close": aggr.close_,
"vwap": aggr.vwap_,
}
raw = field_map.get(price_field, aggr.close_)
return inst.get_price(raw)
def sleep_seconds_until_next_cycle(self) -> float:
now_ns = current_nanoseconds()
interval_ns = self.interval_sec_ * NanoPerSec
next_boundary = (now_ns // interval_ns + 1) * interval_ns
return max(0.0, (next_boundary - now_ns) / NanoPerSec)
def quality_dicts(self) -> List[Dict[str, Any]]:
res: List[Dict[str, Any]] = []
for q in self.data_quality_cache_:
res.append(
{
"instrument": q.instrument_.instrument_id(),
"record_count": q.record_count_,
"latest_tstamp": (
q.latest_tstamp_.isoformat() if q.latest_tstamp_ else None
),
"status": q.status_,
"reason": q.reason_,
}
)
return res
def pair_dicts(self) -> List[Dict[str, Any]]:
return [p.as_dict() for p in self.pair_results_cache_]
class PairSelector(NamedObject):
instruments_: List[ExchangeInstrument]
engine_: PairSelectionEngine
rest_service_: RestService
def __init__(self) -> None:
App.instance().add_cmdline_arg("--oneshot", action="store_true", default=False)
App.instance().add_call(App.Stage.Config, self._on_config())
App.instance().add_call(App.Stage.Run, self.run())
async def _on_config(self) -> None:
cfg = CvttAppConfig.instance()
self.instruments_ = self._load_instruments(cfg)
price_field = cfg.get_value("model/stat_model_price", "close")
self.engine_ = PairSelectionEngine(
config=cfg,
instruments=self.instruments_,
price_field=price_field,
)
self.rest_service_ = RestService(config_key="/api/REST")
self.rest_service_.add_handler("GET", "/data_quality", self._on_data_quality)
self.rest_service_.add_handler(
"GET", "/pair_selection", self._on_pair_selection
)
def _load_instruments(self, cfg: CvttAppConfig) -> List[ExchangeInstrument]:
instruments_cfg = cfg.get_value("instruments", [])
instruments: List[ExchangeInstrument] = []
assert len(instruments_cfg) >= 2, "at least two instruments required"
for item in instruments_cfg:
if isinstance(item, str):
parts = item.split(":", 1)
if len(parts) != 2:
raise ValueError(f"invalid instrument format: {item}")
exch_acct, instrument_id = parts
elif isinstance(item, dict):
exch_acct = item.get("exch_acct", "")
instrument_id = item.get("instrument_id", "")
if not exch_acct or not instrument_id:
raise ValueError(f"invalid instrument config: {item}")
else:
raise ValueError(f"unsupported instrument entry: {item}")
exch_inst = ExchangeAccounts.instance().get_exchange_instrument(
exch_acct=exch_acct, instrument_id=instrument_id
)
assert (
exch_inst is not None
), f"no ExchangeInstrument for {exch_acct}:{instrument_id}"
exch_inst.user_data_["exch_acct"] = exch_acct
instruments.append(exch_inst)
return instruments
async def run(self) -> None:
oneshot = App.instance().get_argument("oneshot", False)
while True:
await self.engine_.run_once()
if oneshot:
break
sleep_for = self.engine_.sleep_seconds_until_next_cycle()
await asyncio.sleep(sleep_for)
async def _on_data_quality(self, request: web.Request) -> web.Response:
fmt = request.query.get("format", "html").lower()
quality = self.engine_.quality_dicts()
if fmt == "json":
return web.json_response(quality)
return web.Response(
text=HtmlRenderer.render_data_quality(quality), content_type="text/html"
)
async def _on_pair_selection(self, request: web.Request) -> web.Response:
fmt = request.query.get("format", "html").lower()
pairs = self.engine_.pair_dicts()
if fmt == "json":
return web.json_response(pairs)
return web.Response(
text=HtmlRenderer.render_pairs(pairs), content_type="text/html"
)
if __name__ == "__main__":
App()
CvttAppConfig()
PairSelector()
App.instance().run()

View File

@ -1,394 +0,0 @@
```python
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller, coint
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.tsa.vector_ar.vecm import coint_johansen # type: ignore
# ---
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import Config
from cvttpy_tools.logger import Log
from cvttpy_tools.timeutils import NanoPerSec, SecPerHour, current_nanoseconds
from cvttpy_tools.web.rest_client import RESTSender
# ---
from cvttpy_trading.trading.instrument import ExchangeInstrument
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate, MdSummary
@dataclass
class InstrumentQuality(NamedObject):
instrument_: ExchangeInstrument
record_count_: int
latest_tstamp_: Optional[pd.Timestamp]
status_: str
reason_: str
@dataclass
class PairStats(NamedObject):
instrument_a_: ExchangeInstrument
instrument_b_: ExchangeInstrument
pvalue_eg_: Optional[float]
pvalue_adf_: Optional[float]
pvalue_j_: Optional[float]
trace_stat_j_: Optional[float]
rank_eg_: int = 0
rank_adf_: int = 0
rank_j_: int = 0
composite_rank_: int = 0
def as_dict(self) -> Dict[str, Any]:
return {
"instrument_a": self.instrument_a_.instrument_id(),
"instrument_b": self.instrument_b_.instrument_id(),
"pvalue_eg": self.pvalue_eg_,
"pvalue_adf": self.pvalue_adf_,
"pvalue_j": self.pvalue_j_,
"trace_stat_j": self.trace_stat_j_,
"rank_eg": self.rank_eg_,
"rank_adf": self.rank_adf_,
"rank_j": self.rank_j_,
"composite_rank": self.composite_rank_,
}
class DataFetcher(NamedObject):
sender_: RESTSender
interval_sec_: int
history_depth_sec_: int
def __init__(
self,
base_url: str,
interval_sec: int,
history_depth_sec: int,
) -> None:
self.sender_ = RESTSender(base_url=base_url)
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
def fetch(self, exch_acct: str, inst: ExchangeInstrument) -> List[MdTradesAggregate]:
rqst_data = {
"exch_acct": exch_acct,
"instrument_id": inst.instrument_id(),
"interval_sec": self.interval_sec_,
"history_depth_sec": self.history_depth_sec_,
}
response = self.sender_.send_post(endpoint="md_summary", post_body=rqst_data)
if response.status_code not in (200, 201):
Log.error(
f"{self.fname()}: error {response.status_code} for {inst.details_short()}: {response.text}")
return []
mdsums: List[MdSummary] = MdSummary.from_REST_response(response=response)
return [
mdsum.create_md_trades_aggregate(
exch_acct=exch_acct, exch_inst=inst, interval_sec=self.interval_sec_
)
for mdsum in mdsums
]
class QualityChecker(NamedObject):
interval_sec_: int
def __init__(self, interval_sec: int) -> None:
self.interval_sec_ = interval_sec
def evaluate(self, inst: ExchangeInstrument, aggr: List[MdTradesAggregate]) -> InstrumentQuality:
if len(aggr) == 0:
return InstrumentQuality(
instrument_=inst,
record_count_=0,
latest_tstamp_=None,
status_="FAIL",
reason_="no records",
)
aggr_sorted = sorted(aggr, key=lambda a: a.aggr_time_ns_)
latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True)
now_ts = pd.Timestamp.utcnow()
recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_)
if latest_ts <= recency_cutoff:
return InstrumentQuality(
instrument_=inst,
record_count_=len(aggr_sorted),
latest_tstamp_=latest_ts,
status_="FAIL",
reason_=f"stale: latest {latest_ts} <= cutoff {recency_cutoff}",
)
gaps_ok, reason = self._check_gaps(aggr_sorted)
status = "PASS" if gaps_ok else "FAIL"
return InstrumentQuality(
instrument_=inst,
record_count_=len(aggr_sorted),
latest_tstamp_=latest_ts,
status_=status,
reason_=reason,
)
def _check_gaps(self, aggr: List[MdTradesAggregate]) -> Tuple[bool, str]:
NUM_TRADES_THRESHOLD = 50
if len(aggr) < 2:
return True, "ok"
interval_ns = self.interval_sec_ * NanoPerSec
for idx in range(1, len(aggr)):
prev = aggr[idx - 1]
curr = aggr[idx]
delta = curr.aggr_time_ns_ - prev.aggr_time_ns_
missing_intervals = int(delta // interval_ns) - 1
if missing_intervals <= 0:
continue
prev_nt = prev.num_trades_
next_nt = curr.num_trades_
estimate = self._approximate_num_trades(prev_nt, next_nt)
if estimate > NUM_TRADES_THRESHOLD:
return False, (
f"gap of {missing_intervals} interval(s), est num_trades={estimate} > {NUM_TRADES_THRESHOLD}"
)
return True, "ok"
@staticmethod
def _approximate_num_trades(prev_nt: int, next_nt: int) -> float:
if prev_nt is None and next_nt is None:
return 0.0
if prev_nt is None:
return float(next_nt)
if next_nt is None:
return float(prev_nt)
return (prev_nt + next_nt) / 2.0
class PairAnalyzer(NamedObject):
price_field_: str
interval_sec_: int
def __init__(self, price_field: str, interval_sec: int) -> None:
self.price_field_ = price_field
self.interval_sec_ = interval_sec
def analyze(self, series: Dict[ExchangeInstrument, pd.DataFrame]) -> List[PairStats]:
instruments = list(series.keys())
results: List[PairStats] = []
for i in range(len(instruments)):
for j in range(i + 1, len(instruments)):
inst_a = instruments[i]
inst_b = instruments[j]
df_a = series[inst_a][["tstamp", "price"]].rename(
columns={"price": "price_a"}
)
df_b = series[inst_b][["tstamp", "price"]].rename(
columns={"price": "price_b"}
)
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
"tstamp"
)
stats = self._compute_stats(inst_a, inst_b, merged)
if stats:
results.append(stats)
self._rank(results)
return results
def _compute_stats(
self,
inst_a: ExchangeInstrument,
inst_b: ExchangeInstrument,
merged: pd.DataFrame,
) -> Optional[PairStats]:
if len(merged) < 2:
return None
px_a = merged["price_a"].astype(float)
px_b = merged["price_b"].astype(float)
std_a = float(px_a.std())
std_b = float(px_b.std())
if std_a == 0 or std_b == 0:
return None
z_a = (px_a - float(px_a.mean())) / std_a
z_b = (px_b - float(px_b.mean())) / std_b
p_eg: Optional[float]
p_adf: Optional[float]
p_j: Optional[float]
trace_stat: Optional[float]
try:
p_eg = float(coint(z_a, z_b)[1])
except Exception as exc:
Log.warning(f"{self.fname()}: EG failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}")
p_eg = None
try:
spread = z_a - z_b
p_adf = float(adfuller(spread, maxlag=1, regression="c")[1])
except Exception as exc:
Log.warning(f"{self.fname()}: ADF failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}")
p_adf = None
try:
data = np.column_stack([z_a, z_b])
res = coint_johansen(data, det_order=0, k_ar_diff=1)
trace_stat = float(res.lr1[0])
cv10, cv5, cv1 = res.cvt[0]
if trace_stat > cv1:
p_j = 0.01
elif trace_stat > cv5:
p_j = 0.05
elif trace_stat > cv10:
p_j = 0.10
else:
p_j = 1.0
except Exception as exc:
Log.warning(f"{self.fname()}: Johansen failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}")
p_j = None
trace_stat = None
return PairStats(
instrument_a_=inst_a,
instrument_b_=inst_b,
pvalue_eg_=p_eg,
pvalue_adf_=p_adf,
pvalue_j_=p_j,
trace_stat_j_=trace_stat,
)
def _rank(self, results: List[PairStats]) -> None:
self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_")
for res in results:
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
results.sort(key=lambda r: r.composite_rank_)
@staticmethod
def _assign_ranks(
results: List[PairStats], key, attr: str
) -> None:
values = [key(r) for r in results]
sorted_vals = sorted([v for v in values if v is not None])
for res in results:
val = key(res)
if val is None:
setattr(res, attr, len(sorted_vals) + 1)
continue
rank = 1 + sum(1 for v in sorted_vals if v < val)
setattr(res, attr, rank)
class PairSelectionEngine(NamedObject):
config_: object
instruments_: List[ExchangeInstrument]
price_field_: str
fetcher_: DataFetcher
quality_: QualityChecker
analyzer_: PairAnalyzer
interval_sec_: int
history_depth_sec_: int
data_quality_cache_: List[InstrumentQuality]
pair_results_cache_: List[PairStats]
def __init__(
self,
config: Config,
instruments: List[ExchangeInstrument],
price_field: str,
) -> None:
self.config_ = config
self.instruments_ = instruments
self.price_field_ = price_field
interval_sec = int(config.get_value("interval_sec", 0))
history_depth_sec = int(config.get_value("history_depth_hours", 0)) * SecPerHour
base_url = config.get_value("cvtt_base_url", None)
assert interval_sec > 0, "interval_sec must be > 0"
assert history_depth_sec > 0, "history_depth_sec must be > 0"
assert base_url, "cvtt_base_url must be set"
self.fetcher_ = DataFetcher(
base_url=base_url,
interval_sec=interval_sec,
history_depth_sec=history_depth_sec,
)
self.quality_ = QualityChecker(interval_sec=interval_sec)
self.analyzer_ = PairAnalyzer(price_field=price_field, interval_sec=interval_sec)
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
self.data_quality_cache_ = []
self.pair_results_cache_ = []
async def run_once(self) -> None:
quality_results: List[InstrumentQuality] = []
price_series: Dict[ExchangeInstrument, pd.DataFrame] = {}
for inst in self.instruments_:
exch_acct = inst.user_data_.get("exch_acct") or inst.exchange_id_
aggr = self.fetcher_.fetch(exch_acct=exch_acct, inst=inst)
q = self.quality_.evaluate(inst, aggr)
quality_results.append(q)
if q.status_ != "PASS":
continue
df = self._to_dataframe(aggr, inst)
if len(df) > 0:
price_series[inst] = df
self.data_quality_cache_ = quality_results
self.pair_results_cache_ = self.analyzer_.analyze(price_series)
def _to_dataframe(self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument) -> pd.DataFrame:
rows: List[Dict[str, Any]] = []
for item in aggr:
rows.append(
{
"tstamp": pd.to_datetime(item.aggr_time_ns_, unit="ns", utc=True),
"price": self._extract_price(item, inst),
"num_trades": item.num_trades_,
}
)
df = pd.DataFrame(rows)
return df.sort_values("tstamp").reset_index(drop=True)
def _extract_price(self, aggr: MdTradesAggregate, inst: ExchangeInstrument) -> float:
price_field = self.price_field_
# MdTradesAggregate inherits hist bar with fields open_, high_, low_, close_, vwap_
field_map = {
"open": aggr.open_,
"high": aggr.high_,
"low": aggr.low_,
"close": aggr.close_,
"vwap": aggr.vwap_,
}
raw = field_map.get(price_field, aggr.close_)
return inst.get_price(raw)
def sleep_seconds_until_next_cycle(self) -> float:
now_ns = current_nanoseconds()
interval_ns = self.interval_sec_ * NanoPerSec
next_boundary = (now_ns // interval_ns + 1) * interval_ns
return max(0.0, (next_boundary - now_ns) / NanoPerSec)
def quality_dicts(self) -> List[Dict[str, Any]]:
res: List[Dict[str, Any]] = []
for q in self.data_quality_cache_:
res.append(
{
"instrument": q.instrument_.instrument_id(),
"record_count": q.record_count_,
"latest_tstamp": q.latest_tstamp_.isoformat() if q.latest_tstamp_ else None,
"status": q.status_,
"reason": q.reason_,
}
)
return res
def pair_dicts(self) -> List[Dict[str, Any]]:
return [p.as_dict() for p in self.pair_results_cache_]
```

View File

@ -1,140 +0,0 @@
from __future__ import annotations
from typing import Any, Dict, List
from cvttpy_tools.app import App
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import CvttAppConfig
class HtmlRenderer(NamedObject):
def __init__(self) -> None:
pass
@staticmethod
def render_data_quality(quality: List[Dict[str, Any]]) -> str:
rows = "".join(
f"<tr>"
f"<td>{q.get('instrument','')}</td>"
f"<td>{q.get('record_count','')}</td>"
f"<td>{q.get('latest_tstamp','')}</td>"
f"<td>{q.get('status','')}</td>"
f"<td>{q.get('reason','')}</td>"
f"</tr>"
for q in sorted(quality, key=lambda x: str(x.get("instrument", "")))
)
return f"""
<!DOCTYPE html>
<html>
<head>
<meta charset='utf-8'/>
<title>Data Quality</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ccc; padding: 8px; text-align: left; }}
th {{ background: #f2f2f2; }}
</style>
</head>
<body>
<h2>Data Quality</h2>
<table>
<thead>
<tr><th>Instrument</th><th>Records</th><th>Latest</th><th>Status</th><th>Reason</th></tr>
</thead>
<tbody>{rows}</tbody>
</table>
</body>
</html>
"""
@staticmethod
def render_pairs(pairs: List[Dict[str, Any]]) -> str:
if not pairs:
body = "<p>No pairs available. Check data quality and try again.</p>"
else:
body_rows = []
for p in pairs:
body_rows.append(
"<tr>"
f"<td>{p.get('instrument_a','')}</td>"
f"<td>{p.get('instrument_b','')}</td>"
f"<td data-value='{p.get('rank_eg',0)}'>{p.get('rank_eg','')}</td>"
f"<td data-value='{p.get('rank_adf',0)}'>{p.get('rank_adf','')}</td>"
f"<td data-value='{p.get('rank_j',0)}'>{p.get('rank_j','')}</td>"
f"<td data-value='{p.get('pvalue_eg','')}'>{p.get('pvalue_eg','')}</td>"
f"<td data-value='{p.get('pvalue_adf','')}'>{p.get('pvalue_adf','')}</td>"
f"<td data-value='{p.get('pvalue_j','')}'>{p.get('pvalue_j','')}</td>"
"</tr>"
)
body = "\n".join(body_rows)
return f"""
<!DOCTYPE html>
<html>
<head>
<meta charset='utf-8'/>
<title>Pair Selection</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid #ccc; padding: 8px; text-align: left; }}
th.sortable {{ cursor: pointer; background: #f2f2f2; }}
</style>
</head>
<body>
<h2>Pair Selection</h2>
<table id="pairs-table">
<thead>
<tr>
<th>Instrument A</th>
<th>Instrument B</th>
<th class="sortable" data-type="num">Rank-EG</th>
<th class="sortable" data-type="num">Rank-ADF</th>
<th class="sortable" data-type="num">Rank-J</th>
<th>EG p-value</th>
<th>ADF p-value</th>
<th>Johansen pseudo p</th>
</tr>
</thead>
<tbody>
{body}
</tbody>
</table>
<script>
(function() {{
const table = document.getElementById('pairs-table');
if (!table) return;
const getValue = (cell) => {{
const val = cell.getAttribute('data-value');
const num = parseFloat(val);
return isNaN(num) ? val : num;
}};
const toggleSort = (index, isNumeric) => {{
const tbody = table.querySelector('tbody');
const rows = Array.from(tbody.querySelectorAll('tr'));
const th = table.querySelectorAll('th')[index];
const dir = th.getAttribute('data-dir') === 'asc' ? 'desc' : 'asc';
th.setAttribute('data-dir', dir);
rows.sort((a, b) => {{
const va = getValue(a.children[index]);
const vb = getValue(b.children[index]);
if (isNumeric && !isNaN(va) && !isNaN(vb)) {{
return dir === 'asc' ? va - vb : vb - va;
}}
return dir === 'asc'
? String(va).localeCompare(String(vb))
: String(vb).localeCompare(String(va));
}});
tbody.innerHTML = '';
rows.forEach(r => tbody.appendChild(r));
}};
table.querySelectorAll('th.sortable').forEach((th, idx) => {{
th.addEventListener('click', () => toggleSort(idx, th.dataset.type === 'num'));
}});
}})();
</script>
</body>
</html>
"""

View File

@ -141,11 +141,12 @@ class PairTrader(NamedObject):
) )
async def _on_md_summary(self, history: List[MdTradesAggregate], exch_inst: ExchangeInstrument) -> None: async def _on_md_summary(self, history: List[MdTradesAggregate], exch_inst: ExchangeInstrument) -> None:
Log.info(f"{self.fname()}: got {exch_inst.details_short()} data") # URGENT before calling stragegy, make sure that **BOTH** instruments market data is combined.
Log.info(f"DEBUG got {exch_inst.details_short()} data")
self.latest_history_[exch_inst] = history self.latest_history_[exch_inst] = history
if len(self.latest_history_) == 2: if len(self.latest_history_) == 2:
from itertools import chain from itertools import chain
all_aggrs = sorted(list(chain.from_iterable(self.latest_history_.values())), key=lambda X: X.aggr_time_ns_) all_aggrs = sorted(list(chain.from_iterable(self.latest_history_.values())), key=lambda X: X.time_ns_)
await self.live_strategy_.on_mkt_data_hist_snapshot(hist_aggr=all_aggrs) await self.live_strategy_.on_mkt_data_hist_snapshot(hist_aggr=all_aggrs)
self.latest_history_ = {} self.latest_history_ = {}

View File

@ -1,186 +0,0 @@
#!/usr/bin/env bash
# ---------------- Settings
repo=git@cloud21.cvtt.vpn:/works/git/cvtt2/research/pairs_trading.git
dist_root=/home/cvttdist/software/cvtt2
dist_user=cvttdist
dist_host="cloud21.cvtt.vpn"
dist_ssh_port="22"
dist_locations="cloud21.cvtt.vpn:22 hs01.cvtt.vpn:22"
version_file="VERSION"
prj=pairs_trading
brnch=master
interactive=N
# ---------------- Settings
# ---------------- cmdline
usage() {
echo "Usage: $0 [-b <branch (master)> -i (interactive)"
exit 1
}
while getopts "b:i" opt; do
case ${opt} in
b )
brnch=$OPTARG
;;
i )
interactive=Y
;;
\? )
echo "Invalid option: -$OPTARG" >&2
usage
;;
: )
echo "Option -$OPTARG requires an argument." >&2
usage
;;
esac
done
# ---------------- cmdline
confirm() {
if [ "${interactive}" == "Y" ]; then
echo "--------------------------------"
echo -n "Press <Enter> to continue" && read
fi
}
if [ "${interactive}" == "Y" ]; then
echo -n "Enter project [${prj}]: "
read project
if [ "${project}" == "" ]
then
project=${prj}
fi
else
project=${prj}
fi
# repo=${git_repo_arr[${project}]}
if [ -z ${repo} ]; then
echo "ERROR: Project repository for ${project} not found"
exit -1
fi
echo "Project repo: ${repo}"
if [ "${interactive}" == "Y" ]; then
echo -n "Enter branch to build release from [${brnch}]: "
read branch
if [ "${branch}" == "" ]
then
branch=${brnch}
fi
else
branch=${brnch}
fi
tmp_dir=$(mktemp -d)
function cleanup {
cd ${HOME}
rm -rf ${tmp_dir}
}
trap cleanup EXIT
prj_dir="${tmp_dir}/${prj}"
cmd_arr=()
Cmd="git clone ${repo} ${prj_dir}"
cmd_arr+=("${Cmd}")
Cmd="cd ${prj_dir}"
cmd_arr+=("${Cmd}")
if [ "${interactive}" == "Y" ]; then
echo "------------------------------------"
echo "The following commands will execute:"
echo "------------------------------------"
for cmd in "${cmd_arr[@]}"
do
echo ${cmd}
done
fi
confirm
for cmd in "${cmd_arr[@]}"
do
echo ${cmd} && eval ${cmd}
done
Cmd="git checkout ${branch}"
echo ${Cmd} && eval ${Cmd}
if [ "${?}" != "0" ]; then
echo "ERROR: Branch ${branch} is not found"
cd ${HOME} && rm -rf ${tmp_dir}
exit -1
fi
release_version=$(cat ${version_file} | awk -F',' '{print $1}')
whats_new=$(cat ${version_file} | awk -F',' '{print $2}')
echo "--------------------------------"
echo "Version file: ${version_file}"
echo "Release version: ${release_version}"
confirm
version_tag="v${release_version}"
if [ "$(git tag -l "${version_tag}")" != "" ]; then
version_tag="${version_tag}.$(date +%Y%m%d_%H%M)"
fi
version_comment="'${version_tag} ${project} ${branch} $(date +%Y-%m-%d)\n${whats_new}'"
cmd_arr=()
Cmd="git tag -a ${version_tag} -m ${version_comment}"
cmd_arr+=("${Cmd}")
Cmd="git push origin --tags"
cmd_arr+=("${Cmd}")
Cmd="rm -rf .git"
cmd_arr+=("${Cmd}")
SourceLoc=../${project}
dist_path="${dist_root}/${project}/${release_version}"
for dist_loc in ${dist_locations}; do
dhp=(${dist_loc//:/ })
dist_host=${dhp[0]}
dist_port=${dhp[1]}
Cmd="rsync -avzh"
Cmd="${Cmd} --rsync-path=\"mkdir -p ${dist_path}"
Cmd="${Cmd} && rsync\" -e \"ssh -p ${dist_ssh_port}\""
Cmd="${Cmd} $SourceLoc ${dist_user}@${dist_host}:${dist_path}/"
cmd_arr+=("${Cmd}")
done
if [ "${interactive}" == "Y" ]; then
echo "------------------------------------"
echo "The following commands will execute:"
echo "------------------------------------"
for cmd in "${cmd_arr[@]}"
do
echo ${cmd}
done
fi
confirm
for cmd in "${cmd_arr[@]}"
do
pwd && echo ${cmd} && eval ${cmd}
done
echo "$0 Done ${project} ${release_version}"

View File

@ -1,82 +1,86 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from typing import Dict, Any, List, Optional, Set from typing import Callable, Coroutine, Dict, Any, List, Optional, Set
import requests import requests
from cvttpy_tools.base import NamedObject from cvttpy_tools.base import NamedObject
from cvttpy_tools.app import App
from cvttpy_tools.logger import Log from cvttpy_tools.logger import Log
from cvttpy_tools.config import Config from cvttpy_tools.config import Config
from cvttpy_tools.timer import Timer from cvttpy_tools.timer import Timer
from cvttpy_tools.timeutils import NanosT, current_seconds from cvttpy_tools.timeutils import NanosT, current_seconds, NanoPerSec
from cvttpy_tools.settings.cvtt_types import InstrumentIdT, IntervalSecT from cvttpy_tools.settings.cvtt_types import InstrumentIdT, IntervalSecT
from cvttpy_tools.web.rest_client import RESTSender
# --- # ---
from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar
from cvttpy_trading.trading.instrument import ExchangeInstrument from cvttpy_trading.trading.instrument import ExchangeInstrument
from cvttpy_trading.trading.accounting.exch_account import ExchangeAccountNameT from cvttpy_trading.trading.accounting.exch_account import ExchangeAccountNameT
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate, MdSummary, MdSummaryCallbackT from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate
from cvttpy_trading.trading.exchange_config import ExchangeAccounts from cvttpy_trading.trading.exchange_config import ExchangeAccounts
# --- # ---
from pairs_trading.lib.live.rest_client import RESTSender
# class MdSummary(HistMdBar): class MdSummary(HistMdBar):
# def __init__( def __init__(
# self, self,
# ts_ns: int, ts_ns: int,
# open: float, open: float,
# high: float, high: float,
# low: float, low: float,
# close: float, close: float,
# volume: float, volume: float,
# vwap: float, vwap: float,
# num_trades: int, num_trades: int,
# ): ):
# super().__init__(ts=ts_ns) super().__init__(ts=ts_ns)
# self.open_ = open self.open_ = open
# self.high_ = high self.high_ = high
# self.low_ = low self.low_ = low
# self.close_ = close self.close_ = close
# self.volume_ = volume self.volume_ = volume
# self.vwap_ = vwap self.vwap_ = vwap
# self.num_trades_ = num_trades self.num_trades_ = num_trades
# @classmethod @classmethod
# def from_REST_response(cls, response: requests.Response) -> List[MdSummary]: def from_REST_response(cls, response: requests.Response) -> List[MdSummary]:
# res: List[MdSummary] = [] res: List[MdSummary] = []
# jresp = response.json() jresp = response.json()
# hist_data = jresp.get("historical_data", []) hist_data = jresp.get("historical_data", [])
# for hd in hist_data: for hd in hist_data:
# res.append( res.append(
# MdSummary( MdSummary(
# ts_ns=hd["time_ns"], ts_ns=hd["time_ns"],
# open=hd["open"], open=hd["open"],
# high=hd["high"], high=hd["high"],
# low=hd["low"], low=hd["low"],
# close=hd["close"], close=hd["close"],
# volume=hd["volume"], volume=hd["volume"],
# vwap=hd["vwap"], vwap=hd["vwap"],
# num_trades=hd["num_trades"], num_trades=hd["num_trades"],
# ) )
# ) )
# return res return res
# def create_md_trades_aggregate( def create_md_trades_aggregate(
# self, self,
# exch_acct: ExchangeAccountNameT, exch_acct: ExchangeAccountNameT,
# exch_inst: ExchangeInstrument, exch_inst: ExchangeInstrument,
# interval_sec: IntervalSecT, interval_sec: IntervalSecT,
# ) -> MdTradesAggregate: ) -> MdTradesAggregate:
# res = MdTradesAggregate( res = MdTradesAggregate(
# exch_acct=exch_acct, exch_acct=exch_acct,
# exch_inst=exch_inst, exch_inst=exch_inst,
# interval_ns=interval_sec * NanoPerSec, interval_ns=interval_sec * NanoPerSec,
# ) )
# res.set(mdbar=self) res.set(mdbar=self)
# return res return res
# MdSummaryCallbackT = Callable[[List[MdTradesAggregate]], Coroutine] MdSummaryCallbackT = Callable[[List[MdTradesAggregate]], Coroutine]
class MdSummaryCollector(NamedObject): class MdSummaryCollector(NamedObject):
@ -159,7 +163,6 @@ class MdSummaryCollector(NamedObject):
) )
return None return None
res = MdSummary.from_REST_response(response=response) res = MdSummary.from_REST_response(response=response)
Log.info(f"DEBUG *** {self.exch_inst_.base_asset_id_}: {res[-1].tstamp_}")
return None if len(res) == 0 else res[-1] return None if len(res) == 0 else res[-1]
def is_empty(self) -> bool: def is_empty(self) -> bool:
@ -192,16 +195,15 @@ class MdSummaryCollector(NamedObject):
Log.info(f"{self.fname()} Timer for {self.exch_inst_.details_short()} is set to run in {start_in} sec") Log.info(f"{self.fname()} Timer for {self.exch_inst_.details_short()} is set to run in {start_in} sec")
def next_load_time(self) -> NanosT: def next_load_time(self) -> NanosT:
ALLOW_LAG_SEC = 1
curr_sec = int(current_seconds()) curr_sec = int(current_seconds())
return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + ALLOW_LAG_SEC return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + 2
async def _load_new(self) -> None: async def _load_new(self) -> None:
last: Optional[MdSummary] = self.get_last() last: Optional[MdSummary] = self.get_last()
if not last: if not last:
Log.warning(f"{self.fname()}: did not get last update") Log.warning(f"{self.fname()}: did not get last update")
elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].aggr_time_ns_: elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].time_ns_:
Log.info( Log.info(
f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}" f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}"
) )

View File

@ -1,12 +1,19 @@
```python
from __future__ import annotations from __future__ import annotations
from typing import Dict import asyncio
from typing import Callable, Dict, Any, List, Optional
import time import time
import requests import requests
from cvttpy_tools.base import NamedObject from cvttpy_tools.base import NamedObject
from cvttpy_tools.logger import Log
from cvttpy_tools.config import Config
from cvttpy_tools.timer import Timer
from cvttpy_tools.timeutils import NanoPerSec, NanosT, current_nanoseconds, current_seconds
from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar
class RESTSender(NamedObject): class RESTSender(NamedObject):
session_: requests.Session session_: requests.Session
@ -57,4 +64,4 @@ class RESTSender(NamedObject):
raise ConnectionError( raise ConnectionError(
f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore
) from excpt ) from excpt
```

View File

@ -6,10 +6,10 @@ import requests
from cvttpy_tools.base import NamedObject from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import Config from cvttpy_tools.config import Config
from cvttpy_tools.logger import Log from cvttpy_tools.logger import Log
from cvttpy_tools.web.rest_client import RESTSender
# --- # ---
from cvttpy_trading.trading.trading_instructions import TradingInstructions from cvttpy_trading.trading.trading_instructions import TradingInstructions
# --- # ---
from pairs_trading.lib.live.rest_client import RESTSender
from pairs_trading.apps.pair_trader import PairTrader from pairs_trading.apps.pair_trader import PairTrader

View File

@ -9,7 +9,7 @@ from cvttpy_tools.base import NamedObject
from cvttpy_tools.app import App from cvttpy_tools.app import App
from cvttpy_tools.config import Config from cvttpy_tools.config import Config
from cvttpy_tools.settings.cvtt_types import IntervalSecT from cvttpy_tools.settings.cvtt_types import IntervalSecT
from cvttpy_tools.timeutils import NanosT, SecPerHour, current_nanoseconds, NanoPerSec, format_nanos_utc from cvttpy_tools.timeutils import SecPerHour, current_nanoseconds, NanoPerSec
from cvttpy_tools.logger import Log from cvttpy_tools.logger import Log
# --- # ---
@ -42,14 +42,14 @@ class PtLiveStrategy(NamedObject):
# for presentation: history of prediction values and trading signals # for presentation: history of prediction values and trading signals
predictions_df_: pd.DataFrame predictions_df_: pd.DataFrame
trading_signals_df_: pd.DataFrame trading_signals_df_: pd.DataFrame
allowed_md_lag_sec_: int
def __init__( def __init__(
self, self,
config: Config, config: Config,
pairs_trader: PairTrader, pairs_trader: PairTrader,
): ):
# import copy
# self.config_ = Config(json_src=copy.deepcopy(config.data()))
self.config_ = config self.config_ = config
self.pairs_trader_ = pairs_trader self.pairs_trader_ = pairs_trader
@ -83,7 +83,7 @@ class PtLiveStrategy(NamedObject):
) )
assert self.history_depth_sec_ > 0, "history_depth_hours cannot be 0" assert self.history_depth_sec_ > 0, "history_depth_hours cannot be 0"
self.allowed_md_lag_sec_ = self.config_.get_value("allowed_md_lag_sec", 3) await self.pairs_trader_.subscribe_md()
self.open_threshold_ = self.config_.get_value( self.open_threshold_ = self.config_.get_value(
"model/disequilibrium/open_trshld", 0.0 "model/disequilibrium/open_trshld", 0.0
@ -99,9 +99,6 @@ class PtLiveStrategy(NamedObject):
self.close_threshold_ > 0 self.close_threshold_ > 0
), "disequilibrium/close_trshld must be greater than 0" ), "disequilibrium/close_trshld must be greater than 0"
await self.pairs_trader_.subscribe_md()
def __repr__(self) -> str: def __repr__(self) -> str:
return f"{self.classname()}: trading_pair={self.trading_pair_}, mdp={self.model_data_policy_.__class__.__name__}, " return f"{self.classname()}: trading_pair={self.trading_pair_}, mdp={self.model_data_policy_.__class__.__name__}, "
@ -135,33 +132,17 @@ class PtLiveStrategy(NamedObject):
await self._send_trading_instructions(trading_instructions) await self._send_trading_instructions(trading_instructions)
def _is_md_actual(self, hist_aggr: List[MdTradesAggregate]) -> bool: def _is_md_actual(self, hist_aggr: List[MdTradesAggregate]) -> bool:
curr_ns = current_nanoseconds()
LAG_THRESHOLD = 5 * NanoPerSec
if len(hist_aggr) == 0: if len(hist_aggr) == 0:
Log.warning(f"{self.fname()} list of aggregates IS EMPTY") Log.warning(f"{self.fname()} list of aggregates IS EMPTY")
return False return False
curr_ns = current_nanoseconds()
# MAYBE check market data length # MAYBE check market data length
lag_ns = curr_ns - hist_aggr[-1].time_ns_
# at 18:05:01 we should see data for 18:04:00 if lag_ns > LAG_THRESHOLD:
lag_sec = (curr_ns - hist_aggr[-1].aggr_time_ns_) / NanoPerSec - self.interval_sec() Log.warning(f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()} Lagging {int(lag_ns/NanoPerSec)} seconds")
if lag_sec > self.allowed_md_lag_sec_:
Log.warning(
f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()}"
f" Lagging {int(lag_sec)} > {self.allowed_md_lag_sec_} seconds:"
f"\n{len(hist_aggr)} records"
f"\n{hist_aggr[-1].exch_inst_.base_asset_id_}: {hist_aggr[-1].tstamp()}"
f"\n{hist_aggr[-2].exch_inst_.base_asset_id_}: {hist_aggr[-2].tstamp()}"
)
return False return False
else:
Log.info(
f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()}"
f" Lag {int(lag_sec)} <= {self.allowed_md_lag_sec_} seconds"
f"\n{len(hist_aggr)} records"
f"\n{hist_aggr[-1].exch_inst_.base_asset_id_}: {hist_aggr[-1].tstamp()}"
f"\n{hist_aggr[-2].exch_inst_.base_asset_id_}: {hist_aggr[-2].tstamp()}"
)
return True return True
def _create_md_df(self, hist_aggr: List[MdTradesAggregate]) -> pd.DataFrame: def _create_md_df(self, hist_aggr: List[MdTradesAggregate]) -> pd.DataFrame:
@ -182,8 +163,8 @@ class PtLiveStrategy(NamedObject):
rows.append( rows.append(
{ {
# convert nanoseconds → tz-aware pandas timestamp # convert nanoseconds → tz-aware pandas timestamp
"tstamp": pd.to_datetime(aggr.aggr_time_ns_, unit="ns", utc=True), "tstamp": pd.to_datetime(aggr.time_ns_, unit="ns", utc=True),
"time_ns": aggr.aggr_time_ns_, "time_ns": aggr.time_ns_,
"symbol": exch_inst.instrument_id().split("-", 1)[1], "symbol": exch_inst.instrument_id().split("-", 1)[1],
"exchange_id": exch_inst.exchange_id_, "exchange_id": exch_inst.exchange_id_,
"instrument_id": exch_inst.instrument_id(), "instrument_id": exch_inst.instrument_id(),
@ -289,7 +270,6 @@ class PtLiveStrategy(NamedObject):
issued_ts_ns=current_nanoseconds(), issued_ts_ns=current_nanoseconds(),
data=TargetPositionSignal( data=TargetPositionSignal(
strength=side_a * self._strength(scaled_disequilibrium), strength=side_a * self._strength(scaled_disequilibrium),
exchange_id=pair.get_instrument_a().exchange_id_,
base_asset=pair.get_instrument_a().base_asset_id_, base_asset=pair.get_instrument_a().base_asset_id_,
quote_asset=pair.get_instrument_a().quote_asset_id_, quote_asset=pair.get_instrument_a().quote_asset_id_,
user_data={} user_data={}
@ -304,7 +284,6 @@ class PtLiveStrategy(NamedObject):
issued_ts_ns=current_nanoseconds(), issued_ts_ns=current_nanoseconds(),
data=TargetPositionSignal( data=TargetPositionSignal(
strength=side_b * self._strength(scaled_disequilibrium), strength=side_b * self._strength(scaled_disequilibrium),
exchange_id=pair.get_instrument_b().exchange_id_,
base_asset=pair.get_instrument_b().base_asset_id_, base_asset=pair.get_instrument_b().base_asset_id_,
quote_asset=pair.get_instrument_b().quote_asset_id_, quote_asset=pair.get_instrument_b().quote_asset_id_,
user_data={} user_data={}
@ -325,7 +304,6 @@ class PtLiveStrategy(NamedObject):
issued_ts_ns=current_nanoseconds(), issued_ts_ns=current_nanoseconds(),
data=TargetPositionSignal( data=TargetPositionSignal(
strength=0, strength=0,
exchange_id=pair.get_instrument_a().exchange_id_,
base_asset=pair.get_instrument_a().base_asset_id_, base_asset=pair.get_instrument_a().base_asset_id_,
quote_asset=pair.get_instrument_a().quote_asset_id_, quote_asset=pair.get_instrument_a().quote_asset_id_,
user_data={} user_data={}
@ -340,7 +318,6 @@ class PtLiveStrategy(NamedObject):
issued_ts_ns=current_nanoseconds(), issued_ts_ns=current_nanoseconds(),
data=TargetPositionSignal( data=TargetPositionSignal(
strength=0, strength=0,
exchange_id=pair.get_instrument_b().exchange_id_,
base_asset=pair.get_instrument_b().base_asset_id_, base_asset=pair.get_instrument_b().base_asset_id_,
quote_asset=pair.get_instrument_b().quote_asset_id_, quote_asset=pair.get_instrument_b().quote_asset_id_,
user_data={} user_data={}