Compare commits

...

3 Commits

Author SHA1 Message Date
Oleg Sheynin
2c08b6f1a9 intermarket fix for weekends 2025-07-25 00:47:19 +00:00
Oleg Sheynin
24f1f82d1f fixes to notebook 2025-07-24 22:45:21 +00:00
Oleg Sheynin
af0a6f62a9 progress 2025-07-24 21:09:13 +00:00
10 changed files with 737 additions and 1808 deletions

38
configuration/vecm.cfg Normal file
View File

@ -0,0 +1,38 @@
{
"market_data_loading": {
"CRYPTO": {
"data_directory": "./data/crypto",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "PAIR-",
},
"EQUITY": {
"data_directory": "./data/equity",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "STOCK-",
}
},
# ====== Funding ======
"funding_per_pair": 2000.0,
# ====== Trading Parameters ======
"price_column": "close",
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 1.0,
"training_minutes": 120,
"fit_method_class": "pt_trading.vecm_rolling_fit.VECMRollingFit",
# ====== Stop Conditions ======
"stop_close_conditions": {
"profit": 2.0,
"loss": -0.5
}
# ====== End of Session Closeout ======
"close_outstanding_positions": true,
# "close_outstanding_positions": false,
"trading_hours": {
"timezone": "America/New_York",
"begin_session": "9:30:00",
"end_session": "18:30:00",
}
}

View File

@ -0,0 +1,38 @@
{
"market_data_loading": {
"CRYPTO": {
"data_directory": "./data/crypto",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "PAIR-",
},
"EQUITY": {
"data_directory": "./data/equity",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "STOCK-",
}
},
# ====== Funding ======
"funding_per_pair": 2000.0,
# ====== Trading Parameters ======
"price_column": "close",
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 0.5,
"training_minutes": 120,
"fit_method_class": "pt_trading.z-score_rolling_fit.ZScoreRollingFit",
# ====== Stop Conditions ======
"stop_close_conditions": {
"profit": 2.0,
"loss": -0.5
}
# ====== End of Session Closeout ======
"close_outstanding_positions": true,
# "close_outstanding_positions": false,
"trading_hours": {
"timezone": "America/New_York",
"begin_session": "9:30:00",
"end_session": "15:30:00",
}
}

View File

@ -31,8 +31,8 @@
"close_outstanding_positions": true,
# "close_outstanding_positions": false,
"trading_hours": {
"timezone": "America/New_York",
"begin_session": "9:30:00",
"end_session": "22:30:00",
"timezone": "America/New_York"
"end_session": "18:30:00",
}
}

View File

