diff --git a/.vscode/settings.json b/.vscode/settings.json index 5bbf820..2a330b4 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -108,4 +108,5 @@ // Enable Python-specific code actions "python.analysis.completeFunctionParens": true, "python.analysis.addImport.exactMatchOnly": false, + "workbench.tree.indent": 24, } diff --git a/__DELETE__/strategy/pair_strategy.py b/__DELETE__/strategy/pair_strategy.py index fbed69f..dbefbc4 100644 --- a/__DELETE__/strategy/pair_strategy.py +++ b/__DELETE__/strategy/pair_strategy.py @@ -89,10 +89,10 @@ def main() -> None: print(f"{message_type=} {subscr_id=} {instrument_id}") if message_type == "md_aggregate": aggr = message.get("md_aggregate", []) - print(f"[{aggr['tstmp'][:19]}] *** RLTM *** {message}") + print(f"[{aggr['tstamp'][:19]}] *** RLTM *** {message}") elif message_type == "historical_md_aggregate": for aggr in message.get("historical_data", []): - print(f"[{aggr['tstmp'][:19]}] *** HIST *** {aggr}") + print(f"[{aggr['tstamp'][:19]}] *** HIST *** {aggr}") else: print(f"Unknown message type: {message_type}") diff --git a/bin/trade_pair.py b/bin/trade_pair.py index 89ae114..883e69b 100644 --- a/bin/trade_pair.py +++ b/bin/trade_pair.py @@ -34,10 +34,10 @@ class PairTradingRunner(NamedObject): # print(f"{message_type=} {subscr_id=} {instrument_id}") # if message_type == "md_aggregate": # aggr = message.get("md_aggregate", []) -# print(f"[{aggr['tstmp'][:19]}] *** RLTM *** {message}") +# print(f"[{aggr['tstamp'][:19]}] *** RLTM *** {message}") # elif message_type == "historical_md_aggregate": # for aggr in message.get("historical_data", []): -# print(f"[{aggr['tstmp'][:19]}] *** HIST *** {aggr}") +# print(f"[{aggr['tstamp'][:19]}] *** HIST *** {aggr}") # else: # print(f"Unknown message type: {message_type}") diff --git a/lib/cvtt_client/mkt_data.py b/lib/cvtt_client/mkt_data.py index 0c8aff0..8dade62 100644 --- a/lib/cvtt_client/mkt_data.py +++ b/lib/cvtt_client/mkt_data.py @@ -156,10 +156,10 @@ async def main() -> None: print(f"{message_type=} {subscr_id=} {instrument_id}") if message_type == "md_aggregate": aggr = message.get("md_aggregate", []) - print(f"[{aggr['tstmp'][:19]}] *** RLTM *** {message}") + print(f"[{aggr['tstamp'][:19]}] *** RLTM *** {message}") elif message_type == "historical_md_aggregate": for aggr in message.get("historical_data", []): - print(f"[{aggr['tstmp'][:19]}] *** HIST *** {aggr}") + print(f"[{aggr['tstamp'][:19]}] *** HIST *** {aggr}") else: print(f"Unknown message type: {message_type}") diff --git a/lib/pt_strategy/live_strategy.py b/lib/pt_strategy/live_strategy.py index 68e23a4..67d1960 100644 --- a/lib/pt_strategy/live_strategy.py +++ b/lib/pt_strategy/live_strategy.py @@ -52,11 +52,11 @@ class PtMktDataClient(NamedObject): if message_type == "md_aggregate": aggr = message.get("md_aggregate", {}) await self.live_strategy_.on_mkt_data_update(aggr) - # print(f"[{aggr['tstmp'][:19]}] *** RLTM *** {message}") + # print(f"[{aggr['tstamp'][:19]}] *** RLTM *** {message}") elif message_type == "historical_md_aggregate": aggr = message.get("historical_data", {}) await self.live_strategy_.on_mkt_data_hist_snapshot(aggr) - # print(f"[{aggr['tstmp'][:19]}] *** HIST *** {aggr}") + # print(f"[{aggr['tstamp'][:19]}] *** HIST *** {aggr}") else: Log.info(f"Unknown message type: {message_type}") diff --git a/lib/pt_strategy/pt_market_data.py b/lib/pt_strategy/pt_market_data.py index ba41597..1b3da1c 100644 --- a/lib/pt_strategy/pt_market_data.py +++ b/lib/pt_strategy/pt_market_data.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, Dict, List, Type +from typing import Any, Dict, List, Optional import pandas as pd from cvttpy_base.settings.cvtt_types import JsonDictT @@ -176,7 +176,28 @@ class RealTimeMarketData(PtMarketData): async def on_mkt_data_hist_snapshot(self, snapshot: JsonDictT) -> None: # URGENT # create origin_mkt_data_df_ from snapshot + # verify that the data for both instruments are present + # transform it to market_data_df_ tstamp, close_symbolA, close_symbolB + ''' + # from cvttpy/exchanges/binance/spot/mkt_data.py + values = { + "time_ns": time_ns, + "tstamp": format_nanos_utc(time_ns), + "exchange_id": exch_inst.exchange_id_, + "instrument_id": exch_inst.instrument_id(), + "interval_ns": interval_sec * 1_000_000_000, + "open": float(kline[1]), + "high": float(kline[2]), + "low": float(kline[3]), + "close": float(kline[4]), + "volume": float(kline[5]), + "num_trades": kline[8], + "vwap": float(kline[7]) / float(kline[5]) if float(kline[5]) > 0 else 0.0 # Calculate VWAP + } + ''' + + pass async def on_mkt_data_update(self, update: JsonDictT) -> Optional[pd.DataFrame]: @@ -187,6 +208,23 @@ class RealTimeMarketData(PtMarketData): # add tmp1 to origin_mkt_data_df_ # add tmp2 to market_data_df_ # return market_data_df_ - + ''' + class MdTradesAggregate(NamedObject): + def to_dict(self) -> Dict[str, Any]: + return { + "time_ns": self.time_ns_, + "tstamp": format_nanos_utc(self.time_ns_), + "exchange_id": self.exch_inst_.exchange_id_, + "instrument_id": self.exch_inst_.instrument_id(), + "interval_ns": self.interval_ns_, + "open": self.exch_inst_.get_price(self.open_), + "high": self.exch_inst_.get_price(self.high_), + "low": self.exch_inst_.get_price(self.low_), + "close": self.exch_inst_.get_price(self.close_), + "volume": self.exch_inst_.get_quantity(self.volume_), + "vwap": self.exch_inst_.get_price(self.vwap_), + "num_trades": self.exch_inst_.get_quantity(self.num_trades_), + } + ''' return pd.DataFrame() \ No newline at end of file