Compare commits

...

2 Commits

Author SHA1 Message Date
Oleg Sheynin
e97f76222c progress 2025-12-19 23:06:20 +00:00
Oleg Sheynin
38e1621b2f progress 2025-12-19 23:04:31 +00:00
26 changed files with 387 additions and 10591 deletions

View File

@ -1,9 +0,0 @@
{
"recommendations": [
"ms-python.python",
"ms-python.pylance",
"ms-python.black-formatter",
"ms-python.mypy-type-checker",
"ms-python.isort"
]
}

2
.vscode/launch.json vendored
View File

@ -28,7 +28,7 @@
"program": "${workspaceFolder}/bin/pairs_trader.py",
"console": "integratedTerminal",
"env": {
"PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.."
"PYTHONPATH": "${workspaceFolder}/.."
},
"args": [
"--config=${workspaceFolder}/configuration/pairs_trader.cfg",

View File

@ -4,5 +4,7 @@
"path": ".."
}
],
"settings": {}
"settings": {
"workbench.colorTheme": "Noctis Minimus"
}
}

112
.vscode/settings.__OLD__.json vendored Normal file
View File

@ -0,0 +1,112 @@
{
"PythonVersion": "3.12",
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
},
// ===========================================================
"workbench.activityBar.orientation": "vertical",
// ===========================================================
// "markdown.styles": [
// "/home/oleg/develop/cvtt2/.vscode/light-theme.css"
// ],
"markdown.preview.background": "#ffffff",
"markdown.preview.textEditorTheme": "light",
"markdown-pdf.styles": [
"/home/oleg/develop/cvtt2/.vscode/light-theme.css"
],
"editor.detectIndentation": false,
// Configure editor settings to be overridden for [yaml] language.
"[yaml]": {
"editor.insertSpaces": true,
"editor.tabSize": 4,
},
"pylint.args": [
"--disable=missing-docstring"
, "--disable=invalid-name"
, "--disable=too-few-public-methods"
, "--disable=broad-exception-raised"
, "--disable=broad-exception-caught"
, "--disable=pointless-string-statement"
, "--disable=unused-argument"
, "--disable=line-too-long"
, "--disable=import-outside-toplevel"
, "--disable=fixme"
, "--disable=protected-access"
, "--disable=logging-fstring-interpolation"
],
// ===== TESTING CONFIGURATION =====
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.testing.pytestArgs": [
"-v",
"--tb=short",
"--disable-warnings"
],
"python.testing.envVars": {
"PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.."
},
"python.testing.cwd": "${workspaceFolder}",
"python.testing.autoTestDiscoverOnSaveEnabled": true,
"python.testing.pytestPath": "/home/oleg/.pyenv/python3.12-venv/bin/pytest",
"python.testing.promptToConfigure": false,
"python.testing.pytest.enabled": true,
// Python interpreter settings
"python.defaultInterpreterPath": "/home/oleg/.pyenv/python3.12-venv/bin/python3.12",
// Environment variables for Python execution
"python.envFile": "${workspaceFolder}/.vscode/.env",
"python.terminal.activateEnvironment": false,
"python.terminal.activateEnvInCurrentTerminal": false,
// Global environment variables for VS Code Python extension
"terminal.integrated.env.linux": {
"PYTHONPATH": "/home/oleg/develop/:${env:PYTHONPATH}"
},
"pylint.enabled": true,
"github.copilot.enable": false,
"markdown.extension.print.theme": "dark",
"python.analysis.extraPaths": [
"${workspaceFolder}/..",
"${workspaceFolder}/lib"
],
// Try enabling regular Python language server alongside CursorPyright
"python.languageServer": "None",
"python.analysis.diagnosticMode": "workspace",
"workbench.colorTheme": "Atom One Dark",
"cursorpyright.analysis.enable": false,
"cursorpyright.analysis.extraPaths": [
"${workspaceFolder}/..",
"${workspaceFolder}/lib"
],
// Enable quick fixes for unused imports
"python.analysis.autoImportCompletions": true,
"python.analysis.fixAll": ["source.unusedImports"],
"python.analysis.typeCheckingMode": "basic",
// Enable code actions for CursorPyright
"cursorpyright.analysis.autoImportCompletions": true,
"cursorpyright.analysis.typeCheckingMode": "off",
"cursorpyright.reportUnusedImport": "warning",
"cursorpyright.reportUnusedVariable": "warning",
"cursorpyright.analysis.diagnosticMode": "workspace",
// Force enable code actions
"editor.lightBulb.enabled": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit",
"source.fixAll": "explicit",
"source.unusedImports": "explicit"
},
// Enable Python-specific code actions
"python.analysis.completeFunctionParens": true,
"python.analysis.addImport.exactMatchOnly": false,
"workbench.tree.indent": 24,
}

120
.vscode/settings.json vendored
View File

@ -1,112 +1,26 @@
{
"PythonVersion": "3.12",
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
},
// ===========================================================
"workbench.activityBar.orientation": "vertical",
// ===========================================================
// "markdown.styles": [
// "/home/oleg/develop/cvtt2/.vscode/light-theme.css"
// ],
"markdown.preview.background": "#ffffff",
"markdown.preview.textEditorTheme": "light",
"markdown-pdf.styles": [
"/home/oleg/develop/cvtt2/.vscode/light-theme.css"
],
"editor.detectIndentation": false,
// Configure editor settings to be overridden for [yaml] language.
"[yaml]": {
"editor.insertSpaces": true,
"editor.tabSize": 4,
},
"pylint.args": [
"--disable=missing-docstring"
, "--disable=invalid-name"
, "--disable=too-few-public-methods"
, "--disable=broad-exception-raised"
, "--disable=broad-exception-caught"
, "--disable=pointless-string-statement"
, "--disable=unused-argument"
, "--disable=line-too-long"
, "--disable=import-outside-toplevel"
, "--disable=fixme"
, "--disable=protected-access"
, "--disable=logging-fstring-interpolation"
],
// ===== TESTING CONFIGURATION =====
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false,
"python.testing.pytestArgs": [
"-v",
"--tb=short",
"--disable-warnings"
"unittests"
],
"python.testing.envVars": {
"PYTHONPATH": "${workspaceFolder}/lib:${workspaceFolder}/.."
},
"python.testing.cwd": "${workspaceFolder}",
"python.testing.autoTestDiscoverOnSaveEnabled": true,
"python.testing.pytestPath": "/home/oleg/.pyenv/python3.12-venv/bin/pytest",
"python.testing.promptToConfigure": false,
"python.testing.pytest.enabled": true,
// Python interpreter settings
"python.defaultInterpreterPath": "/home/oleg/.pyenv/python3.12-venv/bin/python3.12",
// Environment variables for Python execution
"python.envFile": "${workspaceFolder}/.vscode/.env",
"python.terminal.activateEnvironment": false,
"python.terminal.activateEnvInCurrentTerminal": false,
// Global environment variables for VS Code Python extension
"terminal.integrated.env.linux": {
"PYTHONPATH": "/home/oleg/develop/:${env:PYTHONPATH}"
},
"pylint.enabled": true,
"github.copilot.enable": false,
"markdown.extension.print.theme": "dark",
"python.defaultInterpreterPath": "/usr/bin/python3",
"python.testing.pytestPath": "python3",
"python.analysis.extraPaths": [
"${workspaceFolder}/..",
"${workspaceFolder}/lib"
"${workspaceFolder}",
"${workspaceFolder}/..",
"${workspaceFolder}/unittests"
],
// Try enabling regular Python language server alongside CursorPyright
"python.languageServer": "None",
"python.analysis.diagnosticMode": "workspace",
"workbench.colorTheme": "Atom One Dark",
"cursorpyright.analysis.enable": false,
"cursorpyright.analysis.extraPaths": [
"${workspaceFolder}/..",
"${workspaceFolder}/lib"
],
// Enable quick fixes for unused imports
"python.analysis.autoImportCompletions": true,
"python.analysis.fixAll": ["source.unusedImports"],
"python.analysis.typeCheckingMode": "basic",
// Enable code actions for CursorPyright
"cursorpyright.analysis.autoImportCompletions": true,
"cursorpyright.analysis.typeCheckingMode": "off",
"cursorpyright.reportUnusedImport": "warning",
"cursorpyright.reportUnusedVariable": "warning",
"cursorpyright.analysis.diagnosticMode": "workspace",
// Force enable code actions
"editor.lightBulb.enabled": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit",
"source.fixAll": "explicit",
"source.unusedImports": "explicit"
"python.envFile": "${workspaceFolder}/.env",
"python.testing.debugPort": 3000,
"python.linting.enabled": true,
"python.linting.pylintEnabled": false,
"python.linting.mypyEnabled": true,
"files.associations": {
"*.py": "python"
},
// Enable Python-specific code actions
"python.analysis.completeFunctionParens": true,
"python.analysis.addImport.exactMatchOnly": false,
"workbench.tree.indent": 24,
}
"python.testing.promptToConfigure": false,
"workbench.colorTheme": "Dracula Theme Soft"
}

View File

