Compare commits

..

2 Commits

Author SHA1 Message Date
Oleg Sheynin
e97f76222c progress 2025-12-19 23:06:20 +00:00
Oleg Sheynin
38e1621b2f progress 2025-12-19 23:04:31 +00:00
26 changed files with 387 additions and 10591 deletions

View File

@ -1,9 +0,0 @@
{
"recommendations": [
"ms-python.python",
"ms-python.pylance",
"ms-python.black-formatter",
"ms-python.mypy-type-checker",
"ms-python.isort"
]
}

2
.vscode/launch.json vendored
View File

@ -28,7 +28,7 @@
"program": "${workspaceFolder}/bin/pairs_trader.py", "program": "${workspaceFolder}/bin/pairs_trader.py",
"console": "integratedTerminal", "console": "integratedTerminal",
"env": { "env": {
"PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.." "PYTHONPATH": "${workspaceFolder}/.."
}, },
"args": [ "args": [
"--config=${workspaceFolder}/configuration/pairs_trader.cfg", "--config=${workspaceFolder}/configuration/pairs_trader.cfg",

View File

@ -4,5 +4,7 @@
"path": ".." "path": ".."
} }
], ],
"settings": {} "settings": {
"workbench.colorTheme": "Noctis Minimus"
}
} }

112
.vscode/settings.__OLD__.json vendored Normal file
View File

@ -0,0 +1,112 @@
{
"PythonVersion": "3.12",
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
},
// ===========================================================
"workbench.activityBar.orientation": "vertical",
// ===========================================================
// "markdown.styles": [
// "/home/oleg/develop/cvtt2/.vscode/light-theme.css"
// ],
"markdown.preview.background": "#ffffff",
"markdown.preview.textEditorTheme": "light",
"markdown-pdf.styles": [
"/home/oleg/develop/cvtt2/.vscode/light-theme.css"
],
"editor.detectIndentation": false,
// Configure editor settings to be overridden for [yaml] language.
"[yaml]": {
"editor.insertSpaces": true,
"editor.tabSize": 4,
},
"pylint.args": [
"--disable=missing-docstring"
, "--disable=invalid-name"
, "--disable=too-few-public-methods"
, "--disable=broad-exception-raised"
, "--disable=broad-exception-caught"
, "--disable=pointless-string-statement"
, "--disable=unused-argument"
, "--disable=line-too-long"
, "--disable=import-outside-toplevel"
, "--disable=fixme"
, "--disable=protected-access"
, "--disable=logging-fstring-interpolation"
],
// ===== TESTING CONFIGURATION =====
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.testing.pytestArgs": [
"-v",
"--tb=short",
"--disable-warnings"
],
"python.testing.envVars": {
"PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.."
},
"python.testing.cwd": "${workspaceFolder}",
"python.testing.autoTestDiscoverOnSaveEnabled": true,
"python.testing.pytestPath": "/home/oleg/.pyenv/python3.12-venv/bin/pytest",
"python.testing.promptToConfigure": false,
"python.testing.pytest.enabled": true,
// Python interpreter settings
"python.defaultInterpreterPath": "/home/oleg/.pyenv/python3.12-venv/bin/python3.12",
// Environment variables for Python execution
"python.envFile": "${workspaceFolder}/.vscode/.env",
"python.terminal.activateEnvironment": false,
"python.terminal.activateEnvInCurrentTerminal": false,
// Global environment variables for VS Code Python extension
"terminal.integrated.env.linux": {
"PYTHONPATH": "/home/oleg/develop/:${env:PYTHONPATH}"
},
"pylint.enabled": true,
"github.copilot.enable": false,
"markdown.extension.print.theme": "dark",
"python.analysis.extraPaths": [
"${workspaceFolder}/..",
"${workspaceFolder}/lib"
],
// Try enabling regular Python language server alongside CursorPyright
"python.languageServer": "None",
"python.analysis.diagnosticMode": "workspace",
"workbench.colorTheme": "Atom One Dark",
"cursorpyright.analysis.enable": false,
"cursorpyright.analysis.extraPaths": [
"${workspaceFolder}/..",
"${workspaceFolder}/lib"
],
// Enable quick fixes for unused imports
"python.analysis.autoImportCompletions": true,
"python.analysis.fixAll": ["source.unusedImports"],
"python.analysis.typeCheckingMode": "basic",
// Enable code actions for CursorPyright
"cursorpyright.analysis.autoImportCompletions": true,
"cursorpyright.analysis.typeCheckingMode": "off",
"cursorpyright.reportUnusedImport": "warning",
"cursorpyright.reportUnusedVariable": "warning",
"cursorpyright.analysis.diagnosticMode": "workspace",
// Force enable code actions
"editor.lightBulb.enabled": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit",
"source.fixAll": "explicit",
"source.unusedImports": "explicit"
},
// Enable Python-specific code actions
"python.analysis.completeFunctionParens": true,
"python.analysis.addImport.exactMatchOnly": false,
"workbench.tree.indent": 24,
}

120
.vscode/settings.json vendored
View File

@ -1,112 +1,26 @@
{ {
"PythonVersion": "3.12",
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
},
// ===========================================================
"workbench.activityBar.orientation": "vertical",
// ===========================================================
// "markdown.styles": [
// "/home/oleg/develop/cvtt2/.vscode/light-theme.css"
// ],
"markdown.preview.background": "#ffffff",
"markdown.preview.textEditorTheme": "light",
"markdown-pdf.styles": [
"/home/oleg/develop/cvtt2/.vscode/light-theme.css"
],
"editor.detectIndentation": false,
// Configure editor settings to be overridden for [yaml] language.
"[yaml]": {
"editor.insertSpaces": true,
"editor.tabSize": 4,
},
"pylint.args": [
"--disable=missing-docstring"
, "--disable=invalid-name"
, "--disable=too-few-public-methods"
, "--disable=broad-exception-raised"
, "--disable=broad-exception-caught"
, "--disable=pointless-string-statement"
, "--disable=unused-argument"
, "--disable=line-too-long"
, "--disable=import-outside-toplevel"
, "--disable=fixme"
, "--disable=protected-access"
, "--disable=logging-fstring-interpolation"
],
// ===== TESTING CONFIGURATION =====
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true, "python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false,
"python.testing.pytestArgs": [ "python.testing.pytestArgs": [
"-v", "unittests"
"--tb=short",
"--disable-warnings"
], ],
"python.testing.envVars": {
"PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.."
},
"python.testing.cwd": "${workspaceFolder}", "python.testing.cwd": "${workspaceFolder}",
"python.testing.autoTestDiscoverOnSaveEnabled": true, "python.testing.autoTestDiscoverOnSaveEnabled": true,
"python.testing.pytestPath": "/home/oleg/.pyenv/python3.12-venv/bin/pytest", "python.defaultInterpreterPath": "/usr/bin/python3",
"python.testing.promptToConfigure": false, "python.testing.pytestPath": "python3",
"python.testing.pytest.enabled": true,
// Python interpreter settings
"python.defaultInterpreterPath": "/home/oleg/.pyenv/python3.12-venv/bin/python3.12",
// Environment variables for Python execution
"python.envFile": "${workspaceFolder}/.vscode/.env",
"python.terminal.activateEnvironment": false,
"python.terminal.activateEnvInCurrentTerminal": false,
// Global environment variables for VS Code Python extension
"terminal.integrated.env.linux": {
"PYTHONPATH": "/home/oleg/develop/:${env:PYTHONPATH}"
},
"pylint.enabled": true,
"github.copilot.enable": false,
"markdown.extension.print.theme": "dark",
"python.analysis.extraPaths": [ "python.analysis.extraPaths": [
"${workspaceFolder}/..", "${workspaceFolder}",
"${workspaceFolder}/lib" "${workspaceFolder}/..",
"${workspaceFolder}/unittests"
], ],
"python.envFile": "${workspaceFolder}/.env",
// Try enabling regular Python language server alongside CursorPyright "python.testing.debugPort": 3000,
"python.languageServer": "None", "python.linting.enabled": true,
"python.analysis.diagnosticMode": "workspace", "python.linting.pylintEnabled": false,
"workbench.colorTheme": "Atom One Dark", "python.linting.mypyEnabled": true,
"cursorpyright.analysis.enable": false, "files.associations": {
"cursorpyright.analysis.extraPaths": [ "*.py": "python"
"${workspaceFolder}/..",
"${workspaceFolder}/lib"
],
// Enable quick fixes for unused imports
"python.analysis.autoImportCompletions": true,
"python.analysis.fixAll": ["source.unusedImports"],
"python.analysis.typeCheckingMode": "basic",
// Enable code actions for CursorPyright
"cursorpyright.analysis.autoImportCompletions": true,
"cursorpyright.analysis.typeCheckingMode": "off",
"cursorpyright.reportUnusedImport": "warning",
"cursorpyright.reportUnusedVariable": "warning",
"cursorpyright.analysis.diagnosticMode": "workspace",
// Force enable code actions
"editor.lightBulb.enabled": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit",
"source.fixAll": "explicit",
"source.unusedImports": "explicit"
}, },
"python.testing.promptToConfigure": false,
// Enable Python-specific code actions "workbench.colorTheme": "Dracula Theme Soft"
"python.analysis.completeFunctionParens": true, }
"python.analysis.addImport.exactMatchOnly": false,
"workbench.tree.indent": 24,
}

View File

