Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
73135ee8c2 | ||
|
|
e4a3795793 | ||
|
|
f311315ef8 | ||
|
|
76f9a80ad6 | ||
|
|
bf25eb7fb5 | ||
|
|
f2a5d6a7ad | ||
|
|
b9d479ae8c | ||
|
|
e6ae62ebb6 | ||
|
|
170e48d646 | ||
|
|
d5f00f557b |
1
.gitignore
vendored
1
.gitignore
vendored
@ -9,3 +9,4 @@ cvttpy
|
||||
# SpecStory explanation file
|
||||
.specstory/.what-is-this.md
|
||||
results/
|
||||
tmp/
|
||||
|
||||
23
.vscode/launch.json
vendored
23
.vscode/launch.json
vendored
@ -17,29 +17,6 @@
|
||||
"PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.."
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "-------- Live Pair Trading --------",
|
||||
},
|
||||
{
|
||||
"name": "PAIR TRADER",
|
||||
"type": "debugpy",
|
||||
"request": "launch",
|
||||
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
|
||||
"program": "${workspaceFolder}/apps/pair_trader.py",
|
||||
"console": "integratedTerminal",
|
||||
"env": {
|
||||
"PYTHONPATH": "${workspaceFolder}/..",
|
||||
"CONFIG_SERVICE": "cloud16.cvtt.vpn:6789",
|
||||
"MODEL_CONFIG": "vecm"
|
||||
},
|
||||
"args": [
|
||||
// "--config=${workspaceFolder}/configuration/pair_trader.cfg",
|
||||
"--config=http://cloud16.cvtt.vpn:6789/apps/pairs_trading/pair_trader",
|
||||
"--book_id=TEST_BOOK_20250818",
|
||||
"--instrument_A=COINBASE_AT:PAIR-ADA-USD",
|
||||
"--instrument_B=COINBASE_AT:PAIR-SOL-USD",
|
||||
],
|
||||
},
|
||||
{
|
||||
"name": "-------- VECM --------",
|
||||
},
|
||||
|
||||
2
.vscode/settings.json
vendored
2
.vscode/settings.json
vendored
@ -6,7 +6,6 @@
|
||||
],
|
||||
"python.testing.cwd": "${workspaceFolder}",
|
||||
"python.testing.autoTestDiscoverOnSaveEnabled": true,
|
||||
"python.defaultInterpreterPath": "/home/oleg/.pyenv/python3.12-venv/bin/python3",
|
||||
"python.testing.pytestPath": "python3",
|
||||
"python.analysis.extraPaths": [
|
||||
"${workspaceFolder}",
|
||||
@ -16,4 +15,5 @@
|
||||
"python.envFile": "${workspaceFolder}/.env",
|
||||
"python.testing.debugPort": 3000,
|
||||
"python.testing.promptToConfigure": false,
|
||||
"python.defaultInterpreterPath": "/home/oleg/.pyenv/python3.12-venv/bin/python"
|
||||
}
|
||||
509
apps/pair_selector/pair_selector.py
Normal file
509
apps/pair_selector/pair_selector.py
Normal file
@ -0,0 +1,509 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from aiohttp import web
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from statsmodels.tsa.stattools import adfuller, coint # type: ignore
|
||||
from statsmodels.tsa.vector_ar.vecm import coint_johansen # type: ignore
|
||||
|
||||
|
||||
from cvttpy_tools.app import App
|
||||
from cvttpy_tools.base import NamedObject
|
||||
from cvttpy_tools.config import Config, CvttAppConfig
|
||||
from cvttpy_tools.logger import Log
|
||||
from cvttpy_tools.timeutils import NanoPerSec, SecPerHour, current_nanoseconds
|
||||
from cvttpy_tools.web.rest_client import RESTSender
|
||||
from cvttpy_tools.web.rest_service import RestService
|
||||
|
||||
from cvttpy_trading.trading.exchange_config import ExchangeAccounts
|
||||
from cvttpy_trading.trading.instrument import ExchangeInstrument
|
||||
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate, MdSummary
|
||||
|
||||
from pairs_trading.apps.pair_selector.renderer import HtmlRenderer
|
||||
|
||||
|
||||
@dataclass
|
||||
class InstrumentQuality(NamedObject):
|
||||
instrument_: ExchangeInstrument
|
||||
record_count_: int
|
||||
latest_tstamp_: Optional[pd.Timestamp]
|
||||
status_: str
|
||||
reason_: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class PairStats(NamedObject):
|
||||
instrument_a_: ExchangeInstrument
|
||||
instrument_b_: ExchangeInstrument
|
||||
pvalue_eg_: Optional[float]
|
||||
pvalue_adf_: Optional[float]
|
||||
pvalue_j_: Optional[float]
|
||||
trace_stat_j_: Optional[float]
|
||||
rank_eg_: int = 0
|
||||
rank_adf_: int = 0
|
||||
rank_j_: int = 0
|
||||
composite_rank_: int = 0
|
||||
|
||||
def as_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"instrument_a": self.instrument_a_.instrument_id(),
|
||||
"instrument_b": self.instrument_b_.instrument_id(),
|
||||
"pvalue_eg": self.pvalue_eg_,
|
||||
"pvalue_adf": self.pvalue_adf_,
|
||||
"pvalue_j": self.pvalue_j_,
|
||||
"trace_stat_j": self.trace_stat_j_,
|
||||
"rank_eg": self.rank_eg_,
|
||||
"rank_adf": self.rank_adf_,
|
||||
"rank_j": self.rank_j_,
|
||||
"composite_rank": self.composite_rank_,
|
||||
}
|
||||
|
||||
|
||||
class DataFetcher(NamedObject):
|
||||
sender_: RESTSender
|
||||
interval_sec_: int
|
||||
history_depth_sec_: int
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str,
|
||||
interval_sec: int,
|
||||
history_depth_sec: int,
|
||||
) -> None:
|
||||
self.sender_ = RESTSender(base_url=base_url)
|
||||
self.interval_sec_ = interval_sec
|
||||
self.history_depth_sec_ = history_depth_sec
|
||||
|
||||
def fetch(
|
||||
self, exch_acct: str, inst: ExchangeInstrument
|
||||
) -> List[MdTradesAggregate]:
|
||||
rqst_data = {
|
||||
"exch_acct": exch_acct,
|
||||
"instrument_id": inst.instrument_id(),
|
||||
"interval_sec": self.interval_sec_,
|
||||
"history_depth_sec": self.history_depth_sec_,
|
||||
}
|
||||
response = self.sender_.send_post(endpoint="md_summary", post_body=rqst_data)
|
||||
if response.status_code not in (200, 201):
|
||||
Log.error(
|
||||
f"{self.fname()}: error {response.status_code} for {inst.details_short()}: {response.text}"
|
||||
)
|
||||
return []
|
||||
mdsums: List[MdSummary] = MdSummary.from_REST_response(response=response)
|
||||
return [
|
||||
mdsum.create_md_trades_aggregate(
|
||||
exch_acct=exch_acct, exch_inst=inst, interval_sec=self.interval_sec_
|
||||
)
|
||||
for mdsum in mdsums
|
||||
]
|
||||
|
||||
|
||||
class QualityChecker(NamedObject):
|
||||
interval_sec_: int
|
||||
|
||||
def __init__(self, interval_sec: int) -> None:
|
||||
self.interval_sec_ = interval_sec
|
||||
|
||||
def evaluate(
|
||||
self, inst: ExchangeInstrument, aggr: List[MdTradesAggregate]
|
||||
) -> InstrumentQuality:
|
||||
if len(aggr) == 0:
|
||||
return InstrumentQuality(
|
||||
instrument_=inst,
|
||||
record_count_=0,
|
||||
latest_tstamp_=None,
|
||||
status_="FAIL",
|
||||
reason_="no records",
|
||||
)
|
||||
|
||||
aggr_sorted = sorted(aggr, key=lambda a: a.aggr_time_ns_)
|
||||
|
||||
latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True)
|
||||
now_ts = pd.Timestamp.utcnow()
|
||||
recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_)
|
||||
if latest_ts <= recency_cutoff:
|
||||
return InstrumentQuality(
|
||||
instrument_=inst,
|
||||
record_count_=len(aggr_sorted),
|
||||
latest_tstamp_=latest_ts,
|
||||
status_="FAIL",
|
||||
reason_=f"stale: latest {latest_ts} <= cutoff {recency_cutoff}",
|
||||
)
|
||||
|
||||
gaps_ok, reason = self._check_gaps(aggr_sorted)
|
||||
status = "PASS" if gaps_ok else "FAIL"
|
||||
return InstrumentQuality(
|
||||
instrument_=inst,
|
||||
record_count_=len(aggr_sorted),
|
||||
latest_tstamp_=latest_ts,
|
||||
status_=status,
|
||||
reason_=reason,
|
||||
)
|
||||
|
||||
def _check_gaps(self, aggr: List[MdTradesAggregate]) -> Tuple[bool, str]:
|
||||
NUM_TRADES_THRESHOLD = 50
|
||||
if len(aggr) < 2:
|
||||
return True, "ok"
|
||||
|
||||
interval_ns = self.interval_sec_ * NanoPerSec
|
||||
for idx in range(1, len(aggr)):
|
||||
prev = aggr[idx - 1]
|
||||
curr = aggr[idx]
|
||||
delta = curr.aggr_time_ns_ - prev.aggr_time_ns_
|
||||
missing_intervals = int(delta // interval_ns) - 1
|
||||
if missing_intervals <= 0:
|
||||
continue
|
||||
|
||||
prev_nt = prev.num_trades_
|
||||
next_nt = curr.num_trades_
|
||||
estimate = self._approximate_num_trades(prev_nt, next_nt)
|
||||
if estimate > NUM_TRADES_THRESHOLD:
|
||||
return False, (
|
||||
f"gap of {missing_intervals} interval(s), est num_trades={estimate} > {NUM_TRADES_THRESHOLD}"
|
||||
)
|
||||
return True, "ok"
|
||||
|
||||
@staticmethod
|
||||
def _approximate_num_trades(prev_nt: int, next_nt: int) -> float:
|
||||
if prev_nt is None and next_nt is None:
|
||||
return 0.0
|
||||
if prev_nt is None:
|
||||
return float(next_nt)
|
||||
if next_nt is None:
|
||||
return float(prev_nt)
|
||||
return (prev_nt + next_nt) / 2.0
|
||||
|
||||
|
||||
class PairAnalyzer(NamedObject):
|
||||
price_field_: str
|
||||
interval_sec_: int
|
||||
|
||||
def __init__(self, price_field: str, interval_sec: int) -> None:
|
||||
self.price_field_ = price_field
|
||||
self.interval_sec_ = interval_sec
|
||||
|
||||
def analyze(
|
||||
self, series: Dict[ExchangeInstrument, pd.DataFrame]
|
||||
) -> List[PairStats]:
|
||||
instruments = list(series.keys())
|
||||
results: List[PairStats] = []
|
||||
for i in range(len(instruments)):
|
||||
for j in range(i + 1, len(instruments)):
|
||||
inst_a = instruments[i]
|
||||
inst_b = instruments[j]
|
||||
df_a = series[inst_a][["tstamp", "price"]].rename(
|
||||
columns={"price": "price_a"}
|
||||
)
|
||||
df_b = series[inst_b][["tstamp", "price"]].rename(
|
||||
columns={"price": "price_b"}
|
||||
)
|
||||
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
|
||||
"tstamp"
|
||||
)
|
||||
stats = self._compute_stats(inst_a, inst_b, merged)
|
||||
if stats:
|
||||
results.append(stats)
|
||||
self._rank(results)
|
||||
return results
|
||||
|
||||
def _compute_stats(
|
||||
self,
|
||||
inst_a: ExchangeInstrument,
|
||||
inst_b: ExchangeInstrument,
|
||||
merged: pd.DataFrame,
|
||||
) -> Optional[PairStats]:
|
||||
if len(merged) < 2:
|
||||
return None
|
||||
px_a = merged["price_a"].astype(float)
|
||||
px_b = merged["price_b"].astype(float)
|
||||
|
||||
std_a = float(px_a.std())
|
||||
std_b = float(px_b.std())
|
||||
if std_a == 0 or std_b == 0:
|
||||
return None
|
||||
|
||||
z_a = (px_a - float(px_a.mean())) / std_a
|
||||
z_b = (px_b - float(px_b.mean())) / std_b
|
||||
|
||||
p_eg: Optional[float]
|
||||
p_adf: Optional[float]
|
||||
p_j: Optional[float]
|
||||
trace_stat: Optional[float]
|
||||
|
||||
try:
|
||||
p_eg = float(coint(z_a, z_b)[1])
|
||||
except Exception as exc:
|
||||
Log.warning(
|
||||
f"{self.fname()}: EG failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}"
|
||||
)
|
||||
p_eg = None
|
||||
|
||||
try:
|
||||
spread = z_a - z_b
|
||||
p_adf = float(adfuller(spread, maxlag=1, regression="c")[1])
|
||||
except Exception as exc:
|
||||
Log.warning(
|
||||
f"{self.fname()}: ADF failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}"
|
||||
)
|
||||
p_adf = None
|
||||
|
||||
try:
|
||||
data = np.column_stack([z_a, z_b])
|
||||
res = coint_johansen(data, det_order=0, k_ar_diff=1)
|
||||
trace_stat = float(res.lr1[0])
|
||||
cv10, cv5, cv1 = res.cvt[0]
|
||||
if trace_stat > cv1:
|
||||
p_j = 0.01
|
||||
elif trace_stat > cv5:
|
||||
p_j = 0.05
|
||||
elif trace_stat > cv10:
|
||||
p_j = 0.10
|
||||
else:
|
||||
p_j = 1.0
|
||||
except Exception as exc:
|
||||
Log.warning(
|
||||
f"{self.fname()}: Johansen failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}"
|
||||
)
|
||||
p_j = None
|
||||
trace_stat = None
|
||||
|
||||
return PairStats(
|
||||
instrument_a_=inst_a,
|
||||
instrument_b_=inst_b,
|
||||
pvalue_eg_=p_eg,
|
||||
pvalue_adf_=p_adf,
|
||||
pvalue_j_=p_j,
|
||||
trace_stat_j_=trace_stat,
|
||||
)
|
||||
|
||||
def _rank(self, results: List[PairStats]) -> None:
|
||||
self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
|
||||
self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
|
||||
self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_")
|
||||
for res in results:
|
||||
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
|
||||
results.sort(key=lambda r: r.composite_rank_)
|
||||
|
||||
@staticmethod
|
||||
def _assign_ranks(results: List[PairStats], key, attr: str) -> None:
|
||||
values = [key(r) for r in results]
|
||||
sorted_vals = sorted([v for v in values if v is not None])
|
||||
for res in results:
|
||||
val = key(res)
|
||||
if val is None:
|
||||
setattr(res, attr, len(sorted_vals) + 1)
|
||||
continue
|
||||
rank = 1 + sum(1 for v in sorted_vals if v < val)
|
||||
setattr(res, attr, rank)
|
||||
|
||||
|
||||
class PairSelectionEngine(NamedObject):
|
||||
config_: object
|
||||
instruments_: List[ExchangeInstrument]
|
||||
price_field_: str
|
||||
fetcher_: DataFetcher
|
||||
quality_: QualityChecker
|
||||
analyzer_: PairAnalyzer
|
||||
interval_sec_: int
|
||||
history_depth_sec_: int
|
||||
data_quality_cache_: List[InstrumentQuality]
|
||||
pair_results_cache_: List[PairStats]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: Config,
|
||||
instruments: List[ExchangeInstrument],
|
||||
price_field: str,
|
||||
) -> None:
|
||||
self.config_ = config
|
||||
self.instruments_ = instruments
|
||||
self.price_field_ = price_field
|
||||
|
||||
interval_sec = int(config.get_value("interval_sec", 0))
|
||||
history_depth_sec = int(config.get_value("history_depth_hours", 0)) * SecPerHour
|
||||
base_url = config.get_value("cvtt_base_url", None)
|
||||
assert interval_sec > 0, "interval_sec must be > 0"
|
||||
assert history_depth_sec > 0, "history_depth_sec must be > 0"
|
||||
assert base_url, "cvtt_base_url must be set"
|
||||
|
||||
self.fetcher_ = DataFetcher(
|
||||
base_url=base_url,
|
||||
interval_sec=interval_sec,
|
||||
history_depth_sec=history_depth_sec,
|
||||
)
|
||||
self.quality_ = QualityChecker(interval_sec=interval_sec)
|
||||
self.analyzer_ = PairAnalyzer(
|
||||
price_field=price_field, interval_sec=interval_sec
|
||||
)
|
||||
|
||||
self.interval_sec_ = interval_sec
|
||||
self.history_depth_sec_ = history_depth_sec
|
||||
|
||||
self.data_quality_cache_ = []
|
||||
self.pair_results_cache_ = []
|
||||
|
||||
async def run_once(self) -> None:
|
||||
quality_results: List[InstrumentQuality] = []
|
||||
price_series: Dict[ExchangeInstrument, pd.DataFrame] = {}
|
||||
|
||||
for inst in self.instruments_:
|
||||
exch_acct = inst.user_data_.get("exch_acct") or inst.exchange_id_
|
||||
aggr = self.fetcher_.fetch(exch_acct=exch_acct, inst=inst)
|
||||
q = self.quality_.evaluate(inst, aggr)
|
||||
quality_results.append(q)
|
||||
if q.status_ != "PASS":
|
||||
continue
|
||||
df = self._to_dataframe(aggr, inst)
|
||||
if len(df) > 0:
|
||||
price_series[inst] = df
|
||||
self.data_quality_cache_ = quality_results
|
||||
self.pair_results_cache_ = self.analyzer_.analyze(price_series)
|
||||
|
||||
def _to_dataframe(
|
||||
self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument
|
||||
) -> pd.DataFrame:
|
||||
rows: List[Dict[str, Any]] = []
|
||||
for item in aggr:
|
||||
rows.append(
|
||||
{
|
||||
"tstamp": pd.to_datetime(item.aggr_time_ns_, unit="ns", utc=True),
|
||||
"price": self._extract_price(item, inst),
|
||||
"num_trades": item.num_trades_,
|
||||
}
|
||||
)
|
||||
df = pd.DataFrame(rows)
|
||||
return df.sort_values("tstamp").reset_index(drop=True)
|
||||
|
||||
def _extract_price(
|
||||
self, aggr: MdTradesAggregate, inst: ExchangeInstrument
|
||||
) -> float:
|
||||
price_field = self.price_field_
|
||||
# MdTradesAggregate inherits hist bar with fields open_, high_, low_, close_, vwap_
|
||||
field_map = {
|
||||
"open": aggr.open_,
|
||||
"high": aggr.high_,
|
||||
"low": aggr.low_,
|
||||
"close": aggr.close_,
|
||||
"vwap": aggr.vwap_,
|
||||
}
|
||||
raw = field_map.get(price_field, aggr.close_)
|
||||
return inst.get_price(raw)
|
||||
|
||||
def sleep_seconds_until_next_cycle(self) -> float:
|
||||
now_ns = current_nanoseconds()
|
||||
interval_ns = self.interval_sec_ * NanoPerSec
|
||||
next_boundary = (now_ns // interval_ns + 1) * interval_ns
|
||||
return max(0.0, (next_boundary - now_ns) / NanoPerSec)
|
||||
|
||||
def quality_dicts(self) -> List[Dict[str, Any]]:
|
||||
res: List[Dict[str, Any]] = []
|
||||
for q in self.data_quality_cache_:
|
||||
res.append(
|
||||
{
|
||||
"instrument": q.instrument_.instrument_id(),
|
||||
"record_count": q.record_count_,
|
||||
"latest_tstamp": (
|
||||
q.latest_tstamp_.isoformat() if q.latest_tstamp_ else None
|
||||
),
|
||||
"status": q.status_,
|
||||
"reason": q.reason_,
|
||||
}
|
||||
)
|
||||
return res
|
||||
|
||||
def pair_dicts(self) -> List[Dict[str, Any]]:
|
||||
return [p.as_dict() for p in self.pair_results_cache_]
|
||||
|
||||
|
||||
class PairSelector(NamedObject):
|
||||
instruments_: List[ExchangeInstrument]
|
||||
engine_: PairSelectionEngine
|
||||
rest_service_: RestService
|
||||
|
||||
def __init__(self) -> None:
|
||||
App.instance().add_cmdline_arg("--oneshot", action="store_true", default=False)
|
||||
App.instance().add_call(App.Stage.Config, self._on_config())
|
||||
App.instance().add_call(App.Stage.Run, self.run())
|
||||
|
||||
async def _on_config(self) -> None:
|
||||
cfg = CvttAppConfig.instance()
|
||||
self.instruments_ = self._load_instruments(cfg)
|
||||
price_field = cfg.get_value("model/stat_model_price", "close")
|
||||
|
||||
self.engine_ = PairSelectionEngine(
|
||||
config=cfg,
|
||||
instruments=self.instruments_,
|
||||
price_field=price_field,
|
||||
)
|
||||
|
||||
self.rest_service_ = RestService(config_key="/api/REST")
|
||||
self.rest_service_.add_handler("GET", "/data_quality", self._on_data_quality)
|
||||
self.rest_service_.add_handler(
|
||||
"GET", "/pair_selection", self._on_pair_selection
|
||||
)
|
||||
|
||||
def _load_instruments(self, cfg: CvttAppConfig) -> List[ExchangeInstrument]:
|
||||
instruments_cfg = cfg.get_value("instruments", [])
|
||||
instruments: List[ExchangeInstrument] = []
|
||||
assert len(instruments_cfg) >= 2, "at least two instruments required"
|
||||
for item in instruments_cfg:
|
||||
if isinstance(item, str):
|
||||
parts = item.split(":", 1)
|
||||
if len(parts) != 2:
|
||||
raise ValueError(f"invalid instrument format: {item}")
|
||||
exch_acct, instrument_id = parts
|
||||
elif isinstance(item, dict):
|
||||
exch_acct = item.get("exch_acct", "")
|
||||
instrument_id = item.get("instrument_id", "")
|
||||
if not exch_acct or not instrument_id:
|
||||
raise ValueError(f"invalid instrument config: {item}")
|
||||
else:
|
||||
raise ValueError(f"unsupported instrument entry: {item}")
|
||||
|
||||
exch_inst = ExchangeAccounts.instance().get_exchange_instrument(
|
||||
exch_acct=exch_acct, instrument_id=instrument_id
|
||||
)
|
||||
assert (
|
||||
exch_inst is not None
|
||||
), f"no ExchangeInstrument for {exch_acct}:{instrument_id}"
|
||||
exch_inst.user_data_["exch_acct"] = exch_acct
|
||||
instruments.append(exch_inst)
|
||||
return instruments
|
||||
|
||||
async def run(self) -> None:
|
||||
oneshot = App.instance().get_argument("oneshot", False)
|
||||
while True:
|
||||
await self.engine_.run_once()
|
||||
if oneshot:
|
||||
break
|
||||
sleep_for = self.engine_.sleep_seconds_until_next_cycle()
|
||||
await asyncio.sleep(sleep_for)
|
||||
|
||||
async def _on_data_quality(self, request: web.Request) -> web.Response:
|
||||
fmt = request.query.get("format", "html").lower()
|
||||
quality = self.engine_.quality_dicts()
|
||||
if fmt == "json":
|
||||
return web.json_response(quality)
|
||||
return web.Response(
|
||||
text=HtmlRenderer.render_data_quality(quality), content_type="text/html"
|
||||
)
|
||||
|
||||
async def _on_pair_selection(self, request: web.Request) -> web.Response:
|
||||
fmt = request.query.get("format", "html").lower()
|
||||
pairs = self.engine_.pair_dicts()
|
||||
if fmt == "json":
|
||||
return web.json_response(pairs)
|
||||
return web.Response(
|
||||
text=HtmlRenderer.render_pairs(pairs), content_type="text/html"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
App()
|
||||
CvttAppConfig()
|
||||
PairSelector()
|
||||
App.instance().run()
|
||||
394
apps/pair_selector/pair_selector_engine.py.md
Normal file
394
apps/pair_selector/pair_selector_engine.py.md
Normal file
@ -0,0 +1,394 @@
|
||||
```python
|
||||
from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from statsmodels.tsa.stattools import adfuller, coint
|
||||
from statsmodels.tsa.vector_ar.vecm import coint_johansen
|
||||
from statsmodels.tsa.vector_ar.vecm import coint_johansen # type: ignore
|
||||
# ---
|
||||
from cvttpy_tools.base import NamedObject
|
||||
from cvttpy_tools.config import Config
|
||||
from cvttpy_tools.logger import Log
|
||||
from cvttpy_tools.timeutils import NanoPerSec, SecPerHour, current_nanoseconds
|
||||
from cvttpy_tools.web.rest_client import RESTSender
|
||||
# ---
|
||||
from cvttpy_trading.trading.instrument import ExchangeInstrument
|
||||
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate, MdSummary
|
||||
|
||||
|
||||
@dataclass
|
||||
class InstrumentQuality(NamedObject):
|
||||
instrument_: ExchangeInstrument
|
||||
record_count_: int
|
||||
latest_tstamp_: Optional[pd.Timestamp]
|
||||
status_: str
|
||||
reason_: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class PairStats(NamedObject):
|
||||
instrument_a_: ExchangeInstrument
|
||||
instrument_b_: ExchangeInstrument
|
||||
pvalue_eg_: Optional[float]
|
||||
pvalue_adf_: Optional[float]
|
||||
pvalue_j_: Optional[float]
|
||||
trace_stat_j_: Optional[float]
|
||||
rank_eg_: int = 0
|
||||
rank_adf_: int = 0
|
||||
rank_j_: int = 0
|
||||
composite_rank_: int = 0
|
||||
|
||||
def as_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"instrument_a": self.instrument_a_.instrument_id(),
|
||||
"instrument_b": self.instrument_b_.instrument_id(),
|
||||
"pvalue_eg": self.pvalue_eg_,
|
||||
"pvalue_adf": self.pvalue_adf_,
|
||||
"pvalue_j": self.pvalue_j_,
|
||||
"trace_stat_j": self.trace_stat_j_,
|
||||
"rank_eg": self.rank_eg_,
|
||||
"rank_adf": self.rank_adf_,
|
||||
"rank_j": self.rank_j_,
|
||||
"composite_rank": self.composite_rank_,
|
||||
}
|
||||
|
||||
|
||||
class DataFetcher(NamedObject):
|
||||
sender_: RESTSender
|
||||
interval_sec_: int
|
||||
history_depth_sec_: int
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str,
|
||||
interval_sec: int,
|
||||
history_depth_sec: int,
|
||||
) -> None:
|
||||
self.sender_ = RESTSender(base_url=base_url)
|
||||
self.interval_sec_ = interval_sec
|
||||
self.history_depth_sec_ = history_depth_sec
|
||||
|
||||
def fetch(self, exch_acct: str, inst: ExchangeInstrument) -> List[MdTradesAggregate]:
|
||||
rqst_data = {
|
||||
"exch_acct": exch_acct,
|
||||
"instrument_id": inst.instrument_id(),
|
||||
"interval_sec": self.interval_sec_,
|
||||
"history_depth_sec": self.history_depth_sec_,
|
||||
}
|
||||
response = self.sender_.send_post(endpoint="md_summary", post_body=rqst_data)
|
||||
if response.status_code not in (200, 201):
|
||||
Log.error(
|
||||
f"{self.fname()}: error {response.status_code} for {inst.details_short()}: {response.text}")
|
||||
return []
|
||||
mdsums: List[MdSummary] = MdSummary.from_REST_response(response=response)
|
||||
return [
|
||||
mdsum.create_md_trades_aggregate(
|
||||
exch_acct=exch_acct, exch_inst=inst, interval_sec=self.interval_sec_
|
||||
)
|
||||
for mdsum in mdsums
|
||||
]
|
||||
|
||||
|
||||
class QualityChecker(NamedObject):
|
||||
interval_sec_: int
|
||||
|
||||
def __init__(self, interval_sec: int) -> None:
|
||||
self.interval_sec_ = interval_sec
|
||||
|
||||
def evaluate(self, inst: ExchangeInstrument, aggr: List[MdTradesAggregate]) -> InstrumentQuality:
|
||||
if len(aggr) == 0:
|
||||
return InstrumentQuality(
|
||||
instrument_=inst,
|
||||
record_count_=0,
|
||||
latest_tstamp_=None,
|
||||
status_="FAIL",
|
||||
reason_="no records",
|
||||
)
|
||||
|
||||
aggr_sorted = sorted(aggr, key=lambda a: a.aggr_time_ns_)
|
||||
|
||||
latest_ts = pd.to_datetime(aggr_sorted[-1].aggr_time_ns_, unit="ns", utc=True)
|
||||
now_ts = pd.Timestamp.utcnow()
|
||||
recency_cutoff = now_ts - pd.Timedelta(seconds=2 * self.interval_sec_)
|
||||
if latest_ts <= recency_cutoff:
|
||||
return InstrumentQuality(
|
||||
instrument_=inst,
|
||||
record_count_=len(aggr_sorted),
|
||||
latest_tstamp_=latest_ts,
|
||||
status_="FAIL",
|
||||
reason_=f"stale: latest {latest_ts} <= cutoff {recency_cutoff}",
|
||||
)
|
||||
|
||||
gaps_ok, reason = self._check_gaps(aggr_sorted)
|
||||
status = "PASS" if gaps_ok else "FAIL"
|
||||
return InstrumentQuality(
|
||||
instrument_=inst,
|
||||
record_count_=len(aggr_sorted),
|
||||
latest_tstamp_=latest_ts,
|
||||
status_=status,
|
||||
reason_=reason,
|
||||
)
|
||||
|
||||
def _check_gaps(self, aggr: List[MdTradesAggregate]) -> Tuple[bool, str]:
|
||||
NUM_TRADES_THRESHOLD = 50
|
||||
if len(aggr) < 2:
|
||||
return True, "ok"
|
||||
|
||||
interval_ns = self.interval_sec_ * NanoPerSec
|
||||
for idx in range(1, len(aggr)):
|
||||
prev = aggr[idx - 1]
|
||||
curr = aggr[idx]
|
||||
delta = curr.aggr_time_ns_ - prev.aggr_time_ns_
|
||||
missing_intervals = int(delta // interval_ns) - 1
|
||||
if missing_intervals <= 0:
|
||||
continue
|
||||
|
||||
prev_nt = prev.num_trades_
|
||||
next_nt = curr.num_trades_
|
||||
estimate = self._approximate_num_trades(prev_nt, next_nt)
|
||||
if estimate > NUM_TRADES_THRESHOLD:
|
||||
return False, (
|
||||
f"gap of {missing_intervals} interval(s), est num_trades={estimate} > {NUM_TRADES_THRESHOLD}"
|
||||
)
|
||||
return True, "ok"
|
||||
|
||||
@staticmethod
|
||||
def _approximate_num_trades(prev_nt: int, next_nt: int) -> float:
|
||||
if prev_nt is None and next_nt is None:
|
||||
return 0.0
|
||||
if prev_nt is None:
|
||||
return float(next_nt)
|
||||
if next_nt is None:
|
||||
return float(prev_nt)
|
||||
return (prev_nt + next_nt) / 2.0
|
||||
|
||||
|
||||
class PairAnalyzer(NamedObject):
|
||||
price_field_: str
|
||||
interval_sec_: int
|
||||
|
||||
def __init__(self, price_field: str, interval_sec: int) -> None:
|
||||
self.price_field_ = price_field
|
||||
self.interval_sec_ = interval_sec
|
||||
|
||||
def analyze(self, series: Dict[ExchangeInstrument, pd.DataFrame]) -> List[PairStats]:
|
||||
instruments = list(series.keys())
|
||||
results: List[PairStats] = []
|
||||
for i in range(len(instruments)):
|
||||
for j in range(i + 1, len(instruments)):
|
||||
inst_a = instruments[i]
|
||||
inst_b = instruments[j]
|
||||
df_a = series[inst_a][["tstamp", "price"]].rename(
|
||||
columns={"price": "price_a"}
|
||||
)
|
||||
df_b = series[inst_b][["tstamp", "price"]].rename(
|
||||
columns={"price": "price_b"}
|
||||
)
|
||||
merged = pd.merge(df_a, df_b, on="tstamp", how="inner").sort_values(
|
||||
"tstamp"
|
||||
)
|
||||
stats = self._compute_stats(inst_a, inst_b, merged)
|
||||
if stats:
|
||||
results.append(stats)
|
||||
self._rank(results)
|
||||
return results
|
||||
|
||||
def _compute_stats(
|
||||
self,
|
||||
inst_a: ExchangeInstrument,
|
||||
inst_b: ExchangeInstrument,
|
||||
merged: pd.DataFrame,
|
||||
) -> Optional[PairStats]:
|
||||
if len(merged) < 2:
|
||||
return None
|
||||
px_a = merged["price_a"].astype(float)
|
||||
px_b = merged["price_b"].astype(float)
|
||||
|
||||
std_a = float(px_a.std())
|
||||
std_b = float(px_b.std())
|
||||
if std_a == 0 or std_b == 0:
|
||||
return None
|
||||
|
||||
z_a = (px_a - float(px_a.mean())) / std_a
|
||||
z_b = (px_b - float(px_b.mean())) / std_b
|
||||
|
||||
p_eg: Optional[float]
|
||||
p_adf: Optional[float]
|
||||
p_j: Optional[float]
|
||||
trace_stat: Optional[float]
|
||||
|
||||
try:
|
||||
p_eg = float(coint(z_a, z_b)[1])
|
||||
except Exception as exc:
|
||||
Log.warning(f"{self.fname()}: EG failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}")
|
||||
p_eg = None
|
||||
|
||||
try:
|
||||
spread = z_a - z_b
|
||||
p_adf = float(adfuller(spread, maxlag=1, regression="c")[1])
|
||||
except Exception as exc:
|
||||
Log.warning(f"{self.fname()}: ADF failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}")
|
||||
p_adf = None
|
||||
|
||||
try:
|
||||
data = np.column_stack([z_a, z_b])
|
||||
res = coint_johansen(data, det_order=0, k_ar_diff=1)
|
||||
trace_stat = float(res.lr1[0])
|
||||
cv10, cv5, cv1 = res.cvt[0]
|
||||
if trace_stat > cv1:
|
||||
p_j = 0.01
|
||||
elif trace_stat > cv5:
|
||||
p_j = 0.05
|
||||
elif trace_stat > cv10:
|
||||
p_j = 0.10
|
||||
else:
|
||||
p_j = 1.0
|
||||
except Exception as exc:
|
||||
Log.warning(f"{self.fname()}: Johansen failed for {inst_a.details_short()}/{inst_b.details_short()}: {exc}")
|
||||
p_j = None
|
||||
trace_stat = None
|
||||
|
||||
return PairStats(
|
||||
instrument_a_=inst_a,
|
||||
instrument_b_=inst_b,
|
||||
pvalue_eg_=p_eg,
|
||||
pvalue_adf_=p_adf,
|
||||
pvalue_j_=p_j,
|
||||
trace_stat_j_=trace_stat,
|
||||
)
|
||||
|
||||
def _rank(self, results: List[PairStats]) -> None:
|
||||
self._assign_ranks(results, key=lambda r: r.pvalue_eg_, attr="rank_eg_")
|
||||
self._assign_ranks(results, key=lambda r: r.pvalue_adf_, attr="rank_adf_")
|
||||
self._assign_ranks(results, key=lambda r: r.pvalue_j_, attr="rank_j_")
|
||||
for res in results:
|
||||
res.composite_rank_ = res.rank_eg_ + res.rank_adf_ + res.rank_j_
|
||||
results.sort(key=lambda r: r.composite_rank_)
|
||||
|
||||
@staticmethod
|
||||
def _assign_ranks(
|
||||
results: List[PairStats], key, attr: str
|
||||
) -> None:
|
||||
values = [key(r) for r in results]
|
||||
sorted_vals = sorted([v for v in values if v is not None])
|
||||
for res in results:
|
||||
val = key(res)
|
||||
if val is None:
|
||||
setattr(res, attr, len(sorted_vals) + 1)
|
||||
continue
|
||||
rank = 1 + sum(1 for v in sorted_vals if v < val)
|
||||
setattr(res, attr, rank)
|
||||
|
||||
|
||||
class PairSelectionEngine(NamedObject):
|
||||
config_: object
|
||||
instruments_: List[ExchangeInstrument]
|
||||
price_field_: str
|
||||
fetcher_: DataFetcher
|
||||
quality_: QualityChecker
|
||||
analyzer_: PairAnalyzer
|
||||
interval_sec_: int
|
||||
history_depth_sec_: int
|
||||
data_quality_cache_: List[InstrumentQuality]
|
||||
pair_results_cache_: List[PairStats]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: Config,
|
||||
instruments: List[ExchangeInstrument],
|
||||
price_field: str,
|
||||
) -> None:
|
||||
self.config_ = config
|
||||
self.instruments_ = instruments
|
||||
self.price_field_ = price_field
|
||||
|
||||
interval_sec = int(config.get_value("interval_sec", 0))
|
||||
history_depth_sec = int(config.get_value("history_depth_hours", 0)) * SecPerHour
|
||||
base_url = config.get_value("cvtt_base_url", None)
|
||||
assert interval_sec > 0, "interval_sec must be > 0"
|
||||
assert history_depth_sec > 0, "history_depth_sec must be > 0"
|
||||
assert base_url, "cvtt_base_url must be set"
|
||||
|
||||
self.fetcher_ = DataFetcher(
|
||||
base_url=base_url,
|
||||
interval_sec=interval_sec,
|
||||
history_depth_sec=history_depth_sec,
|
||||
)
|
||||
self.quality_ = QualityChecker(interval_sec=interval_sec)
|
||||
self.analyzer_ = PairAnalyzer(price_field=price_field, interval_sec=interval_sec)
|
||||
|
||||
self.interval_sec_ = interval_sec
|
||||
self.history_depth_sec_ = history_depth_sec
|
||||
|
||||
self.data_quality_cache_ = []
|
||||
self.pair_results_cache_ = []
|
||||
|
||||
async def run_once(self) -> None:
|
||||
quality_results: List[InstrumentQuality] = []
|
||||
price_series: Dict[ExchangeInstrument, pd.DataFrame] = {}
|
||||
|
||||
for inst in self.instruments_:
|
||||
exch_acct = inst.user_data_.get("exch_acct") or inst.exchange_id_
|
||||
aggr = self.fetcher_.fetch(exch_acct=exch_acct, inst=inst)
|
||||
q = self.quality_.evaluate(inst, aggr)
|
||||
quality_results.append(q)
|
||||
if q.status_ != "PASS":
|
||||
continue
|
||||
df = self._to_dataframe(aggr, inst)
|
||||
if len(df) > 0:
|
||||
price_series[inst] = df
|
||||
self.data_quality_cache_ = quality_results
|
||||
self.pair_results_cache_ = self.analyzer_.analyze(price_series)
|
||||
|
||||
def _to_dataframe(self, aggr: List[MdTradesAggregate], inst: ExchangeInstrument) -> pd.DataFrame:
|
||||
rows: List[Dict[str, Any]] = []
|
||||
for item in aggr:
|
||||
rows.append(
|
||||
{
|
||||
"tstamp": pd.to_datetime(item.aggr_time_ns_, unit="ns", utc=True),
|
||||
"price": self._extract_price(item, inst),
|
||||
"num_trades": item.num_trades_,
|
||||
}
|
||||
)
|
||||
df = pd.DataFrame(rows)
|
||||
return df.sort_values("tstamp").reset_index(drop=True)
|
||||
|
||||
def _extract_price(self, aggr: MdTradesAggregate, inst: ExchangeInstrument) -> float:
|
||||
price_field = self.price_field_
|
||||
# MdTradesAggregate inherits hist bar with fields open_, high_, low_, close_, vwap_
|
||||
field_map = {
|
||||
"open": aggr.open_,
|
||||
"high": aggr.high_,
|
||||
"low": aggr.low_,
|
||||
"close": aggr.close_,
|
||||
"vwap": aggr.vwap_,
|
||||
}
|
||||
raw = field_map.get(price_field, aggr.close_)
|
||||
return inst.get_price(raw)
|
||||
|
||||
def sleep_seconds_until_next_cycle(self) -> float:
|
||||
now_ns = current_nanoseconds()
|
||||
interval_ns = self.interval_sec_ * NanoPerSec
|
||||
next_boundary = (now_ns // interval_ns + 1) * interval_ns
|
||||
return max(0.0, (next_boundary - now_ns) / NanoPerSec)
|
||||
|
||||
def quality_dicts(self) -> List[Dict[str, Any]]:
|
||||
res: List[Dict[str, Any]] = []
|
||||
for q in self.data_quality_cache_:
|
||||
res.append(
|
||||
{
|
||||
"instrument": q.instrument_.instrument_id(),
|
||||
"record_count": q.record_count_,
|
||||
"latest_tstamp": q.latest_tstamp_.isoformat() if q.latest_tstamp_ else None,
|
||||
"status": q.status_,
|
||||
"reason": q.reason_,
|
||||
}
|
||||
)
|
||||
return res
|
||||
|
||||
def pair_dicts(self) -> List[Dict[str, Any]]:
|
||||
return [p.as_dict() for p in self.pair_results_cache_]
|
||||
```
|
||||
140
apps/pair_selector/renderer.py
Normal file
140
apps/pair_selector/renderer.py
Normal file
@ -0,0 +1,140 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Dict, List
|
||||
|
||||
|
||||
from cvttpy_tools.app import App
|
||||
from cvttpy_tools.base import NamedObject
|
||||
from cvttpy_tools.config import CvttAppConfig
|
||||
|
||||
|
||||
class HtmlRenderer(NamedObject):
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def render_data_quality(quality: List[Dict[str, Any]]) -> str:
|
||||
rows = "".join(
|
||||
f"<tr>"
|
||||
f"<td>{q.get('instrument','')}</td>"
|
||||
f"<td>{q.get('record_count','')}</td>"
|
||||
f"<td>{q.get('latest_tstamp','')}</td>"
|
||||
f"<td>{q.get('status','')}</td>"
|
||||
f"<td>{q.get('reason','')}</td>"
|
||||
f"</tr>"
|
||||
for q in sorted(quality, key=lambda x: str(x.get("instrument", "")))
|
||||
)
|
||||
return f"""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset='utf-8'/>
|
||||
<title>Data Quality</title>
|
||||
<style>
|
||||
body {{ font-family: Arial, sans-serif; margin: 20px; }}
|
||||
table {{ border-collapse: collapse; width: 100%; }}
|
||||
th, td {{ border: 1px solid #ccc; padding: 8px; text-align: left; }}
|
||||
th {{ background: #f2f2f2; }}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h2>Data Quality</h2>
|
||||
<table>
|
||||
<thead>
|
||||
<tr><th>Instrument</th><th>Records</th><th>Latest</th><th>Status</th><th>Reason</th></tr>
|
||||
</thead>
|
||||
<tbody>{rows}</tbody>
|
||||
</table>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def render_pairs(pairs: List[Dict[str, Any]]) -> str:
|
||||
if not pairs:
|
||||
body = "<p>No pairs available. Check data quality and try again.</p>"
|
||||
else:
|
||||
body_rows = []
|
||||
for p in pairs:
|
||||
body_rows.append(
|
||||
"<tr>"
|
||||
f"<td>{p.get('instrument_a','')}</td>"
|
||||
f"<td>{p.get('instrument_b','')}</td>"
|
||||
f"<td data-value='{p.get('rank_eg',0)}'>{p.get('rank_eg','')}</td>"
|
||||
f"<td data-value='{p.get('rank_adf',0)}'>{p.get('rank_adf','')}</td>"
|
||||
f"<td data-value='{p.get('rank_j',0)}'>{p.get('rank_j','')}</td>"
|
||||
f"<td data-value='{p.get('pvalue_eg','')}'>{p.get('pvalue_eg','')}</td>"
|
||||
f"<td data-value='{p.get('pvalue_adf','')}'>{p.get('pvalue_adf','')}</td>"
|
||||
f"<td data-value='{p.get('pvalue_j','')}'>{p.get('pvalue_j','')}</td>"
|
||||
"</tr>"
|
||||
)
|
||||
body = "\n".join(body_rows)
|
||||
|
||||
return f"""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset='utf-8'/>
|
||||
<title>Pair Selection</title>
|
||||
<style>
|
||||
body {{ font-family: Arial, sans-serif; margin: 20px; }}
|
||||
table {{ border-collapse: collapse; width: 100%; }}
|
||||
th, td {{ border: 1px solid #ccc; padding: 8px; text-align: left; }}
|
||||
th.sortable {{ cursor: pointer; background: #f2f2f2; }}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h2>Pair Selection</h2>
|
||||
<table id="pairs-table">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Instrument A</th>
|
||||
<th>Instrument B</th>
|
||||
<th class="sortable" data-type="num">Rank-EG</th>
|
||||
<th class="sortable" data-type="num">Rank-ADF</th>
|
||||
<th class="sortable" data-type="num">Rank-J</th>
|
||||
<th>EG p-value</th>
|
||||
<th>ADF p-value</th>
|
||||
<th>Johansen pseudo p</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{body}
|
||||
</tbody>
|
||||
</table>
|
||||
<script>
|
||||
(function() {{
|
||||
const table = document.getElementById('pairs-table');
|
||||
if (!table) return;
|
||||
const getValue = (cell) => {{
|
||||
const val = cell.getAttribute('data-value');
|
||||
const num = parseFloat(val);
|
||||
return isNaN(num) ? val : num;
|
||||
}};
|
||||
const toggleSort = (index, isNumeric) => {{
|
||||
const tbody = table.querySelector('tbody');
|
||||
const rows = Array.from(tbody.querySelectorAll('tr'));
|
||||
const th = table.querySelectorAll('th')[index];
|
||||
const dir = th.getAttribute('data-dir') === 'asc' ? 'desc' : 'asc';
|
||||
th.setAttribute('data-dir', dir);
|
||||
rows.sort((a, b) => {{
|
||||
const va = getValue(a.children[index]);
|
||||
const vb = getValue(b.children[index]);
|
||||
if (isNumeric && !isNaN(va) && !isNaN(vb)) {{
|
||||
return dir === 'asc' ? va - vb : vb - va;
|
||||
}}
|
||||
return dir === 'asc'
|
||||
? String(va).localeCompare(String(vb))
|
||||
: String(vb).localeCompare(String(va));
|
||||
}});
|
||||
tbody.innerHTML = '';
|
||||
rows.forEach(r => tbody.appendChild(r));
|
||||
}};
|
||||
table.querySelectorAll('th.sortable').forEach((th, idx) => {{
|
||||
th.addEventListener('click', () => toggleSort(idx, th.dataset.type === 'num'));
|
||||
}});
|
||||
}})();
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
@ -141,12 +141,11 @@ class PairTrader(NamedObject):
|
||||
)
|
||||
|
||||
async def _on_md_summary(self, history: List[MdTradesAggregate], exch_inst: ExchangeInstrument) -> None:
|
||||
# URGENT before calling stragegy, make sure that **BOTH** instruments market data is combined.
|
||||
Log.info(f"DEBUG got {exch_inst.details_short()} data")
|
||||
Log.info(f"{self.fname()}: got {exch_inst.details_short()} data")
|
||||
self.latest_history_[exch_inst] = history
|
||||
if len(self.latest_history_) == 2:
|
||||
from itertools import chain
|
||||
all_aggrs = sorted(list(chain.from_iterable(self.latest_history_.values())), key=lambda X: X.time_ns_)
|
||||
all_aggrs = sorted(list(chain.from_iterable(self.latest_history_.values())), key=lambda X: X.aggr_time_ns_)
|
||||
|
||||
await self.live_strategy_.on_mkt_data_hist_snapshot(hist_aggr=all_aggrs)
|
||||
self.latest_history_ = {}
|
||||
|
||||
186
build-dist.sh
Executable file
186
build-dist.sh
Executable file
@ -0,0 +1,186 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# ---------------- Settings
|
||||
|
||||
repo=git@cloud21.cvtt.vpn:/works/git/cvtt2/research/pairs_trading.git
|
||||
|
||||
dist_root=/home/cvttdist/software/cvtt2
|
||||
dist_user=cvttdist
|
||||
dist_host="cloud21.cvtt.vpn"
|
||||
dist_ssh_port="22"
|
||||
|
||||
dist_locations="cloud21.cvtt.vpn:22 hs01.cvtt.vpn:22"
|
||||
version_file="VERSION"
|
||||
|
||||
prj=pairs_trading
|
||||
brnch=master
|
||||
interactive=N
|
||||
|
||||
# ---------------- Settings
|
||||
|
||||
# ---------------- cmdline
|
||||
|
||||
usage() {
|
||||
echo "Usage: $0 [-b <branch (master)> -i (interactive)"
|
||||
exit 1
|
||||
}
|
||||
|
||||
while getopts "b:i" opt; do
|
||||
case ${opt} in
|
||||
b )
|
||||
brnch=$OPTARG
|
||||
;;
|
||||
i )
|
||||
interactive=Y
|
||||
;;
|
||||
\? )
|
||||
echo "Invalid option: -$OPTARG" >&2
|
||||
usage
|
||||
;;
|
||||
: )
|
||||
echo "Option -$OPTARG requires an argument." >&2
|
||||
usage
|
||||
;;
|
||||
esac
|
||||
done
|
||||
# ---------------- cmdline
|
||||
|
||||
confirm() {
|
||||
if [ "${interactive}" == "Y" ]; then
|
||||
echo "--------------------------------"
|
||||
echo -n "Press <Enter> to continue" && read
|
||||
fi
|
||||
}
|
||||
|
||||
|
||||
if [ "${interactive}" == "Y" ]; then
|
||||
echo -n "Enter project [${prj}]: "
|
||||
read project
|
||||
if [ "${project}" == "" ]
|
||||
then
|
||||
project=${prj}
|
||||
fi
|
||||
else
|
||||
project=${prj}
|
||||
fi
|
||||
|
||||
# repo=${git_repo_arr[${project}]}
|
||||
if [ -z ${repo} ]; then
|
||||
echo "ERROR: Project repository for ${project} not found"
|
||||
exit -1
|
||||
fi
|
||||
echo "Project repo: ${repo}"
|
||||
|
||||
if [ "${interactive}" == "Y" ]; then
|
||||
echo -n "Enter branch to build release from [${brnch}]: "
|
||||
read branch
|
||||
if [ "${branch}" == "" ]
|
||||
then
|
||||
branch=${brnch}
|
||||
fi
|
||||
else
|
||||
branch=${brnch}
|
||||
fi
|
||||
|
||||
tmp_dir=$(mktemp -d)
|
||||
function cleanup {
|
||||
cd ${HOME}
|
||||
rm -rf ${tmp_dir}
|
||||
}
|
||||
trap cleanup EXIT
|
||||
|
||||
|
||||
prj_dir="${tmp_dir}/${prj}"
|
||||
|
||||
cmd_arr=()
|
||||
Cmd="git clone ${repo} ${prj_dir}"
|
||||
cmd_arr+=("${Cmd}")
|
||||
|
||||
Cmd="cd ${prj_dir}"
|
||||
cmd_arr+=("${Cmd}")
|
||||
|
||||
if [ "${interactive}" == "Y" ]; then
|
||||
echo "------------------------------------"
|
||||
echo "The following commands will execute:"
|
||||
echo "------------------------------------"
|
||||
for cmd in "${cmd_arr[@]}"
|
||||
do
|
||||
echo ${cmd}
|
||||
done
|
||||
fi
|
||||
|
||||
confirm
|
||||
|
||||
for cmd in "${cmd_arr[@]}"
|
||||
do
|
||||
echo ${cmd} && eval ${cmd}
|
||||
done
|
||||
|
||||
Cmd="git checkout ${branch}"
|
||||
echo ${Cmd} && eval ${Cmd}
|
||||
if [ "${?}" != "0" ]; then
|
||||
echo "ERROR: Branch ${branch} is not found"
|
||||
cd ${HOME} && rm -rf ${tmp_dir}
|
||||
exit -1
|
||||
fi
|
||||
|
||||
|
||||
release_version=$(cat ${version_file} | awk -F',' '{print $1}')
|
||||
whats_new=$(cat ${version_file} | awk -F',' '{print $2}')
|
||||
|
||||
|
||||
echo "--------------------------------"
|
||||
echo "Version file: ${version_file}"
|
||||
echo "Release version: ${release_version}"
|
||||
|
||||
confirm
|
||||
|
||||
version_tag="v${release_version}"
|
||||
if [ "$(git tag -l "${version_tag}")" != "" ]; then
|
||||
version_tag="${version_tag}.$(date +%Y%m%d_%H%M)"
|
||||
fi
|
||||
version_comment="'${version_tag} ${project} ${branch} $(date +%Y-%m-%d)\n${whats_new}'"
|
||||
|
||||
cmd_arr=()
|
||||
Cmd="git tag -a ${version_tag} -m ${version_comment}"
|
||||
cmd_arr+=("${Cmd}")
|
||||
|
||||
Cmd="git push origin --tags"
|
||||
cmd_arr+=("${Cmd}")
|
||||
|
||||
Cmd="rm -rf .git"
|
||||
cmd_arr+=("${Cmd}")
|
||||
|
||||
SourceLoc=../${project}
|
||||
|
||||
dist_path="${dist_root}/${project}/${release_version}"
|
||||
|
||||
for dist_loc in ${dist_locations}; do
|
||||
dhp=(${dist_loc//:/ })
|
||||
dist_host=${dhp[0]}
|
||||
dist_port=${dhp[1]}
|
||||
Cmd="rsync -avzh"
|
||||
Cmd="${Cmd} --rsync-path=\"mkdir -p ${dist_path}"
|
||||
Cmd="${Cmd} && rsync\" -e \"ssh -p ${dist_ssh_port}\""
|
||||
Cmd="${Cmd} $SourceLoc ${dist_user}@${dist_host}:${dist_path}/"
|
||||
cmd_arr+=("${Cmd}")
|
||||
done
|
||||
|
||||
if [ "${interactive}" == "Y" ]; then
|
||||
echo "------------------------------------"
|
||||
echo "The following commands will execute:"
|
||||
echo "------------------------------------"
|
||||
for cmd in "${cmd_arr[@]}"
|
||||
do
|
||||
echo ${cmd}
|
||||
done
|
||||
fi
|
||||
|
||||
confirm
|
||||
|
||||
for cmd in "${cmd_arr[@]}"
|
||||
do
|
||||
pwd && echo ${cmd} && eval ${cmd}
|
||||
done
|
||||
|
||||
echo "$0 Done ${project} ${release_version}"
|
||||
@ -1,86 +1,82 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Callable, Coroutine, Dict, Any, List, Optional, Set
|
||||
from typing import Dict, Any, List, Optional, Set
|
||||
|
||||
import requests
|
||||
|
||||
from cvttpy_tools.base import NamedObject
|
||||
from cvttpy_tools.app import App
|
||||
from cvttpy_tools.logger import Log
|
||||
from cvttpy_tools.config import Config
|
||||
from cvttpy_tools.timer import Timer
|
||||
from cvttpy_tools.timeutils import NanosT, current_seconds, NanoPerSec
|
||||
from cvttpy_tools.timeutils import NanosT, current_seconds
|
||||
from cvttpy_tools.settings.cvtt_types import InstrumentIdT, IntervalSecT
|
||||
|
||||
from cvttpy_tools.web.rest_client import RESTSender
|
||||
# ---
|
||||
from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar
|
||||
from cvttpy_trading.trading.instrument import ExchangeInstrument
|
||||
from cvttpy_trading.trading.accounting.exch_account import ExchangeAccountNameT
|
||||
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate
|
||||
from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate, MdSummary, MdSummaryCallbackT
|
||||
from cvttpy_trading.trading.exchange_config import ExchangeAccounts
|
||||
|
||||
# ---
|
||||
from pairs_trading.lib.live.rest_client import RESTSender
|
||||
|
||||
|
||||
class MdSummary(HistMdBar):
|
||||
def __init__(
|
||||
self,
|
||||
ts_ns: int,
|
||||
open: float,
|
||||
high: float,
|
||||
low: float,
|
||||
close: float,
|
||||
volume: float,
|
||||
vwap: float,
|
||||
num_trades: int,
|
||||
):
|
||||
super().__init__(ts=ts_ns)
|
||||
self.open_ = open
|
||||
self.high_ = high
|
||||
self.low_ = low
|
||||
self.close_ = close
|
||||
self.volume_ = volume
|
||||
self.vwap_ = vwap
|
||||
self.num_trades_ = num_trades
|
||||
# class MdSummary(HistMdBar):
|
||||
# def __init__(
|
||||
# self,
|
||||
# ts_ns: int,
|
||||
# open: float,
|
||||
# high: float,
|
||||
# low: float,
|
||||
# close: float,
|
||||
# volume: float,
|
||||
# vwap: float,
|
||||
# num_trades: int,
|
||||
# ):
|
||||
# super().__init__(ts=ts_ns)
|
||||
# self.open_ = open
|
||||
# self.high_ = high
|
||||
# self.low_ = low
|
||||
# self.close_ = close
|
||||
# self.volume_ = volume
|
||||
# self.vwap_ = vwap
|
||||
# self.num_trades_ = num_trades
|
||||
|
||||
@classmethod
|
||||
def from_REST_response(cls, response: requests.Response) -> List[MdSummary]:
|
||||
res: List[MdSummary] = []
|
||||
jresp = response.json()
|
||||
hist_data = jresp.get("historical_data", [])
|
||||
for hd in hist_data:
|
||||
res.append(
|
||||
MdSummary(
|
||||
ts_ns=hd["time_ns"],
|
||||
open=hd["open"],
|
||||
high=hd["high"],
|
||||
low=hd["low"],
|
||||
close=hd["close"],
|
||||
volume=hd["volume"],
|
||||
vwap=hd["vwap"],
|
||||
num_trades=hd["num_trades"],
|
||||
)
|
||||
)
|
||||
return res
|
||||
# @classmethod
|
||||
# def from_REST_response(cls, response: requests.Response) -> List[MdSummary]:
|
||||
# res: List[MdSummary] = []
|
||||
# jresp = response.json()
|
||||
# hist_data = jresp.get("historical_data", [])
|
||||
# for hd in hist_data:
|
||||
# res.append(
|
||||
# MdSummary(
|
||||
# ts_ns=hd["time_ns"],
|
||||
# open=hd["open"],
|
||||
# high=hd["high"],
|
||||
# low=hd["low"],
|
||||
# close=hd["close"],
|
||||
# volume=hd["volume"],
|
||||
# vwap=hd["vwap"],
|
||||
# num_trades=hd["num_trades"],
|
||||
# )
|
||||
# )
|
||||
# return res
|
||||
|
||||
def create_md_trades_aggregate(
|
||||
self,
|
||||
exch_acct: ExchangeAccountNameT,
|
||||
exch_inst: ExchangeInstrument,
|
||||
interval_sec: IntervalSecT,
|
||||
) -> MdTradesAggregate:
|
||||
res = MdTradesAggregate(
|
||||
exch_acct=exch_acct,
|
||||
exch_inst=exch_inst,
|
||||
interval_ns=interval_sec * NanoPerSec,
|
||||
)
|
||||
res.set(mdbar=self)
|
||||
return res
|
||||
# def create_md_trades_aggregate(
|
||||
# self,
|
||||
# exch_acct: ExchangeAccountNameT,
|
||||
# exch_inst: ExchangeInstrument,
|
||||
# interval_sec: IntervalSecT,
|
||||
# ) -> MdTradesAggregate:
|
||||
# res = MdTradesAggregate(
|
||||
# exch_acct=exch_acct,
|
||||
# exch_inst=exch_inst,
|
||||
# interval_ns=interval_sec * NanoPerSec,
|
||||
# )
|
||||
# res.set(mdbar=self)
|
||||
# return res
|
||||
|
||||
|
||||
MdSummaryCallbackT = Callable[[List[MdTradesAggregate]], Coroutine]
|
||||
# MdSummaryCallbackT = Callable[[List[MdTradesAggregate]], Coroutine]
|
||||
|
||||
|
||||
class MdSummaryCollector(NamedObject):
|
||||
@ -163,6 +159,7 @@ class MdSummaryCollector(NamedObject):
|
||||
)
|
||||
return None
|
||||
res = MdSummary.from_REST_response(response=response)
|
||||
Log.info(f"DEBUG *** {self.exch_inst_.base_asset_id_}: {res[-1].tstamp_}")
|
||||
return None if len(res) == 0 else res[-1]
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
@ -195,15 +192,16 @@ class MdSummaryCollector(NamedObject):
|
||||
Log.info(f"{self.fname()} Timer for {self.exch_inst_.details_short()} is set to run in {start_in} sec")
|
||||
|
||||
def next_load_time(self) -> NanosT:
|
||||
ALLOW_LAG_SEC = 1
|
||||
curr_sec = int(current_seconds())
|
||||
return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + 2
|
||||
return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + ALLOW_LAG_SEC
|
||||
|
||||
async def _load_new(self) -> None:
|
||||
|
||||
last: Optional[MdSummary] = self.get_last()
|
||||
if not last:
|
||||
Log.warning(f"{self.fname()}: did not get last update")
|
||||
elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].time_ns_:
|
||||
elif not self.is_empty() and last.ts_ns_ <= self.history_[-1].aggr_time_ns_:
|
||||
Log.info(
|
||||
f"{self.fname()}: Received {last}. Already Have: {self.history_[-1]}"
|
||||
)
|
||||
|
||||
@ -1,19 +1,12 @@
|
||||
```python
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Callable, Dict, Any, List, Optional
|
||||
from typing import Dict
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
from cvttpy_tools.base import NamedObject
|
||||
from cvttpy_tools.logger import Log
|
||||
from cvttpy_tools.config import Config
|
||||
from cvttpy_tools.timer import Timer
|
||||
|
||||
from cvttpy_tools.timeutils import NanoPerSec, NanosT, current_nanoseconds, current_seconds
|
||||
from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar
|
||||
|
||||
|
||||
class RESTSender(NamedObject):
|
||||
session_: requests.Session
|
||||
@ -64,4 +57,4 @@ class RESTSender(NamedObject):
|
||||
raise ConnectionError(
|
||||
f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore
|
||||
) from excpt
|
||||
|
||||
```
|
||||
@ -6,10 +6,10 @@ import requests
|
||||
from cvttpy_tools.base import NamedObject
|
||||
from cvttpy_tools.config import Config
|
||||
from cvttpy_tools.logger import Log
|
||||
from cvttpy_tools.web.rest_client import RESTSender
|
||||
# ---
|
||||
from cvttpy_trading.trading.trading_instructions import TradingInstructions
|
||||
# ---
|
||||
from pairs_trading.lib.live.rest_client import RESTSender
|
||||
from pairs_trading.apps.pair_trader import PairTrader
|
||||
|
||||
|
||||
|
||||
@ -9,7 +9,7 @@ from cvttpy_tools.base import NamedObject
|
||||
from cvttpy_tools.app import App
|
||||
from cvttpy_tools.config import Config
|
||||
from cvttpy_tools.settings.cvtt_types import IntervalSecT
|
||||
from cvttpy_tools.timeutils import SecPerHour, current_nanoseconds, NanoPerSec
|
||||
from cvttpy_tools.timeutils import NanosT, SecPerHour, current_nanoseconds, NanoPerSec, format_nanos_utc
|
||||
from cvttpy_tools.logger import Log
|
||||
|
||||
# ---
|
||||
@ -42,14 +42,14 @@ class PtLiveStrategy(NamedObject):
|
||||
# for presentation: history of prediction values and trading signals
|
||||
predictions_df_: pd.DataFrame
|
||||
trading_signals_df_: pd.DataFrame
|
||||
allowed_md_lag_sec_: int
|
||||
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: Config,
|
||||
pairs_trader: PairTrader,
|
||||
):
|
||||
# import copy
|
||||
# self.config_ = Config(json_src=copy.deepcopy(config.data()))
|
||||
self.config_ = config
|
||||
|
||||
self.pairs_trader_ = pairs_trader
|
||||
@ -83,7 +83,7 @@ class PtLiveStrategy(NamedObject):
|
||||
)
|
||||
assert self.history_depth_sec_ > 0, "history_depth_hours cannot be 0"
|
||||
|
||||
await self.pairs_trader_.subscribe_md()
|
||||
self.allowed_md_lag_sec_ = self.config_.get_value("allowed_md_lag_sec", 3)
|
||||
|
||||
self.open_threshold_ = self.config_.get_value(
|
||||
"model/disequilibrium/open_trshld", 0.0
|
||||
@ -99,6 +99,9 @@ class PtLiveStrategy(NamedObject):
|
||||
self.close_threshold_ > 0
|
||||
), "disequilibrium/close_trshld must be greater than 0"
|
||||
|
||||
await self.pairs_trader_.subscribe_md()
|
||||
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.classname()}: trading_pair={self.trading_pair_}, mdp={self.model_data_policy_.__class__.__name__}, "
|
||||
|
||||
@ -132,17 +135,33 @@ class PtLiveStrategy(NamedObject):
|
||||
await self._send_trading_instructions(trading_instructions)
|
||||
|
||||
def _is_md_actual(self, hist_aggr: List[MdTradesAggregate]) -> bool:
|
||||
curr_ns = current_nanoseconds()
|
||||
LAG_THRESHOLD = 5 * NanoPerSec
|
||||
|
||||
if len(hist_aggr) == 0:
|
||||
Log.warning(f"{self.fname()} list of aggregates IS EMPTY")
|
||||
return False
|
||||
|
||||
curr_ns = current_nanoseconds()
|
||||
|
||||
# MAYBE check market data length
|
||||
lag_ns = curr_ns - hist_aggr[-1].time_ns_
|
||||
if lag_ns > LAG_THRESHOLD:
|
||||
Log.warning(f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()} Lagging {int(lag_ns/NanoPerSec)} seconds")
|
||||
|
||||
# at 18:05:01 we should see data for 18:04:00
|
||||
lag_sec = (curr_ns - hist_aggr[-1].aggr_time_ns_) / NanoPerSec - self.interval_sec()
|
||||
if lag_sec > self.allowed_md_lag_sec_:
|
||||
Log.warning(
|
||||
f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()}"
|
||||
f" Lagging {int(lag_sec)} > {self.allowed_md_lag_sec_} seconds:"
|
||||
f"\n{len(hist_aggr)} records"
|
||||
f"\n{hist_aggr[-1].exch_inst_.base_asset_id_}: {hist_aggr[-1].tstamp()}"
|
||||
f"\n{hist_aggr[-2].exch_inst_.base_asset_id_}: {hist_aggr[-2].tstamp()}"
|
||||
)
|
||||
return False
|
||||
else:
|
||||
Log.info(
|
||||
f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()}"
|
||||
f" Lag {int(lag_sec)} <= {self.allowed_md_lag_sec_} seconds"
|
||||
f"\n{len(hist_aggr)} records"
|
||||
f"\n{hist_aggr[-1].exch_inst_.base_asset_id_}: {hist_aggr[-1].tstamp()}"
|
||||
f"\n{hist_aggr[-2].exch_inst_.base_asset_id_}: {hist_aggr[-2].tstamp()}"
|
||||
)
|
||||
return True
|
||||
|
||||
def _create_md_df(self, hist_aggr: List[MdTradesAggregate]) -> pd.DataFrame:
|
||||
@ -163,8 +182,8 @@ class PtLiveStrategy(NamedObject):
|
||||
rows.append(
|
||||
{
|
||||
# convert nanoseconds → tz-aware pandas timestamp
|
||||
"tstamp": pd.to_datetime(aggr.time_ns_, unit="ns", utc=True),
|
||||
"time_ns": aggr.time_ns_,
|
||||
"tstamp": pd.to_datetime(aggr.aggr_time_ns_, unit="ns", utc=True),
|
||||
"time_ns": aggr.aggr_time_ns_,
|
||||
"symbol": exch_inst.instrument_id().split("-", 1)[1],
|
||||
"exchange_id": exch_inst.exchange_id_,
|
||||
"instrument_id": exch_inst.instrument_id(),
|
||||
@ -270,6 +289,7 @@ class PtLiveStrategy(NamedObject):
|
||||
issued_ts_ns=current_nanoseconds(),
|
||||
data=TargetPositionSignal(
|
||||
strength=side_a * self._strength(scaled_disequilibrium),
|
||||
exchange_id=pair.get_instrument_a().exchange_id_,
|
||||
base_asset=pair.get_instrument_a().base_asset_id_,
|
||||
quote_asset=pair.get_instrument_a().quote_asset_id_,
|
||||
user_data={}
|
||||
@ -284,6 +304,7 @@ class PtLiveStrategy(NamedObject):
|
||||
issued_ts_ns=current_nanoseconds(),
|
||||
data=TargetPositionSignal(
|
||||
strength=side_b * self._strength(scaled_disequilibrium),
|
||||
exchange_id=pair.get_instrument_b().exchange_id_,
|
||||
base_asset=pair.get_instrument_b().base_asset_id_,
|
||||
quote_asset=pair.get_instrument_b().quote_asset_id_,
|
||||
user_data={}
|
||||
@ -304,6 +325,7 @@ class PtLiveStrategy(NamedObject):
|
||||
issued_ts_ns=current_nanoseconds(),
|
||||
data=TargetPositionSignal(
|
||||
strength=0,
|
||||
exchange_id=pair.get_instrument_a().exchange_id_,
|
||||
base_asset=pair.get_instrument_a().base_asset_id_,
|
||||
quote_asset=pair.get_instrument_a().quote_asset_id_,
|
||||
user_data={}
|
||||
@ -318,6 +340,7 @@ class PtLiveStrategy(NamedObject):
|
||||
issued_ts_ns=current_nanoseconds(),
|
||||
data=TargetPositionSignal(
|
||||
strength=0,
|
||||
exchange_id=pair.get_instrument_b().exchange_id_,
|
||||
base_asset=pair.get_instrument_b().base_asset_id_,
|
||||
quote_asset=pair.get_instrument_b().quote_asset_id_,
|
||||
user_data={}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user