diff --git a/apps/pairs_trader.py b/apps/pairs_trader.py index c2fcd07..6d1d93b 100644 --- a/apps/pairs_trader.py +++ b/apps/pairs_trader.py @@ -1,6 +1,7 @@ from __future__ import annotations from typing import Callable, Coroutine, List +import aiohttp.web as web from cvttpy_tools.app import App from cvttpy_tools.config import Config @@ -8,6 +9,8 @@ from cvttpy_tools.base import NamedObject from cvttpy_tools.config import CvttAppConfig from cvttpy_tools.logger import Log from cvttpy_tools.settings.cvtt_types import BookIdT +from cvttpy_tools.web.rest_service import RestService + # --- from cvttpy_trading.trading.instrument import ExchangeInstrument from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate @@ -17,11 +20,6 @@ from pairs_trading.lib.pt_strategy.live.live_strategy import PtLiveStrategy from pairs_trading.lib.live.mkt_data_client import CvttRestMktDataClient from pairs_trading.lib.live.ti_sender import TradingInstructionsSender -# import sys -# print("PYTHONPATH directories:") -# for path in sys.path: -# print(path) - ''' config http://cloud16.cvtt.vpn/apps/pairs_trading ''' @@ -36,6 +34,7 @@ class PairsTrader(NamedObject): live_strategy_: PtLiveStrategy pricer_client_: CvttRestMktDataClient + rest_service_: RestService def __init__(self) -> None: self.instruments_ = [] @@ -105,7 +104,16 @@ class PairsTrader(NamedObject): # # ------- CREATE REST SERVER ------- - # URGENT CREATE REST SERVER for dashboard communications + self.rest_service_ = RestService( + config_key=f"/api/REST" + ) + + # --- Strategy Handlers + self.rest_service_.add_handler( + method="POST", + url="/api/strategy", + handler=self._on_api_request, + ) async def subscribe_md(self) -> None: for exch_inst in self.instruments_: @@ -124,6 +132,9 @@ class PairsTrader(NamedObject): # Snapshot or update? await self.live_strategy_.on_mkt_data_hist_snapshot(hist_aggr=history) + async def _on_api_request(self, request: web.Request) -> web.Response: + return web.Response() # TODO add API request handler implementation + async def run(self) -> None: Log.info(f"{self.fname()} ...") pass diff --git a/lib/pt_strategy/live/live_strategy.py b/lib/pt_strategy/live/live_strategy.py index cb037ef..d7de592 100644 --- a/lib/pt_strategy/live/live_strategy.py +++ b/lib/pt_strategy/live/live_strategy.py @@ -12,6 +12,7 @@ from cvttpy_tools.app import App from cvttpy_tools.config import Config from cvttpy_tools.settings.cvtt_types import IntervalSecT from cvttpy_tools.timeutils import SecPerHour +from cvttpy_tools.logger import Log # --- from cvttpy_trading.trading.instrument import ExchangeInstrument @@ -117,27 +118,36 @@ class PtLiveStrategy(NamedObject): self, hist_aggr: List[MdTradesAggregate] ) -> None: # Log.info(f"on_mkt_data_hist_snapshot: {aggr}") - # await self.pt_mkt_data_.on_mkt_data_hist_snapshot(snapshot=aggr) - pass # URGENT PtiveStrategy.on_mkt_data_hist_snapshot() + if not self._is_md_actual(hist_aggr=hist_aggr): + return - # if market_data_df is not None: - # self.trading_pair_.market_data_ = market_data_df - # self.model_data_policy_.advance() - # prediction = self.trading_pair_.run( - # market_data_df, self.model_data_policy_.advance() - # ) - # self.predictions_ = pd.concat( - # [self.predictions_, prediction.to_df()], ignore_index=True - # ) + market_data_df: Optional[pd.DataFrame] = self._create_md_pdf(hist_aggr=hist_aggr) + if market_data_df is None: + Log.warning(f"{self.fname()} Unable to create market data df") + return - # trading_instructions: List[TradingInstruction] = ( - # self._create_trading_instructions( - # prediction=prediction, last_row=market_data_df.iloc[-1] - # ) - # ) - # if len(trading_instructions) > 0: - # await self._send_trading_instructions(trading_instructions) - # # trades = self._create_trades(prediction=prediction, last_row=market_data_df.iloc[-1]) + self.trading_pair_.market_data_ = market_data_df + self.model_data_policy_.advance() + prediction = self.trading_pair_.run( + market_data_df, self.model_data_policy_.advance() + ) + self.predictions_df_ = pd.concat( + [self.predictions_df_, prediction.to_df()], ignore_index=True + ) + + trading_instructions: List[TradingInstruction] = ( + self._create_trading_instructions( + prediction=prediction, last_row=market_data_df.iloc[-1] + ) + ) + if len(trading_instructions) > 0: + await self._send_trading_instructions(trading_instructions) + + def _is_md_actual(self, hist_aggr: List[MdTradesAggregate]) -> bool: + return False # URGENT _is_md_actual + + def _create_md_pdf(self, hist_aggr: List[MdTradesAggregate]) -> Optional[pd.DataFrame]: + return None # URGENT _create_md_pdf def interval_sec(self) -> IntervalSecT: return self.interval_sec_