@ -1,181 +0,0 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "-------- Z-Score (OLS) --------",
},
{
"name": "CRYPTO z-score",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "research/pt_backtest.py",
"args": [
"--config=${workspaceFolder}/configuration/zscore.cfg",
"--instruments=ADA-USDT:CRYPTO:BNBSPOT,SOL-USDT:CRYPTO:BNBSPOT",
"--date_pattern=20250605",
"--result_db=${workspaceFolder}/research/results/crypto/%T.z-score.ADA-SOL.20250602.crypto_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "EQUITY z-score",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "research/pt_backtest.py",
"args": [
"--config=${workspaceFolder}/configuration/zscore.cfg",
"--instruments=COIN:EQUITY:ALPACA,MSTR:EQUITY:ALPACA",
"--date_pattern=2025060*",
"--result_db=${workspaceFolder}/research/results/equity/%T.z-score.COIN-MSTR.20250602.equity_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "EQUITY-CRYPTO z-score",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "research/pt_backtest.py",
"args": [
"--config=${workspaceFolder}/configuration/zscore.cfg",
"--instruments=COIN:EQUITY:ALPACA,BTC-USDT:CRYPTO:BNBSPOT",
"--date_pattern=2025060*",
"--result_db=${workspaceFolder}/research/results/intermarket/%T.z-score.COIN-BTC.20250601.equity_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "-------- VECM --------",
},
{
"name": "CRYPTO vecm",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "research/pt_backtest.py",
"args": [
"--config=${workspaceFolder}/configuration/vecm.cfg",
"--instruments=ADA-USDT:CRYPTO:BNBSPOT,SOL-USDT:CRYPTO:BNBSPOT",
"--date_pattern=2025060*",
"--result_db=${workspaceFolder}/research/results/crypto/%T.vecm.ADA-SOL.20250602.crypto_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "EQUITY vecm",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "research/pt_backtest.py",
"args": [
"--config=${workspaceFolder}/configuration/vecm.cfg",
"--instruments=COIN:EQUITY:ALPACA,MSTR:EQUITY:ALPACA",
"--date_pattern=2025060*",
"--result_db=${workspaceFolder}/research/results/equity/%T.vecm.COIN-MSTR.20250602.equity_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "EQUITY-CRYPTO vecm",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "research/pt_backtest.py",
"args": [
"--config=${workspaceFolder}/configuration/vecm.cfg",
"--instruments=COIN:EQUITY:ALPACA,BTC-USDT:CRYPTO:BNBSPOT",
"--date_pattern=2025060*",
"--result_db=${workspaceFolder}/research/results/intermarket/%T.vecm.COIN-BTC.20250601.equity_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "-------- New ZSCORE --------",
},
{
"name": "New CRYPTO z-score",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "${workspaceFolder}/research/backtest_new.py",
"args": [
"--config=${workspaceFolder}/configuration/new_zscore.cfg",
"--instruments=ADA-USDT:CRYPTO:BNBSPOT,SOL-USDT:CRYPTO:BNBSPOT",
"--date_pattern=2025060*",
"--result_db=${workspaceFolder}/research/results/crypto/%T.new_zscore.ADA-SOL.2025060-.crypto_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "New CRYPTO vecm",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "${workspaceFolder}/research/backtest_new.py",
"args": [
"--config=${workspaceFolder}/configuration/new_vecm.cfg",
"--instruments=ADA-USDT:CRYPTO:BNBSPOT,SOL-USDT:CRYPTO:BNBSPOT",
"--date_pattern=20250605",
"--result_db=${workspaceFolder}/research/results/crypto/%T.vecm.ADA-SOL.20250605.crypto_results.db",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
},
{
"name": "-------- Viz Test --------",
},
{
"name": "Viz Test",
"type": "debugpy",
"request": "launch",
"python": "/home/oleg/.pyenv/python3.12-venv/bin/python",
"program": "${workspaceFolder}/research/viz_test.py",
"args": [
"--config=${workspaceFolder}/configuration/new_zscore.cfg",
"--instruments=ADA-USDT:CRYPTO:BNBSPOT,SOL-USDT:CRYPTO:BNBSPOT",
"--date_pattern=20250605",
],
"env": {
"PYTHONPATH": "${workspaceFolder}/lib"
},
"console": "integratedTerminal"
}
]
}

View File

@ -1,44 +0,0 @@
{
"market_data_loading": {
"CRYPTO": {
"data_directory": "./data/crypto",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "PAIR-",
},
"EQUITY": {
"data_directory": "./data/equity",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "STOCK-",
}
},
# ====== Funding ======
"funding_per_pair": 2000.0,
# ====== Trading Parameters ======
"stat_model_price": "close", # "vwap"
"execution_price": {
"column": "vwap",
"shift": 1,
},
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 1.0,
"training_minutes": 120, # TODO Remove this
"training_size": 120,
"fit_method_class": "pt_trading.vecm_rolling_fit.VECMRollingFit",
# ====== Stop Conditions ======
"stop_close_conditions": {
"profit": 2.0,
"loss": -0.5
}
# ====== End of Session Closeout ======
"close_outstanding_positions": true,
# "close_outstanding_positions": false,
"trading_hours": {
"timezone": "America/New_York",
"begin_session": "7:30:00",
"end_session": "18:30:00",
}
}

View File

@ -1,43 +0,0 @@
{
"market_data_loading": {
"CRYPTO": {
"data_directory": "./data/crypto",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "PAIR-",
},
"EQUITY": {
"data_directory": "./data/equity",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "STOCK-",
}
},
# ====== Funding ======
"funding_per_pair": 2000.0,
# ====== Trading Parameters ======
"stat_model_price": "close",
# "execution_price": {
# "column": "vwap",
# "shift": 1,
# },
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 0.5,
"training_minutes": 120, # TODO Remove this
"training_size": 120,
"fit_method_class": "pt_trading.z-score_rolling_fit.ZScoreRollingFit",
# ====== Stop Conditions ======
"stop_close_conditions": {
"profit": 2.0,
"loss": -0.5
}
# ====== End of Session Closeout ======
"close_outstanding_positions": true,
# "close_outstanding_positions": false,
"trading_hours": {
"timezone": "America/New_York",
"begin_session": "7:30:00",
"end_session": "18:30:00",
}
}

View File

@ -1,43 +0,0 @@
{
"market_data_loading": {
"CRYPTO": {
"data_directory": "./data/crypto",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "PAIR-",
},
"EQUITY": {
"data_directory": "./data/equity",
"db_table_name": "md_1min_bars",
"instrument_id_pfx": "STOCK-",
}
},
# ====== Funding ======
"funding_per_pair": 2000.0,
# ====== Trading Parameters ======
"stat_model_price": "close",
"execution_price": {
"column": "vwap",
"shift": 1,
},
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 0.5,
"training_minutes": 120, # TODO Remove this
"training_size": 120,
"fit_method_class": "pt_trading.z-score_rolling_fit.ZScoreRollingFit",
# ====== Stop Conditions ======
"stop_close_conditions": {
"profit": 2.0,
"loss": -0.5
}
# ====== End of Session Closeout ======
"close_outstanding_positions": true,
# "close_outstanding_positions": false,
"trading_hours": {
"timezone": "America/New_York",
"begin_session": "9:30:00",
"end_session": "18:30:00",
}
}

View File

@ -1,304 +0,0 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, Optional, cast
import pandas as pd # type: ignore[import]
from pt_trading.fit_method import PairsTradingFitMethod
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import PairState, TradingPair
NanoPerMin = 1e9
class ExpandingWindowFit(PairsTradingFitMethod):
"""
N O T E:
=========
- This class remains to be abstract
- The following methods are to be implemented in the subclass:
- create_trading_pair()
=========
"""
def __init__(self) -> None:
super().__init__()
def run_pair(
self, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]:
print(f"***{pair}*** STARTING....")
config = pair.config_
start_idx = pair.get_begin_index()
end_index = pair.get_end_index()
pair.user_data_["state"] = PairState.INITIAL
# Initialize trades DataFrame with proper dtypes to avoid concatenation warnings
pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS).astype(
{
"time": "datetime64[ns]",
"symbol": "string",
"side": "string",
"action": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object",
}
)
training_minutes = config["training_minutes"]
while training_minutes + 1 < end_index:
pair.get_datasets(
training_minutes=training_minutes,
training_start_index=start_idx,
testing_size=1,
)
# ================================ PREDICTION ================================
try:
self.pair_predict_result_ = pair.predict()
except Exception as e:
raise RuntimeError(
f"{pair}: TrainingPrediction failed: {str(e)}"
) from e
training_minutes += 1
self._create_trading_signals(pair, config, bt_result)
print(f"***{pair}*** FINISHED *** Num Trades:{len(pair.user_data_['trades'])}")
return pair.get_trades()
def _create_trading_signals(
self, pair: TradingPair, config: Dict, bt_result: BacktestResult
) -> None:
predicted_df = self.pair_predict_result_
assert predicted_df is not None
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_close_trshld"]
for curr_predicted_row_idx in range(len(predicted_df)):
pred_row = predicted_df.iloc[curr_predicted_row_idx]
scaled_disequilibrium = pred_row["scaled_disequilibrium"]
if pair.user_data_["state"] in [
PairState.INITIAL,
PairState.CLOSE,
PairState.CLOSE_POSITION,
PairState.CLOSE_STOP_LOSS,
PairState.CLOSE_STOP_PROFIT,
]:
if scaled_disequilibrium >= open_threshold:
open_trades = self._get_open_trades(
pair, row=pred_row, open_threshold=open_threshold
)
if open_trades is not None:
open_trades["status"] = PairState.OPEN.name
print(f"OPEN TRADES:\n{open_trades}")
pair.add_trades(open_trades)
pair.user_data_["state"] = PairState.OPEN
pair.on_open_trades(open_trades)
elif pair.user_data_["state"] == PairState.OPEN:
if scaled_disequilibrium <= close_threshold:
close_trades = self._get_close_trades(
pair, row=pred_row, close_threshold=close_threshold
)
if close_trades is not None:
close_trades["status"] = PairState.CLOSE.name
print(f"CLOSE TRADES:\n{close_trades}")
pair.add_trades(close_trades)
pair.user_data_["state"] = PairState.CLOSE
pair.on_close_trades(close_trades)
elif pair.to_stop_close_conditions(predicted_row=pred_row):
close_trades = self._get_close_trades(
pair, row=pred_row, close_threshold=close_threshold
)
if close_trades is not None:
close_trades["status"] = pair.user_data_[
"stop_close_state"
].name
print(f"STOP CLOSE TRADES:\n{close_trades}")
pair.add_trades(close_trades)
pair.user_data_["state"] = pair.user_data_["stop_close_state"]
pair.on_close_trades(close_trades)
# Outstanding positions
if pair.user_data_["state"] == PairState.OPEN:
print(f"{pair}: *** Position is NOT CLOSED. ***")
# outstanding positions
if config["close_outstanding_positions"]:
close_position_row = pd.Series(pair.market_data_.iloc[-2])
close_position_row["disequilibrium"] = 0.0
close_position_row["scaled_disequilibrium"] = 0.0
close_position_row["signed_scaled_disequilibrium"] = 0.0
close_position_trades = self._get_close_trades(
pair=pair, row=close_position_row, close_threshold=close_threshold
)
if close_position_trades is not None:
close_position_trades["status"] = PairState.CLOSE_POSITION.name
print(f"CLOSE_POSITION TRADES:\n{close_position_trades}")
pair.add_trades(close_position_trades)
pair.user_data_["state"] = PairState.CLOSE_POSITION
pair.on_close_trades(close_position_trades)
else:
if predicted_df is not None:
bt_result.handle_outstanding_position(
pair=pair,
pair_result_df=predicted_df,
last_row_index=0,
open_side_a=pair.user_data_["open_side_a"],
open_side_b=pair.user_data_["open_side_b"],
open_px_a=pair.user_data_["open_px_a"],
open_px_b=pair.user_data_["open_px_b"],
open_tstamp=pair.user_data_["open_tstamp"],
)
def _get_open_trades(
self, pair: TradingPair, row: pd.Series, open_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.exec_prices_colnames()
open_row = row
open_tstamp = open_row["tstamp"]
open_disequilibrium = open_row["disequilibrium"]
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
signed_scaled_disequilibrium = open_row["signed_scaled_disequilibrium"]
open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"]
# creating the trades
print(f"OPEN_TRADES: {row["tstamp"]} {open_scaled_disequilibrium=}")
if open_disequilibrium > 0:
open_side_a = "SELL"
open_side_b = "BUY"
close_side_a = "BUY"
close_side_b = "SELL"
else:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
# save closing sides
pair.user_data_["open_side_a"] = open_side_a
pair.user_data_["open_side_b"] = open_side_b
pair.user_data_["open_px_a"] = open_px_a
pair.user_data_["open_px_b"] = open_px_b
pair.user_data_["open_tstamp"] = open_tstamp
pair.user_data_["close_side_a"] = close_side_a
pair.user_data_["close_side_b"] = close_side_b
# create opening trades
trd_signal_tuples = [
(
open_tstamp,
pair.symbol_a_,
open_side_a,
"OPEN",
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
(
open_tstamp,
pair.symbol_b_,
open_side_b,
"OPEN",
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
]
# Create DataFrame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype(
{
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"signed_scaled_disequilibrium": "float64",
"pair": "object",
}
)
def _get_close_trades(
self, pair: TradingPair, row: pd.Series, close_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.exec_prices_colnames()
close_row = row
close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"]
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
signed_scaled_disequilibrium = close_row["signed_scaled_disequilibrium"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
close_side_a = pair.user_data_["close_side_a"]
close_side_b = pair.user_data_["close_side_b"]
trd_signal_tuples = [
(
close_tstamp,
pair.symbol_a_,
close_side_a,
"CLOSE",
close_px_a,
close_disequilibrium,
close_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
(
close_tstamp,
pair.symbol_b_,
close_side_b,
"CLOSE",
close_px_b,
close_disequilibrium,
close_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype(
{
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"signed_scaled_disequilibrium": "float64",
"pair": "object",
}
)
def reset(self) -> None:
pass

View File

@ -1,52 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Optional, cast
import pandas as pd
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import TradingPair
NanoPerMin = 1e9
class PairsTradingFitMethod(ABC):
TRADES_COLUMNS = [
"time",
"symbol",
"side",
"action",
"price",
"disequilibrium",
"scaled_disequilibrium",
"signed_scaled_disequilibrium",
"pair",
]
@staticmethod
def create(config: Dict) -> PairsTradingFitMethod:
import importlib
fit_method_class_name = config.get("fit_method_class", None)
assert fit_method_class_name is not None
module_name, class_name = fit_method_class_name.rsplit(".", 1)
module = importlib.import_module(module_name)
fit_method = getattr(module, class_name)()
return cast(PairsTradingFitMethod, fit_method)
@abstractmethod
def run_pair(
self, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]: ...
@abstractmethod
def reset(self) -> None: ...
@abstractmethod
def create_trading_pair(
self,
config: Dict,
market_data: pd.DataFrame,
symbol_a: str,
symbol_b: str,
) -> TradingPair: ...

View File

@ -1,751 +0,0 @@
import os
import sqlite3
from datetime import date, datetime
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from pt_trading.trading_pair import TradingPair
# Recommended replacement adapters and converters for Python 3.12+
# From: https://docs.python.org/3/library/sqlite3.html#sqlite3-adapter-converter-recipes
def adapt_date_iso(val: date) -> str:
"""Adapt datetime.date to ISO 8601 date."""
return val.isoformat()
def adapt_datetime_iso(val: datetime) -> str:
"""Adapt datetime.datetime to timezone-naive ISO 8601 date."""
return val.isoformat()
def convert_date(val: bytes) -> date:
"""Convert ISO 8601 date to datetime.date object."""
return datetime.fromisoformat(val.decode()).date()
def convert_datetime(val: bytes) -> datetime:
"""Convert ISO 8601 datetime to datetime.datetime object."""
return datetime.fromisoformat(val.decode())
# Register the adapters and converters
sqlite3.register_adapter(date, adapt_date_iso)
sqlite3.register_adapter(datetime, adapt_datetime_iso)
sqlite3.register_converter("date", convert_date)
sqlite3.register_converter("datetime", convert_datetime)
def create_result_database(db_path: str) -> None:
"""
Create the SQLite database and required tables if they don't exist.
"""
try:
# Create directory if it doesn't exist
db_dir = os.path.dirname(db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
print(f"Created directory: {db_dir}")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create the pt_bt_results table for completed trades
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS pt_bt_results (
date DATE,
pair TEXT,
symbol TEXT,
open_time DATETIME,
open_side TEXT,
open_price REAL,
open_quantity INTEGER,
open_disequilibrium REAL,
close_time DATETIME,
close_side TEXT,
close_price REAL,
close_quantity INTEGER,
close_disequilibrium REAL,
symbol_return REAL,
pair_return REAL,
close_condition TEXT
)
"""
)
cursor.execute("DELETE FROM pt_bt_results;")
# Create the outstanding_positions table for open positions
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS outstanding_positions (
date DATE,
pair TEXT,
symbol TEXT,
position_quantity REAL,
last_price REAL,
unrealized_return REAL,
open_price REAL,
open_side TEXT
)
"""
)
cursor.execute("DELETE FROM outstanding_positions;")
# Create the config table for storing configuration JSON for reference
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS config (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_timestamp DATETIME,
config_file_path TEXT,
config_json TEXT,
fit_method_class TEXT,
datafiles TEXT,
instruments TEXT
)
"""
)
cursor.execute("DELETE FROM config;")
conn.commit()
conn.close()
except Exception as e:
print(f"Error creating result database: {str(e)}")
raise
def store_config_in_database(
db_path: str,
config_file_path: str,
config: Dict,
fit_method_class: str,
datafiles: List[Tuple[str, str]],
instruments: List[Dict[str, str]],
) -> None:
"""
Store configuration information in the database for reference.
"""
import json
if db_path.upper() == "NONE":
return
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Convert config to JSON string
config_json = json.dumps(config, indent=2, default=str)
# Convert lists to comma-separated strings for storage
datafiles_str = ", ".join([f"{datafile}" for _, datafile in datafiles])
instruments_str = ", ".join(
[
f"{inst['symbol']}:{inst['instrument_type']}:{inst['exchange_id']}"
for inst in instruments
]
)
# Insert configuration record
cursor.execute(
"""
INSERT INTO config (
run_timestamp, config_file_path, config_json, fit_method_class, datafiles, instruments
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
datetime.now(),
config_file_path,
config_json,
fit_method_class,
datafiles_str,
instruments_str,
),
)
conn.commit()
conn.close()
print(f"Configuration stored in database")
except Exception as e:
print(f"Error storing configuration in database: {str(e)}")
import traceback
traceback.print_exc()
def convert_timestamp(timestamp: Any) -> Optional[datetime]:
"""Convert pandas Timestamp to Python datetime object for SQLite compatibility."""
if timestamp is None:
return None
if isinstance(timestamp, pd.Timestamp):
return timestamp.to_pydatetime()
elif isinstance(timestamp, datetime):
return timestamp
elif isinstance(timestamp, date):
return datetime.combine(timestamp, datetime.min.time())
elif isinstance(timestamp, str):
return datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
elif isinstance(timestamp, int):
return datetime.fromtimestamp(timestamp)
else:
raise ValueError(f"Unsupported timestamp type: {type(timestamp)}")
class PairResarchResult:
pair_: TradingPair
trades_: Dict[str, Dict[str, Any]]
outstanding_positions_: List[Dict[str, Any]]
def __init__(self, config: Dict[str, Any], pair: TradingPair, trades: Dict[str, Dict[str, Any]], outstanding_positions: List[Dict[str, Any]]):
self.config = config
self.pair_ = pair
self.trades_ = trades
self.outstanding_positions_ = outstanding_positions
class BacktestResult:
"""
Class to handle backtest results, trades tracking, PnL calculations, and reporting.
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.trades: Dict[str, Dict[str, Any]] = {}
self.total_realized_pnl = 0.0
self.outstanding_positions: List[Dict[str, Any]] = []
self.symbol_roundtrip_trades_: Dict[str, List[Dict[str, Any]]] = {}
def add_trade(
self,
pair_nm: str,
symbol: str,
side: str,
action: str,
price: Any,
disequilibrium: Optional[float] = None,
scaled_disequilibrium: Optional[float] = None,
timestamp: Optional[datetime] = None,
status: Optional[str] = None,
) -> None:
"""Add a trade to the results tracking."""
pair_nm = str(pair_nm)
if pair_nm not in self.trades:
self.trades[pair_nm] = {symbol: []}
if symbol not in self.trades[pair_nm]:
self.trades[pair_nm][symbol] = []
self.trades[pair_nm][symbol].append(
{
"symbol": symbol,
"side": side,
"action": action,
"price": price,
"disequilibrium": disequilibrium,
"scaled_disequilibrium": scaled_disequilibrium,
"timestamp": timestamp,
"status": status,
}
)
def add_outstanding_position(self, position: Dict[str, Any]) -> None:
"""Add an outstanding position to tracking."""
self.outstanding_positions.append(position)
def add_realized_pnl(self, realized_pnl: float) -> None:
"""Add realized PnL to the total."""
self.total_realized_pnl += realized_pnl
def get_total_realized_pnl(self) -> float:
"""Get total realized PnL."""
return self.total_realized_pnl
def get_outstanding_positions(self) -> List[Dict[str, Any]]:
"""Get all outstanding positions."""
return self.outstanding_positions
def get_trades(self) -> Dict[str, Dict[str, Any]]:
"""Get all trades."""
return self.trades
def clear_trades(self) -> None:
"""Clear all trades (used when processing new files)."""
self.trades.clear()
def collect_single_day_results(self, pairs_trades: List[pd.DataFrame]) -> None:
"""Collect and process single day trading results."""
result = pd.concat(pairs_trades, ignore_index=True)
result["time"] = pd.to_datetime(result["time"])
result = result.set_index("time").sort_index()
print("\n -------------- Suggested Trades ")
print(result)
for row in result.itertuples():
side = row.side
action = row.action
symbol = row.symbol
price = row.price
disequilibrium = getattr(row, "disequilibrium", None)
scaled_disequilibrium = getattr(row, "scaled_disequilibrium", None)
if hasattr(row, "time"):
timestamp = getattr(row, "time")
else:
timestamp = convert_timestamp(row.Index)
status = row.status
self.add_trade(
pair_nm=str(row.pair),
symbol=str(symbol),
side=str(side),
action=str(action),
price=float(str(price)),
disequilibrium=disequilibrium,
scaled_disequilibrium=scaled_disequilibrium,
timestamp=timestamp,
status=str(status) if status is not None else "?",
)
def print_single_day_results(self) -> None:
"""Print single day results summary."""
for pair, symbols in self.trades.items():
print(f"\n--- {pair} ---")
for symbol, trades in symbols.items():
for trade_data in trades:
if len(trade_data) >= 2:
side, price = trade_data[:2]
print(f"{symbol} {side} at ${price}")
def print_results_summary(self, all_results: Dict[str, Dict[str, Any]]) -> None:
"""Print summary of all processed files."""
print("\n====== Summary of All Processed Files ======")
for filename, data in all_results.items():
trade_count = sum(
len(trades)
for symbol_trades in data["trades"].values()
for trades in symbol_trades.values()
)
print(f"{filename}: {trade_count} trades")
def calculate_returns(self, all_results: Dict[str, Dict[str, Any]]) -> None:
"""Calculate and print returns by day and pair."""
def _symbol_return(trade1_side: str, trade1_px: float, trade2_side: str, trade2_px: float) -> float:
if trade1_side == "BUY" and trade2_side == "SELL":
return (trade2_px - trade1_px) / trade1_px * 100
elif trade1_side == "SELL" and trade2_side == "BUY":
return (trade1_px - trade2_px) / trade1_px * 100
else:
return 0
print("\n====== Returns By Day and Pair ======")
trades = []
for filename, data in all_results.items():
pairs = list(data["trades"].keys())
for pair in pairs:
self.symbol_roundtrip_trades_[pair] = []
trades_dict = data["trades"][pair]
for symbol in trades_dict.keys():
trades.extend(trades_dict[symbol])
trades = sorted(trades, key=lambda x: (x["timestamp"], x["symbol"]))
print(f"\n--- {filename} ---")
self.outstanding_positions = data["outstanding_positions"]
day_return = 0.0
for idx in range(0, len(trades), 4):
symbol_a = trades[idx]["symbol"]
trade_a_1 = trades[idx]
trade_a_2 = trades[idx + 2]
symbol_b = trades[idx + 1]["symbol"]
trade_b_1 = trades[idx + 1]
trade_b_2 = trades[idx + 3]
symbol_return = 0
assert (
trade_a_1["timestamp"] < trade_a_2["timestamp"]
), f"Trade 1: {trade_a_1['timestamp']} is not less than Trade 2: {trade_a_2['timestamp']}"
assert (
trade_a_1["action"] == "OPEN" and trade_a_2["action"] == "CLOSE"
), f"Trade 1: {trade_a_1['action']} and Trade 2: {trade_a_2['action']} are the same"
# Calculate return based on action combination
trade_return = 0
symbol_a_return = _symbol_return(trade_a_1["side"], trade_a_1["price"], trade_a_2["side"], trade_a_2["price"])
symbol_b_return = _symbol_return(trade_b_1["side"], trade_b_1["price"], trade_b_2["side"], trade_b_2["price"])
pair_return = symbol_a_return + symbol_b_return
self.symbol_roundtrip_trades_[pair].append(
{
"symbol": symbol_a,
"open_side": trade_a_1["side"],
"open_action": trade_a_1["action"],
"open_price": trade_a_1["price"],
"close_side": trade_a_2["side"],
"close_action": trade_a_2["action"],
"close_price": trade_a_2["price"],
"symbol_return": symbol_a_return,
"open_disequilibrium": trade_a_1["disequilibrium"],
"open_scaled_disequilibrium": trade_a_1["scaled_disequilibrium"],
"close_disequilibrium": trade_a_2["disequilibrium"],
"close_scaled_disequilibrium": trade_a_2["scaled_disequilibrium"],
"open_time": trade_a_1["timestamp"],
"close_time": trade_a_2["timestamp"],
"shares": self.config["funding_per_pair"] / 2 / trade_a_1["price"],
"is_completed": True,
"close_condition": trade_a_2["status"],
"pair_return": pair_return
}
)
self.symbol_roundtrip_trades_[pair].append(
{
"symbol": symbol_b,
"open_side": trade_b_1["side"],
"open_action": trade_b_1["action"],
"open_price": trade_b_1["price"],
"close_side": trade_b_2["side"],
"close_action": trade_b_2["action"],
"close_price": trade_b_2["price"],
"symbol_return": symbol_b_return,
"open_disequilibrium": trade_b_1["disequilibrium"],
"open_scaled_disequilibrium": trade_b_1["scaled_disequilibrium"],
"close_disequilibrium": trade_b_2["disequilibrium"],
"close_scaled_disequilibrium": trade_b_2["scaled_disequilibrium"],
"open_time": trade_b_1["timestamp"],
"close_time": trade_b_2["timestamp"],
"shares": self.config["funding_per_pair"] / 2 / trade_b_1["price"],
"is_completed": True,
"close_condition": trade_b_2["status"],
"pair_return": pair_return
}
)
# Print pair returns with disequilibrium information
day_return = 0.0
if pair in self.symbol_roundtrip_trades_:
print(f"{pair}:")
pair_return = 0.0
for trd in self.symbol_roundtrip_trades_[pair]:
disequil_info = ""
if (
trd["open_scaled_disequilibrium"] is not None
and trd["open_scaled_disequilibrium"] is not None
):
disequil_info = f" | Open Dis-eq: {trd['open_scaled_disequilibrium']:.2f},"
f" Close Dis-eq: {trd['open_scaled_disequilibrium']:.2f}"
print(
f" {trd['open_time'].time()}-{trd['close_time'].time()} {trd['symbol']}: "
f" {trd['open_side']} @ ${trd['open_price']:.2f},"
f" {trd["close_side"]} @ ${trd["close_price"]:.2f},"
f" Return: {trd['symbol_return']:.2f}%{disequil_info}"
)
pair_return += trd["symbol_return"]
print(f" Pair Total Return: {pair_return:.2f}%")
day_return += pair_return
# Print day total return and add to global realized PnL
if day_return != 0:
print(f" Day Total Return: {day_return:.2f}%")
self.add_realized_pnl(day_return)
def print_outstanding_positions(self) -> None:
"""Print all outstanding positions with share quantities and current values."""
if not self.get_outstanding_positions():
print("\n====== NO OUTSTANDING POSITIONS ======")
return
print(f"\n====== OUTSTANDING POSITIONS ======")
print(
f"{'Pair':<15}"
f" {'Symbol':<10}"
f" {'Side':<4}"
f" {'Shares':<10}"
f" {'Open $':<8}"
f" {'Current $':<10}"
f" {'Value $':<12}"
f" {'Disequilibrium':<15}"
)
print("-" * 100)
total_value = 0.0
for pos in self.get_outstanding_positions():
# Print position A
print(
f"{pos['pair']:<15}"
f" {pos['symbol_a']:<10}"
f" {pos['side_a']:<4}"
f" {pos['shares_a']:<10.2f}"
f" {pos['open_px_a']:<8.2f}"
f" {pos['current_px_a']:<10.2f}"
f" {pos['current_value_a']:<12.2f}"
f" {'':<15}"
)
# Print position B
print(
f"{'':<15}"
f" {pos['symbol_b']:<10}"
f" {pos['side_b']:<4}"
f" {pos['shares_b']:<10.2f}"
f" {pos['open_px_b']:<8.2f}"
f" {pos['current_px_b']:<10.2f}"
f" {pos['current_value_b']:<12.2f}"
)
# Print pair totals with disequilibrium info
print(
f"{'':<15}"
f" {'PAIR TOTAL':<10}"
f" {'':<4}"
f" {'':<10}"
f" {'':<8}"
f" {'':<10}"
f" {pos['total_current_value']:<12.2f}"
)
# Print disequilibrium details
print(
f"{'':<15}"
f" {'DISEQUIL':<10}"
f" {'':<4}"
f" {'':<10}"
f" {'':<8}"
f" {'':<10}"
f" Raw: {pos['current_disequilibrium']:<6.4f}"
f" Scaled: {pos['current_scaled_disequilibrium']:<6.4f}"
)
print("-" * 100)
total_value += pos["total_current_value"]
print(f"{'TOTAL OUTSTANDING VALUE':<80} ${total_value:<12.2f}")
def print_grand_totals(self) -> None:
"""Print grand totals across all pairs."""
print(f"\n====== GRAND TOTALS ACROSS ALL PAIRS ======")
print(f"Total Realized PnL: {self.get_total_realized_pnl():.2f}%")
def handle_outstanding_position(
self,
pair: TradingPair,
pair_result_df: pd.DataFrame,
last_row_index: int,
open_side_a: str,
open_side_b: str,
open_px_a: float,
open_px_b: float,
open_tstamp: datetime,
) -> Tuple[float, float, float]:
"""
Handle calculation and tracking of outstanding positions when no close signal is found.
Args:
pair: TradingPair object
pair_result_df: DataFrame with pair results
last_row_index: Index of the last row in the data
open_side_a, open_side_b: Trading sides for symbols A and B
open_px_a, open_px_b: Opening prices for symbols A and B
open_tstamp: Opening timestamp
"""
if pair_result_df is None or pair_result_df.empty:
return 0, 0, 0
last_row = pair_result_df.loc[last_row_index]
last_tstamp = last_row["tstamp"]
colname_a, colname_b = pair.exec_prices_colnames()
last_px_a = last_row[colname_a]
last_px_b = last_row[colname_b]
# Calculate share quantities based on funding per pair
# Split funding equally between the two positions
funding_per_position = self.config["funding_per_pair"] / 2
shares_a = funding_per_position / open_px_a
shares_b = funding_per_position / open_px_b
# Calculate current position values (shares * current price)
current_value_a = shares_a * last_px_a * (-1 if open_side_a == "SELL" else 1)
current_value_b = shares_b * last_px_b * (-1 if open_side_b == "SELL" else 1)
total_current_value = current_value_a + current_value_b
# Get disequilibrium information
current_disequilibrium = last_row["disequilibrium"]
current_scaled_disequilibrium = last_row["scaled_disequilibrium"]
# Store outstanding positions
self.add_outstanding_position(
{
"pair": str(pair),
"symbol_a": pair.symbol_a_,
"symbol_b": pair.symbol_b_,
"side_a": open_side_a,
"side_b": open_side_b,
"shares_a": shares_a,
"shares_b": shares_b,
"open_px_a": open_px_a,
"open_px_b": open_px_b,
"current_px_a": last_px_a,
"current_px_b": last_px_b,
"current_value_a": current_value_a,
"current_value_b": current_value_b,
"total_current_value": total_current_value,
"open_time": open_tstamp,
"last_time": last_tstamp,
"current_abs_term": current_scaled_disequilibrium,
"current_disequilibrium": current_disequilibrium,
"current_scaled_disequilibrium": current_scaled_disequilibrium,
}
)
# Print position details
print(f"{pair}: NO CLOSE SIGNAL FOUND - Position held until end of session")
print(f" Open: {open_tstamp} | Last: {last_tstamp}")
print(
f" {pair.symbol_a_}: {open_side_a} {shares_a:.2f} shares @ ${open_px_a:.2f} -> ${last_px_a:.2f} | Value: ${current_value_a:.2f}"
)
print(
f" {pair.symbol_b_}: {open_side_b} {shares_b:.2f} shares @ ${open_px_b:.2f} -> ${last_px_b:.2f} | Value: ${current_value_b:.2f}"
)
print(f" Total Value: ${total_current_value:.2f}")
print(
f" Disequilibrium: {current_disequilibrium:.4f} | Scaled: {current_scaled_disequilibrium:.4f}"
)
return current_value_a, current_value_b, total_current_value
def store_results_in_database(
self, db_path: str, day: str
) -> None:
"""
Store backtest results in the SQLite database.
"""
if db_path.upper() == "NONE":
return
try:
# Extract date from datafile name (assuming format like 20250528.mktdata.ohlcv.db)
date_str = day
# Convert to proper date format
try:
date_obj = datetime.strptime(date_str, "%Y%m%d").date()
except ValueError:
# If date parsing fails, use current date
date_obj = datetime.now().date()
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Process each trade from bt_result
trades = self.get_trades()
for pair_name, _ in trades.items():
# Second pass: insert completed trade records into database
for trade_pair in sorted(self.symbol_roundtrip_trades_[pair_name], key=lambda x: x["open_time"]):
# Only store completed trades in pt_bt_results table
cursor.execute(
"""
INSERT INTO pt_bt_results (
date, pair, symbol, open_time, open_side, open_price,
open_quantity, open_disequilibrium, close_time, close_side,
close_price, close_quantity, close_disequilibrium,
symbol_return, pair_return, close_condition
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
date_obj,
pair_name,
trade_pair["symbol"],
trade_pair["open_time"],
trade_pair["open_side"],
trade_pair["open_price"],
trade_pair["shares"],
trade_pair["open_scaled_disequilibrium"],
trade_pair["close_time"],
trade_pair["close_side"],
trade_pair["close_price"],
trade_pair["shares"],
trade_pair["close_scaled_disequilibrium"],
trade_pair["symbol_return"],
trade_pair["pair_return"],
trade_pair["close_condition"]
),
)
# Store outstanding positions in separate table
outstanding_positions = self.get_outstanding_positions()
for pos in outstanding_positions:
# Calculate position quantity (negative for SELL positions)
position_qty_a = (
pos["shares_a"] if pos["side_a"] == "BUY" else -pos["shares_a"]
)
position_qty_b = (
pos["shares_b"] if pos["side_b"] == "BUY" else -pos["shares_b"]
)
# Calculate unrealized returns
# For symbol A: (current_price - open_price) / open_price * 100 * position_direction
unrealized_return_a = (
(pos["current_px_a"] - pos["open_px_a"]) / pos["open_px_a"] * 100
) * (1 if pos["side_a"] == "BUY" else -1)
unrealized_return_b = (
(pos["current_px_b"] - pos["open_px_b"]) / pos["open_px_b"] * 100
) * (1 if pos["side_b"] == "BUY" else -1)
# Store outstanding position for symbol A
cursor.execute(
"""
INSERT INTO outstanding_positions (
date, pair, symbol, position_quantity, last_price, unrealized_return, open_price, open_side
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
date_obj,
pos["pair"],
pos["symbol_a"],
position_qty_a,
pos["current_px_a"],
unrealized_return_a,
pos["open_px_a"],
pos["side_a"],
),
)
# Store outstanding position for symbol B
cursor.execute(
"""
INSERT INTO outstanding_positions (
date, pair, symbol, position_quantity, last_price, unrealized_return, open_price, open_side
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
date_obj,
pos["pair"],
pos["symbol_b"],
position_qty_b,
pos["current_px_b"],
unrealized_return_b,
pos["open_px_b"],
pos["side_b"],
),
)
conn.commit()
conn.close()
except Exception as e:
print(f"Error storing results in database: {str(e)}")
import traceback
traceback.print_exc()

View File

@ -1,319 +0,0 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, Optional, cast
import pandas as pd # type: ignore[import]
from pt_trading.fit_method import PairsTradingFitMethod
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import PairState, TradingPair
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults
NanoPerMin = 1e9
class RollingFit(PairsTradingFitMethod):
"""
N O T E:
=========
- This class remains to be abstract
- The following methods are to be implemented in the subclass:
- create_trading_pair()
=========
"""
def __init__(self) -> None:
super().__init__()
def run_pair(
self, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]:
print(f"***{pair}*** STARTING....")
config = pair.config_
curr_training_start_idx = pair.get_begin_index()
end_index = pair.get_end_index()
pair.user_data_["state"] = PairState.INITIAL
# Initialize trades DataFrame with proper dtypes to avoid concatenation warnings
pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS).astype(
{
"time": "datetime64[ns]",
"symbol": "string",
"side": "string",
"action": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object",
}
)
training_minutes = config["training_minutes"]
curr_predicted_row_idx = 0
while True:
print(curr_training_start_idx, end="\r")
pair.get_datasets(
training_minutes=training_minutes,
training_start_index=curr_training_start_idx,
testing_size=1,
)
if len(pair.training_df_) < training_minutes:
print(
f"{pair}: current offset={curr_training_start_idx}"
f" * Training data length={len(pair.training_df_)} < {training_minutes}"
" * Not enough training data. Completing the job."
)
break
try:
# ================================ PREDICTION ================================
self.pair_predict_result_ = pair.predict()
except Exception as e:
raise RuntimeError(
f"{pair}: TrainingPrediction failed: {str(e)}"
) from e
# break
curr_training_start_idx += 1
if curr_training_start_idx > end_index:
break
curr_predicted_row_idx += 1
self._create_trading_signals(pair, config, bt_result)
print(f"***{pair}*** FINISHED *** Num Trades:{len(pair.user_data_['trades'])}")
return pair.get_trades()
def _create_trading_signals(
self, pair: TradingPair, config: Dict, bt_result: BacktestResult
) -> None:
predicted_df = self.pair_predict_result_
assert predicted_df is not None
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_close_trshld"]
for curr_predicted_row_idx in range(len(predicted_df)):
pred_row = predicted_df.iloc[curr_predicted_row_idx]
scaled_disequilibrium = pred_row["scaled_disequilibrium"]
if pair.user_data_["state"] in [
PairState.INITIAL,
PairState.CLOSE,
PairState.CLOSE_POSITION,
PairState.CLOSE_STOP_LOSS,
PairState.CLOSE_STOP_PROFIT,
]:
if scaled_disequilibrium >= open_threshold:
open_trades = self._get_open_trades(
pair, row=pred_row, open_threshold=open_threshold
)
if open_trades is not None:
open_trades["status"] = PairState.OPEN.name
print(f"OPEN TRADES:\n{open_trades}")
pair.add_trades(open_trades)
pair.user_data_["state"] = PairState.OPEN
pair.on_open_trades(open_trades)
elif pair.user_data_["state"] == PairState.OPEN:
if scaled_disequilibrium <= close_threshold:
close_trades = self._get_close_trades(
pair, row=pred_row, close_threshold=close_threshold
)
if close_trades is not None:
close_trades["status"] = PairState.CLOSE.name
print(f"CLOSE TRADES:\n{close_trades}")
pair.add_trades(close_trades)
pair.user_data_["state"] = PairState.CLOSE
pair.on_close_trades(close_trades)
elif pair.to_stop_close_conditions(predicted_row=pred_row):
close_trades = self._get_close_trades(
pair, row=pred_row, close_threshold=close_threshold
)
if close_trades is not None:
close_trades["status"] = pair.user_data_[
"stop_close_state"
].name
print(f"STOP CLOSE TRADES:\n{close_trades}")
pair.add_trades(close_trades)
pair.user_data_["state"] = pair.user_data_["stop_close_state"]
pair.on_close_trades(close_trades)
# Outstanding positions
if pair.user_data_["state"] == PairState.OPEN:
print(f"{pair}: *** Position is NOT CLOSED. ***")
# outstanding positions
if config["close_outstanding_positions"]:
close_position_row = pd.Series(pair.market_data_.iloc[-2])
close_position_row["disequilibrium"] = 0.0
close_position_row["scaled_disequilibrium"] = 0.0
close_position_row["signed_scaled_disequilibrium"] = 0.0
close_position_trades = self._get_close_trades(
pair=pair, row=close_position_row, close_threshold=close_threshold
)
if close_position_trades is not None:
close_position_trades["status"] = PairState.CLOSE_POSITION.name
print(f"CLOSE_POSITION TRADES:\n{close_position_trades}")
pair.add_trades(close_position_trades)
pair.user_data_["state"] = PairState.CLOSE_POSITION
pair.on_close_trades(close_position_trades)
else:
if predicted_df is not None:
bt_result.handle_outstanding_position(
pair=pair,
pair_result_df=predicted_df,
last_row_index=0,
open_side_a=pair.user_data_["open_side_a"],
open_side_b=pair.user_data_["open_side_b"],
open_px_a=pair.user_data_["open_px_a"],
open_px_b=pair.user_data_["open_px_b"],
open_tstamp=pair.user_data_["open_tstamp"],
)
def _get_open_trades(
self, pair: TradingPair, row: pd.Series, open_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.exec_prices_colnames()
open_row = row
open_tstamp = open_row["tstamp"]
open_disequilibrium = open_row["disequilibrium"]
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
signed_scaled_disequilibrium = open_row["signed_scaled_disequilibrium"]
open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"]
# creating the trades
print(f"OPEN_TRADES: {row["tstamp"]} {open_scaled_disequilibrium=}")
if open_disequilibrium > 0:
open_side_a = "SELL"
open_side_b = "BUY"
close_side_a = "BUY"
close_side_b = "SELL"
else:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
# save closing sides
pair.user_data_["open_side_a"] = open_side_a
pair.user_data_["open_side_b"] = open_side_b
pair.user_data_["open_px_a"] = open_px_a
pair.user_data_["open_px_b"] = open_px_b
pair.user_data_["open_tstamp"] = open_tstamp
pair.user_data_["close_side_a"] = close_side_a
pair.user_data_["close_side_b"] = close_side_b
# create opening trades
trd_signal_tuples = [
(
open_tstamp,
pair.symbol_a_,
open_side_a,
"OPEN",
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
(
open_tstamp,
pair.symbol_b_,
open_side_b,
"OPEN",
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
]
# Create DataFrame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype(
{
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"signed_scaled_disequilibrium": "float64",
"pair": "object",
}
)
def _get_close_trades(
self, pair: TradingPair, row: pd.Series, close_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.exec_prices_colnames()
close_row = row
close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"]
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
signed_scaled_disequilibrium = close_row["signed_scaled_disequilibrium"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
close_side_a = pair.user_data_["close_side_a"]
close_side_b = pair.user_data_["close_side_b"]
trd_signal_tuples = [
(
close_tstamp,
pair.symbol_a_,
close_side_a,
"CLOSE",
close_px_a,
close_disequilibrium,
close_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
(
close_tstamp,
pair.symbol_b_,
close_side_b,
"CLOSE",
close_px_b,
close_disequilibrium,
close_scaled_disequilibrium,
signed_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype(
{
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"signed_scaled_disequilibrium": "float64",
"pair": "object",
}
)
def reset(self) -> None:
pass

View File

@ -1,380 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional
import pandas as pd # type:ignore
class PairState(Enum):
INITIAL = 1
OPEN = 2
CLOSE = 3
CLOSE_POSITION = 4
CLOSE_STOP_LOSS = 5
CLOSE_STOP_PROFIT = 6
class CointegrationData:
EG_PVALUE_THRESHOLD = 0.05
tstamp_: pd.Timestamp
pair_: str
eg_pvalue_: float
johansen_lr1_: float
johansen_cvt_: float
eg_is_cointegrated_: bool
johansen_is_cointegrated_: bool
def __init__(self, pair: TradingPair):
training_df = pair.training_df_
assert training_df is not None
from statsmodels.tsa.vector_ar.vecm import coint_johansen
df = training_df[pair.colnames()].reset_index(drop=True)
# Run Johansen cointegration test
result = coint_johansen(df, det_order=0, k_ar_diff=1)
self.johansen_lr1_ = result.lr1[0]
self.johansen_cvt_ = result.cvt[0, 1]
self.johansen_is_cointegrated_ = self.johansen_lr1_ > self.johansen_cvt_
# Run Engle-Granger cointegration test
from statsmodels.tsa.stattools import coint # type: ignore
col1, col2 = pair.colnames()
assert training_df is not None
series1 = training_df[col1].reset_index(drop=True)
series2 = training_df[col2].reset_index(drop=True)
self.eg_pvalue_ = float(coint(series1, series2)[1])
self.eg_is_cointegrated_ = bool(self.eg_pvalue_ < self.EG_PVALUE_THRESHOLD)
self.tstamp_ = training_df.index[-1]
self.pair_ = pair.name()
def to_dict(self) -> Dict[str, Any]:
return {
"tstamp": self.tstamp_,
"pair": self.pair_,
"eg_pvalue": self.eg_pvalue_,
"johansen_lr1": self.johansen_lr1_,
"johansen_cvt": self.johansen_cvt_,
"eg_is_cointegrated": self.eg_is_cointegrated_,
"johansen_is_cointegrated": self.johansen_is_cointegrated_,
}
def __repr__(self) -> str:
return f"CointegrationData(tstamp={self.tstamp_}, pair={self.pair_}, eg_pvalue={self.eg_pvalue_}, johansen_lr1={self.johansen_lr1_}, johansen_cvt={self.johansen_cvt_}, eg_is_cointegrated={self.eg_is_cointegrated_}, johansen_is_cointegrated={self.johansen_is_cointegrated_})"
class TradingPair(ABC):
market_data_: pd.DataFrame
symbol_a_: str
symbol_b_: str
stat_model_price_: str
training_mu_: float
training_std_: float
training_df_: pd.DataFrame
testing_df_: pd.DataFrame
user_data_: Dict[str, Any]
# predicted_df_: Optional[pd.DataFrame]
def __init__(
self,
config: Dict[str, Any],
market_data: pd.DataFrame,
symbol_a: str,
symbol_b: str,
):
self.symbol_a_ = symbol_a
self.symbol_b_ = symbol_b
self.stat_model_price_ = config["stat_model_price"]
self.user_data_ = {}
self.predicted_df_ = None
self.config_ = config
self._set_market_data(market_data)
def _set_market_data(self, market_data: pd.DataFrame) -> None:
self.market_data_ = pd.DataFrame(
self._transform_dataframe(market_data)[["tstamp"] + self.colnames()]
)
self.market_data_ = self.market_data_.dropna().reset_index(drop=True)
self.market_data_["tstamp"] = pd.to_datetime(self.market_data_["tstamp"])
self.market_data_ = self.market_data_.sort_values("tstamp")
self._set_execution_price_data()
pass
def _set_execution_price_data(self) -> None:
if "execution_price" not in self.config_:
self.market_data_[f"exec_price_{self.symbol_a_}"] = self.market_data_[f"{self.stat_model_price_}_{self.symbol_a_}"]
self.market_data_[f"exec_price_{self.symbol_b_}"] = self.market_data_[f"{self.stat_model_price_}_{self.symbol_b_}"]
return
execution_price_column = self.config_["execution_price"]["column"]
execution_price_shift = self.config_["execution_price"]["shift"]
self.market_data_[f"exec_price_{self.symbol_a_}"] = self.market_data_[f"{execution_price_column}_{self.symbol_a_}"].shift(-execution_price_shift)
self.market_data_[f"exec_price_{self.symbol_b_}"] = self.market_data_[f"{execution_price_column}_{self.symbol_b_}"].shift(-execution_price_shift)
self.market_data_ = self.market_data_.dropna().reset_index(drop=True)
def get_begin_index(self) -> int:
if "trading_hours" not in self.config_:
return 0
assert "timezone" in self.config_["trading_hours"]
assert "begin_session" in self.config_["trading_hours"]
start_time = (
pd.to_datetime(self.config_["trading_hours"]["begin_session"])
.tz_localize(self.config_["trading_hours"]["timezone"])
.time()
)
mask = self.market_data_["tstamp"].dt.time >= start_time
return int(self.market_data_.index[mask].min())
def get_end_index(self) -> int:
if "trading_hours" not in self.config_:
return 0
assert "timezone" in self.config_["trading_hours"]
assert "end_session" in self.config_["trading_hours"]
end_time = (
pd.to_datetime(self.config_["trading_hours"]["end_session"])
.tz_localize(self.config_["trading_hours"]["timezone"])
.time()
)
mask = self.market_data_["tstamp"].dt.time <= end_time
return int(self.market_data_.index[mask].max())
def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
# Select only the columns we need
df_selected: pd.DataFrame = pd.DataFrame(
df[["tstamp", "symbol", self.stat_model_price_]]
)
# Start with unique timestamps
result_df: pd.DataFrame = (
pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True)
)
# For each unique symbol, add a corresponding close price column
symbols = df_selected["symbol"].unique()
for symbol in symbols:
# Filter rows for this symbol
df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(
drop=True
)
# Create column name like "close-COIN"
new_price_column = f"{self.stat_model_price_}_{symbol}"
# Create temporary dataframe with timestamp and price
temp_df = pd.DataFrame(
{
"tstamp": df_symbol["tstamp"],
new_price_column: df_symbol[self.stat_model_price_],
}
)
# Join with our result dataframe
result_df = pd.merge(result_df, temp_df, on="tstamp", how="left")
result_df = result_df.reset_index(
drop=True
) # do not dropna() since irrelevant symbol would affect dataset
return result_df.dropna()
def get_datasets(
self,
training_minutes: int,
training_start_index: int = 0,
testing_size: Optional[int] = None,
) -> None:
testing_start_index = training_start_index + training_minutes
self.training_df_ = self.market_data_.iloc[
training_start_index:testing_start_index, :training_minutes
].copy()
assert self.training_df_ is not None
self.training_df_ = self.training_df_.dropna().reset_index(drop=True)
testing_start_index = training_start_index + training_minutes
if testing_size is None:
self.testing_df_ = self.market_data_.iloc[testing_start_index:, :].copy()
else:
self.testing_df_ = self.market_data_.iloc[
testing_start_index : testing_start_index + testing_size, :
].copy()
assert self.testing_df_ is not None
self.testing_df_ = self.testing_df_.dropna().reset_index(drop=True)
def colnames(self) -> List[str]:
return [
f"{self.stat_model_price_}_{self.symbol_a_}",
f"{self.stat_model_price_}_{self.symbol_b_}",
]
def exec_prices_colnames(self) -> List[str]:
return [
f"exec_price_{self.symbol_a_}",
f"exec_price_{self.symbol_b_}",
]
def add_trades(self, trades: pd.DataFrame) -> None:
if self.user_data_["trades"] is None or len(self.user_data_["trades"]) == 0:
# If trades is empty or None, just assign the new trades directly
self.user_data_["trades"] = trades.copy()
else:
# Ensure both DataFrames have the same columns and dtypes before concatenation
existing_trades = self.user_data_["trades"]
# If existing trades is empty, just assign the new trades
if len(existing_trades) == 0:
self.user_data_["trades"] = trades.copy()
else:
# Ensure both DataFrames have the same columns
if set(existing_trades.columns) != set(trades.columns):
# Add missing columns to trades with appropriate default values
for col in existing_trades.columns:
if col not in trades.columns:
if col == "time":
trades[col] = pd.Timestamp.now()
elif col in ["action", "symbol"]:
trades[col] = ""
elif col in [
"price",
"disequilibrium",
"scaled_disequilibrium",
]:
trades[col] = 0.0
elif col == "pair":
trades[col] = None
else:
trades[col] = None
# Concatenate with explicit dtypes to avoid warnings
self.user_data_["trades"] = pd.concat(
[existing_trades, trades], ignore_index=True, copy=False
)
def get_trades(self) -> pd.DataFrame:
return (
self.user_data_["trades"] if "trades" in self.user_data_ else pd.DataFrame()
)
def cointegration_check(self) -> Optional[pd.DataFrame]:
print(f"***{self}*** STARTING....")
config = self.config_
curr_training_start_idx = 0
COINTEGRATION_DATA_COLUMNS = {
"tstamp": "datetime64[ns]",
"pair": "string",
"eg_pvalue": "float64",
"johansen_lr1": "float64",
"johansen_cvt": "float64",
"eg_is_cointegrated": "bool",
"johansen_is_cointegrated": "bool",
}
# Initialize trades DataFrame with proper dtypes to avoid concatenation warnings
result: pd.DataFrame = pd.DataFrame(
columns=[col for col in COINTEGRATION_DATA_COLUMNS.keys()]
) # .astype(COINTEGRATION_DATA_COLUMNS)
training_minutes = config["training_minutes"]
while True:
print(curr_training_start_idx, end="\r")
self.get_datasets(
training_minutes=training_minutes,
training_start_index=curr_training_start_idx,
testing_size=1,
)
if len(self.training_df_) < training_minutes:
print(
f"{self}: current offset={curr_training_start_idx}"
f" * Training data length={len(self.training_df_)} < {training_minutes}"
" * Not enough training data. Completing the job."
)
break
new_row = pd.Series(CointegrationData(self).to_dict())
result.loc[len(result)] = new_row
curr_training_start_idx += 1
return result
def to_stop_close_conditions(self, predicted_row: pd.Series) -> bool:
config = self.config_
if (
"stop_close_conditions" not in config
or config["stop_close_conditions"] is None
):
return False
if "profit" in config["stop_close_conditions"]:
current_return = self._current_return(predicted_row)
#
# print(f"time={predicted_row['tstamp']} current_return={current_return}")
#
if current_return >= config["stop_close_conditions"]["profit"]:
print(f"STOP PROFIT: {current_return}")
self.user_data_["stop_close_state"] = PairState.CLOSE_STOP_PROFIT
return True
if "loss" in config["stop_close_conditions"]:
if current_return <= config["stop_close_conditions"]["loss"]:
print(f"STOP LOSS: {current_return}")
self.user_data_["stop_close_state"] = PairState.CLOSE_STOP_LOSS
return True
return False
def on_open_trades(self, trades: pd.DataFrame) -> None:
if "close_trades" in self.user_data_:
del self.user_data_["close_trades"]
self.user_data_["open_trades"] = trades
def on_close_trades(self, trades: pd.DataFrame) -> None:
del self.user_data_["open_trades"]
self.user_data_["close_trades"] = trades
def _current_return(self, predicted_row: pd.Series) -> float:
if "open_trades" in self.user_data_:
open_trades = self.user_data_["open_trades"]
if len(open_trades) == 0:
return 0.0
def _single_instrument_return(symbol: str) -> float:
instrument_open_trades = open_trades[open_trades["symbol"] == symbol]
instrument_open_price = instrument_open_trades["price"].iloc[0]
sign = -1 if instrument_open_trades["side"].iloc[0] == "SELL" else 1
instrument_price = predicted_row[f"{self.stat_model_price_}_{symbol}"]
instrument_return = (
sign
* (instrument_price - instrument_open_price)
/ instrument_open_price
)
return float(instrument_return) * 100.0
instrument_a_return = _single_instrument_return(self.symbol_a_)
instrument_b_return = _single_instrument_return(self.symbol_b_)
return instrument_a_return + instrument_b_return
return 0.0
def __repr__(self) -> str:
return self.name()
def name(self) -> str:
return f"{self.symbol_a_} & {self.symbol_b_}"
# return f"{self.symbol_a_} & {self.symbol_b_}"
@abstractmethod
def predict(self) -> pd.DataFrame: ...
# @abstractmethod
# def predicted_df(self) -> Optional[pd.DataFrame]: ...

View File

@ -1,122 +0,0 @@
from typing import Any, Dict, Optional, cast
import pandas as pd
from pt_trading.results import BacktestResult
from pt_trading.rolling_window_fit import RollingFit
from pt_trading.trading_pair import TradingPair
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults
NanoPerMin = 1e9
class VECMTradingPair(TradingPair):
vecm_fit_: Optional[VECMResults]
pair_predict_result_: Optional[pd.DataFrame]
def __init__(
self,
config: Dict[str, Any],
market_data: pd.DataFrame,
symbol_a: str,
symbol_b: str,
):
super().__init__(config, market_data, symbol_a, symbol_b)
self.vecm_fit_ = None
self.pair_predict_result_ = None
def _train_pair(self) -> None:
self._fit_VECM()
assert self.vecm_fit_ is not None
diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
# print(diseq_series.shape)
self.training_mu_ = float(diseq_series[0].mean())
self.training_std_ = float(diseq_series[0].std())
self.training_df_["dis-equilibrium"] = (
self.training_df_[self.colnames()] @ self.vecm_fit_.beta
)
# Normalize the dis-equilibrium
self.training_df_["scaled_dis-equilibrium"] = (
diseq_series - self.training_mu_
) / self.training_std_
def _fit_VECM(self) -> None:
assert self.training_df_ is not None
vecm_df = self.training_df_[self.colnames()].reset_index(drop=True)
vecm_model = VECM(vecm_df, coint_rank=1)
vecm_fit = vecm_model.fit()
assert vecm_fit is not None
# URGENT check beta and alpha
# Check if the model converged properly
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
print(f"{self}: VECM model failed to converge properly")
self.vecm_fit_ = vecm_fit
pass
def predict(self) -> pd.DataFrame:
self._train_pair()
assert self.testing_df_ is not None
assert self.vecm_fit_ is not None
predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_))
# Convert prediction to a DataFrame for readability
predicted_df = pd.DataFrame(
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
)
predicted_df = pd.merge(
self.testing_df_.reset_index(drop=True),
pd.DataFrame(
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
),
left_index=True,
right_index=True,
suffixes=("", "_pred"),
).dropna()
predicted_df["disequilibrium"] = (
predicted_df[self.colnames()] @ self.vecm_fit_.beta
)
predicted_df["signed_scaled_disequilibrium"] = (
predicted_df["disequilibrium"] - self.training_mu_
) / self.training_std_
predicted_df["scaled_disequilibrium"] = abs(
predicted_df["signed_scaled_disequilibrium"]
)
predicted_df = predicted_df.reset_index(drop=True)
if self.pair_predict_result_ is None:
self.pair_predict_result_ = predicted_df
else:
self.pair_predict_result_ = pd.concat(
[self.pair_predict_result_, predicted_df], ignore_index=True
)
# Reset index to ensure proper indexing
self.pair_predict_result_ = self.pair_predict_result_.reset_index(drop=True)
return self.pair_predict_result_
class VECMRollingFit(RollingFit):
def __init__(self) -> None:
super().__init__()
def create_trading_pair(
self,
config: Dict,
market_data: pd.DataFrame,
symbol_a: str,
symbol_b: str,
) -> TradingPair:
return VECMTradingPair(
config=config,
market_data=market_data,
symbol_a=symbol_a,
symbol_b=symbol_b,
)

View File

@ -1,85 +0,0 @@
from typing import Any, Dict, Optional, cast
import pandas as pd
from pt_trading.results import BacktestResult
from pt_trading.rolling_window_fit import RollingFit
from pt_trading.trading_pair import TradingPair
import statsmodels.api as sm
NanoPerMin = 1e9
class ZScoreTradingPair(TradingPair):
zscore_model_: Optional[sm.regression.linear_model.RegressionResultsWrapper]
pair_predict_result_: Optional[pd.DataFrame]
zscore_df_: Optional[pd.DataFrame]
def __init__(
self,
config: Dict[str, Any],
market_data: pd.DataFrame,
symbol_a: str,
symbol_b: str,
):
super().__init__(config, market_data, symbol_a, symbol_b)
self.zscore_model_ = None
self.pair_predict_result_ = None
self.zscore_df_ = None
def _fit_zscore(self) -> None:
assert self.training_df_ is not None
symbol_a_px_series = self.training_df_[self.colnames()].iloc[:, 0]
symbol_b_px_series = self.training_df_[self.colnames()].iloc[:, 1]
symbol_a_px_series, symbol_b_px_series = symbol_a_px_series.align(
symbol_b_px_series, axis=0
)
X = sm.add_constant(symbol_b_px_series)
self.zscore_model_ = sm.OLS(symbol_a_px_series, X).fit()
assert self.zscore_model_ is not None
hedge_ratio = self.zscore_model_.params.iloc[1]
# Calculate spread and Z-score
spread = symbol_a_px_series - hedge_ratio * symbol_b_px_series
self.zscore_df_ = (spread - spread.mean()) / spread.std()
def predict(self) -> pd.DataFrame:
self._fit_zscore()
assert self.zscore_df_ is not None
self.training_df_["dis-equilibrium"] = self.zscore_df_
self.training_df_["scaled_dis-equilibrium"] = abs(self.zscore_df_)
assert self.testing_df_ is not None
assert self.zscore_df_ is not None
predicted_df = self.testing_df_
predicted_df["disequilibrium"] = self.zscore_df_
predicted_df["signed_scaled_disequilibrium"] = self.zscore_df_
predicted_df["scaled_disequilibrium"] = abs(self.zscore_df_)
predicted_df = predicted_df.reset_index(drop=True)
if self.pair_predict_result_ is None:
self.pair_predict_result_ = predicted_df
else:
self.pair_predict_result_ = pd.concat(
[self.pair_predict_result_, predicted_df], ignore_index=True
)
# Reset index to ensure proper indexing
self.pair_predict_result_ = self.pair_predict_result_.reset_index(drop=True)
return self.pair_predict_result_.dropna()
class ZScoreRollingFit(RollingFit):
def __init__(self) -> None:
super().__init__()
def create_trading_pair(
self, config: Dict, market_data: pd.DataFrame, symbol_a: str, symbol_b: str
) -> TradingPair:
return ZScoreTradingPair(
config=config,
market_data=market_data,
symbol_a=symbol_a,
symbol_b=symbol_b,
)

View File

@ -1,126 +0,0 @@
import argparse
import glob
import importlib
import os
from datetime import date, datetime
from typing import Any, Dict, List, Optional
import pandas as pd
from tools.config import expand_filename, load_config
from tools.data_loader import get_available_instruments_from_db
from pt_trading.results import (
BacktestResult,
create_result_database,
store_config_in_database,
store_results_in_database,
)
from pt_trading.fit_method import PairsTradingFitMethod
from pt_trading.trading_pair import TradingPair
from research.research_tools import create_pairs, resolve_datafiles
def main() -> None:
parser = argparse.ArgumentParser(description="Run pairs trading backtest.")
parser.add_argument(
"--config", type=str, required=True, help="Path to the configuration file."
)
parser.add_argument(
"--datafile",
type=str,
required=False,
help="Market data file to process.",
)
parser.add_argument(
"--instruments",
type=str,
required=False,
help="Comma-separated list of instrument symbols (e.g., COIN,GBTC). If not provided, auto-detects from database.",
)
args = parser.parse_args()
config: Dict = load_config(args.config)
# Resolve data files (CLI takes priority over config)
datafile = resolve_datafiles(config, args.datafile)[0]
if not datafile:
print("No data files found to process.")
return
print(f"Found {datafile} data files to process:")
# # Create result database if needed
# if args.result_db.upper() != "NONE":
# args.result_db = expand_filename(args.result_db)
# create_result_database(args.result_db)
# # Initialize a dictionary to store all trade results
# all_results: Dict[str, Dict[str, Any]] = {}
# # Store configuration in database for reference
# if args.result_db.upper() != "NONE":
# # Get list of all instruments for storage
# all_instruments = []
# for datafile in datafiles:
# if args.instruments:
# file_instruments = [
# inst.strip() for inst in args.instruments.split(",")
# ]
# else:
# file_instruments = get_available_instruments_from_db(datafile, config)
# all_instruments.extend(file_instruments)
# # Remove duplicates while preserving order
# unique_instruments = list(dict.fromkeys(all_instruments))
# store_config_in_database(
# db_path=args.result_db,
# config_file_path=args.config,
# config=config,
# fit_method_class=fit_method_class_name,
# datafiles=datafiles,
# instruments=unique_instruments,
# )
# Process each data file
stat_model_price = config["stat_model_price"]
print(f"\n====== Processing {os.path.basename(datafile)} ======")
# Determine instruments to use
if args.instruments:
# Use CLI-specified instruments
instruments = [inst.strip() for inst in args.instruments.split(",")]
print(f"Using CLI-specified instruments: {instruments}")
else:
# Auto-detect instruments from database
instruments = get_available_instruments_from_db(datafile, config)
print(f"Auto-detected instruments: {instruments}")
if not instruments:
print(f"No instruments found in {datafile}...")
return
# Process data for this file
try:
cointegration_data: pd.DataFrame = pd.DataFrame()
for pair in create_pairs(datafile, stat_model_price, config, instruments):
cointegration_data = pd.concat([cointegration_data, pair.cointegration_check()])
pd.set_option('display.width', 400)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_columns', None)
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
print(f"cointegration_data:\n{cointegration_data}")
except Exception as err:
print(f"Error processing {datafile}: {str(err)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

File diff suppressed because one or more lines are too long

View File

@ -1,232 +0,0 @@
import argparse
import glob
import importlib
import os
from datetime import date, datetime
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from research.research_tools import create_pairs
from tools.config import expand_filename, load_config
from pt_trading.results import (
BacktestResult,
create_result_database,
store_config_in_database,
)
from pt_trading.fit_method import PairsTradingFitMethod
from pt_trading.trading_pair import TradingPair
DayT = str
DataFileNameT = str
def resolve_datafiles(
config: Dict, date_pattern: str, instruments: List[Dict[str, str]]
) -> List[Tuple[DayT, DataFileNameT]]:
resolved_files: List[Tuple[DayT, DataFileNameT]] = []
for inst in instruments:
pattern = date_pattern
inst_type = inst["instrument_type"]
data_dir = config["market_data_loading"][inst_type]["data_directory"]
if "*" in pattern or "?" in pattern:
# Handle wildcards
if not os.path.isabs(pattern):
pattern = os.path.join(data_dir, f"{pattern}.mktdata.ohlcv.db")
matched_files = glob.glob(pattern)
for matched_file in matched_files:
import re
match = re.search(r"(\d{8})\.mktdata\.ohlcv\.db$", matched_file)
assert match is not None
day = match.group(1)
resolved_files.append((day, matched_file))
else:
# Handle explicit file path
if not os.path.isabs(pattern):
pattern = os.path.join(data_dir, f"{pattern}.mktdata.ohlcv.db")
resolved_files.append((date_pattern, pattern))
return sorted(list(set(resolved_files))) # Remove duplicates and sort
def get_instruments(args: argparse.Namespace, config: Dict) -> List[Dict[str, str]]:
instruments = [
{
"symbol": inst.split(":")[0],
"instrument_type": inst.split(":")[1],
"exchange_id": inst.split(":")[2],
"instrument_id_pfx": config["market_data_loading"][inst.split(":")[1]][
"instrument_id_pfx"
],
"db_table_name": config["market_data_loading"][inst.split(":")[1]][
"db_table_name"
],
}
for inst in args.instruments.split(",")
]
return instruments
def run_backtest(
config: Dict,
datafiles: List[str],
fit_method: PairsTradingFitMethod,
instruments: List[Dict[str, str]],
) -> BacktestResult:
"""
Run backtest for all pairs using the specified instruments.
"""
bt_result: BacktestResult = BacktestResult(config=config)
# if len(datafiles) < 2:
# print(f"WARNING: insufficient data files: {datafiles}")
# return bt_result
if not all([os.path.exists(datafile) for datafile in datafiles]):
print(f"WARNING: data file {datafiles} does not exist")
return bt_result
pairs_trades = []
pairs = create_pairs(
datafiles=datafiles,
fit_method=fit_method,
config=config,
instruments=instruments,
)
for pair in pairs:
single_pair_trades = fit_method.run_pair(pair=pair, bt_result=bt_result)
if single_pair_trades is not None and len(single_pair_trades) > 0:
pairs_trades.append(single_pair_trades)
print(f"pairs_trades:\n{pairs_trades}")
# Check if result_list has any data before concatenating
if len(pairs_trades) == 0:
print("No trading signals found for any pairs")
return bt_result
bt_result.collect_single_day_results(pairs_trades)
return bt_result
def main() -> None:
parser = argparse.ArgumentParser(description="Run pairs trading backtest.")
parser.add_argument(
"--config", type=str, required=True, help="Path to the configuration file."
)
parser.add_argument(
"--date_pattern",
type=str,
required=True,
help="Date YYYYMMDD, allows * and ? wildcards",
)
parser.add_argument(
"--instruments",
type=str,
required=True,
help="Comma-separated list of instrument symbols (e.g., COIN:EQUITY,GBTC:CRYPTO)",
)
parser.add_argument(
"--result_db",
type=str,
required=True,
help="Path to SQLite database for storing results. Use 'NONE' to disable database output.",
)
args = parser.parse_args()
config: Dict = load_config(args.config)
# Dynamically instantiate fit method class
fit_method = PairsTradingFitMethod.create(config)
# Resolve data files (CLI takes priority over config)
instruments = get_instruments(args, config)
datafiles = resolve_datafiles(config, args.date_pattern, instruments)
days = list(set([day for day, _ in datafiles]))
print(f"Found {len(datafiles)} data files to process:")
for df in datafiles:
print(f" - {df}")
# Create result database if needed
if args.result_db.upper() != "NONE":
args.result_db = expand_filename(args.result_db)
create_result_database(args.result_db)
# Initialize a dictionary to store all trade results
all_results: Dict[str, Dict[str, Any]] = {}
is_config_stored = False
# Process each data file
for day in sorted(days):
md_datafiles = [datafile for md_day, datafile in datafiles if md_day == day]
if not all([os.path.exists(datafile) for datafile in md_datafiles]):
print(f"WARNING: insufficient data files: {md_datafiles}")
continue
print(f"\n====== Processing {day} ======")
if not is_config_stored:
store_config_in_database(
db_path=args.result_db,
config_file_path=args.config,
config=config,
fit_method_class=config["fit_method_class"],
datafiles=datafiles,
instruments=instruments,
)
is_config_stored = True
# Process data for this file
try:
fit_method.reset()
bt_results = run_backtest(
config=config,
datafiles=md_datafiles,
fit_method=fit_method,
instruments=instruments,
)
if bt_results.trades is None or len(bt_results.trades) == 0:
print(f"No trades found for {day}")
continue
# Store results with day name as key
filename = os.path.basename(day)
all_results[filename] = {
"trades": bt_results.trades.copy(),
"outstanding_positions": bt_results.outstanding_positions.copy(),
}
# Store results in database
if args.result_db.upper() != "NONE":
bt_results.calculate_returns(
{
filename: {
"trades": bt_results.trades.copy(),
"outstanding_positions": bt_results.outstanding_positions.copy(),
}
}
)
bt_results.store_results_in_database(db_path=args.result_db, day=day)
print(f"Successfully processed {filename}")
except Exception as err:
print(f"Error processing {day}: {str(err)}")
import traceback
traceback.print_exc()
# Calculate and print results using a new BacktestResult instance for aggregation
if all_results:
aggregate_bt_results = BacktestResult(config=config)
aggregate_bt_results.calculate_returns(all_results)
aggregate_bt_results.print_grand_totals()
aggregate_bt_results.print_outstanding_positions()
if args.result_db.upper() != "NONE":
print(f"\nResults stored in database: {args.result_db}")
else:
print("No results to display.")
if __name__ == "__main__":
main()

View File

@ -1,101 +0,0 @@
import argparse
import asyncio
import glob
import importlib
import os
from datetime import date, datetime
from typing import Any, Dict, List, Optional
import hjson
import pandas as pd
from tools.data_loader import get_available_instruments_from_db, load_market_data
from pt_trading.results import (
BacktestResult,
create_result_database,
store_config_in_database,
store_results_in_database,
)
from pt_trading.fit_methods import PairsTradingFitMethod
from pt_trading.trading_pair import TradingPair
def run_strategy(
config: Dict,
datafile: str,
fit_method: PairsTradingFitMethod,
instruments: List[str],
) -> BacktestResult:
"""
Run backtest for all pairs using the specified instruments.
"""
bt_result: BacktestResult = BacktestResult(config=config)
def _create_pairs(config: Dict, instruments: List[str]) -> List[TradingPair]:
nonlocal datafile
all_indexes = range(len(instruments))
unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j]
pairs = []
# Update config to use the specified instruments
config_copy = config.copy()
config_copy["instruments"] = instruments
market_data_df = load_market_data(
datafile=datafile,
exchange_id=config_copy["exchange_id"],
instruments=config_copy["instruments"],
instrument_id_pfx=config_copy["instrument_id_pfx"],
db_table_name=config_copy["db_table_name"],
trading_hours=config_copy["trading_hours"],
)
for a_index, b_index in unique_index_pairs:
pair = fit_method.create_trading_pair(
market_data=market_data_df,
symbol_a=instruments[a_index],
symbol_b=instruments[b_index],
)
pairs.append(pair)
return pairs
pairs_trades = []
for pair in _create_pairs(config, instruments):
single_pair_trades = fit_method.run_pair(
pair=pair, config=config, bt_result=bt_result
)
if single_pair_trades is not None and len(single_pair_trades) > 0:
pairs_trades.append(single_pair_trades)
# Check if result_list has any data before concatenating
if len(pairs_trades) == 0:
print("No trading signals found for any pairs")
return bt_result
result = pd.concat(pairs_trades, ignore_index=True)
result["time"] = pd.to_datetime(result["time"])
result = result.set_index("time").sort_index()
bt_result.collect_single_day_results(result)
return bt_result
def main() -> None:
# Load config
# Subscribe to CVTT market data
# On snapshot (with historical data) - create trading strategy with market data dateframe
async def on_message(message_type: MessageTypeT, subscr_id: SubscriptionIdT, message: Dict, instrument_id: str) -> None:
print(f"{message_type=} {subscr_id=} {instrument_id}")
if message_type == "md_aggregate":
aggr = message.get("md_aggregate", [])
print(f"[{aggr['tstamp'][:19]}] *** RLTM *** {message}")
elif message_type == "historical_md_aggregate":
for aggr in message.get("historical_data", []):
print(f"[{aggr['tstamp'][:19]}] *** HIST *** {aggr}")
else:
print(f"Unknown message type: {message_type}")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -4,13 +4,13 @@ from functools import partial
from typing import Dict, List
from cvttpy_tools.settings.cvtt_types import JsonDictT
from cvttpy_tools.tools.app import App
from cvttpy_tools.tools.base import NamedObject
from cvttpy_tools.tools.config import CvttAppConfig
from cvttpy_tools.tools.logger import Log
from pt_strategy.live.live_strategy import PtLiveStrategy
from pt_strategy.live.pricer_md_client import PtMktDataClient
from pt_strategy.live.ti_sender import TradingInstructionsSender
from cvttpy_tools.app import App
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import CvttAppConfig
from cvttpy_tools.logger import Log
from pairs_trading.lib.pt_strategy.live.live_strategy import PtLiveStrategy
from pairs_trading.lib.pt_strategy.live.pricer_md_client import PtMktDataClient
from pairs_trading.lib.pt_strategy.live.ti_sender import TradingInstructionsSender
# import sys
# print("PYTHONPATH directories:")
@ -18,11 +18,6 @@ from pt_strategy.live.ti_sender import TradingInstructionsSender
# print(path)
# from cvtt_client.mkt_data import (CvttPricerWebSockClient,
# CvttPricesSubscription, MessageTypeT,
# SubscriptionIdT)
class PairTradingRunner(NamedObject):
config_: CvttAppConfig
instruments_: List[JsonDictT]
@ -69,11 +64,17 @@ class PairTradingRunner(NamedObject):
assert len(self.instruments_) == 2, "Only two instruments are supported"
Log.info(f"{self.fname()} Instruments: {self.instruments_}")
# ------- CREATE TI (trading instructions) CLIENT -------
# # ------- CREATE TI (trading instructions) CLIENT -------
# ti_config = self.config_.get_subconfig("ti_config", {})
# self.ti_sender_ = TradingInstructionsSender(config=ti_config)
# Log.info(f"{self.fname()} TI client created: {self.ti_sender_}")
# ------- CREATE CVTT CLIENT -------
ti_config = self.config_.get_subconfig("ti_config", {})
self.ti_sender_ = TradingInstructionsSender(config=ti_config)
Log.info(f"{self.fname()} TI client created: {self.ti_sender_}")
# ------- CREATE STRATEGY -------
strategy_config = self.config_.get_value("strategy_config", {})
self.live_strategy_ = PtLiveStrategy(
@ -83,21 +84,18 @@ class PairTradingRunner(NamedObject):
)
Log.info(f"{self.fname()} Strategy created: {self.live_strategy_}")
# ------- CREATE PRICER CLIENT -------
pricer_config = self.config_.get_subconfig("pricer_config", {})
self.pricer_client_ = PtMktDataClient(
live_strategy=self.live_strategy_,
pricer_config=pricer_config
)
Log.info(f"{self.fname()} CVTT Pricer client created: {self.pricer_client_}")
# # ------- CREATE PRICER CLIENT -------
# pricer_config = self.config_.get_subconfig("pricer_config", {})
# self.pricer_client_ = PtMktDataClient(
# live_strategy=self.live_strategy_,
# pricer_config=pricer_config
# )
# Log.info(f"{self.fname()} CVTT Pricer client created: {self.pricer_client_}")
async def run(self) -> None:
Log.info(f"{self.fname()} ...")
pass
if __name__ == "__main__":
App()
CvttAppConfig()

213
lib/client/cvtt_client.py Normal file
View File

@ -0,0 +1,213 @@
from __future__ import annotations
from typing import Dict, Any, List, Optional
import time
import requests
from cvttpy_tools.base import NamedObject
from cvttpy_tools.logger import Log
from cvttpy_tools.config import Config
from cvttpy_tools.timer import Timer
from cvttpy_trading.trading.mkt_data.historical_md import HistMdBar
class RESTSender(NamedObject):
session_: requests.Session
base_url_: str
def __init__(self, base_url: str) -> None:
self.base_url_ = base_url
self.session_ = requests.Session()
def is_ready(self) -> bool:
"""Checks if the server is up and responding"""
url = f"{self.base_url_}/ping"
try:
response = self.session_.get(url)
response.raise_for_status()
return True
except requests.exceptions.RequestException:
return False
def send_post(self, endpoint: str, post_body: Dict) -> requests.Response:
while not self.is_ready():
print("Waiting for FrontGateway to start...")
time.sleep(5)
url = f"{self.base_url_}/{endpoint}"
try:
return self.session_.request(
method="POST",
url=url,
json=post_body,
headers={"Content-Type": "application/json"},
)
except requests.exceptions.RequestException as excpt:
raise ConnectionError(
f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore
) from excpt
def send_get(self, endpoint: str) -> requests.Response:
while not self.is_ready():
print("Waiting for FrontGateway to start...")
time.sleep(5)
url = f"{self.base_url_}/{endpoint}"
try:
return self.session_.request(method="GET", url=url)
except requests.exceptions.RequestException as excpt:
raise ConnectionError(
f"Failed to send status={excpt.response.status_code} {excpt.response.text}" # type: ignore
) from excpt
class MdSummary(HistMdBar):
def __init__(
self,
ts_ns: int,
open: float,
high: float,
low: float,
close: float,
volume: float,
vwap: float,
num_trades: int,
):
super().__init__(ts=ts_ns)
self.open_ = open
self.high_ = high
self.low_ = low
self.close_ = close
self.volume_ = volume
self.vwap_ = vwap
self.num_trades_ = num_trades
@classmethod
def from_REST_response(cls, response: requests.Response) -> List[MdSummary]:
res: List[MdSummary] = []
jresp = response.json()
hist_data = jresp.get("historical_data", [])
for hd in hist_data:
res.append(
MdSummary(
ts_ns=hd["time_ns"],
open=hd["open"],
high=hd["high"],
low=hd["low"],
close=hd["close"],
volume=hd["volume"],
vwap=hd["vwap"],
num_trades=hd["num_trades"],
)
)
return res
class MdSummaryCollector(NamedObject):
sender_: RESTSender
exch_acct_: str
instrument_id_: str
interval_sec_: int
history_depth_sec_: int
history_: List[MdSummary]
timer_: Optional[Timer]
def __init__(
self,
sender: RESTSender,
exch_acct: str,
instrument_id: str,
interval_sec: int,
history_depth_sec: int,
) -> None:
self.sender_ = sender
self.exch_acct_ = exch_acct
self.instrument_id_ = instrument_id
self.interval_sec_ = interval_sec
self.history_depth_sec_ = history_depth_sec
self.history_depth_sec_ = []
self.timer_ = None
def rqst_data(self) -> Dict[str, Any]:
return {
"exch_acct": self.exch_acct_,
"instrument_id": self.instrument_id_,
"interval_sec": self.interval_sec_,
"history_depth_sec": self.history_depth_sec_,
}
def get_history(self) -> List[MdSummary]:
response: requests.Response = self.sender_.send_post(
endpoint="md_summary", post_body=self.rqst_data()
)
return MdSummary.from_REST_response(response=response)
def get_last(self) -> Optional[MdSummary]:
rqst_data = self.rqst_data()
rqst_data["history_depth_sec"] = self.interval_sec_
response: requests.Response = self.sender_.send_post(
endpoint="md_summary", post_body=rqst_data
)
res = MdSummary.from_REST_response(response=response)
return None if len(res) == 0 else res[-1]
async def start(self) -> None:
if self.timer_:
Log.error(f"{self.fname()}: Timer is already started")
return
self.history_ = self.get_history()
self.timer_ = Timer(
start_in_sec=self.interval_sec_,
is_periodic=True,
period_interval=self.interval_sec_,
func=self._load_new,
)
async def _load_new(self) -> None:
last: Optional[MdSummary] = self.get_last()
if not last:
# URGENT logging
return
if last.ts_ns_ <= self.history_[-1].ts_ns_:
# URGENT logging
return
self.history_.append(last)
# URGENT implement notification
def stop(self) -> None:
if self.timer_:
self.timer_.cancel()
self.timer_ = None
class CvttRESTClient(NamedObject):
config_: Config
sender_: RESTSender
def __init__(self, config: Config) -> None:
self.config_ = config
base_url = self.config_.get_value("cvtt_base_url", default="")
assert base_url
self.sender_ = RESTSender(base_url=base_url)
if __name__ == "__main__":
config = Config(json_src={"cvtt_base_url": "http://cvtt-tester-01.cvtt.vpn:23456"})
cvtt_client = CvttRESTClient(config)
mdsc = MdSummaryCollector(
sender=cvtt_client.sender_,
exch_acct="COINBASE_AT",
instrument_id="PAIR-BTC-USD",
interval_sec=60,
history_depth_sec=24 * 3600,
)
hist = mdsc.get_history()
last = mdsc.get_last()
pass

View File

@ -8,8 +8,8 @@ from functools import partial
from typing import Callable, Coroutine, Dict, Optional
import websockets
from cvttpy_tools.logger import Log
from cvttpy_tools.settings.cvtt_types import JsonDictT
from cvttpy_tools.tools.logger import Log
from websockets.asyncio.client import ClientConnection
MessageTypeT = str

View File

@ -2,11 +2,16 @@ from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from enum import Enum
import pandas as pd
# ---
from cvttpy_tools.base import NamedObject
from cvttpy_tools.logger import Log
from cvttpy_tools.settings.cvtt_types import JsonDictT
from cvttpy_tools.tools.base import NamedObject
from cvttpy_tools.tools.logger import Log
# ---
from cvttpy_trading.trading.instrument import ExchangeInstrument
# ---
from pt_strategy.live.ti_sender import TradingInstructionsSender
from pt_strategy.model_data_policy import ModelDataPolicy
from pt_strategy.pt_market_data import RealTimeMarketData

View File

@ -6,11 +6,11 @@ from typing import Dict, List
from cvtt_client.mkt_data import (CvttPricerWebSockClient,
CvttPricesSubscription, MessageTypeT,
SubscriptionIdT)
from cvttpy_tools.app import App
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import Config
from cvttpy_tools.logger import Log
from cvttpy_tools.settings.cvtt_types import JsonDictT
from cvttpy_tools.tools.app import App
from cvttpy_tools.tools.base import NamedObject
from cvttpy_tools.tools.config import Config
from cvttpy_tools.tools.logger import Log
from pt_strategy.live.live_strategy import PtLiveStrategy
from pt_strategy.trading_pair import TradingPair

View File

@ -3,13 +3,13 @@ from enum import Enum
from typing import Tuple
# import aiohttp
from cvttpy_tools.tools.app import App
from cvttpy_tools.tools.base import NamedObject
from cvttpy_tools.tools.config import Config
from cvttpy_tools.tools.logger import Log
from cvttpy_tools.tools.timer import Timer
from cvttpy_tools.tools.timeutils import NanoPerSec
from cvttpy_tools.tools.web.rest_client import REST_RequestProcessor
from cvttpy_tools.app import App
from cvttpy_tools.base import NamedObject
from cvttpy_tools.config import Config
from cvttpy_tools.logger import Log
from cvttpy_tools.timer import Timer
from cvttpy_tools.timeutils import NanoPerSec
from cvttpy_tools.web.rest_client import REST_RequestProcessor
class TradingInstructionsSender(NamedObject):