progress: stop signals

This commit is contained in:
Oleg Sheynin 2025-07-19 01:04:09 +00:00
parent ca9fff8d88
commit c776c95d69
8 changed files with 243 additions and 166 deletions

View File

@ -45,7 +45,6 @@ Each configuration dictionary specifies:
- `instruments`: A list of symbols to consider for forming trading pairs. - `instruments`: A list of symbols to consider for forming trading pairs.
- `trading_hours`: Defines the session start and end times, crucial for equity markets. - `trading_hours`: Defines the session start and end times, crucial for equity markets.
- `price_column`: The column in the data to be used as the price (e.g., "close"). - `price_column`: The column in the data to be used as the price (e.g., "close").
- `zero_threshold`: A small value to handle potential division by zero.
- `dis-equilibrium_open_trshld`: The threshold (in standard deviations) of the dis-equilibrium for opening a trade. - `dis-equilibrium_open_trshld`: The threshold (in standard deviations) of the dis-equilibrium for opening a trade.
- `dis-equilibrium_close_trshld`: The threshold (in standard deviations) of the dis-equilibrium for closing an open trade. - `dis-equilibrium_close_trshld`: The threshold (in standard deviations) of the dis-equilibrium for closing an open trade.
- `training_minutes`: The length of the rolling window (in minutes) used to train the model (e.g., calculate cointegration, mean, and standard deviation of the dis-equilibrium). - `training_minutes`: The length of the rolling window (in minutes) used to train the model (e.g., calculate cointegration, mean, and standard deviation of the dis-equilibrium).

View File

@ -7,24 +7,27 @@
"db_table_name": "md_1min_bars", "db_table_name": "md_1min_bars",
"exchange_id": "BNBSPOT", "exchange_id": "BNBSPOT",
"instrument_id_pfx": "PAIR-", "instrument_id_pfx": "PAIR-",
"trading_hours": {
"begin_session": "00:00:00",
"end_session": "23:59:00",
"timezone": "UTC"
},
"price_column": "close",
"zero_threshold": 1e-10,
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 0.5,
"training_minutes": 120,
"funding_per_pair": 2000.0, "funding_per_pair": 2000.0,
# ====== Trading Parameters ======
"price_column": "close",
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 1.0,
"training_minutes": 120,
"fit_method_class": "pt_trading.sliding_fit.SlidingFit", "fit_method_class": "pt_trading.sliding_fit.SlidingFit",
# "fit_method_class": "pt_trading.static_fit.StaticFit",
"close_outstanding_positions": true, # ====== Stop Conditions ======
"trading_hours": { "stop_close_conditions": {
"begin_session": "06:00:00", "profit": 2.0,
"end_session": "16:00:00", "loss": -0.5
"timezone": "America/New_York"
} }
# ====== End of Session Closeout ======
"close_outstanding_positions": true,
# "close_outstanding_positions": false,
"trading_hours": {
"begin_session": "9:30:00",
"end_session": "21:30:00",
"timezone": "America/New_York"
}
} }

View File

@ -7,24 +7,20 @@
"db_table_name": "md_1min_bars", "db_table_name": "md_1min_bars",
"exchange_id": "ALPACA", "exchange_id": "ALPACA",
"instrument_id_pfx": "STOCK-", "instrument_id_pfx": "STOCK-",
"trading_hours": {
"begin_session": "9:30:00",
"end_session": "16:00:00",
"timezone": "America/New_York"
},
"price_column": "close",
"funding_per_pair": 2000.0,
"zero_threshold": 1e-10,
#
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 1.0,
"training_minutes": 150,
"fit_method_class": "pt_trading.sliding_fit.SlidingFit",
"exclude_instruments": ["CAN"], "exclude_instruments": ["CAN"],
"funding_per_pair": 2000.0,
# ====== Trading Parameters ======
"price_column": "close",
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 1.0,
"training_minutes": 120,
"fit_method_class": "pt_trading.sliding_fit.SlidingFit",
# ====== Stop Conditions ====== # ====== Stop Conditions ======
"stop_conditions": { "stop_close_conditions": {
"profit": 1.0, "profit": 2.0,
"loss": -0.5 "loss": -0.5
} }