@ -1,3 +1,5 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Optional, cast
@ -21,6 +23,15 @@ class PairsTradingFitMethod(ABC):
"signed_scaled_disequilibrium",
"pair",
]
@staticmethod
def create(config: Dict) -> PairsTradingFitMethod:
import importlib
fit_method_class_name = config.get("fit_method_class", None)
assert fit_method_class_name is not None
module_name, class_name = fit_method_class_name.rsplit(".", 1)
module = importlib.import_module(module_name)
fit_method = getattr(module, class_name)()
return cast(PairsTradingFitMethod, fit_method)
@abstractmethod
def run_pair(

View File

@ -121,7 +121,7 @@ def store_config_in_database(
config_file_path: str,
config: Dict,
fit_method_class: str,
datafiles: List[str],
datafiles: List[Tuple[str, str]],
instruments: List[Dict[str, str]],
) -> None:
"""
@ -140,7 +140,7 @@ def store_config_in_database(
config_json = json.dumps(config, indent=2, default=str)
# Convert lists to comma-separated strings for storage
datafiles_str = ", ".join(datafiles)
datafiles_str = ", ".join([f"{datafile}" for _, datafile in datafiles])
instruments_str = ", ".join(
[
f"{inst['symbol']}:{inst['instrument_type']}:{inst['exchange_id']}"
@ -613,7 +613,7 @@ class BacktestResult:
return current_value_a, current_value_b, total_current_value
def store_results_in_database(
self, db_path: str, datafile: str
self, db_path: str, day: str
) -> None:
"""
Store backtest results in the SQLite database.
@ -623,8 +623,7 @@ class BacktestResult:
try:
# Extract date from datafile name (assuming format like 20250528.mktdata.ohlcv.db)
filename = os.path.basename(datafile)
date_str = filename.split(".")[0] # Extract date part
date_str = day
# Convert to proper date format
try:

View File

@ -86,7 +86,12 @@ class TradingPair(ABC):
# predicted_df_: Optional[pd.DataFrame]
def __init__(
self, config: Dict[str, Any], market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str
self,
config: Dict[str, Any],
market_data: pd.DataFrame,
symbol_a: str,
symbol_b: str,
price_column: str,
):
self.symbol_a_ = symbol_a
self.symbol_b_ = symbol_b
@ -102,16 +107,20 @@ class TradingPair(ABC):
)
self.market_data_ = self.market_data_.dropna().reset_index(drop=True)
self.market_data_['tstamp'] = pd.to_datetime(self.market_data_['tstamp'])
self.market_data_ = self.market_data_.sort_values('tstamp')
self.market_data_["tstamp"] = pd.to_datetime(self.market_data_["tstamp"])
self.market_data_ = self.market_data_.sort_values("tstamp")
def get_begin_index(self) -> int:
if "trading_hours" not in self.config_:
return 0
assert "timezone" in self.config_["trading_hours"]
assert "begin_session" in self.config_["trading_hours"]
start_time = pd.to_datetime(self.config_["trading_hours"]["begin_session"]).tz_localize(self.config_["trading_hours"]["timezone"]).time()
mask = self.market_data_['tstamp'].dt.time >= start_time
start_time = (
pd.to_datetime(self.config_["trading_hours"]["begin_session"])
.tz_localize(self.config_["trading_hours"]["timezone"])
.time()
)
mask = self.market_data_["tstamp"].dt.time >= start_time
return int(self.market_data_.index[mask].min())
def get_end_index(self) -> int:
@ -119,8 +128,12 @@ class TradingPair(ABC):
return 0
assert "timezone" in self.config_["trading_hours"]
assert "end_session" in self.config_["trading_hours"]
end_time = pd.to_datetime(self.config_["trading_hours"]["end_session"]).tz_localize(self.config_["trading_hours"]["timezone"]).time()
mask = self.market_data_['tstamp'].dt.time <= end_time
end_time = (
pd.to_datetime(self.config_["trading_hours"]["end_session"])
.tz_localize(self.config_["trading_hours"]["timezone"])
.time()
)
mask = self.market_data_["tstamp"].dt.time <= end_time
return int(self.market_data_.index[mask].max())
def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
@ -171,7 +184,7 @@ class TradingPair(ABC):
testing_start_index = training_start_index + training_minutes
self.training_df_ = self.market_data_.iloc[
training_start_index:testing_start_index, : training_minutes
training_start_index:testing_start_index, :training_minutes
].copy()
assert self.training_df_ is not None
self.training_df_ = self.training_df_.dropna().reset_index(drop=True)
@ -213,7 +226,11 @@ class TradingPair(ABC):
trades[col] = pd.Timestamp.now()
elif col in ["action", "symbol"]:
trades[col] = ""
elif col in ["price", "disequilibrium", "scaled_disequilibrium"]:
elif col in [
"price",
"disequilibrium",
"scaled_disequilibrium",
]:
trades[col] = 0.0
elif col == "pair":
trades[col] = None
@ -222,13 +239,13 @@ class TradingPair(ABC):
# Concatenate with explicit dtypes to avoid warnings
self.user_data_["trades"] = pd.concat(
[existing_trades, trades],
ignore_index=True,
copy=False
[existing_trades, trades], ignore_index=True, copy=False
)
def get_trades(self) -> pd.DataFrame:
return self.user_data_["trades"] if "trades" in self.user_data_ else pd.DataFrame()
return (
self.user_data_["trades"] if "trades" in self.user_data_ else pd.DataFrame()
)
def cointegration_check(self) -> Optional[pd.DataFrame]:
print(f"***{self}*** STARTING....")
@ -237,16 +254,18 @@ class TradingPair(ABC):
curr_training_start_idx = 0
COINTEGRATION_DATA_COLUMNS = {
"tstamp" : "datetime64[ns]",
"pair" : "string",
"eg_pvalue" : "float64",
"johansen_lr1" : "float64",
"johansen_cvt" : "float64",
"eg_is_cointegrated" : "bool",
"johansen_is_cointegrated" : "bool",
"tstamp": "datetime64[ns]",
"pair": "string",
"eg_pvalue": "float64",
"johansen_lr1": "float64",
"johansen_cvt": "float64",
"eg_is_cointegrated": "bool",
"johansen_is_cointegrated": "bool",
}
# Initialize trades DataFrame with proper dtypes to avoid concatenation warnings
result: pd.DataFrame = pd.DataFrame(columns=[col for col in COINTEGRATION_DATA_COLUMNS.keys()]) #.astype(COINTEGRATION_DATA_COLUMNS)
result: pd.DataFrame = pd.DataFrame(
columns=[col for col in COINTEGRATION_DATA_COLUMNS.keys()]
) # .astype(COINTEGRATION_DATA_COLUMNS)
training_minutes = config["training_minutes"]
while True:
@ -271,7 +290,10 @@ class TradingPair(ABC):
def to_stop_close_conditions(self, predicted_row: pd.Series) -> bool:
config = self.config_
if ("stop_close_conditions" not in config or config["stop_close_conditions"] is None) :
if (
"stop_close_conditions" not in config
or config["stop_close_conditions"] is None
):
return False
if "profit" in config["stop_close_conditions"]:
current_return = self._current_return(predicted_row)
@ -290,7 +312,8 @@ class TradingPair(ABC):
return False
def on_open_trades(self, trades: pd.DataFrame) -> None:
if "close_trades" in self.user_data_: del self.user_data_["close_trades"]
if "close_trades" in self.user_data_:
del self.user_data_["close_trades"]
self.user_data_["open_trades"] = trades
def on_close_trades(self, trades: pd.DataFrame) -> None:
@ -302,18 +325,23 @@ class TradingPair(ABC):
open_trades = self.user_data_["open_trades"]
if len(open_trades) == 0:
return 0.0
def _single_instrument_return(symbol: str) -> float:
instrument_open_trades = open_trades[open_trades["symbol"] == symbol]
instrument_open_price = instrument_open_trades["price"].iloc[0]
sign = -1 if instrument_open_trades["side"].iloc[0] == "SELL" else 1
instrument_price = predicted_row[f"{self.price_column_}_{symbol}"]
instrument_return = sign * (instrument_price - instrument_open_price) / instrument_open_price
instrument_return = (
sign
* (instrument_price - instrument_open_price)
/ instrument_open_price
)
return float(instrument_return) * 100.0
instrument_a_return = _single_instrument_return(self.symbol_a_)
instrument_b_return = _single_instrument_return(self.symbol_b_)
return (instrument_a_return + instrument_b_return)
return instrument_a_return + instrument_b_return
return 0.0
def __repr__(self) -> str:
@ -328,4 +356,3 @@ class TradingPair(ABC):
# @abstractmethod
# def predicted_df(self) -> Optional[pd.DataFrame]: ...

