pairs_trading/research/backtest.py
2026-01-11 18:17:05 +00:00

140 lines
4.8 KiB
Python

from __future__ import annotations
import os
from typing import Any, Dict, List, Tuple
# ---
from cvttpy_tools.app import App
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import CvttAppConfig
# ---
from cvttpy_trading.trading.instrument import ExchangeInstrument
from cvttpy_trading.settings.instruments import Instruments
# ---
from pairs_trading.lib.pt_strategy.results import (
PairResearchResult,
create_result_database,
store_config_in_database,
)
from pairs_trading.lib.pt_strategy.research_strategy import PtResearchStrategy
from pairs_trading.lib.tools.filetools import resolve_datafiles
InstrumentTypeT = str
class Runner(NamedObject):
def __init__(self):
App()
CvttAppConfig()
# App.instance().add_cmdline_arg(
# "--config", type=str, required=True, help="Path to the configuration file."
# )
App.instance().add_cmdline_arg(
"--date_pattern",
type=str,
required=True,
help="Date YYYYMMDD, allows * and ? wildcards",
)
App.instance().add_cmdline_arg(
"--instruments",
type=str,
required=True,
help="Comma-separated list of instrument symbols (e.g., COIN:EQUITY,GBTC:CRYPTO)",
)
App.instance().add_cmdline_arg(
"--result_db",
type=str,
required=True,
help="Path to SQLite database for storing results. Use 'NONE' to disable database output.",
)
App.instance().add_call(stage=App.Stage.Config, func=self._on_config())
App.instance().add_call(stage=App.Stage.Run, func=self.run())
async def _on_config(self) -> None:
# Resolve data files (CLI takes priority over config)
instruments: List[ExchangeInstrument] = self._get_instruments()
datafiles = resolve_datafiles(
config=CvttAppConfig.instance(),
date_pattern=App.instance().get_argument("date_pattern"),
instruments=instruments,
)
days = list(set([day for day, _ in datafiles]))
print(f"Found {len(datafiles)} data files to process:")
for df in datafiles:
print(f" - {df}")
# Create result database if needed
if App.instance().get_argument("result_db").upper() != "NONE":
create_result_database(App.instance().get_argument("result_db"))
# Initialize a dictionary to store all trade results
all_results: Dict[str, Dict[str, Any]] = {}
is_config_stored = False
# Process each data file
results = PairResearchResult(config=CvttAppConfig.instance())
for day in sorted(days):
md_datafiles = [datafile for md_day, datafile in datafiles if md_day == day]
if not all([os.path.exists(datafile) for datafile in md_datafiles]):
print(f"WARNING: insufficient data files: {md_datafiles}")
exit(1)
print(f"\n====== Processing {day} ======")
if not is_config_stored:
store_config_in_database(
db_path=App.instance().get_argument("result_db"),
config_file_path=App.instance().get_argument("config"),
config=CvttAppConfig.instance(),
datafiles=datafiles,
instruments=instruments,
)
is_config_stored = True
CvttAppConfig.instance().set_value("datafiles", md_datafiles)
pt_strategy = PtResearchStrategy(
config=CvttAppConfig.instance(),
instruments=instruments,
)
pt_strategy.run()
results.add_day_results(
day=day,
trades=pt_strategy.day_trades(),
outstanding_positions=pt_strategy.outstanding_positions(),
)
results.analyze_pair_performance()
def _get_instruments(self) -> List[ExchangeInstrument]:
res: List[ExchangeInstrument] = []
for inst in App.instance().get_argument("instruments").split(","):
instrument_type = inst.split(":")[0]
exchange_id = inst.split(":")[1]
instrument_id = inst.split(":")[2]
exch_inst: ExchangeInstrument = Instruments.instance().get_exch_inst(
exch_id=exchange_id, inst_id=instrument_id, src=f"{self.fname()}"
)
exch_inst.user_data_["instrument_type"] = instrument_type
res.append(exch_inst)
return res
async def run(self) -> None:
if App.instance().get_argument("result_db").upper() != "NONE":
print(
f'\nResults stored in database: {App.instance().get_argument("result_db")}'
)
else:
print("No results to display.")
if __name__ == "__main__":
Runner()
App.instance().run()