View File

@ -1,26 +0,0 @@
{
"security_type": "EQUITY",
"data_directory": "./data/equity",
# "datafiles": [
# "20250604.mktdata.ohlcv.db",
# ],
"db_table_name": "md_1min_bars",
"exchange_id": "ALPACA",
"instrument_id_pfx": "STOCK-",
"trading_hours": {
"begin_session": "9:30:00",
"end_session": "16:00:00",
"timezone": "America/New_York"
},
"price_column": "close",
"zero_threshold": 1e-10,
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 1.0,
"training_minutes": 120,
"funding_per_pair": 2000.0,
"fit_method_class": "pt_trading.sliding_fit.SlidingFit",
# "fit_method_class": "pt_trading.static_fit.StaticFit",
"exclude_instruments": ["CAN"],
"close_outstanding_positions": false
}

View File

@ -29,8 +29,3 @@ class PairsTradingFitMethod(ABC):
def reset(self) -> None: ... def reset(self) -> None: ...
class PairState(Enum):
INITIAL = 1
OPEN = 2
CLOSED = 3
CLOSED_POSITIONS = 4

View File

@ -3,9 +3,9 @@ from enum import Enum
from typing import Dict, Optional, cast from typing import Dict, Optional, cast
import pandas as pd # type: ignore[import] import pandas as pd # type: ignore[import]
from pt_trading.fit_method import PairState, PairsTradingFitMethod from pt_trading.fit_method import PairsTradingFitMethod
from pt_trading.results import BacktestResult from pt_trading.results import BacktestResult
from pt_trading.trading_pair import CointegrationData, TradingPair from pt_trading.trading_pair import CointegrationData, TradingPair, PairState
NanoPerMin = 1e9 NanoPerMin = 1e9
@ -72,7 +72,7 @@ class SlidingFit(PairsTradingFitMethod):
curr_predicted_row_idx += 1 curr_predicted_row_idx += 1
self._create_trading_signals(pair, config, bt_result) self._create_trading_signals(pair, config, bt_result)
print(f"***{pair}*** FINISHED ... {len(pair.user_data_['trades'])}") print(f"***{pair}*** FINISHED *** Num Trades:{len(pair.user_data_['trades'])}")
return pair.get_trades() return pair.get_trades()
def _create_trading_signals( def _create_trading_signals(
@ -86,24 +86,41 @@ class SlidingFit(PairsTradingFitMethod):
close_threshold = config["dis-equilibrium_close_trshld"] close_threshold = config["dis-equilibrium_close_trshld"]
for curr_predicted_row_idx in range(len(pair.predicted_df_)): for curr_predicted_row_idx in range(len(pair.predicted_df_)):
pred_row = pair.predicted_df_.iloc[curr_predicted_row_idx] pred_row = pair.predicted_df_.iloc[curr_predicted_row_idx]
if pair.user_data_["state"] in [PairState.INITIAL, PairState.CLOSED, PairState.CLOSED_POSITIONS]: scaled_disequilibrium = pred_row["scaled_disequilibrium"]
open_trades = self._get_open_trades(
pair, row=pred_row, open_threshold=open_threshold if pair.user_data_["state"] in [PairState.INITIAL, PairState.CLOSE, PairState.CLOSE_POSITION]:
) if scaled_disequilibrium >= open_threshold:
if open_trades is not None: open_trades = self._get_open_trades(
open_trades["status"] = "OPEN" pair, row=pred_row, open_threshold=open_threshold
print(f"OPEN TRADES:\n{open_trades}") )
pair.add_trades(open_trades) if open_trades is not None:
pair.user_data_["state"] = PairState.OPEN open_trades["status"] = PairState.OPEN.name
print(f"OPEN TRADES:\n{open_trades}")
pair.add_trades(open_trades)
pair.user_data_["state"] = PairState.OPEN
pair.on_open_trades(open_trades)
elif pair.user_data_["state"] == PairState.OPEN: elif pair.user_data_["state"] == PairState.OPEN:
close_trades = self._get_close_trades( if scaled_disequilibrium <= close_threshold:
pair, row=pred_row, close_threshold=close_threshold close_trades = self._get_close_trades(
) pair, row=pred_row, close_threshold=close_threshold
if close_trades is not None: )
close_trades["status"] = "CLOSE" if close_trades is not None:
print(f"CLOSE TRADES:\n{close_trades}") close_trades["status"] = PairState.CLOSE.name
pair.add_trades(close_trades) print(f"CLOSE TRADES:\n{close_trades}")
pair.user_data_["state"] = PairState.CLOSED pair.add_trades(close_trades)
pair.user_data_["state"] = PairState.CLOSE
pair.on_close_trades(close_trades)
elif pair.to_stop_close_conditions(predicted_row=pred_row):
close_trades = self._get_close_trades(
pair, row=pred_row, close_threshold=close_threshold
)
if close_trades is not None:
close_trades["status"] = pair.user_data_["stop_close_state"].name
print(f"STOP CLOSE TRADES:\n{close_trades}")
pair.add_trades(close_trades)
pair.user_data_["state"] = pair.user_data_["stop_close_state"]
pair.on_close_trades(close_trades)
# Outstanding positions # Outstanding positions
if pair.user_data_["state"] == PairState.OPEN: if pair.user_data_["state"] == PairState.OPEN:
@ -112,16 +129,17 @@ class SlidingFit(PairsTradingFitMethod):
) )
# outstanding positions # outstanding positions
if config["close_outstanding_positions"]: if config["close_outstanding_positions"]:
close_position_trades = self._get_close_position_trades( close_position_trades = self._get_close_trades(
pair=pair, pair=pair,
row=pred_row, row=pred_row,
close_threshold=close_threshold, close_threshold=close_threshold,
) )
if close_position_trades is not None: if close_position_trades is not None:
close_position_trades["status"] = "CLOSE_POSITION" close_position_trades["status"] = PairState.CLOSE_POSITION.name
print(f"CLOSE_POSITION TRADES:\n{close_position_trades}") print(f"CLOSE_POSITION TRADES:\n{close_position_trades}")
pair.add_trades(close_position_trades) pair.add_trades(close_position_trades)
pair.user_data_["state"] = PairState.CLOSED_POSITIONS pair.user_data_["state"] = PairState.CLOSE_POSITION
pair.on_close_trades(close_position_trades)
else: else:
if pair.predicted_df_ is not None: if pair.predicted_df_ is not None:
bt_result.handle_outstanding_position( bt_result.handle_outstanding_position(
@ -154,9 +172,6 @@ class SlidingFit(PairsTradingFitMethod):
open_px_a = open_row[f"{colname_a}"] open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"] open_px_b = open_row[f"{colname_b}"]
if open_scaled_disequilibrium < open_threshold:
return None
# creating the trades # creating the trades
print(f"OPEN_TRADES: {row["tstamp"]} {open_scaled_disequilibrium=}") print(f"OPEN_TRADES: {row["tstamp"]} {open_scaled_disequilibrium=}")
if open_disequilibrium > 0: if open_disequilibrium > 0:
@ -237,8 +252,6 @@ class SlidingFit(PairsTradingFitMethod):
close_side_a = pair.user_data_["close_side_a"] close_side_a = pair.user_data_["close_side_a"]
close_side_b = pair.user_data_["close_side_b"] close_side_b = pair.user_data_["close_side_b"]
if close_scaled_disequilibrium > close_threshold:
return None
trd_signal_tuples = [ trd_signal_tuples = [
( (
close_tstamp, close_tstamp,
@ -276,61 +289,114 @@ class SlidingFit(PairsTradingFitMethod):
"pair": "object" "pair": "object"
}) })
def _get_close_position_trades( # def _get_stop_close_trades(
self, pair: TradingPair, row: pd.Series, close_threshold: float # self, pair: TradingPair, row: pd.Series, close_threshold: float
) -> Optional[pd.DataFrame]: # ) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames() # colname_a, colname_b = pair.colnames()
# assert pair.predicted_df_ is not None
# if len(pair.predicted_df_) == 0:
# return None
assert pair.predicted_df_ is not None # stop_close_row = row
if len(pair.predicted_df_) == 0: # stop_close_tstamp = stop_close_row["tstamp"]
return None # stop_close_disequilibrium = stop_close_row["disequilibrium"]
# stop_close_scaled_disequilibrium = stop_close_row["scaled_disequilibrium"]
# stop_close_px_a = stop_close_row[f"{colname_a}"]
# stop_close_px_b = stop_close_row[f"{colname_b}"]
close_position_row = row # stop_close_side_a = pair.user_data_["close_side_a"]
close_position_tstamp = close_position_row["tstamp"] # stop_close_side_b = pair.user_data_["close_side_b"]
close_position_disequilibrium = close_position_row["disequilibrium"]
close_position_scaled_disequilibrium = close_position_row["scaled_disequilibrium"]
close_position_px_a = close_position_row[f"{colname_a}"]
close_position_px_b = close_position_row[f"{colname_b}"]
close_position_side_a = pair.user_data_["close_side_a"] # trd_signal_tuples = [
close_position_side_b = pair.user_data_["close_side_b"] # (
# stop_close_tstamp,
# stop_close_side_a,
# pair.symbol_a_,
# stop_close_px_a,
# stop_close_disequilibrium,
# stop_close_scaled_disequilibrium,
# pair,
# ),
# (
# stop_close_tstamp,
# stop_close_side_b,
# pair.symbol_b_,
# stop_close_px_b,
# stop_close_disequilibrium,
# stop_close_scaled_disequilibrium,
# pair,
# ),
# ]
# df = pd.DataFrame(
# trd_signal_tuples,
# columns=self.TRADES_COLUMNS,
# )
# # Ensure consistent dtypes
# return df.astype({
# "time": "datetime64[ns]",
# "action": "string",
# "symbol": "string",
# "price": "float64",
# "disequilibrium": "float64",
# "scaled_disequilibrium": "float64",
# "pair": "object"
# })
trd_signal_tuples = [ # def _get_close_position_trades(
( # self, pair: TradingPair, row: pd.Series, close_threshold: float
close_position_tstamp, # ) -> Optional[pd.DataFrame]:
close_position_side_a, # colname_a, colname_b = pair.colnames()
pair.symbol_a_,
close_position_px_a,
close_position_disequilibrium,
close_position_scaled_disequilibrium,
pair,
),
(
close_position_tstamp,
close_position_side_b,
pair.symbol_b_,
close_position_px_b,
close_position_disequilibrium,
close_position_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame with explicit dtypes to avoid concatenation warnings # assert pair.predicted_df_ is not None
df = pd.DataFrame( # if len(pair.predicted_df_) == 0:
trd_signal_tuples, # return None
columns=self.TRADES_COLUMNS,
) # close_position_row = row
# Ensure consistent dtypes # close_position_tstamp = close_position_row["tstamp"]
return df.astype({ # close_position_disequilibrium = close_position_row["disequilibrium"]
"time": "datetime64[ns]", # close_position_scaled_disequilibrium = close_position_row["scaled_disequilibrium"]
"action": "string", # close_position_px_a = close_position_row[f"{colname_a}"]
"symbol": "string", # close_position_px_b = close_position_row[f"{colname_b}"]
"price": "float64",
"disequilibrium": "float64", # close_position_side_a = pair.user_data_["close_side_a"]
"scaled_disequilibrium": "float64", # close_position_side_b = pair.user_data_["close_side_b"]
"pair": "object"
}) # trd_signal_tuples = [
# (
# close_position_tstamp,
# close_position_side_a,
# pair.symbol_a_,
# close_position_px_a,
# close_position_disequilibrium,
# close_position_scaled_disequilibrium,
# pair,
# ),
# (
# close_position_tstamp,
# close_position_side_b,
# pair.symbol_b_,
# close_position_px_b,
# close_position_disequilibrium,
# close_position_scaled_disequilibrium,
# pair,
# ),
# ]
# # Add tuples to data frame with explicit dtypes to avoid concatenation warnings
# df = pd.DataFrame(
# trd_signal_tuples,
# columns=self.TRADES_COLUMNS,
# )
# # Ensure consistent dtypes
# return df.astype({
# "time": "datetime64[ns]",
# "action": "string",
# "symbol": "string",
# "price": "float64",
# "disequilibrium": "float64",
# "scaled_disequilibrium": "float64",
# "pair": "object"
# })
def reset(self) -> None: def reset(self) -> None:
curr_training_start_idx = 0 curr_training_start_idx = 0

View File

@ -1,9 +1,18 @@
from __future__ import annotations from __future__ import annotations
from enum import Enum
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import pandas as pd # type:ignore import pandas as pd # type:ignore
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults # type:ignore from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults
class PairState(Enum):
INITIAL = 1
OPEN = 2
CLOSE = 3
CLOSE_POSITION = 4
CLOSE_STOP_LOSS = 5
CLOSE_STOP_PROFIT = 6
class CointegrationData: class CointegrationData:
EG_PVALUE_THRESHOLD = 0.05 EG_PVALUE_THRESHOLD = 0.05
@ -288,13 +297,6 @@ class TradingPair:
/ self.training_std_ / self.training_std_
) )
# print("*** PREDICTED DF")
# print(predicted_df)
# print("*" * 80)
# print("*** SELF.PREDICTED_DF")
# print(self.predicted_df_)
# print("*" * 80)
predicted_df = predicted_df.reset_index(drop=True) predicted_df = predicted_df.reset_index(drop=True)
if self.predicted_df_ is None: if self.predicted_df_ is None:
self.predicted_df_ = predicted_df self.predicted_df_ = predicted_df
@ -343,6 +345,49 @@ class TradingPair:
curr_training_start_idx += 1 curr_training_start_idx += 1
return result return result
def to_stop_close_conditions(self, predicted_row: pd.Series) -> bool:
config = self.config_
if ("stop_close_conditions" not in config or config["stop_close_conditions"] is None) :
return False
if "profit" in config["stop_close_conditions"]:
current_return = self._current_return(predicted_row)
#
# print(f"time={predicted_row['tstamp']} current_return={current_return}")
#
if current_return >= config["stop_close_conditions"]["profit"]:
self.user_data_["stop_close_state"] = PairState.CLOSE_STOP_PROFIT
return True
if "loss" in config["stop_close_conditions"]:
if current_return <= config["stop_close_conditions"]["loss"]:
self.user_data_["stop_close_state"] = PairState.CLOSE_STOP_LOSS
return True
return False
def on_open_trades(self, trades: pd.DataFrame) -> None:
if "close_trades" in self.user_data_: del self.user_data_["close_trades"]
self.user_data_["open_trades"] = trades
def on_close_trades(self, trades: pd.DataFrame) -> None:
del self.user_data_["open_trades"]
self.user_data_["close_trades"] = trades
def _current_return(self, predicted_row: pd.Series) -> float:
if "open_trades" in self.user_data_:
open_trades = self.user_data_["open_trades"]
if len(open_trades) == 0:
return 0.0
def _stock_return(stock: str) -> float:
stock_open_trades = open_trades[open_trades["symbol"] == stock]
stock_sign = -1 if stock_open_trades["action"].iloc[0] == "SELL" else 1
stock_price = predicted_row[f"{self.price_column_}_{stock}"]
stock_return = stock_sign * (stock_price - stock_open_trades["price"].iloc[0]) / stock_open_trades["price"].iloc[0]
return float(stock_return)
stock_a_return = _stock_return(self.symbol_a_)
stock_b_return = _stock_return(self.symbol_b_)
return (stock_a_return + stock_b_return) * 100.0
return 0.0
def __repr__(self) -> str: def __repr__(self) -> str:
return self.name() return self.name()

File diff suppressed because one or more lines are too long