This commit is contained in:
Oleg Sheynin 2025-05-29 21:11:37 -04:00
parent 73b7fa1aaf
commit da6ccf2bfb
4 changed files with 297 additions and 139 deletions

View File

@ -1,95 +0,0 @@
from typing import Any, Dict, List, Optional
# ------------------------ Configuration ------------------------
# Default configuration
CRYPTO_CONFIG: Dict = {
"security_type": "CRYPTO",
# --- Data retrieval
"data_directory": "./data/crypto",
"datafiles": [
# "20250519.mktdata.ohlcv.db",
# "20250520.mktdata.ohlcv.db",
# "20250521.mktdata.ohlcv.db",
# "20250522.mktdata.ohlcv.db",
# "20250523.mktdata.ohlcv.db",
# "20250524.mktdata.ohlcv.db",
"20250525.mktdata.ohlcv.db",
],
"db_table_name": "bnbspot_ohlcv_1min",
# ----- Instruments
"exchange_id": "BNBSPOT",
"instrument_id_pfx": "PAIR-",
"instruments": [
"BTC-USDT",
"BCH-USDT",
"ETH-USDT",
"LTC-USDT",
"XRP-USDT",
"ADA-USDT",
"SOL-USDT",
"DOT-USDT",
],
"trading_hours": {
"begin_session": "00:00:00",
"end_session": "23:59:00",
"timezone": "UTC",
},
# ----- Model Settings
"price_column": "close",
"min_required_points": 30,
"zero_threshold": 1e-10,
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 0.5,
# "training_minutes": 120,
"training_minutes": 60,
# ----- Validation
"funding_per_pair": 2000.0, # USD
}
# ========================== EQUITIES
EQT_CONFIG: Dict = {
# --- Data retrieval
"security_type": "EQUITY",
"data_directory": "./data/equity",
"datafiles": [
# "20250508.alpaca_sim_md.db",
# "20250509.alpaca_sim_md.db",
"20250512.alpaca_sim_md.db",
# "20250513.alpaca_sim_md.db",
# "20250514.alpaca_sim_md.db",
# "20250515.alpaca_sim_md.db",
# "20250516.alpaca_sim_md.db",
# "20250519.alpaca_sim_md.db",
# "20250520.alpaca_sim_md.db"
],
"db_table_name": "md_1min_bars",
# ----- Instruments
"exchange_id": "ALPACA",
"instrument_id_pfx": "STOCK-",
"instruments": [
"COIN",
"GBTC",
"HOOD",
"MSTR",
"PYPL",
],
"trading_hours": {
"begin_session": "9:30:00",
"end_session": "16:00:00",
"timezone": "America/New_York",
},
# ----- Model Settings
"price_column": "close",
"min_required_points": 30,
"zero_threshold": 1e-10,
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 0.5,
"training_minutes": 120,
# ----- Validation
"funding_per_pair": 2000.0,
}

View File

