397 lines
16 KiB
Python
397 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
from enum import Enum
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import pandas as pd # type:ignore
|
|
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults
|
|
|
|
class PairState(Enum):
|
|
INITIAL = 1
|
|
OPEN = 2
|
|
CLOSE = 3
|
|
CLOSE_POSITION = 4
|
|
CLOSE_STOP_LOSS = 5
|
|
CLOSE_STOP_PROFIT = 6
|
|
|
|
class CointegrationData:
|
|
EG_PVALUE_THRESHOLD = 0.05
|
|
|
|
tstamp_: pd.Timestamp
|
|
pair_: str
|
|
eg_pvalue_: float
|
|
johansen_lr1_: float
|
|
johansen_cvt_: float
|
|
eg_is_cointegrated_: bool
|
|
johansen_is_cointegrated_: bool
|
|
|
|
def __init__(self, pair: TradingPair):
|
|
training_df = pair.training_df_
|
|
|
|
assert training_df is not None
|
|
from statsmodels.tsa.vector_ar.vecm import coint_johansen
|
|
|
|
df = training_df[pair.colnames()].reset_index(drop=True)
|
|
|
|
# Run Johansen cointegration test
|
|
result = coint_johansen(df, det_order=0, k_ar_diff=1)
|
|
self.johansen_lr1_ = result.lr1[0]
|
|
self.johansen_cvt_ = result.cvt[0, 1]
|
|
self.johansen_is_cointegrated_ = self.johansen_lr1_ > self.johansen_cvt_
|
|
|
|
# Run Engle-Granger cointegration test
|
|
from statsmodels.tsa.stattools import coint #type: ignore
|
|
|
|
col1, col2 = pair.colnames()
|
|
assert training_df is not None
|
|
series1 = training_df[col1].reset_index(drop=True)
|
|
series2 = training_df[col2].reset_index(drop=True)
|
|
|
|
self.eg_pvalue_ = float(coint(series1, series2)[1])
|
|
self.eg_is_cointegrated_ = bool(self.eg_pvalue_ < self.EG_PVALUE_THRESHOLD)
|
|
|
|
self.tstamp_ = training_df.index[-1]
|
|
self.pair_ = pair.name()
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"tstamp": self.tstamp_,
|
|
"pair": self.pair_,
|
|
"eg_pvalue": self.eg_pvalue_,
|
|
"johansen_lr1": self.johansen_lr1_,
|
|
"johansen_cvt": self.johansen_cvt_,
|
|
"eg_is_cointegrated": self.eg_is_cointegrated_,
|
|
"johansen_is_cointegrated": self.johansen_is_cointegrated_,
|
|
}
|
|
|
|
def __repr__(self) -> str:
|
|
return f"CointegrationData(tstamp={self.tstamp_}, pair={self.pair_}, eg_pvalue={self.eg_pvalue_}, johansen_lr1={self.johansen_lr1_}, johansen_cvt={self.johansen_cvt_}, eg_is_cointegrated={self.eg_is_cointegrated_}, johansen_is_cointegrated={self.johansen_is_cointegrated_})"
|
|
|
|
|
|
class TradingPair:
|
|
market_data_: pd.DataFrame
|
|
symbol_a_: str
|
|
symbol_b_: str
|
|
price_column_: str
|
|
|
|
training_mu_: float
|
|
training_std_: float
|
|
|
|
training_df_: pd.DataFrame
|
|
testing_df_: pd.DataFrame
|
|
|
|
vecm_fit_: VECMResults
|
|
|
|
user_data_: Dict[str, Any]
|
|
|
|
predicted_df_: Optional[pd.DataFrame]
|
|
|
|
def __init__(
|
|
self, config: Dict[str, Any], market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str
|
|
):
|
|
self.symbol_a_ = symbol_a
|
|
self.symbol_b_ = symbol_b
|
|
self.price_column_ = price_column
|
|
self.set_market_data(market_data)
|
|
self.user_data_ = {}
|
|
self.predicted_df_ = None
|
|
self.config_ = config
|
|
|
|
def set_market_data(self, market_data: pd.DataFrame) -> None:
|
|
self.market_data_ = pd.DataFrame(
|
|
self._transform_dataframe(market_data)[["tstamp"] + self.colnames()]
|
|
)
|
|
|
|
self.market_data_ = self.market_data_.dropna().reset_index(drop=True)
|
|
self.market_data_['tstamp'] = pd.to_datetime(self.market_data_['tstamp'])
|
|
self.market_data_ = self.market_data_.sort_values('tstamp')
|
|
|
|
def get_begin_index(self) -> int:
|
|
if "trading_hours" not in self.config_:
|
|
return 0
|
|
assert "timezone" in self.config_["trading_hours"]
|
|
assert "begin_session" in self.config_["trading_hours"]
|
|
start_time = pd.to_datetime(self.config_["trading_hours"]["begin_session"]).tz_localize(self.config_["trading_hours"]["timezone"]).time()
|
|
mask = self.market_data_['tstamp'].dt.time >= start_time
|
|
return int(self.market_data_.index[mask].min())
|
|
|
|
def get_end_index(self) -> int:
|
|
if "trading_hours" not in self.config_:
|
|
return 0
|
|
assert "timezone" in self.config_["trading_hours"]
|
|
assert "end_session" in self.config_["trading_hours"]
|
|
end_time = pd.to_datetime(self.config_["trading_hours"]["end_session"]).tz_localize(self.config_["trading_hours"]["timezone"]).time()
|
|
mask = self.market_data_['tstamp'].dt.time <= end_time
|
|
return int(self.market_data_.index[mask].max())
|
|
|
|
def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
# Select only the columns we need
|
|
df_selected: pd.DataFrame = pd.DataFrame(
|
|
df[["tstamp", "symbol", self.price_column_]]
|
|
)
|
|
|
|
# Start with unique timestamps
|
|
result_df: pd.DataFrame = (
|
|
pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True)
|
|
)
|
|
|
|
# For each unique symbol, add a corresponding close price column
|
|
|
|
symbols = df_selected["symbol"].unique()
|
|
for symbol in symbols:
|
|
# Filter rows for this symbol
|
|
df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(
|
|
drop=True
|
|
)
|
|
|
|
# Create column name like "close-COIN"
|
|
new_price_column = f"{self.price_column_}_{symbol}"
|
|
|
|
# Create temporary dataframe with timestamp and price
|
|
temp_df = pd.DataFrame(
|
|
{
|
|
"tstamp": df_symbol["tstamp"],
|
|
new_price_column: df_symbol[self.price_column_],
|
|
}
|
|
)
|
|
|
|
# Join with our result dataframe
|
|
result_df = pd.merge(result_df, temp_df, on="tstamp", how="left")
|
|
result_df = result_df.reset_index(
|
|
drop=True
|
|
) # do not dropna() since irrelevant symbol would affect dataset
|
|
|
|
return result_df.dropna()
|
|
|
|
def get_datasets(
|
|
self,
|
|
training_minutes: int,
|
|
training_start_index: int = 0,
|
|
testing_size: Optional[int] = None,
|
|
) -> None:
|
|
|
|
testing_start_index = training_start_index + training_minutes
|
|
self.training_df_ = self.market_data_.iloc[
|
|
training_start_index:testing_start_index, : training_minutes
|
|
].copy()
|
|
assert self.training_df_ is not None
|
|
self.training_df_ = self.training_df_.dropna().reset_index(drop=True)
|
|
|
|
testing_start_index = training_start_index + training_minutes
|
|
if testing_size is None:
|
|
self.testing_df_ = self.market_data_.iloc[testing_start_index:, :].copy()
|
|
else:
|
|
self.testing_df_ = self.market_data_.iloc[
|
|
testing_start_index : testing_start_index + testing_size, :
|
|
].copy()
|
|
assert self.testing_df_ is not None
|
|
self.testing_df_ = self.testing_df_.dropna().reset_index(drop=True)
|
|
|
|
def colnames(self) -> List[str]:
|
|
return [
|
|
f"{self.price_column_}_{self.symbol_a_}",
|
|
f"{self.price_column_}_{self.symbol_b_}",
|
|
]
|
|
|
|
def fit_VECM(self) -> None:
|
|
assert self.training_df_ is not None
|
|
vecm_df = self.training_df_[self.colnames()].reset_index(drop=True)
|
|
vecm_model = VECM(vecm_df, coint_rank=1)
|
|
vecm_fit = vecm_model.fit()
|
|
|
|
assert vecm_fit is not None
|
|
|
|
# URGENT check beta and alpha
|
|
|
|
# Check if the model converged properly
|
|
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
|
|
print(f"{self}: VECM model failed to converge properly")
|
|
|
|
self.vecm_fit_ = vecm_fit
|
|
# print(f"{self}: beta={self.vecm_fit_.beta} alpha={self.vecm_fit_.alpha}" )
|
|
# print(f"{self}: {self.vecm_fit_.summary()}")
|
|
pass
|
|
|
|
def train_pair(self) -> None:
|
|
# print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80)
|
|
self.fit_VECM()
|
|
assert self.training_df_ is not None and self.vecm_fit_ is not None
|
|
diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
|
|
# print(diseq_series.shape)
|
|
self.training_mu_ = float(diseq_series[0].mean())
|
|
self.training_std_ = float(diseq_series[0].std())
|
|
|
|
self.training_df_["dis-equilibrium"] = (
|
|
self.training_df_[self.colnames()] @ self.vecm_fit_.beta
|
|
)
|
|
# Normalize the dis-equilibrium
|
|
self.training_df_["scaled_dis-equilibrium"] = (
|
|
diseq_series - self.training_mu_
|
|
) / self.training_std_
|
|
|
|
def add_trades(self, trades: pd.DataFrame) -> None:
|
|
if self.user_data_["trades"] is None or len(self.user_data_["trades"]) == 0:
|
|
# If trades is empty or None, just assign the new trades directly
|
|
self.user_data_["trades"] = trades.copy()
|
|
else:
|
|
# Ensure both DataFrames have the same columns and dtypes before concatenation
|
|
existing_trades = self.user_data_["trades"]
|
|
|
|
# If existing trades is empty, just assign the new trades
|
|
if len(existing_trades) == 0:
|
|
self.user_data_["trades"] = trades.copy()
|
|
else:
|
|
# Ensure both DataFrames have the same columns
|
|
if set(existing_trades.columns) != set(trades.columns):
|
|
# Add missing columns to trades with appropriate default values
|
|
for col in existing_trades.columns:
|
|
if col not in trades.columns:
|
|
if col == "time":
|
|
trades[col] = pd.Timestamp.now()
|
|
elif col in ["action", "symbol"]:
|
|
trades[col] = ""
|
|
elif col in ["price", "disequilibrium", "scaled_disequilibrium"]:
|
|
trades[col] = 0.0
|
|
elif col == "pair":
|
|
trades[col] = None
|
|
else:
|
|
trades[col] = None
|
|
|
|
# Concatenate with explicit dtypes to avoid warnings
|
|
self.user_data_["trades"] = pd.concat(
|
|
[existing_trades, trades],
|
|
ignore_index=True,
|
|
copy=False
|
|
)
|
|
|
|
def get_trades(self) -> pd.DataFrame:
|
|
return self.user_data_["trades"] if "trades" in self.user_data_ else pd.DataFrame()
|
|
|
|
def predict(self) -> pd.DataFrame:
|
|
assert self.testing_df_ is not None
|
|
assert self.vecm_fit_ is not None
|
|
predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_))
|
|
|
|
# Convert prediction to a DataFrame for readability
|
|
predicted_df = pd.DataFrame(
|
|
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
|
|
)
|
|
|
|
|
|
predicted_df = pd.merge(
|
|
self.testing_df_.reset_index(drop=True),
|
|
pd.DataFrame(
|
|
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
|
|
),
|
|
left_index=True,
|
|
right_index=True,
|
|
suffixes=("", "_pred"),
|
|
).dropna()
|
|
|
|
predicted_df["disequilibrium"] = (
|
|
predicted_df[self.colnames()] @ self.vecm_fit_.beta
|
|
)
|
|
|
|
predicted_df["scaled_disequilibrium"] = (
|
|
abs(predicted_df["disequilibrium"] - self.training_mu_)
|
|
/ self.training_std_
|
|
)
|
|
|
|
predicted_df = predicted_df.reset_index(drop=True)
|
|
if self.predicted_df_ is None:
|
|
self.predicted_df_ = predicted_df
|
|
else:
|
|
self.predicted_df_ = pd.concat([self.predicted_df_, predicted_df], ignore_index=True)
|
|
# Reset index to ensure proper indexing
|
|
self.predicted_df_ = self.predicted_df_.reset_index(drop=True)
|
|
return self.predicted_df_
|
|
|
|
def cointegration_check(self) -> Optional[pd.DataFrame]:
|
|
print(f"***{self}*** STARTING....")
|
|
config = self.config_
|
|
|
|
curr_training_start_idx = 0
|
|
|
|
COINTEGRATION_DATA_COLUMNS = {
|
|
"tstamp" : "datetime64[ns]",
|
|
"pair" : "string",
|
|
"eg_pvalue" : "float64",
|
|
"johansen_lr1" : "float64",
|
|
"johansen_cvt" : "float64",
|
|
"eg_is_cointegrated" : "bool",
|
|
"johansen_is_cointegrated" : "bool",
|
|
}
|
|
# Initialize trades DataFrame with proper dtypes to avoid concatenation warnings
|
|
result: pd.DataFrame = pd.DataFrame(columns=[col for col in COINTEGRATION_DATA_COLUMNS.keys()]) #.astype(COINTEGRATION_DATA_COLUMNS)
|
|
|
|
training_minutes = config["training_minutes"]
|
|
while True:
|
|
print(curr_training_start_idx, end="\r")
|
|
self.get_datasets(
|
|
training_minutes=training_minutes,
|
|
training_start_index=curr_training_start_idx,
|
|
testing_size=1,
|
|
)
|
|
|
|
if len(self.training_df_) < training_minutes:
|
|
print(
|
|
f"{self}: current offset={curr_training_start_idx}"
|
|
f" * Training data length={len(self.training_df_)} < {training_minutes}"
|
|
" * Not enough training data. Completing the job."
|
|
)
|
|
break
|
|
new_row = pd.Series(CointegrationData(self).to_dict())
|
|
result.loc[len(result)] = new_row
|
|
curr_training_start_idx += 1
|
|
return result
|
|
|
|
def to_stop_close_conditions(self, predicted_row: pd.Series) -> bool:
|
|
config = self.config_
|
|
if ("stop_close_conditions" not in config or config["stop_close_conditions"] is None) :
|
|
return False
|
|
if "profit" in config["stop_close_conditions"]:
|
|
current_return = self._current_return(predicted_row)
|
|
#
|
|
# print(f"time={predicted_row['tstamp']} current_return={current_return}")
|
|
#
|
|
if current_return >= config["stop_close_conditions"]["profit"]:
|
|
self.user_data_["stop_close_state"] = PairState.CLOSE_STOP_PROFIT
|
|
return True
|
|
if "loss" in config["stop_close_conditions"]:
|
|
if current_return <= config["stop_close_conditions"]["loss"]:
|
|
self.user_data_["stop_close_state"] = PairState.CLOSE_STOP_LOSS
|
|
return True
|
|
return False
|
|
|
|
def on_open_trades(self, trades: pd.DataFrame) -> None:
|
|
if "close_trades" in self.user_data_: del self.user_data_["close_trades"]
|
|
self.user_data_["open_trades"] = trades
|
|
|
|
def on_close_trades(self, trades: pd.DataFrame) -> None:
|
|
del self.user_data_["open_trades"]
|
|
self.user_data_["close_trades"] = trades
|
|
|
|
def _current_return(self, predicted_row: pd.Series) -> float:
|
|
if "open_trades" in self.user_data_:
|
|
open_trades = self.user_data_["open_trades"]
|
|
if len(open_trades) == 0:
|
|
return 0.0
|
|
def _single_instrument_return(symbol: str) -> float:
|
|
instrument_open_trades = open_trades[open_trades["symbol"] == symbol]
|
|
instrument_sign = -1 if instrument_open_trades["action"].iloc[0] == "SELL" else 1
|
|
instrument_price = predicted_row[f"{self.price_column_}_{symbol}"]
|
|
instrument_return = instrument_sign * (instrument_price - instrument_open_trades["price"].iloc[0]) / instrument_open_trades["price"].iloc[0]
|
|
return float(instrument_return)
|
|
|
|
instrument_a_return = _single_instrument_return(self.symbol_a_)
|
|
instrument_b_return = _single_instrument_return(self.symbol_b_)
|
|
return (instrument_a_return + instrument_b_return) * 100.0
|
|
return 0.0
|
|
|
|
def __repr__(self) -> str:
|
|
return self.name()
|
|
|
|
def name(self) -> str:
|
|
return f"{self.symbol_a_} & {self.symbol_b_}"
|
|
# return f"{self.symbol_a_} & {self.symbol_b_}"
|