@ -1,181 +0,0 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "-------- Z-Score (OLS) --------",
},
{
"name": "CRYPTO z-score",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "research/pt_backtest.py",
"args": [
"--config=${workspaceFolder}/configuration/zscore.cfg",
"--instruments=ADA-USDT:CRYPTO:BNBSPOT,SOL-USDT:CRYPTO:BNBSPOT",
"--date_pattern=20250605",
"--result_db=${workspaceFolder}/research/results/crypto/%T.z-score.ADA-SOL.20250602.crypto_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "EQUITY z-score",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "research/pt_backtest.py",
"args": [
"--config=${workspaceFolder}/configuration/zscore.cfg",
"--instruments=COIN:EQUITY:ALPACA,MSTR:EQUITY:ALPACA",
"--date_pattern=2025060*",
"--result_db=${workspaceFolder}/research/results/equity/%T.z-score.COIN-MSTR.20250602.equity_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "EQUITY-CRYPTO z-score",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "research/pt_backtest.py",
"args": [
"--config=${workspaceFolder}/configuration/zscore.cfg",
"--instruments=COIN:EQUITY:ALPACA,BTC-USDT:CRYPTO:BNBSPOT",
"--date_pattern=2025060*",
"--result_db=${workspaceFolder}/research/results/intermarket/%T.z-score.COIN-BTC.20250601.equity_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "-------- VECM --------",
},
{
"name": "CRYPTO vecm",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "research/pt_backtest.py",
"args": [
"--config=${workspaceFolder}/configuration/vecm.cfg",
"--instruments=ADA-USDT:CRYPTO:BNBSPOT,SOL-USDT:CRYPTO:BNBSPOT",
"--date_pattern=2025060*",
"--result_db=${workspaceFolder}/research/results/crypto/%T.vecm.ADA-SOL.20250602.crypto_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "EQUITY vecm",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "research/pt_backtest.py",
"args": [
"--config=${workspaceFolder}/configuration/vecm.cfg",
"--instruments=COIN:EQUITY:ALPACA,MSTR:EQUITY:ALPACA",
"--date_pattern=2025060*",
"--result_db=${workspaceFolder}/research/results/equity/%T.vecm.COIN-MSTR.20250602.equity_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "EQUITY-CRYPTO vecm",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "research/pt_backtest.py",
"args": [
"--config=${workspaceFolder}/configuration/vecm.cfg",
"--instruments=COIN:EQUITY:ALPACA,BTC-USDT:CRYPTO:BNBSPOT",
"--date_pattern=2025060*",
"--result_db=${workspaceFolder}/research/results/intermarket/%T.vecm.COIN-BTC.20250601.equity_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "-------- New ZSCORE --------",
},
{
"name": "New CRYPTO z-score",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "${workspaceFolder}/research/backtest_new.py",
"args": [
"--config=${workspaceFolder}/configuration/new_zscore.cfg",
"--instruments=ADA-USDT:CRYPTO:BNBSPOT,SOL-USDT:CRYPTO:BNBSPOT",
"--date_pattern=2025060*",
"--result_db=${workspaceFolder}/research/results/crypto/%T.new_zscore.ADA-SOL.2025060-.crypto_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "New CRYPTO vecm",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "${workspaceFolder}/research/backtest_new.py",
"args": [
"--config=${workspaceFolder}/configuration/new_vecm.cfg",
"--instruments=ADA-USDT:CRYPTO:BNBSPOT,SOL-USDT:CRYPTO:BNBSPOT",
"--date_pattern=20250605",
"--result_db=${workspaceFolder}/research/results/crypto/%T.vecm.ADA-SOL.20250605.crypto_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "-------- Viz Test --------",
},
{
"name": "Viz Test",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "${workspaceFolder}/research/viz_test.py",
"args": [
"--config=${workspaceFolder}/configuration/new_zscore.cfg",
"--instruments=ADA-USDT:CRYPTO:BNBSPOT,SOL-USDT:CRYPTO:BNBSPOT",
"--date_pattern=20250605",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
}
]
}

View File

@ -1,44 +0,0 @@
{
"market_data_loading": {
"CRYPTO": {
"data_directory": "./data/crypto",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "PAIR-",
},
"EQUITY": {
"data_directory": "./data/equity",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "STOCK-",
}
},
# ====== Funding ======
"funding_per_pair": 2000.0,
# ====== Trading Parameters ======
"stat_model_price": "close", # "vwap"
"execution_price": {
"column": "vwap",
"shift": 1,
},
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 1.0,
"training_minutes": 120, # TODO Remove this
"training_size": 120,
"fit_method_class": "pt_trading.vecm_rolling_fit.VECMRollingFit",
# ====== Stop Conditions ======
"stop_close_conditions": {
"profit": 2.0,
"loss": -0.5
}
# ====== End of Session Closeout ======
"close_outstanding_positions": true,
# "close_outstanding_positions": false,
"trading_hours": {
"timezone": "America/New_York",
"begin_session": "7:30:00",
"end_session": "18:30:00",
}
}

View File

@ -1,43 +0,0 @@
{
"market_data_loading": {
"CRYPTO": {
"data_directory": "./data/crypto",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "PAIR-",
},
"EQUITY": {
"data_directory": "./data/equity",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "STOCK-",
}
},
# ====== Funding ======
"funding_per_pair": 2000.0,
# ====== Trading Parameters ======
"stat_model_price": "close",
# "execution_price": {
# "column": "vwap",
# "shift": 1,
# },
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 0.5,
"training_minutes": 120, # TODO Remove this
"training_size": 120,
"fit_method_class": "pt_trading.z-score_rolling_fit.ZScoreRollingFit",
# ====== Stop Conditions ======
"stop_close_conditions": {
"profit": 2.0,
"loss": -0.5
}
# ====== End of Session Closeout ======
"close_outstanding_positions": true,
# "close_outstanding_positions": false,
"trading_hours": {
"timezone": "America/New_York",
"begin_session": "7:30:00",
"end_session": "18:30:00",
}
}

View File

@ -1,43 +0,0 @@
{
"market_data_loading": {
"CRYPTO": {
"data_directory": "./data/crypto",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "PAIR-",
},
"EQUITY": {
"data_directory": "./data/equity",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "STOCK-",
}
},
# ====== Funding ======
"funding_per_pair": 2000.0,
# ====== Trading Parameters ======
"stat_model_price": "close",
"execution_price": {
"column": "vwap",
"shift": 1,
},
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 0.5,
"training_minutes": 120, # TODO Remove this
"training_size": 120,
"fit_method_class": "pt_trading.z-score_rolling_fit.ZScoreRollingFit",
# ====== Stop Conditions ======
"stop_close_conditions": {
"profit": 2.0,
"loss": -0.5
}
# ====== End of Session Closeout ======
"close_outstanding_positions": true,
# "close_outstanding_positions": false,
"trading_hours": {
"timezone": "America/New_York",
"begin_session": "9:30:00",
"end_session": "18:30:00",
}
}

View File

@ -1,304 +0,0 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, Optional, cast
import pandas as pd # type: ignore[import]
from pt_trading.fit_method import PairsTradingFitMethod
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import PairState, TradingPair
NanoPerMin = 1e9
class ExpandingWindowFit(PairsTradingFitMethod):
"""
N O T E:
=========
- This class remains to be abstract
- The following methods are to be implemented in the subclass:
- create_trading_pair()
=========
"""
def __init__(self) -> None:
super().__init__()
def run_pair(
self, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]:
print(f"***{pair}*** STARTING....")
config = pair.config_
start_idx = pair.get_begin_index()
end_index = pair.get_end_index()
pair.user_data_["state"] = PairState.INITIAL
# Initialize trades DataFrame with proper dtypes to avoid concatenation warnings
pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS).astype(
{
"time": "datetime64[ns]",
"symbol": "string",
"side": "string",
"action": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object",
}
)
training_minutes = config["training_minutes"]
while training_minutes + 1 < end_index:
pair.get_datasets(
training_minutes=training_minutes,
training_start_index=start_idx,
testing_size=1,
)
# ================================ PREDICTION ================================
try:
self.pair_predict_result_ = pair.predict()
except Exception as e:
raise RuntimeError(
f"{pair}: TrainingPrediction failed: {str(e)}"
) from e
training_minutes += 1
self._create_trading_signals(pair, config, bt_result)
print(f"***{pair}*** FINISHED *** Num Trades:{len(pair.user_data_['trades'])}")
return pair.get_trades()
def _create_trading_signals(
self, pair: TradingPair, config: Dict, bt_result: BacktestResult
) -> None:
predicted_df = self.pair_predict_result_
assert predicted_df is not None
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_close_trshld"]
for curr_predicted_row_idx in range(len(predicted_df)):
pred_row = predicted_df.iloc[curr_predicted_row_idx]
scaled_disequilibrium = pred_row["scaled_disequilibrium"]
if pair.user_data_["state"] in [
PairState.INITIAL,
PairState.CLOSE,
PairState.CLOSE_POSITION,
PairState.CLOSE_STOP_LOSS,
PairState.CLOSE_STOP_PROFIT,
]:
if scaled_disequilibrium >= open_threshold:
open_trades = self._get_open_trades(
pair, row=pred_row, open_threshold=open_threshold
)
if open_trades is not None:
open_trades["status"] = PairState.OPEN.name
print(f"OPEN TRADES:\n{open_trades}")
pair.add_trades(open_trades)
pair.user_data_["state"] = PairState.OPEN
pair.on_open_trades(open_trades)
elif pair.user_data_["state"] == PairState.OPEN:
if scaled_disequilibrium <= close_threshold:
close_trades = self._get_close_trades(
pair, row=pred_row, close_threshold=close_threshold
)
if close_trades is not None:
close_trades["status"] = PairState.CLOSE.name
print(f"CLOSE TRADES:\n{close_trades}")
pair.add_trades(close_trades)
pair.user_data_["state"] = PairState.CLOSE
pair.on_close_trades(close_trades)
elif pair.to_stop_close_conditions(predicted_row=pred_row):
close_trades = self._get_close_trades(
pair, row=pred_row, close_threshold=close_threshold
)
if close_trades is not None:
close_trades["status"] = pair.user_data_[
"stop_close_state"
].name
print(f"STOP CLOSE TRADES:\n{close_trades}")
pair.add_trades(close_trades)
pair.user_data_["state"] = pair.user_data_["stop_close_state"]
pair.on_close_trades(close_trades)
# Outstanding positions
if pair.user_data_["state"] == PairState.OPEN:
print(f"{pair}: *** Position is NOT CLOSED. ***")
# outstanding positions
if config["close_outstanding_positions"]:
close_position_row = pd.Series(pair.market_data_.iloc[-2])
close_position_row["disequilibrium"] = 0.0
close_position_row["scaled_disequilibrium"] = 0.0
close_position_row["signed_scaled_disequilibrium"] = 0.0
close_position_trades = self._get_close_trades(
pair=pair, row=close_position_row, close_threshold=close_threshold
)
if close_position_trades is not None:
close_position_trades["status"] = PairState.CLOSE_POSITION.name
print(f"CLOSE_POSITION TRADES:\n{close_position_trades}")
pair.add_trades(close_position_trades)
pair.user_data_["state"] = PairState.CLOSE_POSITION
pair.on_close_trades(close_position_trades)
else:
if predicted_df is not None:
bt_result.handle_outstanding_position(
pair=pair,
pair_result_df=predicted_df,
last_row_index=0,
open_side_a=pair.user_data_["open_side_a"],
open_side_b=pair.user_data_["open_side_b"],
open_px_a=pair.user_data_["open_px_a"],
open_px_b=pair.user_data_["open_px_b"],
open_tstamp=pair.user_data_["open_tstamp"],
)
def _get_open_trades(
self, pair: TradingPair, row: pd.Series, open_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.exec_prices_colnames()
open_row = row
open_tstamp = open_row["tstamp"]
open_disequilibrium = open_row["disequilibrium"]
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
signed_scaled_disequilibrium = open_row["signed_scaled_disequilibrium"]
open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"]
# creating the trades
print(f"OPEN_TRADES: {row["tstamp"]} {open_scaled_disequilibrium=}")
if open_disequilibrium > 0:
open_side_a = "SELL"
open_side_b = "BUY"
close_side_a = "BUY"
close_side_b = "SELL"
else:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
# save closing sides
pair.user_data_["open_side_a"] = open_side_a
pair.user_data_["open_side_b"] = open_side_b
pair.user_data_["open_px_a"] = open_px_a
pair.user_data_["open_px_b"] = open_px_b
pair.user_data_["open_tstamp"] = open_tstamp
pair.user_data_["close_side_a"] = close_side_a
pair.user_data_["close_side_b"] = close_side_b
# create opening trades
trd_signal_tuples = [
(
open_tstamp,
pair.symbol_a_,
open_side_a,
"OPEN",
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
(
open_tstamp,
pair.symbol_b_,
open_side_b,
"OPEN",
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
]
# Create DataFrame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype(
{
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"signed_scaled_disequilibrium": "float64",
"pair": "object",
}
)
def _get_close_trades(
self, pair: TradingPair, row: pd.Series, close_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.exec_prices_colnames()
close_row = row
close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"]
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
signed_scaled_disequilibrium = close_row["signed_scaled_disequilibrium"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
close_side_a = pair.user_data_["close_side_a"]
close_side_b = pair.user_data_["close_side_b"]
trd_signal_tuples = [
(
close_tstamp,
pair.symbol_a_,
close_side_a,
"CLOSE",
close_px_a,
close_disequilibrium,
close_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
(
close_tstamp,
pair.symbol_b_,
close_side_b,
"CLOSE",
close_px_b,
close_disequilibrium,
close_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype(
{
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"signed_scaled_disequilibrium": "float64",
"pair": "object",
}
)
def reset(self) -> None:
pass

View File

@ -1,52 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Optional, cast
import pandas as pd
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import TradingPair
NanoPerMin = 1e9
class PairsTradingFitMethod(ABC):
TRADES_COLUMNS = [
"time",
"symbol",
"side",
"action",
"price",
"disequilibrium",
"scaled_disequilibrium",
"signed_scaled_disequilibrium",
"pair",
]
@staticmethod
def create(config: Dict) -> PairsTradingFitMethod:
import importlib
fit_method_class_name = config.get("fit_method_class", None)
assert fit_method_class_name is not None
module_name, class_name = fit_method_class_name.rsplit(".", 1)
module = importlib.import_module(module_name)
fit_method = getattr(module, class_name)()
return cast(PairsTradingFitMethod, fit_method)
@abstractmethod
def run_pair(
self, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]: ...
@abstractmethod
def reset(self) -> None: ...
@abstractmethod
def create_trading_pair(
self,
config: Dict,
market_data: pd.DataFrame,
symbol_a: str,
symbol_b: str,
) -> TradingPair: ...

View File

@ -1,751 +0,0 @@
import os
import sqlite3
from datetime import date, datetime
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from pt_trading.trading_pair import TradingPair
# Recommended replacement adapters and converters for Python 3.12+
# From: https://docs.python.org/3/library/sqlite3.html#sqlite3-adapter-converter-recipes
def adapt_date_iso(val: date) -> str:
"""Adapt datetime.date to ISO 8601 date."""
return val.isoformat()
def adapt_datetime_iso(val: datetime) -> str:
"""Adapt datetime.datetime to timezone-naive ISO 8601 date."""
return val.isoformat()
def convert_date(val: bytes) -> date:
"""Convert ISO 8601 date to datetime.date object."""
return datetime.fromisoformat(val.decode()).date()
def convert_datetime(val: bytes) -> datetime:
"""Convert ISO 8601 datetime to datetime.datetime object."""
return datetime.fromisoformat(val.decode())
# Register the adapters and converters
sqlite3.register_adapter(date, adapt_date_iso)
sqlite3.register_adapter(datetime, adapt_datetime_iso)
sqlite3.register_converter("date", convert_date)
sqlite3.register_converter("datetime", convert_datetime)
def create_result_database(db_path: str) -> None:
"""
Create the SQLite database and required tables if they don't exist.
"""
try:
# Create directory if it doesn't exist
db_dir = os.path.dirname(db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
print(f"Created directory: {db_dir}")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create the pt_bt_results table for completed trades
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS pt_bt_results (
date DATE,
pair TEXT,
symbol TEXT,
open_time DATETIME,
open_side TEXT,
open_price REAL,
open_quantity INTEGER,
open_disequilibrium REAL,
close_time DATETIME,
close_side TEXT,
close_price REAL,
close_quantity INTEGER,
close_disequilibrium REAL,
symbol_return REAL,
pair_return REAL,
close_condition TEXT
)
"""
)
cursor.execute("DELETE FROM pt_bt_results;")
# Create the outstanding_positions table for open positions
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS outstanding_positions (
date DATE,
pair TEXT,
symbol TEXT,
position_quantity REAL,
last_price REAL,
unrealized_return REAL,
open_price REAL,
open_side TEXT
)
"""
)
cursor.execute("DELETE FROM outstanding_positions;")
# Create the config table for storing configuration JSON for reference
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS config (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_timestamp DATETIME,
config_file_path TEXT,
config_json TEXT,
fit_method_class TEXT,
datafiles TEXT,
instruments TEXT
)
"""
)
cursor.execute("DELETE FROM config;")
conn.commit()
conn.close()
except Exception as e:
print(f"Error creating result database: {str(e)}")
raise
def store_config_in_database(
db_path: str,
config_file_path: str,
config: Dict,
fit_method_class: str,
datafiles: List[Tuple[str, str]],
instruments: List[Dict[str, str]],
) -> None:
"""
Store configuration information in the database for reference.
"""
import json
if db_path.upper() == "NONE":
return
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Convert config to JSON string
config_json = json.dumps(config, indent=2, default=str)
# Convert lists to comma-separated strings for storage
datafiles_str = ", ".join([f"{datafile}" for _, datafile in datafiles])
instruments_str = ", ".join(
[
f"{inst['symbol']}:{inst['instrument_type']}:{inst['exchange_id']}"
for inst in instruments
]
)
# Insert configuration record
cursor.execute(
"""
INSERT INTO config (
run_timestamp, config_file_path, config_json, fit_method_class, datafiles, instruments
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
datetime.now(),
config_file_path,
config_json,
fit_method_class,
datafiles_str,
instruments_str,
),
)
conn.commit()
conn.close()
print(f"Configuration stored in database")
except Exception as e:
print(f"Error storing configuration in database: {str(e)}")
import traceback
traceback.print_exc()
def convert_timestamp(timestamp: Any) -> Optional[datetime]:
"""Convert pandas Timestamp to Python datetime object for SQLite compatibility."""
if timestamp is None:
return None
if isinstance(timestamp, pd.Timestamp):
return timestamp.to_pydatetime()
elif isinstance(timestamp, datetime):
return timestamp
elif isinstance(timestamp, date):
return datetime.combine(timestamp, datetime.min.time())
elif isinstance(timestamp, str):
return datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
elif isinstance(timestamp, int):
return datetime.fromtimestamp(timestamp)
else:
raise ValueError(f"Unsupported timestamp type: {type(timestamp)}")
class PairResarchResult:
pair_: TradingPair
trades_: Dict[str, Dict[str, Any]]
outstanding_positions_: List[Dict[str, Any]]
def __init__(self, config: Dict[str, Any], pair: TradingPair, trades: Dict[str, Dict[str, Any]], outstanding_positions: List[Dict[str, Any]]):
self.config = config
self.pair_ = pair
self.trades_ = trades
self.outstanding_positions_ = outstanding_positions
class BacktestResult:
"""
Class to handle backtest results, trades tracking, PnL calculations, and reporting.
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.trades: Dict[str, Dict[str, Any]] = {}
self.total_realized_pnl = 0.0
self.outstanding_positions: List[Dict[str, Any]] = []
self.symbol_roundtrip_trades_: Dict[str, List[Dict[str, Any]]] = {}
def add_trade(
self,
pair_nm: str,
symbol: str,
side: str,
action: str,
price: Any,
disequilibrium: Optional[float] = None,
scaled_disequilibrium: Optional[float] = None,
timestamp: Optional[datetime] = None,
status: Optional[str] = None,
) -> None:
"""Add a trade to the results tracking."""
pair_nm = str(pair_nm)
if pair_nm not in self.trades:
self.trades[pair_nm] = {symbol: []}
if symbol not in self.trades[pair_nm]:
self.trades[pair_nm][symbol] = []
self.trades[pair_nm][symbol].append(
{
"symbol": symbol,
"side": side,
"action": action,
"price": price,
"disequilibrium": disequilibrium,
"scaled_disequilibrium": scaled_disequilibrium,
"timestamp": timestamp,
"status": status,
}
)
def add_outstanding_position(self, position: Dict[str, Any]) -> None:
"""Add an outstanding position to tracking."""
self.outstanding_positions.append(position)
def add_realized_pnl(self, realized_pnl: float) -> None:
"""Add realized PnL to the total."""
self.total_realized_pnl += realized_pnl
def get_total_realized_pnl(self) -> float:
"""Get total realized PnL."""
return self.total_realized_pnl
def get_outstanding_positions(self) -> List[Dict[str, Any]]:
"""Get all outstanding positions."""
return self.outstanding_positions
def get_trades(self) -> Dict[str, Dict[str, Any]]:
"""Get all trades."""
return self.trades
def clear_trades(self) -> None:
"""Clear all trades (used when processing new files)."""
self.trades.clear()
def collect_single_day_results(self, pairs_trades: List[pd.DataFrame]) -> None:
"""Collect and process single day trading results."""
result = pd.concat(pairs_trades, ignore_index=True)
result["time"] = pd.to_datetime(result["time"])
result = result.set_index("time").sort_index()
print("\n -------------- Suggested Trades ")
print(result)
for row in result.itertuples():
side = row.side
action = row.action
symbol = row.symbol
price = row.price
disequilibrium = getattr(row, "disequilibrium", None)
scaled_disequilibrium = getattr(row, "scaled_disequilibrium", None)
if hasattr(row, "time"):
timestamp = getattr(row, "time")
else:
timestamp = convert_timestamp(row.Index)
status = row.status
self.add_trade(
pair_nm=str(row.pair),
symbol=str(symbol),
side=str(side),
action=str(action),
price=float(str(price)),
disequilibrium=disequilibrium,
scaled_disequilibrium=scaled_disequilibrium,
timestamp=timestamp,
status=str(status) if status is not None else "?",
)
def print_single_day_results(self) -> None:
"""Print single day results summary."""
for pair, symbols in self.trades.items():
print(f"\n--- {pair} ---")
for symbol, trades in symbols.items():
for trade_data in trades:
if len(trade_data) >= 2:
side, price = trade_data[:2]
print(f"{symbol} {side} at ${price}")
def print_results_summary(self, all_results: Dict[str, Dict[str, Any]]) -> None:
"""Print summary of all processed files."""
print("\n====== Summary of All Processed Files ======")
for filename, data in all_results.items():
trade_count = sum(
len(trades)
for symbol_trades in data["trades"].values()
for trades in symbol_trades.values()
)
print(f"{filename}: {trade_count} trades")
def calculate_returns(self, all_results: Dict[str, Dict[str, Any]]) -> None:
"""Calculate and print returns by day and pair."""
def _symbol_return(trade1_side: str, trade1_px: float, trade2_side: str, trade2_px: float) -> float:
if trade1_side == "BUY" and trade2_side == "SELL":
return (trade2_px - trade1_px) / trade1_px * 100
elif trade1_side == "SELL" and trade2_side == "BUY":
return (trade1_px - trade2_px) / trade1_px * 100
else:
return 0
print("\n====== Returns By Day and Pair ======")
trades = []
for filename, data in all_results.items():
pairs = list(data["trades"].keys())
for pair in pairs:
self.symbol_roundtrip_trades_[pair] = []
trades_dict = data["trades"][pair]
for symbol in trades_dict.keys():
trades.extend(trades_dict[symbol])
trades = sorted(trades, key=lambda x: (x["timestamp"], x["symbol"]))
print(f"\n--- {filename} ---")
self.outstanding_positions = data["outstanding_positions"]
day_return = 0.0
for idx in range(0, len(trades), 4):
symbol_a = trades[idx]["symbol"]
trade_a_1 = trades[idx]
trade_a_2 = trades[idx + 2]
symbol_b = trades[idx + 1]["symbol"]
trade_b_1 = trades[idx + 1]
trade_b_2 = trades[idx + 3]
symbol_return = 0
assert (
trade_a_1["timestamp"] < trade_a_2["timestamp"]
), f"Trade 1: {trade_a_1['timestamp']} is not less than Trade 2: {trade_a_2['timestamp']}"
assert (
trade_a_1["action"] == "OPEN" and trade_a_2["action"] == "CLOSE"
), f"Trade 1: {trade_a_1['action']} and Trade 2: {trade_a_2['action']} are the same"
# Calculate return based on action combination
trade_return = 0
symbol_a_return = _symbol_return(trade_a_1["side"], trade_a_1["price"], trade_a_2["side"], trade_a_2["price"])
symbol_b_return = _symbol_return(trade_b_1["side"], trade_b_1["price"], trade_b_2["side"], trade_b_2["price"])
pair_return = symbol_a_return + symbol_b_return
self.symbol_roundtrip_trades_[pair].append(
{
"symbol": symbol_a,
"open_side": trade_a_1["side"],
"open_action": trade_a_1["action"],
"open_price": trade_a_1["price"],
"close_side": trade_a_2["side"],
"close_action": trade_a_2["action"],
"close_price": trade_a_2["price"],
"symbol_return": symbol_a_return,
"open_disequilibrium": trade_a_1["disequilibrium"],
"open_scaled_disequilibrium": trade_a_1["scaled_disequilibrium"],
"close_disequilibrium": trade_a_2["disequilibrium"],
"close_scaled_disequilibrium": trade_a_2["scaled_disequilibrium"],
"open_time": trade_a_1["timestamp"],
"close_time": trade_a_2["timestamp"],
"shares": self.config["funding_per_pair"] / 2 / trade_a_1["price"],
"is_completed": True,
"close_condition": trade_a_2["status"],
"pair_return": pair_return
}
)
self.symbol_roundtrip_trades_[pair].append(
{
"symbol": symbol_b,
"open_side": trade_b_1["side"],
"open_action": trade_b_1["action"],
"open_price": trade_b_1["price"],
"close_side": trade_b_2["side"],
"close_action": trade_b_2["action"],
"close_price": trade_b_2["price"],
"symbol_return": symbol_b_return,
"open_disequilibrium": trade_b_1["disequilibrium"],
"open_scaled_disequilibrium": trade_b_1["scaled_disequilibrium"],
"close_disequilibrium": trade_b_2["disequilibrium"],
"close_scaled_disequilibrium": trade_b_2["scaled_disequilibrium"],
"open_time": trade_b_1["timestamp"],
"close_time": trade_b_2["timestamp"],
"shares": self.config["funding_per_pair"] / 2 / trade_b_1["price"],
"is_completed": True,
"close_condition": trade_b_2["status"],
"pair_return": pair_return
}
)
# Print pair returns with disequilibrium information
day_return = 0.0
if pair in self.symbol_roundtrip_trades_:
print(f"{pair}:")
pair_return = 0.0
for trd in self.symbol_roundtrip_trades_[pair]:
disequil_info = ""
if (
trd["open_scaled_disequilibrium"] is not None
and trd["open_scaled_disequilibrium"] is not None
):
disequil_info = f" | Open Dis-eq: {trd['open_scaled_disequilibrium']:.2f},"
f" Close Dis-eq: {trd['open_scaled_disequilibrium']:.2f}"
print(
f" {trd['open_time'].time()}-{trd['close_time'].time()} {trd['symbol']}: "
f" {trd['open_side']} @ ${trd['open_price']:.2f},"
f" {trd["close_side"]} @ ${trd["close_price"]:.2f},"
f" Return: {trd['symbol_return']:.2f}%{disequil_info}"
)
pair_return += trd["symbol_return"]
print(f" Pair Total Return: {pair_return:.2f}%")
day_return += pair_return
# Print day total return and add to global realized PnL
if day_return != 0:
print(f" Day Total Return: {day_return:.2f}%")
self.add_realized_pnl(day_return)
def print_outstanding_positions(self) -> None:
"""Print all outstanding positions with share quantities and current values."""
if not self.get_outstanding_positions():
print("\n====== NO OUTSTANDING POSITIONS ======")
return
print(f"\n====== OUTSTANDING POSITIONS ======")
print(
f"{'Pair':<15}"
f" {'Symbol':<10}"
f" {'Side':<4}"
f" {'Shares':<10}"
f" {'Open $':<8}"
f" {'Current $':<10}"
f" {'Value $':<12}"
f" {'Disequilibrium':<15}"
)
print("-" * 100)
total_value = 0.0
for pos in self.get_outstanding_positions():
# Print position A
print(
f"{pos['pair']:<15}"
f" {pos['symbol_a']:<10}"
f" {pos['side_a']:<4}"
f" {pos['shares_a']:<10.2f}"
f" {pos['open_px_a']:<8.2f}"
f" {pos['current_px_a']:<10.2f}"
f" {pos['current_value_a']:<12.2f}"
f" {'':<15}"
)
# Print position B
print(
f"{'':<15}"
f" {pos['symbol_b']:<10}"
f" {pos['side_b']:<4}"
f" {pos['shares_b']:<10.2f}"
f" {pos['open_px_b']:<8.2f}"
f" {pos['current_px_b']:<10.2f}"
f" {pos['current_value_b']:<12.2f}"
)
# Print pair totals with disequilibrium info
print(
f"{'':<15}"
f" {'PAIR TOTAL':<10}"
f" {'':<4}"
f" {'':<10}"
f" {'':<8}"
f" {'':<10}"
f" {pos['total_current_value']:<12.2f}"
)
# Print disequilibrium details
print(
f"{'':<15}"
f" {'DISEQUIL':<10}"
f" {'':<4}"
f" {'':<10}"
f" {'':<8}"
f" {'':<10}"
f" Raw: {pos['current_disequilibrium']:<6.4f}"
f" Scaled: {pos['current_scaled_disequilibrium']:<6.4f}"
)
print("-" * 100)
total_value += pos["total_current_value"]
print(f"{'TOTAL OUTSTANDING VALUE':<80} ${total_value:<12.2f}")
def print_grand_totals(self) -> None:
"""Print grand totals across all pairs."""
print(f"\n====== GRAND TOTALS ACROSS ALL PAIRS ======")
print(f"Total Realized PnL: {self.get_total_realized_pnl():.2f}%")
def handle_outstanding_position(
self,
pair: TradingPair,
pair_result_df: pd.DataFrame,
last_row_index: int,
open_side_a: str,
open_side_b: str,
open_px_a: float,
open_px_b: float,
open_tstamp: datetime,
) -> Tuple[float, float, float]:
"""
Handle calculation and tracking of outstanding positions when no close signal is found.
Args:
pair: TradingPair object
pair_result_df: DataFrame with pair results
last_row_index: Index of the last row in the data
open_side_a, open_side_b: Trading sides for symbols A and B
open_px_a, open_px_b: Opening prices for symbols A and B
open_tstamp: Opening timestamp
"""
if pair_result_df is None or pair_result_df.empty:
return 0, 0, 0
last_row = pair_result_df.loc[last_row_index]
last_tstamp = last_row["tstamp"]
colname_a, colname_b = pair.exec_prices_colnames()
last_px_a = last_row[colname_a]
last_px_b = last_row[colname_b]
# Calculate share quantities based on funding per pair
# Split funding equally between the two positions
funding_per_position = self.config["funding_per_pair"] / 2
shares_a = funding_per_position / open_px_a
shares_b = funding_per_position / open_px_b
# Calculate current position values (shares * current price)
current_value_a = shares_a * last_px_a * (-1 if open_side_a == "SELL" else 1)
current_value_b = shares_b * last_px_b * (-1 if open_side_b == "SELL" else 1)
total_current_value = current_value_a + current_value_b
# Get disequilibrium information
current_disequilibrium = last_row["disequilibrium"]
current_scaled_disequilibrium = last_row["scaled_disequilibrium"]
# Store outstanding positions
self.add_outstanding_position(
{
"pair": str(pair),
"symbol_a": pair.symbol_a_,
"symbol_b": pair.symbol_b_,
"side_a": open_side_a,
"side_b": open_side_b,
"shares_a": shares_a,
"shares_b": shares_b,
"open_px_a": open_px_a,
"open_px_b": open_px_b,
"current_px_a": last_px_a,
"current_px_b": last_px_b,
"current_value_a": current_value_a,
"current_value_b": current_value_b,
"total_current_value": total_current_value,
"open_time": open_tstamp,
"last_time": last_tstamp,
"current_abs_term": current_scaled_disequilibrium,
"current_disequilibrium": current_disequilibrium,
"current_scaled_disequilibrium": current_scaled_disequilibrium,
}
)
# Print position details
print(f"{pair}: NO CLOSE SIGNAL FOUND - Position held until end of session")
print(f" Open: {open_tstamp} | Last: {last_tstamp}")
print(
f" {pair.symbol_a_}: {open_side_a} {shares_a:.2f} shares @ ${open_px_a:.2f} -> ${last_px_a:.2f} | Value: ${current_value_a:.2f}"
)
print(
f" {pair.symbol_b_}: {open_side_b} {shares_b:.2f} shares @ ${open_px_b:.2f} -> ${last_px_b:.2f} | Value: ${current_value_b:.2f}"
)
print(f" Total Value: ${total_current_value:.2f}")
print(
f" Disequilibrium: {current_disequilibrium:.4f} | Scaled: {current_scaled_disequilibrium:.4f}"
)
return current_value_a, current_value_b, total_current_value
def store_results_in_database(
self, db_path: str, day: str
) -> None:
"""
Store backtest results in the SQLite database.
"""
if db_path.upper() == "NONE":
return
try:
# Extract date from datafile name (assuming format like 20250528.mktdata.ohlcv.db)
date_str = day
# Convert to proper date format
try:
date_obj = datetime.strptime(date_str, "%Y%m%d").date()
except ValueError:
# If date parsing fails, use current date
date_obj = datetime.now().date()
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Process each trade from bt_result
trades = self.get_trades()
for pair_name, _ in trades.items():
# Second pass: insert completed trade records into database
for trade_pair in sorted(self.symbol_roundtrip_trades_[pair_name], key=lambda x: x["open_time"]):
# Only store completed trades in pt_bt_results table
cursor.execute(
"""
INSERT INTO pt_bt_results (
date, pair, symbol, open_time, open_side, open_price,
open_quantity, open_disequilibrium, close_time, close_side,
close_price, close_quantity, close_disequilibrium,
symbol_return, pair_return, close_condition
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
date_obj,
pair_name,
trade_pair["symbol"],
trade_pair["open_time"],
trade_pair["open_side"],
trade_pair["open_price"],
trade_pair["shares"],
trade_pair["open_scaled_disequilibrium"],
trade_pair["close_time"],
trade_pair["close_side"],
trade_pair["close_price"],
trade_pair["shares"],
trade_pair["close_scaled_disequilibrium"],
trade_pair["symbol_return"],
trade_pair["pair_return"],
trade_pair["close_condition"]
),
)
# Store outstanding positions in separate table
outstanding_positions = self.get_outstanding_positions()
for pos in outstanding_positions:
# Calculate position quantity (negative for SELL positions)
position_qty_a = (
pos["shares_a"] if pos["side_a"] == "BUY" else -pos["shares_a"]
)
position_qty_b = (
pos["shares_b"] if pos["side_b"] == "BUY" else -pos["shares_b"]
)
# Calculate unrealized returns
# For symbol A: (current_price - open_price) / open_price * 100 * position_direction
unrealized_return_a = (
(pos["current_px_a"] - pos["open_px_a"]) / pos["open_px_a"] * 100
) * (1 if pos["side_a"] == "BUY" else -1)
unrealized_return_b = (
(pos["current_px_b"] - pos["open_px_b"]) / pos["open_px_b"] * 100
) * (1 if pos["side_b"] == "BUY" else -1)
# Store outstanding position for symbol A
cursor.execute(
"""
INSERT INTO outstanding_positions (
date, pair, symbol, position_quantity, last_price, unrealized_return, open_price, open_side
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
date_obj,
pos["pair"],
pos["symbol_a"],
position_qty_a,
pos["current_px_a"],
unrealized_return_a,
pos["open_px_a"],
pos["side_a"],
),
)
# Store outstanding position for symbol B
cursor.execute(
"""
INSERT INTO outstanding_positions (
date, pair, symbol, position_quantity, last_price, unrealized_return, open_price, open_side
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
date_obj,
pos["pair"],
pos["symbol_b"],
position_qty_b,
pos["current_px_b"],
unrealized_return_b,
pos["open_px_b"],
pos["side_b"],
),
)
conn.commit()
conn.close()
except Exception as e:
print(f"Error storing results in database: {str(e)}")
import traceback
traceback.print_exc()

View File

@ -1,319 +0,0 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, Optional, cast
import pandas as pd # type: ignore[import]
from pt_trading.fit_method import PairsTradingFitMethod
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import PairState, TradingPair
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults
NanoPerMin = 1e9
class RollingFit(PairsTradingFitMethod):
"""
N O T E:
=========
- This class remains to be abstract
- The following methods are to be implemented in the subclass:
- create_trading_pair()
=========
"""
def __init__(self) -> None:
super().__init__()
def run_pair(
self, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]:
print(f"***{pair}*** STARTING....")
config = pair.config_
curr_training_start_idx = pair.get_begin_index()
end_index = pair.get_end_index()
pair.user_data_["state"] = PairState.INITIAL
# Initialize trades DataFrame with proper dtypes to avoid concatenation warnings
pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS).astype(
{
"time": "datetime64[ns]",
"symbol": "string",
"side": "string",
"action": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object",
}
)
training_minutes = config["training_minutes"]
curr_predicted_row_idx = 0
while True:
print(curr_training_start_idx, end="\r")
pair.get_datasets(
training_minutes=training_minutes,
training_start_index=curr_training_start_idx,
testing_size=1,
)
if len(pair.training_df_) < training_minutes:
print(
f"{pair}: current offset={curr_training_start_idx}"
f" * Training data length={len(pair.training_df_)} < {training_minutes}"
" * Not enough training data. Completing the job."
)
break
try:
# ================================ PREDICTION ================================
self.pair_predict_result_ = pair.predict()
except Exception as e:
raise RuntimeError(
f"{pair}: TrainingPrediction failed: {str(e)}"
) from e
# break
curr_training_start_idx += 1
if curr_training_start_idx > end_index:
break
curr_predicted_row_idx += 1
self._create_trading_signals(pair, config, bt_result)
print(f"***{pair}*** FINISHED *** Num Trades:{len(pair.user_data_['trades'])}")
return pair.get_trades()
def _create_trading_signals(
self, pair: TradingPair, config: Dict, bt_result: BacktestResult
) -> None:
predicted_df = self.pair_predict_result_
assert predicted_df is not None
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_close_trshld"]
for curr_predicted_row_idx in range(len(predicted_df)):
pred_row = predicted_df.iloc[curr_predicted_row_idx]
scaled_disequilibrium = pred_row["scaled_disequilibrium"]
if pair.user_data_["state"] in [
PairState.INITIAL,
PairState.CLOSE,
PairState.CLOSE_POSITION,
PairState.CLOSE_STOP_LOSS,
PairState.CLOSE_STOP_PROFIT,
]:
if scaled_disequilibrium >= open_threshold:
open_trades = self._get_open_trades(
pair, row=pred_row, open_threshold=open_threshold
)
if open_trades is not None:
open_trades["status"] = PairState.OPEN.name
print(f"OPEN TRADES:\n{open_trades}")
pair.add_trades(open_trades)
pair.user_data_["state"] = PairState.OPEN
pair.on_open_trades(open_trades)
elif pair.user_data_["state"] == PairState.OPEN:
if scaled_disequilibrium <= close_threshold:
close_trades = self._get_close_trades(
pair, row=pred_row, close_threshold=close_threshold
)
if close_trades is not None:
close_trades["status"] = PairState.CLOSE.name
print(f"CLOSE TRADES:\n{close_trades}")
pair.add_trades(close_trades)
pair.user_data_["state"] = PairState.CLOSE
pair.on_close_trades(close_trades)
elif pair.to_stop_close_conditions(predicted_row=pred_row):
close_trades = self._get_close_trades(
pair, row=pred_row, close_threshold=close_threshold
)
if close_trades is not None:
close_trades["status"] = pair.user_data_[
"stop_close_state"
].name
print(f"STOP CLOSE TRADES:\n{close_trades}")
pair.add_trades(close_trades)
pair.user_data_["state"] = pair.user_data_["stop_close_state"]
pair.on_close_trades(close_trades)
# Outstanding positions
if pair.user_data_["state"] == PairState.OPEN:
print(f"{pair}: *** Position is NOT CLOSED. ***")
# outstanding positions
if config["close_outstanding_positions"]:
close_position_row = pd.Series(pair.market_data_.iloc[-2])
close_position_row["disequilibrium"] = 0.0
close_position_row["scaled_disequilibrium"] = 0.0
close_position_row["signed_scaled_disequilibrium"] = 0.0
close_position_trades = self._get_close_trades(
pair=pair, row=close_position_row, close_threshold=close_threshold
)
if close_position_trades is not None:
close_position_trades["status"] = PairState.CLOSE_POSITION.name
print(f"CLOSE_POSITION TRADES:\n{close_position_trades}")
pair.add_trades(close_position_trades)
pair.user_data_["state"] = PairState.CLOSE_POSITION
pair.on_close_trades(close_position_trades)
else:
if predicted_df is not None:
bt_result.handle_outstanding_position(
pair=pair,
pair_result_df=predicted_df,
last_row_index=0,
open_side_a=pair.user_data_["open_side_a"],
open_side_b=pair.user_data_["open_side_b"],
open_px_a=pair.user_data_["open_px_a"],
open_px_b=pair.user_data_["open_px_b"],
open_tstamp=pair.user_data_["open_tstamp"],
)
def _get_open_trades(
self, pair: TradingPair, row: pd.Series, open_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.exec_prices_colnames()
open_row = row
open_tstamp = open_row["tstamp"]
open_disequilibrium = open_row["disequilibrium"]
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
signed_scaled_disequilibrium = open_row["signed_scaled_disequilibrium"]
open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"]
# creating the trades
print(f"OPEN_TRADES: {row["tstamp"]} {open_scaled_disequilibrium=}")
if open_disequilibrium > 0:
open_side_a = "SELL"
open_side_b = "BUY"
close_side_a = "BUY"
close_side_b = "SELL"
else:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
# save closing sides
pair.user_data_["open_side_a"] = open_side_a
pair.user_data_["open_side_b"] = open_side_b
pair.user_data_["open_px_a"] = open_px_a
pair.user_data_["open_px_b"] = open_px_b
pair.user_data_["open_tstamp"] = open_tstamp
pair.user_data_["close_side_a"] = close_side_a
pair.user_data_["close_side_b"] = close_side_b
# create opening trades
trd_signal_tuples = [
(
open_tstamp,
pair.symbol_a_,
open_side_a,
"OPEN",
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
(
open_tstamp,
pair.symbol_b_,
open_side_b,
"OPEN",
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
]
# Create DataFrame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype(
{
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"signed_scaled_disequilibrium": "float64",
"pair": "object",
}
)
def _get_close_trades(
self, pair: TradingPair, row: pd.Series, close_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.exec_prices_colnames()
close_row = row
close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"]
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
signed_scaled_disequilibrium = close_row["signed_scaled_disequilibrium"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
close_side_a = pair.user_data_["close_side_a"]
close_side_b = pair.user_data_["close_side_b"]
trd_signal_tuples = [
(
close_tstamp,
pair.symbol_a_,
close_side_a,
"CLOSE",
close_px_a,
close_disequilibrium,
close_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
(
close_tstamp,
pair.symbol_b_,
close_side_b,
"CLOSE",
close_px_b,
close_disequilibrium,
close_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype(
{
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"signed_scaled_disequilibrium": "float64",
"pair": "object",
}
)
def reset(self) -> None:
pass

View File

@ -1,380 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional
import pandas as pd # type:ignore
class PairState(Enum):
INITIAL = 1
OPEN = 2
CLOSE = 3
CLOSE_POSITION = 4
CLOSE_STOP_LOSS = 5
CLOSE_STOP_PROFIT = 6
class CointegrationData:
EG_PVALUE_THRESHOLD = 0.05
tstamp_: pd.Timestamp
pair_: str
eg_pvalue_: float
johansen_lr1_: float
johansen_cvt_: float
eg_is_cointegrated_: bool
johansen_is_cointegrated_: bool
def __init__(self, pair: TradingPair):
training_df = pair.training_df_
assert training_df is not None
from statsmodels.tsa.vector_ar.vecm import coint_johansen
df = training_df[pair.colnames()].reset_index(drop=True)
# Run Johansen cointegration test
result = coint_johansen(df, det_order=0, k_ar_diff=1)
self.johansen_lr1_ = result.lr1[0]
self.johansen_cvt_ = result.cvt[0, 1]
self.johansen_is_cointegrated_ = self.johansen_lr1_ > self.johansen_cvt_
# Run Engle-Granger cointegration test
from statsmodels.tsa.stattools import coint # type: ignore
col1, col2 = pair.colnames()
assert training_df is not None
series1 = training_df[col1].reset_index(drop=True)
series2 = training_df[col2].reset_index(drop=True)
self.eg_pvalue_ = float(coint(series1, series2)[1])
self.eg_is_cointegrated_ = bool(self.eg_pvalue_ < self.EG_PVALUE_THRESHOLD)
self.tstamp_ = training_df.index[-1]
self.pair_ = pair.name()
def to_dict(self) -> Dict[str, Any]:
return {
"tstamp": self.tstamp_,
"pair": self.pair_,
"eg_pvalue": self.eg_pvalue_,
"johansen_lr1": self.johansen_lr1_,
"johansen_cvt": self.johansen_cvt_,
"eg_is_cointegrated": self.eg_is_cointegrated_,
"johansen_is_cointegrated": self.johansen_is_cointegrated_,
}
def __repr__(self) -> str:
return f"CointegrationData(tstamp={self.tstamp_}, pair={self.pair_}, eg_pvalue={self.eg_pvalue_}, johansen_lr1={self.johansen_lr1_}, johansen_cvt={self.johansen_cvt_}, eg_is_cointegrated={self.eg_is_cointegrated_}, johansen_is_cointegrated={self.johansen_is_cointegrated_})"
class TradingPair(ABC):
market_data_: pd.DataFrame
symbol_a_: str
symbol_b_: str
stat_model_price_: str
training_mu_: float
training_std_: float
training_df_: pd.DataFrame
testing_df_: pd.DataFrame
user_data_: Dict[str, Any]
# predicted_df_: Optional[pd.DataFrame]
def __init__(
self,
config: Dict[str, Any],
market_data: pd.DataFrame,
symbol_a: str,
symbol_b: str,
):
self.symbol_a_ = symbol_a
self.symbol_b_ = symbol_b
self.stat_model_price_ = config["stat_model_price"]
self.user_data_ = {}
self.predicted_df_ = None
self.config_ = config
self._set_market_data(market_data)
def _set_market_data(self, market_data: pd.DataFrame) -> None:
self.market_data_ = pd.DataFrame(
self._transform_dataframe(market_data)[["tstamp"] + self.colnames()]
)
self.market_data_ = self.market_data_.dropna().reset_index(drop=True)
self.market_data_["tstamp"] = pd.to_datetime(self.market_data_["tstamp"])
self.market_data_ = self.market_data_.sort_values("tstamp")
self._set_execution_price_data()
pass
def _set_execution_price_data(self) -> None:
if "execution_price" not in self.config_:
self.market_data_[f"exec_price_{self.symbol_a_}"] = self.market_data_[f"{self.stat_model_price_}_{self.symbol_a_}"]
self.market_data_[f"exec_price_{self.symbol_b_}"] = self.market_data_[f"{self.stat_model_price_}_{self.symbol_b_}"]
return
execution_price_column = self.config_["execution_price"]["column"]
execution_price_shift = self.config_["execution_price"]["shift"]
self.market_data_[f"exec_price_{self.symbol_a_}"] = self.market_data_[f"{execution_price_column}_{self.symbol_a_}"].shift(-execution_price_shift)
self.market_data_[f"exec_price_{self.symbol_b_}"] = self.market_data_[f"{execution_price_column}_{self.symbol_b_}"].shift(-execution_price_shift)
self.market_data_ = self.market_data_.dropna().reset_index(drop=True)
def get_begin_index(self) -> int:
if "trading_hours" not in self.config_:
return 0
assert "timezone" in self.config_["trading_hours"]
assert "begin_session" in self.config_["trading_hours"]
start_time = (
pd.to_datetime(self.config_["trading_hours"]["begin_session"])
.tz_localize(self.config_["trading_hours"]["timezone"])
.time()
)
mask = self.market_data_["tstamp"].dt.time >= start_time
return int(self.market_data_.index[mask].min())
def get_end_index(self) -> int:
if "trading_hours" not in self.config_:
return 0
assert "timezone" in self.config_["trading_hours"]
assert "end_session" in self.config_["trading_hours"]
end_time = (
pd.to_datetime(self.config_["trading_hours"]["end_session"])
.tz_localize(self.config_["trading_hours"]["timezone"])
.time()
)
mask = self.market_data_["tstamp"].dt.time <= end_time
return int(self.market_data_.index[mask].max())
def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
# Select only the columns we need
df_selected: pd.DataFrame = pd.DataFrame(
df[["tstamp", "symbol", self.stat_model_price_]]
)
# Start with unique timestamps
result_df: pd.DataFrame = (
pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True)
)
# For each unique symbol, add a corresponding close price column
symbols = df_selected["symbol"].unique()
for symbol in symbols:
# Filter rows for this symbol
df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(
drop=True
)
# Create column name like "close-COIN"
new_price_column = f"{self.stat_model_price_}_{symbol}"
# Create temporary dataframe with timestamp and price
temp_df = pd.DataFrame(
{
"tstamp": df_symbol["tstamp"],
new_price_column: df_symbol[self.stat_model_price_],
}
)
# Join with our result dataframe
result_df = pd.merge(result_df, temp_df, on="tstamp", how="left")
result_df = result_df.reset_index(
drop=True
) # do not dropna() since irrelevant symbol would affect dataset
return result_df.dropna()
def get_datasets(
self,
training_minutes: int,
training_start_index: int = 0,
testing_size: Optional[int] = None,
) -> None:
testing_start_index = training_start_index + training_minutes
self.training_df_ = self.market_data_.iloc[
training_start_index:testing_start_index, :training_minutes
].copy()
assert self.training_df_ is not None
self.training_df_ = self.training_df_.dropna().reset_index(drop=True)
testing_start_index = training_start_index + training_minutes
if testing_size is None:
self.testing_df_ = self.market_data_.iloc[testing_start_index:, :].copy()
else:
self.testing_df_ = self.market_data_.iloc[
testing_start_index : testing_start_index + testing_size, :
].copy()
assert self.testing_df_ is not None
self.testing_df_ = self.testing_df_.dropna().reset_index(drop=True)
def colnames(self) -> List[str]:
return [
f"{self.stat_model_price_}_{self.symbol_a_}",
f"{self.stat_model_price_}_{self.symbol_b_}",
]
def exec_prices_colnames(self) -> List[str]:
return [
f"exec_price_{self.symbol_a_}",
f"exec_price_{self.symbol_b_}",
]
def add_trades(self, trades: pd.DataFrame) -> None:
if self.user_data_["trades"] is None or len(self.user_data_["trades"]) == 0:
# If trades is empty or None, just assign the new trades directly
self.user_data_["trades"] = trades.copy()
else:
# Ensure both DataFrames have the same columns and dtypes before concatenation
existing_trades = self.user_data_["trades"]
# If existing trades is empty, just assign the new trades
if len(existing_trades) == 0:
self.user_data_["trades"] = trades.copy()
else:
# Ensure both DataFrames have the same columns
if set(existing_trades.columns) != set(trades.columns):
# Add missing columns to trades with appropriate default values
for col in existing_trades.columns:
if col not in trades.columns:
if col == "time":
trades[col] = pd.Timestamp.now()
elif col in ["action", "symbol"]:
trades[col] = ""
elif col in [
"price",
"disequilibrium",
"scaled_disequilibrium",
]:
trades[col] = 0.0
elif col == "pair":
trades[col] = None
else:
trades[col] = None
# Concatenate with explicit dtypes to avoid warnings
self.user_data_["trades"] = pd.concat(
[existing_trades, trades], ignore_index=True, copy=False
)
def get_trades(self) -> pd.DataFrame:
return (
self.user_data_["trades"] if "trades" in self.user_data_ else pd.DataFrame()
)
def cointegration_check(self) -> Optional[pd.DataFrame]:
print(f"***{self}*** STARTING....")
config = self.config_
curr_training_start_idx = 0
COINTEGRATION_DATA_COLUMNS = {
"tstamp": "datetime64[ns]",
"pair": "string",
"eg_pvalue": "float64",
"johansen_lr1": "float64",
"johansen_cvt": "float64",
"eg_is_cointegrated": "bool",
"johansen_is_cointegrated": "bool",
}
# Initialize trades DataFrame with proper dtypes to avoid concatenation warnings
result: pd.DataFrame = pd.DataFrame(
columns=[col for col in COINTEGRATION_DATA_COLUMNS.keys()]
) # .astype(COINTEGRATION_DATA_COLUMNS)
training_minutes = config["training_minutes"]
while True:
print(curr_training_start_idx, end="\r")
self.get_datasets(
training_minutes=training_minutes,
training_start_index=curr_training_start_idx,
testing_size=1,
)
if len(self.training_df_) < training_minutes:
print(
f"{self}: current offset={curr_training_start_idx}"
f" * Training data length={len(self.training_df_)} < {training_minutes}"
" * Not enough training data. Completing the job."
)
break
new_row = pd.Series(CointegrationData(self).to_dict())
result.loc[len(result)] = new_row
curr_training_start_idx += 1
return result
def to_stop_close_conditions(self, predicted_row: pd.Series) -> bool:
config = self.config_
if (
"stop_close_conditions" not in config
or config["stop_close_conditions"] is None
):
return False
if "profit" in config["stop_close_conditions"]:
current_return = self._current_return(predicted_row)
#
# print(f"time={predicted_row['tstamp']} current_return={current_return}")
#
if current_return >= config["stop_close_conditions"]["profit"]:
print(f"STOP PROFIT: {current_return}")
self.user_data_["stop_close_state"] = PairState.CLOSE_STOP_PROFIT
return True
if "loss" in config["stop_close_conditions"]:
if current_return <= config["stop_close_conditions"]["loss"]:
print(f"STOP LOSS: {current_return}")
self.user_data_["stop_close_state"] = PairState.CLOSE_STOP_LOSS
return True
return False
def on_open_trades(self, trades: pd.DataFrame) -> None:
if "close_trades" in self.user_data_:
del self.user_data_["close_trades"]
self.user_data_["open_trades"] = trades
def on_close_trades(self, trades: pd.DataFrame) -> None:
del self.user_data_["open_trades"]
self.user_data_["close_trades"] = trades
def _current_return(self, predicted_row: pd.Series) -> float:
if "open_trades" in self.user_data_:
open_trades = self.user_data_["open_trades"]
if len(open_trades) == 0:
return 0.0
def _single_instrument_return(symbol: str) -> float:
instrument_open_trades = open_trades[open_trades["symbol"] == symbol]
instrument_open_price = instrument_open_trades["price"].iloc[0]
sign = -1 if instrument_open_trades["side"].iloc[0] == "SELL" else 1
instrument_price = predicted_row[f"{self.stat_model_price_}_{symbol}"]
instrument_return = (
sign
* (instrument_price - instrument_open_price)
/ instrument_open_price
)
return float(instrument_return) * 100.0
instrument_a_return = _single_instrument_return(self.symbol_a_)
instrument_b_return = _single_instrument_return(self.symbol_b_)
return instrument_a_return + instrument_b_return
return 0.0
def __repr__(self) -> str:
return self.name()
def name(self) -> str:
return f"{self.symbol_a_} & {self.symbol_b_}"
# return f"{self.symbol_a_} & {self.symbol_b_}"
@abstractmethod
def predict(self) -> pd.DataFrame: ...
# @abstractmethod
# def predicted_df(self) -> Optional[pd.DataFrame]: ...

View File

@ -1,122 +0,0 @@
from typing import Any, Dict, Optional, cast
import pandas as pd
from pt_trading.results import BacktestResult
from pt_trading.rolling_window_fit import RollingFit
from pt_trading.trading_pair import TradingPair
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults
NanoPerMin = 1e9
class VECMTradingPair(TradingPair):
vecm_fit_: Optional[VECMResults]
pair_predict_result_: Optional[pd.DataFrame]
def __init__(
self,
config: Dict[str, Any],
market_data: pd.DataFrame,
symbol_a: str,
symbol_b: str,
):
super().__init__(config, market_data, symbol_a, symbol_b)
self.vecm_fit_ = None
self.pair_predict_result_ = None
def _train_pair(self) -> None:
self._fit_VECM()
assert self.vecm_fit_ is not None
diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
# print(diseq_series.shape)
self.training_mu_ = float(diseq_series[0].mean())
self.training_std_ = float(diseq_series[0].std())
self.training_df_["dis-equilibrium"] = (
self.training_df_[self.colnames()] @ self.vecm_fit_.beta
)
# Normalize the dis-equilibrium
self.training_df_["scaled_dis-equilibrium"] = (
diseq_series - self.training_mu_
) / self.training_std_
def _fit_VECM(self) -> None:
assert self.training_df_ is not None
vecm_df = self.training_df_[self.colnames()].reset_index(drop=True)
vecm_model = VECM(vecm_df, coint_rank=1)
vecm_fit = vecm_model.fit()
assert vecm_fit is not None
# URGENT check beta and alpha
# Check if the model converged properly
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
print(f"{self}: VECM model failed to converge properly")
self.vecm_fit_ = vecm_fit
pass
def predict(self) -> pd.DataFrame:
self._train_pair()
assert self.testing_df_ is not None
assert self.vecm_fit_ is not None
predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_))
# Convert prediction to a DataFrame for readability
predicted_df = pd.DataFrame(
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
)
predicted_df = pd.merge(
self.testing_df_.reset_index(drop=True),
pd.DataFrame(
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
),
left_index=True,
right_index=True,
suffixes=("", "_pred"),
).dropna()
predicted_df["disequilibrium"] = (
predicted_df[self.colnames()] @ self.vecm_fit_.beta
)
predicted_df["signed_scaled_disequilibrium"] = (
predicted_df["disequilibrium"] - self.training_mu_
) / self.training_std_
predicted_df["scaled_disequilibrium"] = abs(
predicted_df["signed_scaled_disequilibrium"]
)
predicted_df = predicted_df.reset_index(drop=True)
if self.pair_predict_result_ is None:
self.pair_predict_result_ = predicted_df
else:
self.pair_predict_result_ = pd.concat(
[self.pair_predict_result_, predicted_df], ignore_index=True
)
# Reset index to ensure proper indexing
self.pair_predict_result_ = self.pair_predict_result_.reset_index(drop=True)
return self.pair_predict_result_
class VECMRollingFit(RollingFit):
def __init__(self) -> None:
super().__init__()
def create_trading_pair(
self,
config: Dict,
market_data: pd.DataFrame,
symbol_a: str,
symbol_b: str,
) -> TradingPair:
return VECMTradingPair(
config=config,
market_data=market_data,
symbol_a=symbol_a,
symbol_b=symbol_b,
)

View File

@ -1,85 +0,0 @@
from typing import Any, Dict, Optional, cast
import pandas as pd
from pt_trading.results import BacktestResult
from pt_trading.rolling_window_fit import RollingFit
from pt_trading.trading_pair import TradingPair
import statsmodels.api as sm
NanoPerMin = 1e9
class ZScoreTradingPair(TradingPair):
zscore_model_: Optional[sm.regression.linear_model.RegressionResultsWrapper]
pair_predict_result_: Optional[pd.DataFrame]
zscore_df_: Optional[pd.DataFrame]
def __init__(
self,
config: Dict[str, Any],
market_data: pd.DataFrame,
symbol_a: str,
symbol_b: str,
):
super().__init__(config, market_data, symbol_a, symbol_b)
self.zscore_model_ = None
self.pair_predict_result_ = None
self.zscore_df_ = None
def _fit_zscore(self) -> None:
assert self.training_df_ is not None
symbol_a_px_series = self.training_df_[self.colnames()].iloc[:, 0]
symbol_b_px_series = self.training_df_[self.colnames()].iloc[:, 1]
symbol_a_px_series, symbol_b_px_series = symbol_a_px_series.align(
symbol_b_px_series, axis=0
)
X = sm.add_constant(symbol_b_px_series)
self.zscore_model_ = sm.OLS(symbol_a_px_series, X).fit()
assert self.zscore_model_ is not None
hedge_ratio = self.zscore_model_.params.iloc[1]
# Calculate spread and Z-score
spread = symbol_a_px_series - hedge_ratio * symbol_b_px_series
self.zscore_df_ = (spread - spread.mean()) / spread.std()
def predict(self) -> pd.DataFrame:
self._fit_zscore()
assert self.zscore_df_ is not None
self.training_df_["dis-equilibrium"] = self.zscore_df_
self.training_df_["scaled_dis-equilibrium"] = abs(self.zscore_df_)
assert self.testing_df_ is not None
assert self.zscore_df_ is not None
predicted_df = self.testing_df_
predicted_df["disequilibrium"] = self.zscore_df_
predicted_df["signed_scaled_disequilibrium"] = self.zscore_df_
predicted_df["scaled_disequilibrium"] = abs(self.zscore_df_)
predicted_df = predicted_df.reset_index(drop=True)
if self.pair_predict_result_ is None:
self.pair_predict_result_ = predicted_df
else:
self.pair_predict_result_ = pd.concat(
[self.pair_predict_result_, predicted_df], ignore_index=True
)
# Reset index to ensure proper indexing
self.pair_predict_result_ = self.pair_predict_result_.reset_index(drop=True)
return self.pair_predict_result_.dropna()
class ZScoreRollingFit(RollingFit):
def __init__(self) -> None:
super().__init__()
def create_trading_pair(
self, config: Dict, market_data: pd.DataFrame, symbol_a: str, symbol_b: str
) -> TradingPair:
return ZScoreTradingPair(
config=config,
market_data=market_data,
symbol_a=symbol_a,
symbol_b=symbol_b,
)

View File

@ -1,126 +0,0 @@
import argparse
import glob
import importlib
import os
from datetime import date, datetime
from typing import Any, Dict, List, Optional
import pandas as pd
from tools.config import expand_filename, load_config
from tools.data_loader import get_available_instruments_from_db
from pt_trading.results import (
BacktestResult,
create_result_database,
store_config_in_database,
store_results_in_database,
)
from pt_trading.fit_method import PairsTradingFitMethod
from pt_trading.trading_pair import TradingPair
from research.research_tools import create_pairs, resolve_datafiles
def main() -> None:
parser = argparse.ArgumentParser(description="Run pairs trading backtest.")
parser.add_argument(
"--config", type=str, required=True, help="Path to the configuration file."
)
parser.add_argument(
"--datafile",
type=str,
required=False,
help="Market data file to process.",
)
parser.add_argument(
"--instruments",
type=str,
required=False,
help="Comma-separated list of instrument symbols (e.g., COIN,GBTC). If not provided, auto-detects from database.",
)
args = parser.parse_args()
config: Dict = load_config(args.config)
# Resolve data files (CLI takes priority over config)
datafile = resolve_datafiles(config, args.datafile)[0]
if not datafile:
print("No data files found to process.")
return
print(f"Found {datafile} data files to process:")
# # Create result database if needed
# if args.result_db.upper() != "NONE":
# args.result_db = expand_filename(args.result_db)
# create_result_database(args.result_db)
# # Initialize a dictionary to store all trade results
# all_results: Dict[str, Dict[str, Any]] = {}
# # Store configuration in database for reference
# if args.result_db.upper() != "NONE":
# # Get list of all instruments for storage
# all_instruments = []
# for datafile in datafiles:
# if args.instruments:
# file_instruments = [
# inst.strip() for inst in args.instruments.split(",")
# ]
# else:
# file_instruments = get_available_instruments_from_db(datafile, config)
# all_instruments.extend(file_instruments)
# # Remove duplicates while preserving order
# unique_instruments = list(dict.fromkeys(all_instruments))
# store_config_in_database(
# db_path=args.result_db,
# config_file_path=args.config,
# config=config,
# fit_method_class=fit_method_class_name,
# datafiles=datafiles,
# instruments=unique_instruments,
# )
# Process each data file
stat_model_price = config["stat_model_price"]
print(f"\n====== Processing {os.path.basename(datafile)} ======")
# Determine instruments to use
if args.instruments:
# Use CLI-specified instruments
instruments = [inst.strip() for inst in args.instruments.split(",")]
print(f"Using CLI-specified instruments: {instruments}")
else:
# Auto-detect instruments from database
instruments = get_available_instruments_from_db(datafile, config)
print(f"Auto-detected instruments: {instruments}")
if not instruments:
print(f"No instruments found in {datafile}...")
return
# Process data for this file
try:
cointegration_data: pd.DataFrame = pd.DataFrame()
for pair in create_pairs(datafile, stat_model_price, config, instruments):
cointegration_data = pd.concat([cointegration_data, pair.cointegration_check()])
pd.set_option('display.width', 400)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_columns', None)
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
print(f"cointegration_data:\n{cointegration_data}")
except Exception as err:
print(f"Error processing {datafile}: {str(err)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

File diff suppressed because one or more lines are too long

View File

@ -1,232 +0,0 @@
import argparse
import glob
import importlib
import os
from datetime import date, datetime
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from research.research_tools import create_pairs
from tools.config import expand_filename, load_config
from pt_trading.results import (
BacktestResult,
create_result_database,
store_config_in_database,
)
from pt_trading.fit_method import PairsTradingFitMethod
from pt_trading.trading_pair import TradingPair
DayT = str
DataFileNameT = str
def resolve_datafiles(
config: Dict, date_pattern: str, instruments: List[Dict[str, str]]
) -> List[Tuple[DayT, DataFileNameT]]:
resolved_files: List[Tuple[DayT, DataFileNameT]] = []
for inst in instruments:
pattern = date_pattern
inst_type = inst["instrument_type"]
data_dir = config["market_data_loading"][inst_type]["data_directory"]
if "*" in pattern or "?" in pattern:
# Handle wildcards
if not os.path.isabs(pattern):
pattern = os.path.join(data_dir, f"{pattern}.mktdata.ohlcv.db")
matched_files = glob.glob(pattern)
for matched_file in matched_files:
import re
match = re.search(r"(\d{8})\.mktdata\.ohlcv\.db$", matched_file)
assert match is not None
day = match.group(1)
resolved_files.append((day, matched_file))
else:
# Handle explicit file path
if not os.path.isabs(pattern):
pattern = os.path.join(data_dir, f"{pattern}.mktdata.ohlcv.db")
resolved_files.append((date_pattern, pattern))
return sorted(list(set(resolved_files))) # Remove duplicates and sort
def get_instruments(args: argparse.Namespace, config: Dict) -> List[Dict[str, str]]:
instruments = [
{
"symbol": inst.split(":")[0],
"instrument_type": inst.split(":")[1],
"exchange_id": inst.split(":")[2],
"instrument_id_pfx": config["market_data_loading"][inst.split(":")[1]][
"instrument_id_pfx"
],
"db_table_name": config["market_data_loading"][inst.split(":")[1]][
"db_table_name"
],
}
for inst in args.instruments.split(",")
]
return instruments
def run_backtest(
config: Dict,
datafiles: List[str],
fit_method: PairsTradingFitMethod,
instruments: List[Dict[str, str]],
) -> BacktestResult:
"""
Run backtest for all pairs using the specified instruments.
"""
bt_result: BacktestResult = BacktestResult(config=config)
# if len(datafiles) < 2:
# print(f"WARNING: insufficient data files: {datafiles}")
# return bt_result
if not all([os.path.exists(datafile) for datafile in datafiles]):
print(f"WARNING: data file {datafiles} does not exist")
return bt_result
pairs_trades = []
pairs = create_pairs(
datafiles=datafiles,
fit_method=fit_method,
config=config,
instruments=instruments,
)
for pair in pairs:
single_pair_trades = fit_method.run_pair(pair=pair, bt_result=bt_result)
if single_pair_trades is not None and len(single_pair_trades) > 0:
pairs_trades.append(single_pair_trades)
print(f"pairs_trades:\n{pairs_trades}")
# Check if result_list has any data before concatenating
if len(pairs_trades) == 0:
print("No trading signals found for any pairs")
return bt_result
bt_result.collect_single_day_results(pairs_trades)
return bt_result
def main() -> None:
parser = argparse.ArgumentParser(description="Run pairs trading backtest.")
parser.add_argument(
"--config", type=str, required=True, help="Path to the configuration file."
)
parser.add_argument(
"--date_pattern",
type=str,
required=True,
help="Date YYYYMMDD, allows * and ? wildcards",
)
parser.add_argument(
"--instruments",
type=str,
required=True,
help="Comma-separated list of instrument symbols (e.g., COIN:EQUITY,GBTC:CRYPTO)",
)
parser.add_argument(
"--result_db",
type=str,
required=True,
help="Path to SQLite database for storing results. Use 'NONE' to disable database output.",
)
args = parser.parse_args()
config: Dict = load_config(args.config)
# Dynamically instantiate fit method class
fit_method = PairsTradingFitMethod.create(config)
# Resolve data files (CLI takes priority over config)
instruments = get_instruments(args, config)
datafiles = resolve_datafiles(config, args.date_pattern, instruments)
days = list(set([day for day, _ in datafiles]))
print(f"Found {len(datafiles)} data files to process:")
for df in datafiles:
print(f" - {df}")
# Create result database if needed
if args.result_db.upper() != "NONE":
args.result_db = expand_filename(args.result_db)
create_result_database(args.result_db)
# Initialize a dictionary to store all trade results
all_results: Dict[str, Dict[str, Any]] = {}
is_config_stored = False
# Process each data file
for day in sorted(days):
md_datafiles = [datafile for md_day, datafile in datafiles if md_day == day]
if not all([os.path.exists(datafile) for datafile in md_datafiles]):
print(f"WARNING: insufficient data files: {md_datafiles}")
continue
print(f"\n====== Processing {day} ======")
if not is_config_stored:
store_config_in_database(
db_path=args.result_db,
config_file_path=args.config,
config=config,
fit_method_class=config["fit_method_class"],
datafiles=datafiles,
instruments=instruments,
)
is_config_stored = True
# Process data for this file
try:
fit_method.reset()
bt_results = run_backtest(
config=config,
datafiles=md_datafiles,
fit_method=fit_method,
instruments=instruments,
)
if bt_results.trades is None or len(bt_results.trades) == 0:
print(f"No trades found for {day}")
continue
# Store results with day name as key
filename = os.path.basename(day)
all_results[filename] = {
"trades": bt_results.trades.copy(),
"outstanding_positions": bt_results.outstanding_positions.copy(),
}
# Store results in database
if args.result_db.upper() != "NONE":
bt_results.calculate_returns(
{
filename: {
"trades": bt_results.trades.copy(),
"outstanding_positions": bt_results.outstanding_positions.copy(),
}
}
)
bt_results.store_results_in_database(db_path=args.result_db, day=day)
print(f"Successfully processed {filename}")
except Exception as err:
print(f"Error processing {day}: {str(err)}")
import traceback
traceback.print_exc()
# Calculate and print results using a new BacktestResult instance for aggregation
if all_results:
aggregate_bt_results = BacktestResult(config=config)
aggregate_bt_results.calculate_returns(all_results)
aggregate_bt_results.print_grand_totals()
aggregate_bt_results.print_outstanding_positions()
if args.result_db.upper() != "NONE":
print(f"\nResults stored in database: {args.result_db}")
else:
print("No results to display.")
if __name__ == "__main__":
main()

View File

@ -1,101 +0,0 @@
import argparse
import asyncio
import glob
import importlib
import os
from datetime import date, datetime
from typing import Any, Dict, List, Optional
import hjson
import pandas as pd
from tools.data_loader import get_available_instruments_from_db, load_market_data
from pt_trading.results import (
BacktestResult,
create_result_database,
store_config_in_database,
store_results_in_database,
)
from pt_trading.fit_methods import PairsTradingFitMethod
from pt_trading.trading_pair import TradingPair
def run_strategy(
config: Dict,
datafile: str,
fit_method: PairsTradingFitMethod,
instruments: List[str],
) -> BacktestResult:
"""
Run backtest for all pairs using the specified instruments.
"""
bt_result: BacktestResult = BacktestResult(config=config)
def _create_pairs(config: Dict, instruments: List[str]) -> List[TradingPair]:
nonlocal datafile
all_indexes = range(len(instruments))
unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j]
pairs = []
# Update config to use the specified instruments
config_copy = config.copy()
config_copy["instruments"] = instruments
market_data_df = load_market_data(
datafile=datafile,
exchange_id=config_copy["exchange_id"],
instruments=config_copy["instruments"],
instrument_id_pfx=config_copy["instrument_id_pfx"],
db_table_name=config_copy["db_table_name"],
trading_hours=config_copy["trading_hours"],
)
for a_index, b_index in unique_index_pairs:
pair = fit_method.create_trading_pair(
market_data=market_data_df,
symbol_a=instruments[a_index],
symbol_b=instruments[b_index],
)
pairs.append(pair)
return pairs
pairs_trades = []
for pair in _create_pairs(config, instruments):
single_pair_trades = fit_method.run_pair(
pair=pair, config=config, bt_result=bt_result
)
if single_pair_trades is not None and len(single_pair_trades) > 0:
pairs_trades.append(single_pair_trades)
# Check if result_list has any data before concatenating
if len(pairs_trades) == 0:
print("No trading signals found for any pairs")
return bt_result
result = pd.concat(pairs_trades, ignore_index=True)
result["time"] = pd.to_datetime(result["time"])
result = result.set_index("time").sort_index()
bt_result.collect_single_day_results(result)
return bt_result
def main() -> None:
# Load config
# Subscribe to CVTT market data
# On snapshot (with historical data) - create trading strategy with market data dateframe
async def on_message(message_type: MessageTypeT, subscr_id: SubscriptionIdT, message: Dict, instrument_id: str) -> None:
print(f"{message_type=} {subscr_id=} {instrument_id}")
if message_type == "md_aggregate":
aggr = message.get("md_aggregate", [])
print(f"[{aggr['tstamp'][:19]}] *** RLTM *** {message}")
elif message_type == "historical_md_aggregate":
for aggr in message.get("historical_data", []):
print(f"[{aggr['tstamp'][:19]}] *** HIST *** {aggr}")
else:
print(f"Unknown message type: {message_type}")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -4,13 +4,13 @@ from functools import partial
from typing import Dict, List from typing import Dict, List
from cvttpy_tools.settings.cvtt_types import JsonDictT from cvttpy_tools.settings.cvtt_types import JsonDictT
from cvttpy_tools.tools.app import App from cvttpy_tools.app import App
from cvttpy_tools.tools.base import NamedObject from cvttpy_tools.base import NamedObject
from cvttpy_tools.tools.config import CvttAppConfig from cvttpy_tools.config import CvttAppConfig
from cvttpy_tools.tools.logger import Log from cvttpy_tools.logger import Log
from pt_strategy.live.live_strategy import PtLiveStrategy from pairs_trading.lib.pt_strategy.live.live_strategy import PtLiveStrategy
from pt_strategy.live.pricer_md_client import PtMktDataClient from pairs_trading.lib.pt_strategy.live.pricer_md_client import PtMktDataClient
from pt_strategy.live.ti_sender import TradingInstructionsSender from pairs_trading.lib.pt_strategy.live.ti_sender import TradingInstructionsSender
# import sys # import sys
# print("PYTHONPATH directories:") # print("PYTHONPATH directories:")
@ -18,11 +18,6 @@ from pt_strategy.live.ti_sender import TradingInstructionsSender
# print(path) # print(path)
# from cvtt_client.mkt_data import (CvttPricerWebSockClient,
# CvttPricesSubscription, MessageTypeT,
# SubscriptionIdT)
class PairTradingRunner(NamedObject): class PairTradingRunner(NamedObject):
config_: CvttAppConfig config_: CvttAppConfig
instruments_: List[JsonDictT] instruments_: List[JsonDictT]
@ -69,11 +64,17 @@ class PairTradingRunner(NamedObject):
assert len(self.instruments_) == 2, "Only two instruments are supported" assert len(self.instruments_) == 2, "Only two instruments are supported"
Log.info(f"{self.fname()} Instruments: {self.instruments_}") Log.info(f"{self.fname()} Instruments: {self.instruments_}")
# ------- CREATE TI (trading instructions) CLIENT ------- # # ------- CREATE TI (trading instructions) CLIENT -------
# ti_config = self.config_.get_subconfig("ti_config", {})
# self.ti_sender_ = TradingInstructionsSender(config=ti_config)
# Log.info(f"{self.fname()} TI client created: {self.ti_sender_}")
# ------- CREATE CVTT CLIENT -------
ti_config = self.config_.get_subconfig("ti_config", {}) ti_config = self.config_.get_subconfig("ti_config", {})
self.ti_sender_ = TradingInstructionsSender(config=ti_config) self.ti_sender_ = TradingInstructionsSender(config=ti_config)
Log.info(f"{self.fname()} TI client created: {self.ti_sender_}") Log.info(f"{self.fname()} TI client created: {self.ti_sender_}")
# ------- CREATE STRATEGY ------- # ------- CREATE STRATEGY -------
strategy_config = self.config_.get_value("strategy_config", {}) strategy_config = self.config_.get_value("strategy_config", {})
self.live_strategy_ = PtLiveStrategy( self.live_strategy_ = PtLiveStrategy(
@ -83,21 +84,18 @@ class PairTradingRunner(NamedObject):
) )
Log.info(f"{self.fname()} Strategy created: {self.live_strategy_}") Log.info(f"{self.fname()} Strategy created: {self.live_strategy_}")
# ------- CREATE PRICER CLIENT ------- # # ------- CREATE PRICER CLIENT -------
pricer_config = self.config_.get_subconfig("pricer_config", {}) # pricer_config = self.config_.get_subconfig("pricer_config", {})
self.pricer_client_ = PtMktDataClient( # self.pricer_client_ = PtMktDataClient(
live_strategy=self.live_strategy_, # live_strategy=self.live_strategy_,
pricer_config=pricer_config # pricer_config=pricer_config
) # )
Log.info(f"{self.fname()} CVTT Pricer client created: {self.pricer_client_}") # Log.info(f"{self.fname()} CVTT Pricer client created: {self.pricer_client_}")
async def run(self) -> None: async def run(self) -> None:
Log.info(f"{self.fname()} ...") Log.info(f"{self.fname()} ...")
pass pass
if __name__ == "__main__": if __name__ == "__main__":
App() App()
CvttAppConfig() CvttAppConfig()

213
lib/client/cvtt_client.py Normal file
View File

@ -0,0 +1,213 @@
from __future__ import annotations
from typing import Dict, Any, List, Optional
import time
import requests
from cvttpy_tools.base import NamedObject
from cvttpy_tools.logger import Log
from cvttpy_tools.config import Config
from cvttpy_tools.timer import Timer
from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar
class RESTSender(NamedObject):
session_: requests.Session
base_url_: str
def __init__(self, base_url: str) -> None:
self.base_url_ = base_url
self.session_ = requests.Session()
def is_ready(self) -> bool:
"""Checks if the server is up and responding"""
url = f"{self.base_url_}/ping"
try:
response = self.session_.get(url)
response.raise_for_status()
return True
except requests.exceptions.RequestException:
return False
def send_post(self, endpoint: str, post_body: Dict) -> requests.Response:
while not self.is_ready():
print("Waiting for FrontGateway to start...")
time.sleep(5)
url = f"{self.base_url_}/{endpoint}"
try:
return self.session_.request(
method="POST",
url=url,
json=post_body,
headers={"Content-Type": "application/json"},
)
except requests.exceptions.RequestException as excpt:
raise ConnectionError(
f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore
) from excpt
def send_get(self, endpoint: str) -> requests.Response:
while not self.is_ready():
print("Waiting for FrontGateway to start...")
time.sleep(5)
url = f"{self.base_url_}/{endpoint}"
try:
return self.session_.request(method="GET", url=url)
except requests.exceptions.RequestException as excpt:
raise ConnectionError(
f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore
) from excpt
class MdSummary(HistMdBar):
def __init__(
self,
ts_ns: int,
open: float,
high: float,
low: float,
close: float,
volume: float,
vwap: float,
num_trades: int,
):
super().__init__(ts=ts_ns)
self.open_ = open
self.high_ = high
self.low_ = low
self.close_ = close
self.volume_ = volume
self.vwap_ = vwap
self.num_trades_ = num_trades
@classmethod
def from_REST_response(cls, response: requests.Response) -> List[MdSummary]:
res: List[MdSummary] = []
jresp = response.json()
hist_data = jresp.get("historical_data", [])
for hd in hist_data:
res.append(
MdSummary(
ts_ns=hd["time_ns"],
open=hd["open"],
high=hd["high"],
low=hd["low"],
close=hd["close"],
volume=hd["volume"],
vwap=hd["vwap"],
num_trades=hd["num_trades"],
)
)
return res
class MdSummaryCollector(NamedObject):
sender_: RESTSender
exch_acct_: str
instrument_id_: str
interval_sec_: int
history_depth_sec_: int
history_: List[MdSummary]
timer_: Optional[Timer]
def __init__(
self,
sender: RESTSender,
exch_acct: str,
instrument_id: str,
interval_sec: int,
history_depth_sec: int,
) -> None:
self.sender_ = sender
self.exch_acct_ = exch_acct
self.instrument_id_ = instrument_id
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
self.history_depth_sec_ = []
self.timer_ = None
def rqst_data(self) -> Dict[str, Any]:
return {
"exch_acct": self.exch_acct_,
"instrument_id": self.instrument_id_,
"interval_sec": self.interval_sec_,
"history_depth_sec": self.history_depth_sec_,
}
def get_history(self) -> List[MdSummary]:
response: requests.Response = self.sender_.send_post(
endpoint="md_summary", post_body=self.rqst_data()
)
return MdSummary.from_REST_response(response=response)
def get_last(self) -> Optional[MdSummary]:
rqst_data = self.rqst_data()
rqst_data["history_depth_sec"] = self.interval_sec_
response: requests.Response = self.sender_.send_post(
endpoint="md_summary", post_body=rqst_data
)
res = MdSummary.from_REST_response(response=response)
return None if len(res) == 0 else res[-1]
async def start(self) -> None:
if self.timer_:
Log.error(f"{self.fname()}: Timer is already started")
return
self.history_ = self.get_history()
self.timer_ = Timer(
start_in_sec=self.interval_sec_,
is_periodic=True,
period_interval=self.interval_sec_,
func=self._load_new,
)
async def _load_new(self) -> None:
last: Optional[MdSummary] = self.get_last()
if not last:
# URGENT logging
return
if last.ts_ns_ <= self.history_[-1].ts_ns_:
# URGENT logging
return
self.history_.append(last)
# URGENT implement notification
def stop(self) -> None:
if self.timer_:
self.timer_.cancel()
self.timer_ = None
class CvttRESTClient(NamedObject):
config_: Config
sender_: RESTSender
def __init__(self, config: Config) -> None:
self.config_ = config
base_url = self.config_.get_value("cvtt_base_url", default="")
assert base_url
self.sender_ = RESTSender(base_url=base_url)
if __name__ == "__main__":
config = Config(json_src={"cvtt_base_url": "http://cvtt-tester-01.cvtt.vpn:23456"})
cvtt_client = CvttRESTClient(config)
mdsc = MdSummaryCollector(
sender=cvtt_client.sender_,
exch_acct="COINBASE_AT",
instrument_id="PAIR-BTC-USD",
interval_sec=60,
history_depth_sec=24 * 3600,
)
hist = mdsc.get_history()
last = mdsc.get_last()
pass

View File

@ -8,8 +8,8 @@ from functools import partial
from typing import Callable, Coroutine, Dict, Optional from typing import Callable, Coroutine, Dict, Optional
import websockets import websockets
from cvttpy_tools.logger import Log
from cvttpy_tools.settings.cvtt_types import JsonDictT from cvttpy_tools.settings.cvtt_types import JsonDictT
from cvttpy_tools.tools.logger import Log
from websockets.asyncio.client import ClientConnection from websockets.asyncio.client import ClientConnection
MessageTypeT = str MessageTypeT = str

View File

@ -2,11 +2,16 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from enum import Enum
import pandas as pd import pandas as pd
# ---
from cvttpy_tools.base import NamedObject
from cvttpy_tools.logger import Log
from cvttpy_tools.settings.cvtt_types import JsonDictT from cvttpy_tools.settings.cvtt_types import JsonDictT
from cvttpy_tools.tools.base import NamedObject # ---
from cvttpy_tools.tools.logger import Log from cvttpy_trading.trading.instrument import ExchangeInstrument
# ---
from pt_strategy.live.ti_sender import TradingInstructionsSender from pt_strategy.live.ti_sender import TradingInstructionsSender
from pt_strategy.model_data_policy import ModelDataPolicy from pt_strategy.model_data_policy import ModelDataPolicy
from pt_strategy.pt_market_data import RealTimeMarketData from pt_strategy.pt_market_data import RealTimeMarketData

View File

@ -6,11 +6,11 @@ from typing import Dict, List
from cvtt_client.mkt_data import (CvttPricerWebSockClient, from cvtt_client.mkt_data import (CvttPricerWebSockClient,
CvttPricesSubscription, MessageTypeT, CvttPricesSubscription, MessageTypeT,
SubscriptionIdT) SubscriptionIdT)
from cvttpy_tools.app import App
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import Config
from cvttpy_tools.logger import Log
from cvttpy_tools.settings.cvtt_types import JsonDictT from cvttpy_tools.settings.cvtt_types import JsonDictT
from cvttpy_tools.tools.app import App
from cvttpy_tools.tools.base import NamedObject
from cvttpy_tools.tools.config import Config
from cvttpy_tools.tools.logger import Log
from pt_strategy.live.live_strategy import PtLiveStrategy from pt_strategy.live.live_strategy import PtLiveStrategy
from pt_strategy.trading_pair import TradingPair from pt_strategy.trading_pair import TradingPair

View File

@ -3,13 +3,13 @@ from enum import Enum
from typing import Tuple from typing import Tuple
# import aiohttp # import aiohttp
from cvttpy_tools.tools.app import App from cvttpy_tools.app import App
from cvttpy_tools.tools.base import NamedObject from cvttpy_tools.base import NamedObject
from cvttpy_tools.tools.config import Config from cvttpy_tools.config import Config
from cvttpy_tools.tools.logger import Log from cvttpy_tools.logger import Log
from cvttpy_tools.tools.timer import Timer from cvttpy_tools.timer import Timer
from cvttpy_tools.tools.timeutils import NanoPerSec from cvttpy_tools.timeutils import NanoPerSec
from cvttpy_tools.tools.web.rest_client import REST_RequestProcessor from cvttpy_tools.web.rest_client import REST_RequestProcessor
class TradingInstructionsSender(NamedObject): class TradingInstructionsSender(NamedObject):