@ -1,28 +1,112 @@
from abc import ABC, abstractmethod
import sys
from typing import Any, Dict, List, Optional from typing import Any, Dict, List
import pandas as pd import pandas as pd
import numpy as np
# ============= statsmodels =================== from strategies import SlidingFitStrategy, StaticFitStrategy
from statsmodels.tsa.vector_ar.vecm import VECM
from backtest_configs import CRYPTO_CONFIG
from strategies import StaticFitStrategy
from tools.data_loader import load_market_data from tools.data_loader import load_market_data
from tools.trading_pair import TradingPair from tools.trading_pair import TradingPair
from results import BacktestResult from results import BacktestResult
NanoPerMin = 1e9
UNSET_FLOAT: float = sys.float_info.max # ------------------------ Configuration ------------------------
UNSET_INT: int = sys.maxsize # Default configuration
CRYPTO_CONFIG: Dict = {
"security_type": "CRYPTO",
# --- Data retrieval
"data_directory": "./data/crypto",
"datafiles": [
"20250519.mktdata.ohlcv.db",
# "20250520.mktdata.ohlcv.db",
# "20250521.mktdata.ohlcv.db",
# "20250522.mktdata.ohlcv.db",
# "20250523.mktdata.ohlcv.db",
# "20250524.mktdata.ohlcv.db",
# "20250525.mktdata.ohlcv.db",
],
"db_table_name": "bnbspot_ohlcv_1min",
# ----- Instruments
"exchange_id": "BNBSPOT",
"instrument_id_pfx": "PAIR-",
"instruments": [
"BTC-USDT",
"BCH-USDT",
"ETH-USDT",
"LTC-USDT",
"XRP-USDT",
"ADA-USDT",
"SOL-USDT",
"DOT-USDT",
],
"trading_hours": {
"begin_session": "00:00:00",
"end_session": "23:59:00",
"timezone": "UTC",
},
# ----- Model Settings
"price_column": "close",
"min_required_points": 30,
"zero_threshold": 1e-10,
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 0.5,
# "training_minutes": 120,
"training_minutes": 120,
# ----- Validation
"funding_per_pair": 2000.0, # USD
}
# ========================== EQUITIES
EQT_CONFIG: Dict = {
# --- Data retrieval
"security_type": "EQUITY",
"data_directory": "./data/equity",
"datafiles": [
"20250508.alpaca_sim_md.db",
# "20250509.alpaca_sim_md.db",
# "20250512.alpaca_sim_md.db",
# "20250513.alpaca_sim_md.db",
# "20250514.alpaca_sim_md.db",
# "20250515.alpaca_sim_md.db",
# "20250516.alpaca_sim_md.db",
# "20250519.alpaca_sim_md.db",
# "20250520.alpaca_sim_md.db"
],
"db_table_name": "md_1min_bars",
# ----- Instruments
"exchange_id": "ALPACA",
"instrument_id_pfx": "STOCK-",
"instruments": [
"COIN",
"GBTC",
"HOOD",
"MSTR",
"PYPL",
],
"trading_hours": {
"begin_session": "9:30:00",
"end_session": "16:00:00",
"timezone": "America/New_York",
},
# ----- Model Settings
"price_column": "close",
"min_required_points": 30,
"zero_threshold": 1e-10,
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 0.5,
"training_minutes": 120,
# ----- Validation
"funding_per_pair": 2000.0,
}
CONFIG = CRYPTO_CONFIG # CONFIG = CRYPTO_CONFIG
# CONFIG = EQT_CONFIG CONFIG = EQT_CONFIG
STRATEGY = StaticFitStrategy()
# CONFIG = CRYPTO_CONFIG
# STRATEGY = SlidingFitStrategy()
def run_all_pairs(config: Dict, datafile: str, price_column: str, bt_result: BacktestResult) -> None: def run_all_pairs(config: Dict, datafile: str, price_column: str, bt_result: BacktestResult) -> None:
@ -47,9 +131,8 @@ def run_all_pairs(config: Dict, datafile: str, price_column: str, bt_result: Bac
pairs_trades = [] pairs_trades = []
strategy = StaticFitStrategy()
for pair in _create_pairs(config): for pair in _create_pairs(config):
single_pair_trades = strategy.run_pair(pair=pair, config=CONFIG, bt_result=bt_result) single_pair_trades = STRATEGY.run_pair(pair=pair, config=CONFIG, bt_result=bt_result)
if single_pair_trades is not None and len(single_pair_trades) > 0: if single_pair_trades is not None and len(single_pair_trades) > 0:
pairs_trades.append(single_pair_trades) pairs_trades.append(single_pair_trades)
# Check if result_list has any data before concatenating # Check if result_list has any data before concatenating
@ -99,6 +182,7 @@ def main() -> None:
# BacktestResults.print_results_summary(all_results) # BacktestResults.print_results_summary(all_results)
bt_results.calculate_returns(all_results) bt_results.calculate_returns(all_results)
# Print grand totals # Print grand totals
bt_results.print_grand_totals() bt_results.print_grand_totals()
bt_results.print_outstanding_positions() bt_results.print_outstanding_positions()

View File

@ -1,4 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from enum import Enum
import sys import sys
from typing import Dict, Optional from typing import Dict, Optional
@ -199,15 +200,29 @@ class StaticFitStrategy(PairsTradingStrategy):
columns=self.TRADES_COLUMNS, columns=self.TRADES_COLUMNS,
) )
class PairState(Enum):
INITIAL = 1
OPEN = 2
CLOSED = 3
class SlidingFitStrategy(PairsTradingStrategy): class SlidingFitStrategy(PairsTradingStrategy):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.curr_training_start_idx_ = 0 self.curr_training_start_idx_ = 0
def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]: def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]:
pair.user_data_['is_position_open'] = False print(f"***{pair}*** STARTING....")
pair.user_data_['state'] = PairState.INITIAL
pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS)
pair.user_data_["is_cointegrated"] = False
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_open_trshld"]
training_minutes = config["training_minutes"] training_minutes = config["training_minutes"]
while True: while True:
print(self.curr_training_start_idx_, end='\r')
pair.get_datasets( pair.get_datasets(
training_minutes=training_minutes, training_minutes=training_minutes,
training_start_index=self.curr_training_start_idx_, training_start_index=self.curr_training_start_idx_,
@ -215,26 +230,175 @@ class SlidingFitStrategy(PairsTradingStrategy):
) )
if len(pair.training_df_) < training_minutes: if len(pair.training_df_) < training_minutes:
print(f"{pair}: Not enough training data. Completing the job.") print(f"{pair}: {self.curr_training_start_idx_} Not enough training data. Completing the job.")
if pair.user_data_["state"] == PairState.OPEN:
print(f"{pair}: {self.curr_training_start_idx_} Position is not closed.")
# outstanding positions
# last_row_index = self.curr_training_start_idx_ + training_minutes
bt_result.handle_outstanding_position(
pair=pair,
pair_result_df=pair.predicted_df_,
last_row_index=0,
open_side_a=pair.user_data_["open_side_a"],
open_side_b=pair.user_data_["open_side_b"],
open_px_a=pair.user_data_["open_px_a"],
open_px_b=pair.user_data_["open_px_b"],
open_tstamp=pair.user_data_["open_tstamp"],
)
break break
try: try:
is_cointegrated = pair.train_pair() is_cointegrated = pair.train_pair()
if not is_cointegrated:
print(f"{pair} IS NOT COINTEGRATED")
return None
except Exception as e: except Exception as e:
print(f"{pair}: Training failed: {str(e)}") raise Exception(f"{pair}: Training failed: {str(e)}") from e
return None
if pair.user_data_["is_cointegrated"] != is_cointegrated:
pair.user_data_["is_cointegrated"] = is_cointegrated
if not is_cointegrated:
if pair.user_data_["state"] == PairState.OPEN:
print(f"{pair} {self.curr_training_start_idx_} LOST COINTEGRATION. Consider closing positions...")
else:
print(f"{pair} {self.curr_training_start_idx_} IS NOT COINTEGRATED. Moving on")
else:
print('*' * 80)
print(f"Pair {pair} ({self.curr_training_start_idx_}) IS COINTEGRATED")
print('*' * 80)
if not is_cointegrated:
self.curr_training_start_idx_ += 1
continue
try: try:
pair.predict() pair.predict()
except Exception as e: except Exception as e:
print(f"{pair}: Prediction failed: {str(e)}") raise Exception(f"{pair}: Prediction failed: {str(e)}") from e
return None
if pair.user_data_["state"] == PairState.INITIAL:
open_trades = self._get_open_trades(pair, open_threshold=open_threshold)
if open_trades is not None:
pair.user_data_["trades"] = open_trades
pair.user_data_["state"] = PairState.OPEN
elif pair.user_data_["state"] == PairState.OPEN:
close_trades = self._get_close_trades(pair, close_threshold=close_threshold)
if close_trades is not None:
pair.user_data_["trades"] = pd.concat([pair.user_data_["trades"], close_trades], ignore_index=True)
pair.user_data_["state"] = PairState.CLOSED
break
self.curr_training_start_idx_ += 1
print(f"***{pair}*** FINISHED ... {len(pair.user_data_['trades'])}")
return pair.user_data_["trades"]
def _get_open_trades(self, pair: TradingPair, open_threshold: float) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames()
predicted_df = pair.predicted_df_
open_row = predicted_df.loc[0]
open_tstamp = open_row["tstamp"]
open_disequilibrium = open_row["disequilibrium"]
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"]
if open_scaled_disequilibrium < open_threshold:
return None
# creating the trades
if open_disequilibrium > 0:
open_side_a = "SELL"
open_side_b = "BUY"
close_side_a = "BUY"
close_side_b = "SELL"
else:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
# save closing sides
pair.user_data_["open_side_a"] = open_side_a
pair.user_data_["open_side_b"] = open_side_b
pair.user_data_["open_px_a"] = open_px_a
pair.user_data_["open_px_b"] = open_px_b
pair.user_data_["open_tstamp"] = open_tstamp
pair.user_data_["close_side_a"] = close_side_a
pair.user_data_["close_side_b"] = close_side_b
# create opening trades
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
]
return pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
def _get_close_trades(self, pair: TradingPair, close_threshold: float) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames()
close_row = pair.predicted_df_.loc[0]
close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"]
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
close_side_a = pair.user_data_["close_side_a"]
close_side_b = pair.user_data_["close_side_b"]
if close_scaled_disequilibrium > close_threshold:
return None
trd_signal_tuples = [
(
close_tstamp,
close_side_a,
pair.symbol_a_,
close_px_a,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
(
close_tstamp,
close_side_b,
pair.symbol_b_,
close_px_b,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame
return pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
pair_trades = self.create_trading_signals(pair=pair, config=config, result=bt_result)
return pair_trades

View File

@ -1,5 +1,5 @@
from typing import List, Optional from typing import Any, Dict, List, Optional
import pandas as pd import pandas as pd
from statsmodels.tsa.vector_ar.vecm import VECM from statsmodels.tsa.vector_ar.vecm import VECM
@ -17,19 +17,22 @@ class TradingPair:
vecm_fit_: Optional[VECM] vecm_fit_: Optional[VECM]
user_data_: Dict[str, Any]
def __init__(self, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str): def __init__(self, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str):
self.symbol_a_ = symbol_a self.symbol_a_ = symbol_a
self.symbol_b_ = symbol_b self.symbol_b_ = symbol_b
self.price_column_ = price_column self.price_column_ = price_column
self.market_data_ = self._transform_dataframe(market_data)[["tstamp"] + self.colnames()] self.market_data_ = self._transform_dataframe(market_data)[["tstamp"] + self.colnames()]
self.training_mu_ = None self.training_mu_ = None
self.training_std_ = None self.training_std_ = None
self.training_df_ = None self.training_df_ = None
self.testing_df_ = None self.testing_df_ = None
self.vecm_fit_ = None self.vecm_fit_ = None
self.user_data_ = {}
def _transform_dataframe(self, df: pd.DataFrame): def _transform_dataframe(self, df: pd.DataFrame):
# Select only the columns we need # Select only the columns we need
df_selected = df[["tstamp", "symbol", self.price_column_]] df_selected = df[["tstamp", "symbol", self.price_column_]]
@ -57,7 +60,9 @@ class TradingPair:
return result_df return result_df
def get_datasets(self, training_minutes: int, training_start_index: int = 0, testing_size: Optional[int] = None) -> None: def get_datasets(self, training_minutes: int, training_start_index: int = 0, testing_size: Optional[int] = None) -> None:
self.training_df_ = self.market_data_.iloc[training_start_index:training_minutes - 1, :].copy()
testing_start_index = training_start_index + training_minutes
self.training_df_ = self.market_data_.iloc[training_start_index:testing_start_index, :].copy()
self.training_df_ = self.training_df_.dropna().reset_index(drop=True) self.training_df_ = self.training_df_.dropna().reset_index(drop=True)
testing_start_index = training_start_index + training_minutes testing_start_index = training_start_index + training_minutes
@ -101,7 +106,7 @@ class TradingPair:
return False return False
pass pass
print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80) # print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80)
self.fit_VECM() self.fit_VECM()
diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
self.training_mu_ = diseq_series.mean().iloc[0] self.training_mu_ = diseq_series.mean().iloc[0]