271 lines
10 KiB
Python
271 lines
10 KiB
Python
from typing import Any, Dict, List, Optional
|
|
|
|
import pandas as pd # type:ignore
|
|
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults # type:ignore
|
|
|
|
|
|
class TradingPair:
|
|
market_data_: pd.DataFrame
|
|
symbol_a_: str
|
|
symbol_b_: str
|
|
price_column_: str
|
|
|
|
training_mu_: float
|
|
training_std_: float
|
|
|
|
training_df_: pd.DataFrame
|
|
testing_df_: pd.DataFrame
|
|
|
|
vecm_fit_: VECMResults
|
|
|
|
user_data_: Dict[str, Any]
|
|
|
|
predicted_df_: Optional[pd.DataFrame]
|
|
|
|
def __init__(
|
|
self, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str
|
|
):
|
|
self.symbol_a_ = symbol_a
|
|
self.symbol_b_ = symbol_b
|
|
self.price_column_ = price_column
|
|
self.market_data_ = pd.DataFrame(
|
|
self._transform_dataframe(market_data)[["tstamp"] + self.colnames()]
|
|
)
|
|
|
|
|
|
self.user_data_ = {}
|
|
self.predicted_df_ = None
|
|
|
|
def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
# Select only the columns we need
|
|
df_selected: pd.DataFrame = pd.DataFrame(
|
|
df[["tstamp", "symbol", self.price_column_]]
|
|
)
|
|
|
|
# Start with unique timestamps
|
|
result_df: pd.DataFrame = (
|
|
pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True)
|
|
)
|
|
|
|
# For each unique symbol, add a corresponding close price column
|
|
|
|
symbols = df_selected["symbol"].unique()
|
|
for symbol in symbols:
|
|
# Filter rows for this symbol
|
|
df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(
|
|
drop=True
|
|
)
|
|
|
|
# Create column name like "close-COIN"
|
|
new_price_column = f"{self.price_column_}_{symbol}"
|
|
|
|
# Create temporary dataframe with timestamp and price
|
|
temp_df = pd.DataFrame(
|
|
{
|
|
"tstamp": df_symbol["tstamp"],
|
|
new_price_column: df_symbol[self.price_column_],
|
|
}
|
|
)
|
|
|
|
# Join with our result dataframe
|
|
result_df = pd.merge(result_df, temp_df, on="tstamp", how="left")
|
|
result_df = result_df.reset_index(
|
|
drop=True
|
|
) # do not dropna() since irrelevant symbol would affect dataset
|
|
|
|
return result_df.dropna()
|
|
|
|
def get_datasets(
|
|
self,
|
|
training_minutes: int,
|
|
training_start_index: int = 0,
|
|
testing_size: Optional[int] = None,
|
|
) -> None:
|
|
|
|
testing_start_index = training_start_index + training_minutes
|
|
self.training_df_ = self.market_data_.iloc[
|
|
training_start_index:testing_start_index, : training_minutes
|
|
].copy()
|
|
assert self.training_df_ is not None
|
|
self.training_df_ = self.training_df_.dropna().reset_index(drop=True)
|
|
|
|
testing_start_index = training_start_index + training_minutes
|
|
if testing_size is None:
|
|
self.testing_df_ = self.market_data_.iloc[testing_start_index:, :].copy()
|
|
else:
|
|
self.testing_df_ = self.market_data_.iloc[
|
|
testing_start_index : testing_start_index + testing_size, :
|
|
].copy()
|
|
assert self.testing_df_ is not None
|
|
self.testing_df_ = self.testing_df_.dropna().reset_index(drop=True)
|
|
|
|
def colnames(self) -> List[str]:
|
|
return [
|
|
f"{self.price_column_}_{self.symbol_a_}",
|
|
f"{self.price_column_}_{self.symbol_b_}",
|
|
]
|
|
|
|
def fit_VECM(self) -> None:
|
|
assert self.training_df_ is not None
|
|
vecm_df = self.training_df_[self.colnames()].reset_index(drop=True)
|
|
vecm_model = VECM(vecm_df, coint_rank=1)
|
|
vecm_fit = vecm_model.fit()
|
|
|
|
assert vecm_fit is not None
|
|
|
|
# URGENT check beta and alpha
|
|
|
|
# Check if the model converged properly
|
|
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
|
|
print(f"{self}: VECM model failed to converge properly")
|
|
|
|
self.vecm_fit_ = vecm_fit
|
|
# print(f"{self}: beta={self.vecm_fit_.beta} alpha={self.vecm_fit_.alpha}" )
|
|
# print(f"{self}: {self.vecm_fit_.summary()}")
|
|
pass
|
|
|
|
def check_cointegration_johansen(self) -> bool:
|
|
assert self.training_df_ is not None
|
|
from statsmodels.tsa.vector_ar.vecm import coint_johansen
|
|
|
|
df = self.training_df_[self.colnames()].reset_index(drop=True)
|
|
result = coint_johansen(df, det_order=0, k_ar_diff=1)
|
|
# print(
|
|
# f"{self}: lr1={result.lr1[0]} > cvt={result.cvt[0, 1]}? {result.lr1[0] > result.cvt[0, 1]}"
|
|
# )
|
|
is_cointegrated: bool = bool(result.lr1[0] > result.cvt[0, 1])
|
|
|
|
return is_cointegrated
|
|
|
|
def check_cointegration_engle_granger(self) -> bool:
|
|
from statsmodels.tsa.stattools import coint
|
|
|
|
col1, col2 = self.colnames()
|
|
assert self.training_df_ is not None
|
|
series1 = self.training_df_[col1].reset_index(drop=True)
|
|
series2 = self.training_df_[col2].reset_index(drop=True)
|
|
|
|
# Run Engle-Granger cointegration test
|
|
pvalue = coint(series1, series2)[1]
|
|
# Define cointegration if p-value < 0.05 (i.e., reject null of no cointegration)
|
|
is_cointegrated: bool = bool(pvalue < 0.05)
|
|
# print(f"{self}: is_cointegrated={is_cointegrated} pvalue={pvalue}")
|
|
return is_cointegrated
|
|
|
|
def check_cointegration(self) -> bool:
|
|
is_cointegrated_johansen = self.check_cointegration_johansen()
|
|
is_cointegrated_engle_granger = self.check_cointegration_engle_granger()
|
|
result = is_cointegrated_johansen or is_cointegrated_engle_granger
|
|
return result or True # TODO: remove this
|
|
|
|
def train_pair(self) -> bool:
|
|
result = self.check_cointegration()
|
|
# print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80)
|
|
self.fit_VECM()
|
|
assert self.training_df_ is not None and self.vecm_fit_ is not None
|
|
diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
|
|
# print(diseq_series.shape)
|
|
self.training_mu_ = float(diseq_series[0].mean())
|
|
self.training_std_ = float(diseq_series[0].std())
|
|
|
|
self.training_df_["dis-equilibrium"] = (
|
|
self.training_df_[self.colnames()] @ self.vecm_fit_.beta
|
|
)
|
|
# Normalize the dis-equilibrium
|
|
self.training_df_["scaled_dis-equilibrium"] = (
|
|
diseq_series - self.training_mu_
|
|
) / self.training_std_
|
|
|
|
return result
|
|
|
|
def add_trades(self, trades: pd.DataFrame) -> None:
|
|
if self.user_data_["trades"] is None or len(self.user_data_["trades"]) == 0:
|
|
# If trades is empty or None, just assign the new trades directly
|
|
self.user_data_["trades"] = trades.copy()
|
|
else:
|
|
# Ensure both DataFrames have the same columns and dtypes before concatenation
|
|
existing_trades = self.user_data_["trades"]
|
|
|
|
# If existing trades is empty, just assign the new trades
|
|
if len(existing_trades) == 0:
|
|
self.user_data_["trades"] = trades.copy()
|
|
else:
|
|
# Ensure both DataFrames have the same columns
|
|
if set(existing_trades.columns) != set(trades.columns):
|
|
# Add missing columns to trades with appropriate default values
|
|
for col in existing_trades.columns:
|
|
if col not in trades.columns:
|
|
if col == "time":
|
|
trades[col] = pd.Timestamp.now()
|
|
elif col in ["action", "symbol"]:
|
|
trades[col] = ""
|
|
elif col in ["price", "disequilibrium", "scaled_disequilibrium"]:
|
|
trades[col] = 0.0
|
|
elif col == "pair":
|
|
trades[col] = None
|
|
else:
|
|
trades[col] = None
|
|
|
|
# Concatenate with explicit dtypes to avoid warnings
|
|
self.user_data_["trades"] = pd.concat(
|
|
[existing_trades, trades],
|
|
ignore_index=True,
|
|
copy=False
|
|
)
|
|
|
|
def get_trades(self) -> pd.DataFrame:
|
|
return self.user_data_["trades"] if "trades" in self.user_data_ else pd.DataFrame()
|
|
|
|
def predict(self) -> pd.DataFrame:
|
|
assert self.testing_df_ is not None
|
|
assert self.vecm_fit_ is not None
|
|
predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_))
|
|
|
|
# Convert prediction to a DataFrame for readability
|
|
predicted_df = pd.DataFrame(
|
|
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
|
|
)
|
|
|
|
|
|
predicted_df = pd.merge(
|
|
self.testing_df_.reset_index(drop=True),
|
|
pd.DataFrame(
|
|
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
|
|
),
|
|
left_index=True,
|
|
right_index=True,
|
|
suffixes=("", "_pred"),
|
|
).dropna()
|
|
|
|
predicted_df["disequilibrium"] = (
|
|
predicted_df[self.colnames()] @ self.vecm_fit_.beta
|
|
)
|
|
|
|
predicted_df["scaled_disequilibrium"] = (
|
|
abs(predicted_df["disequilibrium"] - self.training_mu_)
|
|
/ self.training_std_
|
|
)
|
|
|
|
# print("*** PREDICTED DF")
|
|
# print(predicted_df)
|
|
# print("*" * 80)
|
|
# print("*** SELF.PREDICTED_DF")
|
|
# print(self.predicted_df_)
|
|
# print("*" * 80)
|
|
|
|
predicted_df = predicted_df.reset_index(drop=True)
|
|
if self.predicted_df_ is None:
|
|
self.predicted_df_ = predicted_df
|
|
else:
|
|
self.predicted_df_ = pd.concat([self.predicted_df_, predicted_df], ignore_index=True)
|
|
# Reset index to ensure proper indexing
|
|
self.predicted_df_ = self.predicted_df_.reset_index(drop=True)
|
|
return self.predicted_df_
|
|
|
|
def __repr__(self) -> str:
|
|
return self.name()
|
|
|
|
def name(self) -> str:
|
|
return f"{self.symbol_a_} & {self.symbol_b_}"
|
|
# return f"{self.symbol_a_} & {self.symbol_b_}"
|