View File

@ -5,7 +5,7 @@ from typing import Dict, List, cast
import pandas as pd
def load_sqlite_to_dataframe(db_path, query):
def load_sqlite_to_dataframe(db_path:str, query:str) -> pd.DataFrame:
try:
conn = sqlite3.connect(db_path)

File diff suppressed because one or more lines are too long

View File

@ -3,7 +3,7 @@ import glob
import importlib
import os
from datetime import date, datetime
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
@ -17,11 +17,13 @@ from pt_trading.results import (
from pt_trading.fit_method import PairsTradingFitMethod
from pt_trading.trading_pair import TradingPair
DayT = str
DataFileNameT = str
def resolve_datafiles(
config: Dict, date_pattern: str, instruments: List[Dict[str, str]]
) -> List[str]:
resolved_files = []
) -> List[Tuple[DayT, DataFileNameT]]:
resolved_files: List[Tuple[DayT, DataFileNameT]] = []
for inst in instruments:
pattern = date_pattern
inst_type = inst["instrument_type"]
@ -31,12 +33,17 @@ def resolve_datafiles(
if not os.path.isabs(pattern):
pattern = os.path.join(data_dir, f"{pattern}.mktdata.ohlcv.db")
matched_files = glob.glob(pattern)
resolved_files.extend(matched_files)
for matched_file in matched_files:
import re
match = re.search(r"(\d{8})\.mktdata\.ohlcv\.db$", matched_file)
assert match is not None
day = match.group(1)
resolved_files.append((day, matched_file))
else:
# Handle explicit file path
if not os.path.isabs(pattern):
pattern = os.path.join(data_dir, f"{pattern}.mktdata.ohlcv.db")
resolved_files.append(pattern)
resolved_files.append((date_pattern, pattern))
return sorted(list(set(resolved_files))) # Remove duplicates and sort
@ -61,7 +68,7 @@ def get_instruments(args: argparse.Namespace, config: Dict) -> List[Dict[str, st
def run_backtest(
config: Dict,
datafile: str,
datafiles: List[str],
price_column: str,
fit_method: PairsTradingFitMethod,
instruments: List[Dict[str, str]],
@ -70,10 +77,14 @@ def run_backtest(
Run backtest for all pairs using the specified instruments.
"""
bt_result: BacktestResult = BacktestResult(config=config)
if len(datafiles) < 2:
print(f"WARNING: insufficient data files: {datafiles}")
return bt_result
pairs_trades = []
pairs = create_pairs(
datafile=datafile,
datafiles=datafiles,
fit_method=fit_method,
price_column=price_column,
config=config,
@ -92,7 +103,6 @@ def run_backtest(
bt_result.collect_single_day_results(pairs_trades)
return bt_result
def main() -> None:
parser = argparse.ArgumentParser(description="Run pairs trading backtest.")
parser.add_argument(
@ -122,11 +132,7 @@ def main() -> None:
config: Dict = load_config(args.config)
# Dynamically instantiate fit method class
fit_method_class_name = config.get("fit_method_class", None)
assert fit_method_class_name is not None
module_name, class_name = fit_method_class_name.rsplit(".", 1)
module = importlib.import_module(module_name)
fit_method = getattr(module, class_name)()
fit_method = PairsTradingFitMethod.create(config)
# Resolve data files (CLI takes priority over config)
instruments = get_instruments(args, config)
@ -136,6 +142,7 @@ def main() -> None:
print("No data files found to process.")
return
days = list(set([day for day, _ in datafiles]))
print(f"Found {len(datafiles)} data files to process:")
for df in datafiles:
print(f" - {df}")
@ -158,7 +165,7 @@ def main() -> None:
db_path=args.result_db,
config_file_path=args.config,
config=config,
fit_method_class=fit_method_class_name,
fit_method_class=config["fit_method_class"],
datafiles=datafiles,
instruments=instruments,
)
@ -166,8 +173,9 @@ def main() -> None:
# Process each data file
price_column = config["price_column"]
for datafile in datafiles:
print(f"\n====== Processing {os.path.basename(datafile)} ======")
for day in sorted(days):
md_datafiles = [datafile for md_day, datafile in datafiles if md_day == day]
print(f"\n====== Processing {day} ======")
# Process data for this file
try:
@ -175,14 +183,14 @@ def main() -> None:
bt_results = run_backtest(
config=config,
datafile=datafile,
datafiles=md_datafiles,
price_column=price_column,
fit_method=fit_method,
instruments=instruments,
)
# Store results with file name as key
filename = os.path.basename(datafile)
# Store results with day name as key
filename = os.path.basename(day)
all_results[filename] = {
"trades": bt_results.trades.copy(),
"outstanding_positions": bt_results.outstanding_positions.copy(),
@ -198,12 +206,12 @@ def main() -> None:
}
}
)
bt_results.store_results_in_database(args.result_db, datafile)
bt_results.store_results_in_database(db_path=args.result_db, day=day)
print(f"Successfully processed {filename}")
except Exception as err:
print(f"Error processing {datafile}: {str(err)}")
print(f"Error processing {day}: {str(err)}")
import traceback
traceback.print_exc()

View File

@ -2,6 +2,7 @@ import glob
import os
from typing import Dict, List, Optional
import pandas as pd
from pt_trading.fit_method import PairsTradingFitMethod
@ -45,14 +46,14 @@ def resolve_datafiles(config: Dict, cli_datafiles: Optional[str] = None) -> List
def create_pairs(
datafile: str,
datafiles: List[str],
fit_method: PairsTradingFitMethod,
price_column: str,
config: Dict,
instruments: List[Dict[str, str]],
) -> List:
from tools.data_loader import load_market_data
from pt_trading.trading_pair import TradingPair
from tools.data_loader import load_market_data
all_indexes = range(len(instruments))
unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j]
@ -62,16 +63,17 @@ def create_pairs(
config_copy = config.copy()
config_copy["instruments"] = instruments
market_data_df = load_market_data(
market_data_df = pd.DataFrame()
for datafile in datafiles:
md_df = load_market_data(
datafile=datafile,
instruments=instruments,
db_table_name=config_copy["market_data_loading"][instruments[0]["instrument_type"]]["db_table_name"],
trading_hours=config_copy["trading_hours"],
)
market_data_df = pd.concat([market_data_df, md_df])
for a_index, b_index in unique_index_pairs:
from research.pt_backtest import TradingPair
pair = fit_method.create_trading_pair(
config=config_copy,
market_data=market_data_df,