from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, List, Optional, cast from enum import Enum import pandas as pd # --- from cvttpy_tools.base import NamedObject from cvttpy_tools.app import App from cvttpy_tools.config import Config from cvttpy_tools.settings.cvtt_types import IntervalSecT from cvttpy_tools.timeutils import SecPerHour from cvttpy_tools.logger import Log # --- from cvttpy_trading.trading.instrument import ExchangeInstrument from cvttpy_trading.trading.mkt_data.md_summary import MdTradesAggregate from cvttpy_trading.trading.trading_instructions import TradingInstructions # --- from pairs_trading.lib.pt_strategy.model_data_policy import ModelDataPolicy from pairs_trading.lib.pt_strategy.pt_model import Prediction from pairs_trading.lib.pt_strategy.trading_pair import PairState, TradingPair from pairs_trading.apps.pairs_trader import PairsTrader """ --config=pair.cfg --pair=PAIR-BTC-USDT:COINBASE_AT,PAIR-ETH-USDT:COINBASE_AT """ # class TradingInstructionType(Enum): # TARGET_POSITION = "TARGET_POSITION" # @dataclass # class TradingInstruction(NamedObject): # type_: TradingInstructionType # exch_instr_: ExchangeInstrument # specifics_: Dict[str, Any] class PtLiveStrategy(NamedObject): config_: Config instruments_: List[ExchangeInstrument] interval_sec_: IntervalSecT history_depth_sec_: IntervalSecT open_threshold_: float close_threshold_: float trading_pair_: TradingPair model_data_policy_: ModelDataPolicy pairs_trader_: PairsTrader # ti_sender_: TradingInstructionsSender # for presentation: history of prediction values and trading signals predictions_df_: pd.DataFrame trading_signals_df_: pd.DataFrame def __init__( self, config: Config, instruments: List[ExchangeInstrument], pairs_trader: PairsTrader, ): self.trading_pair_ = TradingPair( config=cast(Dict[str, Any], config.data()), instruments=[{"instrument_id": ei.instrument_id()} for ei in instruments], ) self.predictions_df_ = pd.DataFrame() self.trading_signals_df_ = pd.DataFrame() self.pairs_trader_ = pairs_trader import copy # modified config must be passed to PtMarketData self.config_ = Config(json_src=copy.deepcopy(config.data())) self.instruments_ = instruments App.instance().add_call( stage=App.Stage.Config, func=self._on_config(), can_run_now=True ) async def _on_config(self) -> None: self.interval_sec_ = self.config_.get_value("interval_sec", 0) self.history_depth_sec_ = ( self.config_.get_value("history_depth_hours", 0) * SecPerHour ) await self.pairs_trader_.subscribe_md() self.model_data_policy_ = ModelDataPolicy.create( self.config_, is_real_time=True, pair=self.trading_pair_ ) self.open_threshold_ = self.config_.get_value( "dis-equilibrium_open_trshld", 0.0 ) self.close_threshold_ = self.config_.get_value( "dis-equilibrium_close_trshld", 0.0 ) assert ( self.open_threshold_ > 0 ), "dis-equilibrium_open_trshld must be greater than 0" assert ( self.close_threshold_ > 0 ), "dis-equilibrium_close_trshld must be greater than 0" def __repr__(self) -> str: return f"{self.classname()}: trading_pair={self.trading_pair_}, mdp={self.model_data_policy_.__class__.__name__}, " async def on_mkt_data_hist_snapshot( self, hist_aggr: List[MdTradesAggregate] ) -> None: # Log.info(f"on_mkt_data_hist_snapshot: {aggr}") if not self._is_md_actual(hist_aggr=hist_aggr): return market_data_df: Optional[pd.DataFrame] = self._create_md_pdf(hist_aggr=hist_aggr) if market_data_df is None: Log.warning(f"{self.fname()} Unable to create market data df") return self.trading_pair_.market_data_ = market_data_df self.model_data_policy_.advance() prediction = self.trading_pair_.run( market_data_df, self.model_data_policy_.advance() ) self.predictions_df_ = pd.concat( [self.predictions_df_, prediction.to_df()], ignore_index=True ) trading_instructions: Optional[TradingInstructions] = ( self._create_trading_instructions( prediction=prediction, last_row=market_data_df.iloc[-1] ) ) if trading_instructions is not None: await self._send_trading_instructions(trading_instructions) def _is_md_actual(self, hist_aggr: List[MdTradesAggregate]) -> bool: return False # URGENT _is_md_actual def _create_md_pdf(self, hist_aggr: List[MdTradesAggregate]) -> Optional[pd.DataFrame]: return None # URGENT _create_md_pdf def interval_sec(self) -> IntervalSecT: return self.interval_sec_ def history_depth_sec(self) -> IntervalSecT: return self.history_depth_sec_ async def _send_trading_instructions( self, trading_instructions: TradingInstructions ) -> None: await self.pairs_trader_.ti_sender_.send_trading_instructions(trading_instructions) pass # URGENT implement _send_trading_instructions def _create_trading_instructions( self, prediction: Prediction, last_row: pd.Series ) -> Optional[TradingInstructions]: pair = self.trading_pair_ res: Optional[TradingInstructions] scaled_disequilibrium = prediction.scaled_disequilibrium_ abs_scaled_disequilibrium = abs(scaled_disequilibrium) if pair.is_closed(): if abs_scaled_disequilibrium >= self.open_threshold_: trd_instructions = self._create_open_trade_instructions( pair, row=last_row, prediction=prediction ) elif pair.is_open(): if abs_scaled_disequilibrium <= self.close_threshold_: trd_instructions = self._create_close_trade_instructions( pair, row=last_row # , prediction=prediction ) elif pair.to_stop_close_conditions(predicted_row=last_row): trd_instructions = self._create_close_trade_instructions( pair, row=last_row ) return trd_instructions def _create_open_trade_instructions( self, pair: TradingPair, row: pd.Series, prediction: Prediction ) -> Optional[TradingInstructions]: ti: Optional[TradingInstructions] = None scaled_disequilibrium = prediction.scaled_disequilibrium_ # if scaled_disequilibrium > 0: # side_a = "SELL" # trd_inst_a = TradingInstruction( # type_=TradingInstructionType.TARGET_POSITION, # exch_instr_=pair.get_instrument_a(), # specifics_={"side": "SELL", "strength": -1}, # ) # side_b = "BUY" # else: # side_a = "BUY" # side_b = "SELL" # colname_a, colname_b = pair.exec_prices_colnames() # px_a = row[f"{colname_a}"] # px_b = row[f"{colname_b}"] # tstamp = row["tstamp"] # diseqlbrm = prediction.disequilibrium_ # scaled_disequilibrium = prediction.scaled_disequilibrium_ # df = self._trades_df() # # save closing sides # pair.user_data_["open_side_a"] = side_a # used in oustanding positions # pair.user_data_["open_side_b"] = side_b # pair.user_data_["open_px_a"] = px_a # pair.user_data_["open_px_b"] = px_b # pair.user_data_["open_tstamp"] = tstamp # pair.user_data_["close_side_a"] = side_b # used for closing trades # pair.user_data_["close_side_b"] = side_a # # create opening trades # df.loc[len(df)] = { # "time": tstamp, # "symbol": pair.symbol_a_, # "side": side_a, # "action": "OPEN", # "price": px_a, # "disequilibrium": diseqlbrm, # "signed_scaled_disequilibrium": scaled_disequilibrium, # "scaled_disequilibrium": abs(scaled_disequilibrium), # # "pair": pair, # } # df.loc[len(df)] = { # "time": tstamp, # "symbol": pair.symbol_b_, # "side": side_b, # "action": "OPEN", # "price": px_b, # "disequilibrium": diseqlbrm, # "scaled_disequilibrium": abs(scaled_disequilibrium), # "signed_scaled_disequilibrium": scaled_disequilibrium, # # "pair": pair, # } # ti: List[TradingInstruction] = self._create_trading_instructions( # prediction=prediction, last_row=row # ) return ti def _create_close_trade_instructions( self, pair: TradingPair, row: pd.Series # , prediction: Prediction ) -> Optional[TradingInstructions]: ti: Optional[TradingInstructions] = None # URGENT implement _create_close_trade_instructions return ti def _handle_outstanding_positions(self) -> Optional[pd.DataFrame]: trades = None pair = self.trading_pair_ # Outstanding positions if pair.user_data_["state"] == PairState.OPEN: print(f"{pair}: *** Position is NOT CLOSED. ***") # outstanding positions if self.config_.key_exists("close_outstanding_positions"): close_position_row = pd.Series(pair.market_data_.iloc[-2]) # close_position_row["disequilibrium"] = 0.0 # close_position_row["scaled_disequilibrium"] = 0.0 # close_position_row["signed_scaled_disequilibrium"] = 0.0 trades = self._create_close_trades( pair=pair, row=close_position_row, prediction=None ) if trades is not None: trades["status"] = PairState.CLOSE_POSITION.name print(f"CLOSE_POSITION TRADES:\n{trades}") pair.user_data_["state"] = PairState.CLOSE_POSITION pair.on_close_trades(trades) else: pair.add_outstanding_position( symbol=pair.symbol_a_, open_side=pair.user_data_["open_side_a"], open_px=pair.user_data_["open_px_a"], open_tstamp=pair.user_data_["open_tstamp"], last_mkt_data_row=pair.market_data_.iloc[-1], ) pair.add_outstanding_position( symbol=pair.symbol_b_, open_side=pair.user_data_["open_side_b"], open_px=pair.user_data_["open_px_b"], open_tstamp=pair.user_data_["open_tstamp"], last_mkt_data_row=pair.market_data_.iloc[-1], ) return trades def _trades_df(self) -> pd.DataFrame: types = { "time": "datetime64[ns]", "action": "string", "symbol": "string", "side": "string", "price": "float64", "disequilibrium": "float64", "scaled_disequilibrium": "float64", "signed_scaled_disequilibrium": "float64", # "pair": "object", } columns = list(types.keys()) return pd.DataFrame(columns=columns).astype(types) def _create_open_trades( self, pair: TradingPair, row: pd.Series, prediction: Prediction ) -> Optional[pd.DataFrame]: colname_a, colname_b = pair.exec_prices_colnames() tstamp = row["tstamp"] diseqlbrm = prediction.disequilibrium_ scaled_disequilibrium = prediction.scaled_disequilibrium_ px_a = row[f"{colname_a}"] px_b = row[f"{colname_b}"] # creating the trades df = self._trades_df() print(f"OPEN_TRADES: {row["tstamp"]} {scaled_disequilibrium=}") if diseqlbrm > 0: side_a = "SELL" side_b = "BUY" else: side_a = "BUY" side_b = "SELL" # save closing sides pair.user_data_["open_side_a"] = side_a # used in oustanding positions pair.user_data_["open_side_b"] = side_b pair.user_data_["open_px_a"] = px_a pair.user_data_["open_px_b"] = px_b pair.user_data_["open_tstamp"] = tstamp pair.user_data_["close_side_a"] = side_b # used for closing trades pair.user_data_["close_side_b"] = side_a # create opening trades df.loc[len(df)] = { "time": tstamp, "symbol": pair.symbol_a_, "side": side_a, "action": "OPEN", "price": px_a, "disequilibrium": diseqlbrm, "signed_scaled_disequilibrium": scaled_disequilibrium, "scaled_disequilibrium": abs(scaled_disequilibrium), # "pair": pair, } df.loc[len(df)] = { "time": tstamp, "symbol": pair.symbol_b_, "side": side_b, "action": "OPEN", "price": px_b, "disequilibrium": diseqlbrm, "scaled_disequilibrium": abs(scaled_disequilibrium), "signed_scaled_disequilibrium": scaled_disequilibrium, # "pair": pair, } return df def _create_close_trades( self, pair: TradingPair, row: pd.Series, prediction: Optional[Prediction] = None ) -> Optional[pd.DataFrame]: colname_a, colname_b = pair.exec_prices_colnames() tstamp = row["tstamp"] if prediction is not None: diseqlbrm = prediction.disequilibrium_ signed_scaled_disequilibrium = prediction.scaled_disequilibrium_ scaled_disequilibrium = abs(prediction.scaled_disequilibrium_) else: diseqlbrm = 0.0 signed_scaled_disequilibrium = 0.0 scaled_disequilibrium = 0.0 px_a = row[f"{colname_a}"] px_b = row[f"{colname_b}"] # creating the trades df = self._trades_df() # create opening trades df.loc[len(df)] = { "time": tstamp, "symbol": pair.symbol_a_, "side": pair.user_data_["close_side_a"], "action": "CLOSE", "price": px_a, "disequilibrium": diseqlbrm, "scaled_disequilibrium": scaled_disequilibrium, "signed_scaled_disequilibrium": signed_scaled_disequilibrium, # "pair": pair, } df.loc[len(df)] = { "time": tstamp, "symbol": pair.symbol_b_, "side": pair.user_data_["close_side_b"], "action": "CLOSE", "price": px_b, "disequilibrium": diseqlbrm, "scaled_disequilibrium": scaled_disequilibrium, "signed_scaled_disequilibrium": signed_scaled_disequilibrium, # "pair": pair, } del pair.user_data_["close_side_a"] del pair.user_data_["close_side_b"] del pair.user_data_["open_tstamp"] del pair.user_data_["open_px_a"] del pair.user_data_["open_px_b"] del pair.user_data_["open_side_a"] del pair.user_data_["open_side_b"] return df