diff --git a/.vscode/launch.json b/.vscode/launch.json index f18f6af..b067598 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -31,8 +31,8 @@ "PYTHONPATH": "${workspaceFolder}/..", "CONFIG_SERVICE": "cloud16.cvtt.vpn:6789", "MODEL_CONFIG": "vecm", - "CVTT_URL": "http://cvtt-tester-01.cvtt.vpn:23456", - // "CVTT_URL": "http://dev-server-02.cvtt.vpn:23456", + // "CVTT_URL": "http://cvtt-tester-01.cvtt.vpn:23456", + "CVTT_URL": "http://dev-server-02.cvtt.vpn:23456", }, "args": [ // "--config=${workspaceFolder}/configuration/pair_trader.cfg", diff --git a/LIBRARIES b/LIBRARIES deleted file mode 100644 index fc4f358..0000000 --- a/LIBRARIES +++ /dev/null @@ -1,2 +0,0 @@ -cvttpy_tools: 1.3.4 -cvttpy_trading: 2.4.1 \ No newline at end of file diff --git a/VERSION b/VERSION index 6812f81..05b19b1 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.0.3 \ No newline at end of file +0.0.4 \ No newline at end of file diff --git a/lib/live/mkt_data_client.py b/lib/live/mkt_data_client.py index 99a36c3..b11a6f4 100644 --- a/lib/live/mkt_data_client.py +++ b/lib/live/mkt_data_client.py @@ -196,8 +196,9 @@ class MdSummaryCollector(NamedObject): Log.info(f"{self.fname()} Timer for {self.exch_inst_.details_short()} is set to run in {start_in} sec") def next_load_time(self) -> NanosT: + ALLOW_LAG_SEC = 1 curr_sec = int(current_seconds()) - return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + 5 + return (curr_sec - curr_sec % self.interval_sec_) + self.interval_sec_ + ALLOW_LAG_SEC async def _load_new(self) -> None: diff --git a/lib/pt_strategy/live/live_strategy.py b/lib/pt_strategy/live/live_strategy.py index c23cde7..57c1357 100644 --- a/lib/pt_strategy/live/live_strategy.py +++ b/lib/pt_strategy/live/live_strategy.py @@ -42,14 +42,14 @@ class PtLiveStrategy(NamedObject): # for presentation: history of prediction values and trading signals predictions_df_: pd.DataFrame trading_signals_df_: pd.DataFrame + allowed_md_lag_sec_: int + def __init__( self, config: Config, pairs_trader: PairTrader, ): - # import copy - # self.config_ = Config(json_src=copy.deepcopy(config.data())) self.config_ = config self.pairs_trader_ = pairs_trader @@ -83,6 +83,8 @@ class PtLiveStrategy(NamedObject): ) assert self.history_depth_sec_ > 0, "history_depth_hours cannot be 0" + self.allowed_md_lag_sec_ = self.config_.get_value("allowed_md_lag_sec", 3) + await self.pairs_trader_.subscribe_md() self.open_threshold_ = self.config_.get_value( @@ -136,23 +138,29 @@ class PtLiveStrategy(NamedObject): Log.warning(f"{self.fname()} list of aggregates IS EMPTY") return False - ALLOWED_LAG_SEC = 5.0 curr_ns = current_nanoseconds() - LAG_THRESHOLD = NanosT((self.interval_sec() + ALLOWED_LAG_SEC) * NanoPerSec) # MAYBE check market data length - lag_ns = curr_ns - hist_aggr[-1].aggr_time_ns_ - if lag_ns > LAG_THRESHOLD: + + # at 18:05:01 we should see data for 18:04:00 + lag_sec = (curr_ns - hist_aggr[-1].aggr_time_ns_) / NanoPerSec - self.interval_sec() + if lag_sec > self.allowed_md_lag_sec_: Log.warning( f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()}" - f" Lagging {int(lag_ns/NanoPerSec)} seconds:" + f" Lagging {int(lag_sec)} > {self.allowed_md_lag_sec_} seconds:" f"\n{len(hist_aggr)} records" f"\n{hist_aggr[-1].exch_inst_.base_asset_id_}: {hist_aggr[-1].tstamp()}" f"\n{hist_aggr[-2].exch_inst_.base_asset_id_}: {hist_aggr[-2].tstamp()}" - # f" {hist_aggr[-1].exch_inst_.base_asset_id_}: {format_nanos_utc(hist_aggr[-1].aggr_time_ns_)}" - # f" {hist_aggr[-2].exch_inst_.base_asset_id_}: {format_nanos_utc(hist_aggr[-2].aggr_time_ns_)}" ) return False + else: + Log.info( + f"{self.fname()} {hist_aggr[-1].exch_inst_.details_short()}" + f" Lag {int(lag_sec)} <= {self.allowed_md_lag_sec_} seconds" + f"\n{len(hist_aggr)} records" + f"\n{hist_aggr[-1].exch_inst_.base_asset_id_}: {hist_aggr[-1].tstamp()}" + f"\n{hist_aggr[-2].exch_inst_.base_asset_id_}: {hist_aggr[-2].tstamp()}" + ) return True def _create_md_df(self, hist_aggr: List[MdTradesAggregate]) -> pd.DataFrame: