pairs_trading/lib/pt_trading/trading_pair.py
Oleg Sheynin af0a6f62a9 progress
2025-07-24 21:09:13 +00:00

359 lines
13 KiB
Python

from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional
import pandas as pd # type:ignore
class PairState(Enum):
INITIAL = 1
OPEN = 2
CLOSE = 3
CLOSE_POSITION = 4
CLOSE_STOP_LOSS = 5
CLOSE_STOP_PROFIT = 6
class CointegrationData:
EG_PVALUE_THRESHOLD = 0.05
tstamp_: pd.Timestamp
pair_: str
eg_pvalue_: float
johansen_lr1_: float
johansen_cvt_: float
eg_is_cointegrated_: bool
johansen_is_cointegrated_: bool
def __init__(self, pair: TradingPair):
training_df = pair.training_df_
assert training_df is not None
from statsmodels.tsa.vector_ar.vecm import coint_johansen
df = training_df[pair.colnames()].reset_index(drop=True)
# Run Johansen cointegration test
result = coint_johansen(df, det_order=0, k_ar_diff=1)
self.johansen_lr1_ = result.lr1[0]
self.johansen_cvt_ = result.cvt[0, 1]
self.johansen_is_cointegrated_ = self.johansen_lr1_ > self.johansen_cvt_
# Run Engle-Granger cointegration test
from statsmodels.tsa.stattools import coint # type: ignore
col1, col2 = pair.colnames()
assert training_df is not None
series1 = training_df[col1].reset_index(drop=True)
series2 = training_df[col2].reset_index(drop=True)
self.eg_pvalue_ = float(coint(series1, series2)[1])
self.eg_is_cointegrated_ = bool(self.eg_pvalue_ < self.EG_PVALUE_THRESHOLD)
self.tstamp_ = training_df.index[-1]
self.pair_ = pair.name()
def to_dict(self) -> Dict[str, Any]:
return {
"tstamp": self.tstamp_,
"pair": self.pair_,
"eg_pvalue": self.eg_pvalue_,
"johansen_lr1": self.johansen_lr1_,
"johansen_cvt": self.johansen_cvt_,
"eg_is_cointegrated": self.eg_is_cointegrated_,
"johansen_is_cointegrated": self.johansen_is_cointegrated_,
}
def __repr__(self) -> str:
return f"CointegrationData(tstamp={self.tstamp_}, pair={self.pair_}, eg_pvalue={self.eg_pvalue_}, johansen_lr1={self.johansen_lr1_}, johansen_cvt={self.johansen_cvt_}, eg_is_cointegrated={self.eg_is_cointegrated_}, johansen_is_cointegrated={self.johansen_is_cointegrated_})"
class TradingPair(ABC):
market_data_: pd.DataFrame
symbol_a_: str
symbol_b_: str
price_column_: str
training_mu_: float
training_std_: float
training_df_: pd.DataFrame
testing_df_: pd.DataFrame
user_data_: Dict[str, Any]
# predicted_df_: Optional[pd.DataFrame]
def __init__(
self,
config: Dict[str, Any],
market_data: pd.DataFrame,
symbol_a: str,
symbol_b: str,
price_column: str,
):
self.symbol_a_ = symbol_a
self.symbol_b_ = symbol_b
self.price_column_ = price_column
self.set_market_data(market_data)
self.user_data_ = {}
self.predicted_df_ = None
self.config_ = config
def set_market_data(self, market_data: pd.DataFrame) -> None:
self.market_data_ = pd.DataFrame(
self._transform_dataframe(market_data)[["tstamp"] + self.colnames()]
)
self.market_data_ = self.market_data_.dropna().reset_index(drop=True)
self.market_data_["tstamp"] = pd.to_datetime(self.market_data_["tstamp"])
self.market_data_ = self.market_data_.sort_values("tstamp")
def get_begin_index(self) -> int:
if "trading_hours" not in self.config_:
return 0
assert "timezone" in self.config_["trading_hours"]
assert "begin_session" in self.config_["trading_hours"]
start_time = (
pd.to_datetime(self.config_["trading_hours"]["begin_session"])
.tz_localize(self.config_["trading_hours"]["timezone"])
.time()
)
mask = self.market_data_["tstamp"].dt.time >= start_time
return int(self.market_data_.index[mask].min())
def get_end_index(self) -> int:
if "trading_hours" not in self.config_:
return 0
assert "timezone" in self.config_["trading_hours"]
assert "end_session" in self.config_["trading_hours"]
end_time = (
pd.to_datetime(self.config_["trading_hours"]["end_session"])
.tz_localize(self.config_["trading_hours"]["timezone"])
.time()
)
mask = self.market_data_["tstamp"].dt.time <= end_time
return int(self.market_data_.index[mask].max())
def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
# Select only the columns we need
df_selected: pd.DataFrame = pd.DataFrame(
df[["tstamp", "symbol", self.price_column_]]
)
# Start with unique timestamps
result_df: pd.DataFrame = (
pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True)
)
# For each unique symbol, add a corresponding close price column
symbols = df_selected["symbol"].unique()
for symbol in symbols:
# Filter rows for this symbol
df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(
drop=True
)
# Create column name like "close-COIN"
new_price_column = f"{self.price_column_}_{symbol}"
# Create temporary dataframe with timestamp and price
temp_df = pd.DataFrame(
{
"tstamp": df_symbol["tstamp"],
new_price_column: df_symbol[self.price_column_],
}
)
# Join with our result dataframe
result_df = pd.merge(result_df, temp_df, on="tstamp", how="left")
result_df = result_df.reset_index(
drop=True
) # do not dropna() since irrelevant symbol would affect dataset
return result_df.dropna()
def get_datasets(
self,
training_minutes: int,
training_start_index: int = 0,
testing_size: Optional[int] = None,
) -> None:
testing_start_index = training_start_index + training_minutes
self.training_df_ = self.market_data_.iloc[
training_start_index:testing_start_index, :training_minutes
].copy()
assert self.training_df_ is not None
self.training_df_ = self.training_df_.dropna().reset_index(drop=True)
testing_start_index = training_start_index + training_minutes
if testing_size is None:
self.testing_df_ = self.market_data_.iloc[testing_start_index:, :].copy()
else:
self.testing_df_ = self.market_data_.iloc[
testing_start_index : testing_start_index + testing_size, :
].copy()
assert self.testing_df_ is not None
self.testing_df_ = self.testing_df_.dropna().reset_index(drop=True)
def colnames(self) -> List[str]:
return [
f"{self.price_column_}_{self.symbol_a_}",
f"{self.price_column_}_{self.symbol_b_}",
]
def add_trades(self, trades: pd.DataFrame) -> None:
if self.user_data_["trades"] is None or len(self.user_data_["trades"]) == 0:
# If trades is empty or None, just assign the new trades directly
self.user_data_["trades"] = trades.copy()
else:
# Ensure both DataFrames have the same columns and dtypes before concatenation
existing_trades = self.user_data_["trades"]
# If existing trades is empty, just assign the new trades
if len(existing_trades) == 0:
self.user_data_["trades"] = trades.copy()
else:
# Ensure both DataFrames have the same columns
if set(existing_trades.columns) != set(trades.columns):
# Add missing columns to trades with appropriate default values
for col in existing_trades.columns:
if col not in trades.columns:
if col == "time":
trades[col] = pd.Timestamp.now()
elif col in ["action", "symbol"]:
trades[col] = ""
elif col in [
"price",
"disequilibrium",
"scaled_disequilibrium",
]:
trades[col] = 0.0
elif col == "pair":
trades[col] = None
else:
trades[col] = None
# Concatenate with explicit dtypes to avoid warnings
self.user_data_["trades"] = pd.concat(
[existing_trades, trades], ignore_index=True, copy=False
)
def get_trades(self) -> pd.DataFrame:
return (
self.user_data_["trades"] if "trades" in self.user_data_ else pd.DataFrame()
)
def cointegration_check(self) -> Optional[pd.DataFrame]:
print(f"***{self}*** STARTING....")
config = self.config_
curr_training_start_idx = 0
COINTEGRATION_DATA_COLUMNS = {
"tstamp": "datetime64[ns]",
"pair": "string",
"eg_pvalue": "float64",
"johansen_lr1": "float64",
"johansen_cvt": "float64",
"eg_is_cointegrated": "bool",
"johansen_is_cointegrated": "bool",
}
# Initialize trades DataFrame with proper dtypes to avoid concatenation warnings
result: pd.DataFrame = pd.DataFrame(
columns=[col for col in COINTEGRATION_DATA_COLUMNS.keys()]
) # .astype(COINTEGRATION_DATA_COLUMNS)
training_minutes = config["training_minutes"]
while True:
print(curr_training_start_idx, end="\r")
self.get_datasets(
training_minutes=training_minutes,
training_start_index=curr_training_start_idx,
testing_size=1,
)
if len(self.training_df_) < training_minutes:
print(
f"{self}: current offset={curr_training_start_idx}"
f" * Training data length={len(self.training_df_)} < {training_minutes}"
" * Not enough training data. Completing the job."
)
break
new_row = pd.Series(CointegrationData(self).to_dict())
result.loc[len(result)] = new_row
curr_training_start_idx += 1
return result
def to_stop_close_conditions(self, predicted_row: pd.Series) -> bool:
config = self.config_
if (
"stop_close_conditions" not in config
or config["stop_close_conditions"] is None
):
return False
if "profit" in config["stop_close_conditions"]:
current_return = self._current_return(predicted_row)
#
# print(f"time={predicted_row['tstamp']} current_return={current_return}")
#
if current_return >= config["stop_close_conditions"]["profit"]:
print(f"STOP PROFIT: {current_return}")
self.user_data_["stop_close_state"] = PairState.CLOSE_STOP_PROFIT
return True
if "loss" in config["stop_close_conditions"]:
if current_return <= config["stop_close_conditions"]["loss"]:
print(f"STOP LOSS: {current_return}")
self.user_data_["stop_close_state"] = PairState.CLOSE_STOP_LOSS
return True
return False
def on_open_trades(self, trades: pd.DataFrame) -> None:
if "close_trades" in self.user_data_:
del self.user_data_["close_trades"]
self.user_data_["open_trades"] = trades
def on_close_trades(self, trades: pd.DataFrame) -> None:
del self.user_data_["open_trades"]
self.user_data_["close_trades"] = trades
def _current_return(self, predicted_row: pd.Series) -> float:
if "open_trades" in self.user_data_:
open_trades = self.user_data_["open_trades"]
if len(open_trades) == 0:
return 0.0
def _single_instrument_return(symbol: str) -> float:
instrument_open_trades = open_trades[open_trades["symbol"] == symbol]
instrument_open_price = instrument_open_trades["price"].iloc[0]
sign = -1 if instrument_open_trades["side"].iloc[0] == "SELL" else 1
instrument_price = predicted_row[f"{self.price_column_}_{symbol}"]
instrument_return = (
sign
* (instrument_price - instrument_open_price)
/ instrument_open_price
)
return float(instrument_return) * 100.0
instrument_a_return = _single_instrument_return(self.symbol_a_)
instrument_b_return = _single_instrument_return(self.symbol_b_)
return instrument_a_return + instrument_b_return
return 0.0
def __repr__(self) -> str:
return self.name()
def name(self) -> str:
return f"{self.symbol_a_} & {self.symbol_b_}"
# return f"{self.symbol_a_} & {self.symbol_b_}"
@abstractmethod
def predict(self) -> pd.DataFrame: ...
# @abstractmethod
# def predicted_df(self) -> Optional[pd.DataFrame]: ...