From 002f797751a93898716abc9663d4171113612ecd Mon Sep 17 00:00:00 2001 From: Oleg Sheynin Date: Thu, 1 Jan 2026 22:12:04 +0000 Subject: [PATCH] dev progress --- apps/pairs_trader.py | 4 +- lib/pt_strategy/live/live_strategy.py | 141 +++++++++++++------------- 2 files changed, 75 insertions(+), 70 deletions(-) diff --git a/apps/pairs_trader.py b/apps/pairs_trader.py index 6d1d93b..933aa45 100644 --- a/apps/pairs_trader.py +++ b/apps/pairs_trader.py @@ -34,6 +34,7 @@ class PairsTrader(NamedObject): live_strategy_: PtLiveStrategy pricer_client_: CvttRestMktDataClient + ti_sender_: TradingInstructionsSender rest_service_: RestService def __init__(self) -> None: @@ -100,8 +101,7 @@ class PairsTrader(NamedObject): # ------- CREATE TRADER CLIENT ------- self.ti_sender_ = TradingInstructionsSender(config=self.config_, pairs_trader=self) - Log.info(f"{self.fname()} TI client created: {self.ti_sender_}") - + Log.info(f"{self.fname()} TI sebder created: {self.ti_sender_}") # # ------- CREATE REST SERVER ------- self.rest_service_ = RestService( diff --git a/lib/pt_strategy/live/live_strategy.py b/lib/pt_strategy/live/live_strategy.py index d7de592..a651ff4 100644 --- a/lib/pt_strategy/live/live_strategy.py +++ b/lib/pt_strategy/live/live_strategy.py @@ -17,28 +17,29 @@ from cvttpy_tools.logger import Log # --- from cvttpy_trading.trading.instrument import ExchangeInstrument from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate - +from cvttpy_trading.trading.trading_instructions import TradingInstructions # --- from pairs_trading.lib.pt_strategy.model_data_policy import ModelDataPolicy from pairs_trading.lib.pt_strategy.pt_model import Prediction from pairs_trading.lib.pt_strategy.trading_pair import PairState, TradingPair from pairs_trading.apps.pairs_trader import PairsTrader + """ --config=pair.cfg --pair=PAIR-BTC-USDT:COINBASE_AT,PAIR-ETH-USDT:COINBASE_AT """ -class TradingInstructionType(Enum): - TARGET_POSITION = "TARGET_POSITION" +# class TradingInstructionType(Enum): +# TARGET_POSITION = "TARGET_POSITION" -@dataclass -class TradingInstruction(NamedObject): - type_: TradingInstructionType - exch_instr_: ExchangeInstrument - specifics_: Dict[str, Any] +# @dataclass +# class TradingInstruction(NamedObject): +# type_: TradingInstructionType +# exch_instr_: ExchangeInstrument +# specifics_: Dict[str, Any] class PtLiveStrategy(NamedObject): @@ -135,12 +136,12 @@ class PtLiveStrategy(NamedObject): [self.predictions_df_, prediction.to_df()], ignore_index=True ) - trading_instructions: List[TradingInstruction] = ( + trading_instructions: Optional[TradingInstructions] = ( self._create_trading_instructions( prediction=prediction, last_row=market_data_df.iloc[-1] ) ) - if len(trading_instructions) > 0: + if trading_instructions is not None: await self._send_trading_instructions(trading_instructions) def _is_md_actual(self, hist_aggr: List[MdTradesAggregate]) -> bool: @@ -156,15 +157,16 @@ class PtLiveStrategy(NamedObject): return self.history_depth_sec_ async def _send_trading_instructions( - self, trading_instructions: List[TradingInstruction] + self, trading_instructions: TradingInstructions ) -> None: + await self.pairs_trader_.ti_sender_.send_trading_instructions(trading_instructions) pass # URGENT implement _send_trading_instructions def _create_trading_instructions( self, prediction: Prediction, last_row: pd.Series - ) -> List[TradingInstruction]: + ) -> Optional[TradingInstructions]: pair = self.trading_pair_ - trd_instructions: List[TradingInstruction] = [] + res: Optional[TradingInstructions] scaled_disequilibrium = prediction.scaled_disequilibrium_ abs_scaled_disequilibrium = abs(scaled_disequilibrium) @@ -188,73 +190,76 @@ class PtLiveStrategy(NamedObject): def _create_open_trade_instructions( self, pair: TradingPair, row: pd.Series, prediction: Prediction - ) -> List[TradingInstruction]: + ) -> Optional[TradingInstructions]: + ti: Optional[TradingInstructions] = None scaled_disequilibrium = prediction.scaled_disequilibrium_ - if scaled_disequilibrium > 0: - side_a = "SELL" - trd_inst_a = TradingInstruction( - type_=TradingInstructionType.TARGET_POSITION, - exch_instr_=pair.get_instrument_a(), - specifics_={"side": "SELL", "strength": -1}, - ) - side_b = "BUY" - else: - side_a = "BUY" - side_b = "SELL" + # if scaled_disequilibrium > 0: + # side_a = "SELL" + # trd_inst_a = TradingInstruction( + # type_=TradingInstructionType.TARGET_POSITION, + # exch_instr_=pair.get_instrument_a(), + # specifics_={"side": "SELL", "strength": -1}, + # ) + # side_b = "BUY" + # else: + # side_a = "BUY" + # side_b = "SELL" - colname_a, colname_b = pair.exec_prices_colnames() - px_a = row[f"{colname_a}"] - px_b = row[f"{colname_b}"] + # colname_a, colname_b = pair.exec_prices_colnames() + # px_a = row[f"{colname_a}"] + # px_b = row[f"{colname_b}"] - tstamp = row["tstamp"] - diseqlbrm = prediction.disequilibrium_ - scaled_disequilibrium = prediction.scaled_disequilibrium_ + # tstamp = row["tstamp"] + # diseqlbrm = prediction.disequilibrium_ + # scaled_disequilibrium = prediction.scaled_disequilibrium_ - df = self._trades_df() + # df = self._trades_df() - # save closing sides - pair.user_data_["open_side_a"] = side_a # used in oustanding positions - pair.user_data_["open_side_b"] = side_b - pair.user_data_["open_px_a"] = px_a - pair.user_data_["open_px_b"] = px_b - pair.user_data_["open_tstamp"] = tstamp + # # save closing sides + # pair.user_data_["open_side_a"] = side_a # used in oustanding positions + # pair.user_data_["open_side_b"] = side_b + # pair.user_data_["open_px_a"] = px_a + # pair.user_data_["open_px_b"] = px_b + # pair.user_data_["open_tstamp"] = tstamp - pair.user_data_["close_side_a"] = side_b # used for closing trades - pair.user_data_["close_side_b"] = side_a + # pair.user_data_["close_side_a"] = side_b # used for closing trades + # pair.user_data_["close_side_b"] = side_a - # create opening trades - df.loc[len(df)] = { - "time": tstamp, - "symbol": pair.symbol_a_, - "side": side_a, - "action": "OPEN", - "price": px_a, - "disequilibrium": diseqlbrm, - "signed_scaled_disequilibrium": scaled_disequilibrium, - "scaled_disequilibrium": abs(scaled_disequilibrium), - # "pair": pair, - } - df.loc[len(df)] = { - "time": tstamp, - "symbol": pair.symbol_b_, - "side": side_b, - "action": "OPEN", - "price": px_b, - "disequilibrium": diseqlbrm, - "scaled_disequilibrium": abs(scaled_disequilibrium), - "signed_scaled_disequilibrium": scaled_disequilibrium, - # "pair": pair, - } - ti: List[TradingInstruction] = self._create_trading_instructions( - prediction=prediction, last_row=row - ) + # # create opening trades + # df.loc[len(df)] = { + # "time": tstamp, + # "symbol": pair.symbol_a_, + # "side": side_a, + # "action": "OPEN", + # "price": px_a, + # "disequilibrium": diseqlbrm, + # "signed_scaled_disequilibrium": scaled_disequilibrium, + # "scaled_disequilibrium": abs(scaled_disequilibrium), + # # "pair": pair, + # } + # df.loc[len(df)] = { + # "time": tstamp, + # "symbol": pair.symbol_b_, + # "side": side_b, + # "action": "OPEN", + # "price": px_b, + # "disequilibrium": diseqlbrm, + # "scaled_disequilibrium": abs(scaled_disequilibrium), + # "signed_scaled_disequilibrium": scaled_disequilibrium, + # # "pair": pair, + # } + # ti: List[TradingInstruction] = self._create_trading_instructions( + # prediction=prediction, last_row=row + # ) return ti def _create_close_trade_instructions( self, pair: TradingPair, row: pd.Series # , prediction: Prediction - ) -> List[TradingInstruction]: - return [] # URGENT implement _create_close_trade_instructions + ) -> Optional[TradingInstructions]: + ti: Optional[TradingInstructions] = None + # URGENT implement _create_close_trade_instructions + return ti def _handle_outstanding_positions(self) -> Optional[pd.DataFrame]: trades = None