Compare commits

..

26 Commits

Author SHA1 Message Date
Cryptoval Trading Technologies
a04e8878fb lg_changes 2025-07-25 22:11:49 +00:00
Oleg Sheynin
facf7fb0c6 Using timezone for trading session 2025-07-16 18:17:34 +00:00
Oleg Sheynin
9c34d935bd added close position and trade session 2025-07-16 18:06:33 +00:00
Oleg Sheynin
20f150a6b7 progress 2025-07-16 03:21:14 +00:00
Oleg Sheynin
d46bcb64d6 progress 2025-07-16 03:06:16 +00:00
Oleg Sheynin
26659ede12 fix pair market data 2025-07-16 02:32:16 +00:00
Oleg Sheynin
e9995312a0 minor 2025-07-15 22:26:10 +00:00
Oleg Sheynin
a46c8a7576 minor 2025-07-15 20:36:15 +00:00
Oleg Sheynin
fe2ebbb27f fixed 2025-07-15 20:32:11 +00:00
Oleg Sheynin
ddd9f4adb9 progress 2025-07-15 19:29:26 +00:00
Oleg Sheynin
4bc947cf07 progress 2025-07-15 19:24:18 +00:00
Oleg Sheynin
51944b3a2f progress 2025-07-15 04:14:57 +00:00
Oleg Sheynin
bff1c54b48 fix 2025-07-15 03:57:18 +00:00
Oleg Sheynin
9c91f37bcc progress 2025-07-15 03:37:29 +00:00
Oleg Sheynin
76547e1176 progress 2025-07-15 02:52:10 +00:00
Oleg Sheynin
80cf1b60ef outstanding positions bug fix 2025-07-15 00:10:46 +00:00
Oleg Sheynin
94ffb32f50 progress 2025-07-14 22:42:08 +00:00
Oleg Sheynin
967c01c367 progress 2025-07-14 22:26:52 +00:00
Oleg Sheynin
747ca05b16 progress 2025-07-14 21:56:28 +00:00
Oleg Sheynin
30ae95a808 minor 2025-07-14 19:07:28 +00:00
Oleg Sheynin
bcba183768 cleaned up sliding notebook 2025-07-14 19:05:19 +00:00
Oleg Sheynin
cc0072dcc8 minor change 2025-07-14 05:19:14 +00:00
Oleg Sheynin
35a1cd748e notebook changes 2025-07-14 05:15:47 +00:00
Oleg Sheynin
3b003c7811 progress 2025-07-14 00:41:46 +00:00
Oleg Sheynin
b24285802a sliding fit fix 2025-07-13 22:33:48 +00:00
Oleg Sheynin
48f18f7b4f progress, sliding model - buggy 2025-07-12 03:17:12 +00:00
18 changed files with 12887 additions and 2810 deletions

View File

@ -7,16 +7,6 @@
"db_table_name": "md_1min_bars",
"exchange_id": "BNBSPOT",
"instrument_id_pfx": "PAIR-",
# "instruments": [
# "BTC-USDT",
# "BCH-USDT",
# "ETH-USDT",
# "LTC-USDT",
# "XRP-USDT",
# "ADA-USDT",
# "SOL-USDT",
# "DOT-USDT"
# ],
"trading_hours": {
"begin_session": "00:00:00",
"end_session": "23:59:00",
@ -29,5 +19,13 @@
"dis-equilibrium_close_trshld": 0.5,
"training_minutes": 120,
"funding_per_pair": 2000.0,
"fit_method_class": "pt_trading.fit_methods.StaticFit"
"fit_method_class": "pt_trading.sliding_fit.SlidingFit",
# "fit_method_class": "pt_trading.static_fit.StaticFit",
"close_outstanding_positions": true,
"trading_hours": {
"begin_session": "06:00:00",
"end_session": "16:00:00",
"timezone": "America/New_York"
}
}

View File

@ -2,7 +2,7 @@
"security_type": "EQUITY",
"data_directory": "./data/equity",
"datafiles": [
"202506*.mktdata.ohlcv.db",
"20250618.mktdata.ohlcv.db",
],
"db_table_name": "md_1min_bars",
"exchange_id": "ALPACA",
@ -19,7 +19,9 @@
"dis-equilibrium_close_trshld": 1.0,
"training_minutes": 120,
"funding_per_pair": 2000.0,
"fit_method_class": "pt_trading.fit_methods.SlidingFit",
"exclude_instruments": ["CAN"]
# "fit_method_class": "pt_trading.sliding_fit.SlidingFit",
"fit_method_class": "pt_trading.static_fit.StaticFit",
"exclude_instruments": ["CAN"],
"close_outstanding_positions": false
}

View File

@ -0,0 +1,26 @@
{
"security_type": "EQUITY",
"data_directory": "./data/equity",
"datafiles": [
"20250602.mktdata.ohlcv.db",
],
"db_table_name": "md_1min_bars",
"exchange_id": "ALPACA",
"instrument_id_pfx": "STOCK-",
"trading_hours": {
"begin_session": "9:30:00",
"end_session": "16:00:00",
"timezone": "America/New_York"
},
"price_column": "close",
"min_required_points": 30,
"zero_threshold": 1e-10,
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 1.0,
"training_minutes": 120,
"funding_per_pair": 2000.0,
"fit_method_class": "pt_trading.fit_methods.StaticFit",
"exclude_instruments": ["CAN"]
}
# "fit_method_class": "pt_trading.fit_methods.SlidingFit",
# "fit_method_class": "pt_trading.fit_methods.StaticFit",

View File

@ -0,0 +1,27 @@
{
"security_type": "EQUITY",
"data_directory": "./data/equity",
# "datafiles": [
# "20250604.mktdata.ohlcv.db",
# ],
"db_table_name": "md_1min_bars",
"exchange_id": "ALPACA",
"instrument_id_pfx": "STOCK-",
"trading_hours": {
"begin_session": "9:30:00",
"end_session": "16:00:00",
"timezone": "America/New_York"
},
"price_column": "close",
"min_required_points": 30,
"zero_threshold": 1e-10,
"dis-equilibrium_open_trshld": 2.0,
"dis-equilibrium_close_trshld": 1.0,
"training_minutes": 120,
"funding_per_pair": 2000.0,
"fit_method_class": "pt_trading.sliding_fit.SlidingFit",
# "fit_method_class": "pt_trading.static_fit.StaticFit",
"exclude_instruments": ["CAN"],
"close_outstanding_positions": false
}

115
lg_notes.md Normal file
View File

@ -0,0 +1,115 @@
07.11.2025
pairs_trading/configuration <---- directory for config
equity_lg.cfg <-------- copy of equity.cfg
How to run a Program: TRIANGLEsquare ----> triangle EQUITY backtest
Results are in > results (timestamp table for all runs)
table "...timestamp... .pt_backtest_results.equity.db"
going to table using sqlite
> sqlite3 '/home/coder/results/20250721_175750.pt_backtest_results.equity.db'
sqlite> .databases
main: /home/coder/results/20250717_180122.pt_backtest_results.equity.db r/w
sqlite> .tables
config outstanding_positions pt_bt_results
sqlite> PRAGMA table_info('pt_bt_results');
0|date|DATE|0||0
1|pair|TEXT|0||0
2|symbol|TEXT|0||0
3|open_time|DATETIME|0||0
4|open_side|TEXT|0||0
5|open_price|REAL|0||0
6|open_quantity|INTEGER|0||0
7|open_disequilibrium|REAL|0||0
8|close_time|DATETIME|0||0
9|close_side|TEXT|0||0
10|close_price|REAL|0||0
11|close_quantity|INTEGER|0||0
12|close_disequilibrium|REAL|0||0
13|symbol_return|REAL|0||0
14|pair_return|REAL|0||0
select count(*) as cnt from pt_bt_results;
8
select * from pt_bt_results;
select
date, close_time, pair, symbol, symbol_return, pair_return
from pt_bt_results ;
select date, sum(symbol_return) as daily_return
from pt_bt_results where date = '2025-06-18' group by date;
.quit
sqlite3 '/home/coder/results/20250717_172435.pt_backtest_results.equity.db'
sqlite> select date, sum(symbol_return) as daily_return
from pt_bt_results group by date;
2025-06-02|1.29845390060828
...
2025-06-18|-43.5084977104115 <========== ????? ==========>
2025-06-20|11.8605547517183
select
date, close_time, pair, symbol, symbol_return, pair_return
from pt_bt_results ;
select date, close_time, pair, symbol, symbol_return, pair_return
from pt_bt_results where date = '2025-06-18';
./scripts/load_equity_pair_intraday.sh -A NVDA -B QQQ -d 20250701 -T ./intraday_md
to inspect exactly what sources, formats, and processing steps you can open the script with:
head -n 50 ./scripts/load_equity_pair_intraday.sh
✓ Data file found: /home/coder/pairs_trading/data/crypto/20250605.mktdata.ohlcv.db
sqlite3 '/home/coder/results/20250722_201930.pt_backtest_results.crypto.db'
sqlite3 '/home/coder/results/xxxxxxxx_yyyyyy.pt_backtest_results.pseudo.db'
11111111
=== At your terminal, run these commands:
sqlite3 '/home/coder/results/20250722_201930.pt_backtest_results.crypto.db'
=== Then inside the SQLite prompt:
.mode csv
.headers on
.output results_20250722.csv
SELECT * FROM pt_bt_results;
.output stdout
.quit
cd /home/coder/
# === mode csv formats output as CSV
# === headers on includes column names
# === output my_table.csv directs output to that file
# === Run your SELECT query, then revert output
# === Open my_table.csv in Excel directly
# ======== Using scp (Secure Copy)
# === On your local machine, open a terminal and run:
scp cvtt@953f6e8df266:/home/coder/results_20250722.csv ~/Downloads/
# ===== convert cvs pandas dataframe ====== -->
import pandas as pd
# Replace with the actual path to your CSV file
file_path = '/home/coder/results_20250722.csv'
# Read the CSV file into a DataFrame
df = pd.read_csv(file_path)
# Show the first few rows
print(df.head())

View File

@ -0,0 +1,36 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Optional, cast
import pandas as pd # type: ignore[import]
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import TradingPair
NanoPerMin = 1e9
class PairsTradingFitMethod(ABC):
TRADES_COLUMNS = [
"time",
"action",
"symbol",
"price",
"disequilibrium",
"scaled_disequilibrium",
"pair",
]
@abstractmethod
def run_pair(
self, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]: ...
@abstractmethod
def reset(self) -> None: ...
class PairState(Enum):
INITIAL = 1
OPEN = 2
CLOSED = 3
CLOSED_POSITIONS = 4

View File

@ -1,419 +0,0 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Optional, cast
import pandas as pd # type: ignore[import]
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import TradingPair
NanoPerMin = 1e9
class PairsTradingFitMethod(ABC):
TRADES_COLUMNS = [
"time",
"action",
"symbol",
"price",
"disequilibrium",
"scaled_disequilibrium",
"pair",
]
@abstractmethod
def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]:
...
@abstractmethod
def reset(self):
...
class StaticFit(PairsTradingFitMethod):
def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]: # abstractmethod
pair.get_datasets(training_minutes=config["training_minutes"])
try:
is_cointegrated = pair.train_pair()
if not is_cointegrated:
print(f"{pair} IS NOT COINTEGRATED")
return None
except Exception as e:
print(f"{pair}: Training failed: {str(e)}")
return None
try:
pair.predict()
except Exception as e:
print(f"{pair}: Prediction failed: {str(e)}")
return None
pair_trades = self.create_trading_signals(pair=pair, config=config, result=bt_result)
return pair_trades
def create_trading_signals(self, pair: TradingPair, config: Dict, result: BacktestResult) -> pd.DataFrame:
beta = pair.vecm_fit_.beta # type: ignore
colname_a, colname_b = pair.colnames()
predicted_df = pair.predicted_df_
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_close_trshld"]
# Iterate through the testing dataset to find the first trading opportunity
open_row_index = None
for row_idx in range(len(predicted_df)):
curr_disequilibrium = predicted_df["scaled_disequilibrium"][row_idx]
# Check if current row has sufficient disequilibrium (not near-zero)
if curr_disequilibrium >= open_threshold:
open_row_index = row_idx
break
# If no row with sufficient disequilibrium found, skip this pair
if open_row_index is None:
print(f"{pair}: Insufficient disequilibrium in testing dataset. Skipping.")
return pd.DataFrame()
# Look for close signal starting from the open position
trading_signals_df = (
predicted_df["scaled_disequilibrium"][open_row_index:] < close_threshold
)
# Adjust indices to account for the offset from open_row_index
close_row_index = None
for idx, value in trading_signals_df.items():
if value:
close_row_index = idx
break
open_row = predicted_df.loc[open_row_index]
open_tstamp = open_row["tstamp"]
open_disequilibrium = open_row["disequilibrium"]
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"]
abs_beta = abs(beta[1])
pred_px_b = predicted_df.loc[open_row_index][f"{colname_b}_pred"]
pred_px_a = predicted_df.loc[open_row_index][f"{colname_a}_pred"]
if pred_px_b * abs_beta - pred_px_a > 0:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
else:
open_side_b = "BUY"
open_side_a = "SELL"
close_side_b = "SELL"
close_side_a = "BUY"
# If no close signal found, print position and unrealized PnL
if close_row_index is None:
last_row_index = len(predicted_df) - 1
# Use the new method from BacktestResult to handle outstanding positions
result.handle_outstanding_position(
pair=pair,
pair_result_df=predicted_df,
last_row_index=last_row_index,
open_side_a=open_side_a,
open_side_b=open_side_b,
open_px_a=open_px_a,
open_px_b=open_px_b,
open_tstamp=open_tstamp,
)
# Return only open trades (no close trades)
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
]
else:
# Close signal found - create complete trade
close_row = predicted_df.loc[close_row_index]
close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"]
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
print(f"{pair}: Close signal found at index {close_row_index}")
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
close_tstamp,
close_side_a,
pair.symbol_a_,
close_px_a,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
(
close_tstamp,
close_side_b,
pair.symbol_b_,
close_px_b,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame
return pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS, # type: ignore
)
def reset(self) -> None:
pass
class PairState(Enum):
INITIAL = 1
OPEN = 2
CLOSED = 3
class SlidingFit(PairsTradingFitMethod):
def __init__(self) -> None:
super().__init__()
self.curr_training_start_idx_ = 0
def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]:
print(f"***{pair}*** STARTING....")
pair.user_data_['state'] = PairState.INITIAL
pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS) # type: ignore
pair.user_data_["is_cointegrated"] = False
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_open_trshld"]
training_minutes = config["training_minutes"]
while True:
print(self.curr_training_start_idx_, end='\r')
pair.get_datasets(
training_minutes=training_minutes,
training_start_index=self.curr_training_start_idx_,
testing_size=1
)
if len(pair.training_df_) < training_minutes:
print(f"{pair}: {self.curr_training_start_idx_} Not enough training data. Completing the job.")
if pair.user_data_["state"] == PairState.OPEN:
print(f"{pair}: {self.curr_training_start_idx_} Position is not closed.")
# outstanding positions
# last_row_index = self.curr_training_start_idx_ + training_minutes
bt_result.handle_outstanding_position(
pair=pair,
pair_result_df=pair.predicted_df_,
last_row_index=0,
open_side_a=pair.user_data_["open_side_a"],
open_side_b=pair.user_data_["open_side_b"],
open_px_a=pair.user_data_["open_px_a"],
open_px_b=pair.user_data_["open_px_b"],
open_tstamp=pair.user_data_["open_tstamp"],
)
break
try:
is_cointegrated = pair.train_pair()
except Exception as e:
raise RuntimeError(f"{pair}: Training failed: {str(e)}") from e
if pair.user_data_["is_cointegrated"] != is_cointegrated:
pair.user_data_["is_cointegrated"] = is_cointegrated
if not is_cointegrated:
if pair.user_data_["state"] == PairState.OPEN:
print(f"{pair} {self.curr_training_start_idx_} LOST COINTEGRATION. Consider closing positions...")
else:
print(f"{pair} {self.curr_training_start_idx_} IS NOT COINTEGRATED. Moving on")
else:
print('*' * 80)
print(f"Pair {pair} ({self.curr_training_start_idx_}) IS COINTEGRATED")
print('*' * 80)
if not is_cointegrated:
self.curr_training_start_idx_ += 1
continue
try:
pair.predict()
except Exception as e:
raise RuntimeError(f"{pair}: Prediction failed: {str(e)}") from e
if pair.user_data_["state"] == PairState.INITIAL:
open_trades = self._get_open_trades(pair, open_threshold=open_threshold)
if open_trades is not None:
pair.user_data_["trades"] = open_trades
pair.user_data_["state"] = PairState.OPEN
elif pair.user_data_["state"] == PairState.OPEN:
close_trades = self._get_close_trades(pair, close_threshold=close_threshold)
if close_trades is not None:
pair.user_data_["trades"] = pd.concat([pair.user_data_["trades"], close_trades], ignore_index=True)
pair.user_data_["state"] = PairState.CLOSED
break
self.curr_training_start_idx_ += 1
print(f"***{pair}*** FINISHED ... {len(pair.user_data_['trades'])}")
return pair.user_data_["trades"]
def _get_open_trades(self, pair: TradingPair, open_threshold: float) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames()
predicted_df = pair.predicted_df_
# Check if we have any data to work with
if len(predicted_df) == 0:
return None
open_row = predicted_df.iloc[0]
open_tstamp = open_row["tstamp"]
open_disequilibrium = open_row["disequilibrium"]
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"]
if open_scaled_disequilibrium < open_threshold:
return None
# creating the trades
if open_disequilibrium > 0:
open_side_a = "SELL"
open_side_b = "BUY"
close_side_a = "BUY"
close_side_b = "SELL"
else:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
# save closing sides
pair.user_data_["open_side_a"] = open_side_a
pair.user_data_["open_side_b"] = open_side_b
pair.user_data_["open_px_a"] = open_px_a
pair.user_data_["open_px_b"] = open_px_b
pair.user_data_["open_tstamp"] = open_tstamp
pair.user_data_["close_side_a"] = close_side_a
pair.user_data_["close_side_b"] = close_side_b
# create opening trades
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
]
return pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS, # type: ignore
)
def _get_close_trades(self, pair: TradingPair, close_threshold: float) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames()
# Check if we have any data to work with
if len(pair.predicted_df_) == 0:
return None
close_row = pair.predicted_df_.iloc[0]
close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"]
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
close_side_a = pair.user_data_["close_side_a"]
close_side_b = pair.user_data_["close_side_b"]
if close_scaled_disequilibrium > close_threshold:
return None
trd_signal_tuples = [
(
close_tstamp,
close_side_a,
pair.symbol_a_,
close_px_a,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
(
close_tstamp,
close_side_b,
pair.symbol_b_,
close_px_b,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame
return pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS, # type: ignore
)
def reset(self):
self.curr_training_start_idx_ = 0

View File

@ -1,28 +1,30 @@
from typing import Any, Dict, List
import pandas as pd
import sqlite3
import os
from datetime import datetime, date
import sqlite3
from datetime import date, datetime
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from pt_trading.trading_pair import TradingPair
# Recommended replacement adapters and converters for Python 3.12+
# From: https://docs.python.org/3/library/sqlite3.html#sqlite3-adapter-converter-recipes
def adapt_date_iso(val):
def adapt_date_iso(val: date) -> str:
"""Adapt datetime.date to ISO 8601 date."""
return val.isoformat()
def adapt_datetime_iso(val):
def adapt_datetime_iso(val: datetime) -> str:
"""Adapt datetime.datetime to timezone-naive ISO 8601 date."""
return val.isoformat()
def convert_date(val):
def convert_date(val: bytes) -> date:
"""Convert ISO 8601 date to datetime.date object."""
return datetime.fromisoformat(val.decode()).date()
def convert_datetime(val):
def convert_datetime(val: bytes) -> datetime:
"""Convert ISO 8601 datetime to datetime.datetime object."""
return datetime.fromisoformat(val.decode())
@ -39,6 +41,12 @@ def create_result_database(db_path: str) -> None:
Create the SQLite database and required tables if they don't exist.
"""
try:
# Create directory if it doesn't exist
db_dir = os.path.dirname(db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
print(f"Created directory: {db_dir}")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
@ -172,7 +180,7 @@ def store_results_in_database(
if db_path.upper() == "NONE":
return
def convert_timestamp(timestamp):
def convert_timestamp(timestamp: Any) -> Optional[datetime]:
"""Convert pandas Timestamp to Python datetime object for SQLite compatibility."""
if timestamp is None:
return None
@ -423,14 +431,14 @@ class BacktestResult:
def add_trade(
self,
pair_nm,
symbol,
action,
price,
disequilibrium=None,
scaled_disequilibrium=None,
timestamp=None,
):
pair_nm: str,
symbol: str,
action: str,
price: Any,
disequilibrium: Optional[float] = None,
scaled_disequilibrium: Optional[float] = None,
timestamp: Optional[datetime] = None,
) -> None:
"""Add a trade to the results tracking."""
pair_nm = str(pair_nm)
@ -442,11 +450,11 @@ class BacktestResult:
(action, price, disequilibrium, scaled_disequilibrium, timestamp)
)
def add_outstanding_position(self, position: Dict[str, Any]):
def add_outstanding_position(self, position: Dict[str, Any]) -> None:
"""Add an outstanding position to tracking."""
self.outstanding_positions.append(position)
def add_realized_pnl(self, realized_pnl: float):
def add_realized_pnl(self, realized_pnl: float) -> None:
"""Add realized PnL to the total."""
self.total_realized_pnl += realized_pnl
@ -462,14 +470,15 @@ class BacktestResult:
"""Get all trades."""
return self.trades
def clear_trades(self):
def clear_trades(self) -> None:
"""Clear all trades (used when processing new files)."""
self.trades.clear()
def collect_single_day_results(self, result):
def collect_single_day_results(self, pairs_trades: List[pd.DataFrame]) -> None:
"""Collect and process single day trading results."""
if result is None:
return
result = pd.concat(pairs_trades, ignore_index=True)
result["time"] = pd.to_datetime(result["time"])
result = result.set_index("time").sort_index()
print("\n -------------- Suggested Trades ")
print(result)
@ -482,16 +491,16 @@ class BacktestResult:
scaled_disequilibrium = getattr(row, "scaled_disequilibrium", None)
timestamp = getattr(row, "time", None)
self.add_trade(
pair_nm=row.pair,
action=action,
symbol=symbol,
price=price,
pair_nm=str(row.pair),
action=str(action),
symbol=str(symbol),
price=float(str(price)),
disequilibrium=disequilibrium,
scaled_disequilibrium=scaled_disequilibrium,
timestamp=timestamp,
)
def print_single_day_results(self):
def print_single_day_results(self) -> None:
"""Print single day results summary."""
for pair, symbols in self.trades.items():
print(f"\n--- {pair} ---")
@ -501,7 +510,7 @@ class BacktestResult:
side, price = trade_data[:2]
print(f"{symbol} {side} at ${price}")
def print_results_summary(self, all_results):
def print_results_summary(self, all_results: Dict[str, Dict[str, Any]]) -> None:
"""Print summary of all processed files."""
print("\n====== Summary of All Processed Files ======")
for filename, data in all_results.items():
@ -512,7 +521,7 @@ class BacktestResult:
)
print(f"{filename}: {trade_count} trades")
def calculate_returns(self, all_results: Dict):
def calculate_returns(self, all_results: Dict[str, Dict[str, Any]]) -> None:
"""Calculate and print returns by day and pair."""
print("\n====== Returns By Day and Pair ======")
@ -520,6 +529,8 @@ class BacktestResult:
day_return = 0
print(f"\n--- {filename} ---")
self.outstanding_positions = data["outstanding_positions"]
# Process each pair
for pair, symbols in data["trades"].items():
pair_return = 0
@ -527,80 +538,87 @@ class BacktestResult:
# Calculate individual symbol returns in the pair
for symbol, trades in symbols.items():
if len(trades) >= 2: # Need at least entry and exit
# Get entry and exit trades - handle both old and new tuple formats
if len(trades[0]) == 2: # Old format: (action, price)
entry_action, entry_price = trades[0]
exit_action, exit_price = trades[1]
open_disequilibrium = None
open_scaled_disequilibrium = None
close_disequilibrium = None
close_scaled_disequilibrium = None
if len(trades) == 0:
continue
symbol_return = 0
symbol_trades = []
# Process all trades sequentially for this symbol
for i, trade in enumerate(trades):
# Handle both old and new tuple formats
if len(trade) == 2: # Old format: (action, price)
action, price = trade
disequilibrium = None
scaled_disequilibrium = None
timestamp = None
else: # New format: (action, price, disequilibrium, scaled_disequilibrium, timestamp)
entry_action, entry_price = trades[0][:2]
exit_action, exit_price = trades[1][:2]
open_disequilibrium = (
trades[0][2] if len(trades[0]) > 2 else None
)
open_scaled_disequilibrium = (
trades[0][3] if len(trades[0]) > 3 else None
)
close_disequilibrium = (
trades[1][2] if len(trades[1]) > 2 else None
)
close_scaled_disequilibrium = (
trades[1][3] if len(trades[1]) > 3 else None
)
action, price = trade[:2]
disequilibrium = trade[2] if len(trade) > 2 else None
scaled_disequilibrium = trade[3] if len(trade) > 3 else None
timestamp = trade[4] if len(trade) > 4 else None
# Calculate return based on action
symbol_return = 0
if entry_action == "BUY" and exit_action == "SELL":
symbol_trades.append((action, price, disequilibrium, scaled_disequilibrium, timestamp))
# Calculate returns for all trade combinations
for i in range(len(symbol_trades) - 1):
trade1 = symbol_trades[i]
trade2 = symbol_trades[i + 1]
action1, price1, diseq1, scaled_diseq1, ts1 = trade1
action2, price2, diseq2, scaled_diseq2, ts2 = trade2
# Calculate return based on action combination
trade_return = 0
if action1 == "BUY" and action2 == "SELL":
# Long position
symbol_return = (
(exit_price - entry_price) / entry_price * 100
)
elif entry_action == "SELL" and exit_action == "BUY":
trade_return = (price2 - price1) / price1 * 100
elif action1 == "SELL" and action2 == "BUY":
# Short position
symbol_return = (
(entry_price - exit_price) / entry_price * 100
)
trade_return = (price1 - price2) / price1 * 100
symbol_return += trade_return
# Store trade details for reporting
pair_trades.append(
(
symbol,
entry_action,
entry_price,
exit_action,
exit_price,
symbol_return,
open_scaled_disequilibrium,
close_scaled_disequilibrium,
action1,
price1,
action2,
price2,
trade_return,
scaled_diseq1,
scaled_diseq2,
i + 1, # Trade sequence number
)
)
pair_return += symbol_return
pair_return += symbol_return
# Print pair returns with disequilibrium information
if pair_trades:
print(f" {pair}:")
for (
symbol,
entry_action,
entry_price,
exit_action,
exit_price,
symbol_return,
open_scaled_disequilibrium,
close_scaled_disequilibrium,
action1,
price1,
action2,
price2,
trade_return,
scaled_diseq1,
scaled_diseq2,
trade_num,
) in pair_trades:
disequil_info = ""
if (
open_scaled_disequilibrium is not None
and close_scaled_disequilibrium is not None
scaled_diseq1 is not None
and scaled_diseq2 is not None
):
disequil_info = f" | Open Dis-eq: {open_scaled_disequilibrium:.2f}, Close Dis-eq: {close_scaled_disequilibrium:.2f}"
disequil_info = f" | Open Dis-eq: {scaled_diseq1:.2f}, Close Dis-eq: {scaled_diseq2:.2f}"
print(
f" {symbol}: {entry_action} @ ${entry_price:.2f}, {exit_action} @ ${exit_price:.2f}, Return: {symbol_return:.2f}%{disequil_info}"
f" {symbol} (Trade #{trade_num}): {action1} @ ${price1:.2f}, {action2} @ ${price2:.2f}, Return: {trade_return:.2f}%{disequil_info}"
)
print(f" Pair Total Return: {pair_return:.2f}%")
day_return += pair_return
@ -610,7 +628,7 @@ class BacktestResult:
print(f" Day Total Return: {day_return:.2f}%")
self.add_realized_pnl(day_return)
def print_outstanding_positions(self):
def print_outstanding_positions(self) -> None:
"""Print all outstanding positions with share quantities and current values."""
if not self.get_outstanding_positions():
print("\n====== NO OUTSTANDING POSITIONS ======")
@ -684,22 +702,22 @@ class BacktestResult:
print(f"{'TOTAL OUTSTANDING VALUE':<80} ${total_value:<12.2f}")
def print_grand_totals(self):
def print_grand_totals(self) -> None:
"""Print grand totals across all pairs."""
print(f"\n====== GRAND TOTALS ACROSS ALL PAIRS ======")
print(f"Total Realized PnL: {self.get_total_realized_pnl():.2f}%")
def handle_outstanding_position(
self,
pair,
pair_result_df,
last_row_index,
open_side_a,
open_side_b,
open_px_a,
open_px_b,
open_tstamp,
):
pair: TradingPair,
pair_result_df: pd.DataFrame,
last_row_index: int,
open_side_a: str,
open_side_b: str,
open_px_a: float,
open_px_b: float,
open_tstamp: datetime,
) -> Tuple[float, float, float]:
"""
Handle calculation and tracking of outstanding positions when no close signal is found.
@ -727,8 +745,8 @@ class BacktestResult:
shares_b = funding_per_position / open_px_b
# Calculate current position values (shares * current price)
current_value_a = shares_a * last_px_a
current_value_b = shares_b * last_px_b
current_value_a = shares_a * last_px_a * (-1 if open_side_a == "SELL" else 1)
current_value_b = shares_b * last_px_b * (-1 if open_side_b == "SELL" else 1)
total_current_value = current_value_a + current_value_b
# Get disequilibrium information

View File

@ -0,0 +1,362 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Optional, cast
import pandas as pd # type: ignore[import]
from pt_trading.fit_method import PairState, PairsTradingFitMethod
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import TradingPair
NanoPerMin = 1e9
class SlidingFit(PairsTradingFitMethod):
def __init__(self) -> None:
super().__init__()
def run_pair(
self, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]:
print(f"***{pair}*** STARTING....")
config = pair.config_
curr_training_start_idx = pair.get_begin_index()
end_index = pair.get_end_index()
pair.user_data_["state"] = PairState.INITIAL
# Initialize trades DataFrame with proper dtypes to avoid concatenation warnings
pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS).astype({
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object"
})
pair.user_data_["is_cointegrated"] = False
training_minutes = config["training_minutes"]
curr_predicted_row_idx = 0
while True:
print(curr_training_start_idx, end="\r")
pair.get_datasets(
training_minutes=training_minutes,
training_start_index=curr_training_start_idx,
testing_size=1,
)
if len(pair.training_df_) < training_minutes:
print(
f"{pair}: current offset={curr_training_start_idx}"
f" * Training data length={len(pair.training_df_)} < {training_minutes}"
" * Not enough training data. Completing the job."
)
break
try:
# ================================ TRAINING ================================
is_cointegrated = pair.train_pair()
except Exception as e:
raise RuntimeError(f"{pair}: Training failed: {str(e)}") from e
if pair.user_data_["is_cointegrated"] != is_cointegrated:
pair.user_data_["is_cointegrated"] = is_cointegrated
if not is_cointegrated:
if pair.user_data_["state"] == PairState.OPEN:
print(
f"{pair} {curr_training_start_idx} LOST COINTEGRATION. Consider closing positions..."
)
else:
print(
f"{pair} {curr_training_start_idx} IS NOT COINTEGRATED. Moving on"
)
else:
print("*" * 80)
print(
f"Pair {pair} ({curr_training_start_idx}) IS COINTEGRATED"
)
print("*" * 80)
if not is_cointegrated:
curr_training_start_idx += 1
continue
try:
# ================================ PREDICTION ================================
pair.predict()
except Exception as e:
raise RuntimeError(f"{pair}: Prediction failed: {str(e)}") from e
# break
curr_training_start_idx += 1
if curr_training_start_idx > end_index:
break
curr_predicted_row_idx += 1
self._create_trading_signals(pair, config, bt_result)
print(f"***{pair}*** FINISHED ... {len(pair.user_data_['trades'])}")
return pair.get_trades()
def _create_trading_signals(
self, pair: TradingPair, config: Dict, bt_result: BacktestResult
) -> None:
if pair.predicted_df_ is None:
print(f"{pair.market_data_.iloc[0]['tstamp']} {pair}: No predicted data")
return
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_close_trshld"]
for curr_predicted_row_idx in range(len(pair.predicted_df_)):
pred_row = pair.predicted_df_.iloc[curr_predicted_row_idx]
if pair.user_data_["state"] in [PairState.INITIAL, PairState.CLOSED, PairState.CLOSED_POSITIONS]:
open_trades = self._get_open_trades(
pair, row=pred_row, open_threshold=open_threshold
)
if open_trades is not None:
open_trades["status"] = "OPEN"
print(f"OPEN TRADES:\n{open_trades}")
pair.add_trades(open_trades)
pair.user_data_["state"] = PairState.OPEN
elif pair.user_data_["state"] == PairState.OPEN:
close_trades = self._get_close_trades(
pair, row=pred_row, close_threshold=close_threshold
)
if close_trades is not None:
close_trades["status"] = "CLOSE"
print(f"CLOSE TRADES:\n{close_trades}")
pair.add_trades(close_trades)
pair.user_data_["state"] = PairState.CLOSED
# Outstanding positions
if pair.user_data_["state"] == PairState.OPEN:
print(
f"{pair}: *** Position is NOT CLOSED. ***"
)
# outstanding positions
if config["close_outstanding_positions"]:
close_position_trades = self._get_close_position_trades(
pair=pair,
row=pred_row,
close_threshold=close_threshold,
)
if close_position_trades is not None:
close_position_trades["status"] = "CLOSE_POSITION"
print(f"CLOSE_POSITION TRADES:\n{close_position_trades}")
pair.add_trades(close_position_trades)
pair.user_data_["state"] = PairState.CLOSED_POSITIONS
else:
if pair.predicted_df_ is not None:
bt_result.handle_outstanding_position(
pair=pair,
pair_result_df=pair.predicted_df_,
last_row_index=0,
open_side_a=pair.user_data_["open_side_a"],
open_side_b=pair.user_data_["open_side_b"],
open_px_a=pair.user_data_["open_px_a"],
open_px_b=pair.user_data_["open_px_b"],
open_tstamp=pair.user_data_["open_tstamp"],
)
def _get_open_trades(
self, pair: TradingPair, row: pd.Series, open_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames()
assert pair.predicted_df_ is not None
predicted_df = pair.predicted_df_
# Check if we have any data to work with
if len(predicted_df) == 0:
return None
open_row = row
open_tstamp = open_row["tstamp"]
open_disequilibrium = open_row["disequilibrium"]
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"]
if open_scaled_disequilibrium < open_threshold:
return None
# creating the trades
print(f"OPEN_TRADES: {row["tstamp"]} {open_scaled_disequilibrium=}")
if open_disequilibrium > 0:
open_side_a = "SELL"
open_side_b = "BUY"
close_side_a = "BUY"
close_side_b = "SELL"
else:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
# save closing sides
pair.user_data_["open_side_a"] = open_side_a
pair.user_data_["open_side_b"] = open_side_b
pair.user_data_["open_px_a"] = open_px_a
pair.user_data_["open_px_b"] = open_px_b
pair.user_data_["open_tstamp"] = open_tstamp
pair.user_data_["close_side_a"] = close_side_a
pair.user_data_["close_side_b"] = close_side_b
# create opening trades
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
]
# Create DataFrame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype({
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object"
})
def _get_close_trades(
self, pair: TradingPair, row: pd.Series, close_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames()
assert pair.predicted_df_ is not None
if len(pair.predicted_df_) == 0:
return None
close_row = row
close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"]
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
close_side_a = pair.user_data_["close_side_a"]
close_side_b = pair.user_data_["close_side_b"]
if close_scaled_disequilibrium > close_threshold:
return None
trd_signal_tuples = [
(
close_tstamp,
close_side_a,
pair.symbol_a_,
close_px_a,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
(
close_tstamp,
close_side_b,
pair.symbol_b_,
close_px_b,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype({
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object"
})
def _get_close_position_trades(
self, pair: TradingPair, row: pd.Series, close_threshold: float
) -> Optional[pd.DataFrame]:
colname_a, colname_b = pair.colnames()
assert pair.predicted_df_ is not None
if len(pair.predicted_df_) == 0:
return None
close_position_row = row
close_position_tstamp = close_position_row["tstamp"]
close_position_disequilibrium = close_position_row["disequilibrium"]
close_position_scaled_disequilibrium = close_position_row["scaled_disequilibrium"]
close_position_px_a = close_position_row[f"{colname_a}"]
close_position_px_b = close_position_row[f"{colname_b}"]
close_position_side_a = pair.user_data_["close_side_a"]
close_position_side_b = pair.user_data_["close_side_b"]
trd_signal_tuples = [
(
close_position_tstamp,
close_position_side_a,
pair.symbol_a_,
close_position_px_a,
close_position_disequilibrium,
close_position_scaled_disequilibrium,
pair,
),
(
close_position_tstamp,
close_position_side_b,
pair.symbol_b_,
close_position_px_b,
close_position_disequilibrium,
close_position_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype({
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object"
})
def reset(self) -> None:
curr_training_start_idx = 0

View File

@ -0,0 +1,220 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Optional, cast
import pandas as pd # type: ignore[import]
from pt_trading.results import BacktestResult
from pt_trading.trading_pair import TradingPair
from pt_trading.fit_method import PairsTradingFitMethod
NanoPerMin = 1e9
class StaticFit(PairsTradingFitMethod):
def run_pair(
self, pair: TradingPair, bt_result: BacktestResult
) -> Optional[pd.DataFrame]: # abstractmethod
config = pair.config_
pair.get_datasets(training_minutes=config["training_minutes"])
try:
is_cointegrated = pair.train_pair()
if not is_cointegrated:
print(f"{pair} IS NOT COINTEGRATED")
return None
except Exception as e:
print(f"{pair}: Training failed: {str(e)}")
return None
try:
pair.predict()
except Exception as e:
print(f"{pair}: Prediction failed: {str(e)}")
return None
pair_trades = self.create_trading_signals(
pair=pair, config=config, result=bt_result
)
return pair_trades
def create_trading_signals(
self, pair: TradingPair, config: Dict, result: BacktestResult
) -> pd.DataFrame:
beta = pair.vecm_fit_.beta # type: ignore
colname_a, colname_b = pair.colnames()
predicted_df = pair.predicted_df_
if predicted_df is None:
# Return empty DataFrame with correct columns and dtypes
return pd.DataFrame(columns=self.TRADES_COLUMNS).astype({
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object"
})
open_threshold = config["dis-equilibrium_open_trshld"]
close_threshold = config["dis-equilibrium_close_trshld"]
# Iterate through the testing dataset to find the first trading opportunity
open_row_index = None
for row_idx in range(len(predicted_df)):
curr_disequilibrium = predicted_df["scaled_disequilibrium"][row_idx]
# Check if current row has sufficient disequilibrium (not near-zero)
if curr_disequilibrium >= open_threshold:
open_row_index = row_idx
break
# If no row with sufficient disequilibrium found, skip this pair
if open_row_index is None:
print(f"{pair}: Insufficient disequilibrium in testing dataset. Skipping.")
return pd.DataFrame()
# Look for close signal starting from the open position
trading_signals_df = (
predicted_df["scaled_disequilibrium"][open_row_index:] < close_threshold
)
# Adjust indices to account for the offset from open_row_index
close_row_index = None
for idx, value in trading_signals_df.items():
if value:
close_row_index = idx
break
open_row = predicted_df.loc[open_row_index]
open_px_a = predicted_df.at[open_row_index, f"{colname_a}"]
open_px_b = predicted_df.at[open_row_index, f"{colname_b}"]
open_tstamp = predicted_df.at[open_row_index, "tstamp"]
open_disequilibrium = open_row["disequilibrium"]
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
abs_beta = abs(beta[1])
pred_px_b = predicted_df.loc[open_row_index][f"{colname_b}_pred"]
pred_px_a = predicted_df.loc[open_row_index][f"{colname_a}_pred"]
if pred_px_b * abs_beta - pred_px_a > 0:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
else:
open_side_b = "BUY"
open_side_a = "SELL"
close_side_b = "SELL"
close_side_a = "BUY"
# If no close signal found, print position and unrealized PnL
if close_row_index is None:
last_row_index = len(predicted_df) - 1
# Use the new method from BacktestResult to handle outstanding positions
result.handle_outstanding_position(
pair=pair,
pair_result_df=predicted_df,
last_row_index=last_row_index,
open_side_a=open_side_a,
open_side_b=open_side_b,
open_px_a=float(open_px_a),
open_px_b=float(open_px_b),
open_tstamp=pd.Timestamp(open_tstamp),
)
# Return only open trades (no close trades)
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
]
else:
# Close signal found - create complete trade
close_row = predicted_df.loc[close_row_index]
close_tstamp = close_row["tstamp"]
close_disequilibrium = close_row["disequilibrium"]
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
print(f"{pair}: Close signal found at index {close_row_index}")
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_disequilibrium,
open_scaled_disequilibrium,
pair,
),
(
close_tstamp,
close_side_a,
pair.symbol_a_,
close_px_a,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
(
close_tstamp,
close_side_b,
pair.symbol_b_,
close_px_b,
close_disequilibrium,
close_scaled_disequilibrium,
pair,
),
]
# Add tuples to data frame with explicit dtypes to avoid concatenation warnings
df = pd.DataFrame(
trd_signal_tuples,
columns=self.TRADES_COLUMNS,
)
# Ensure consistent dtypes
return df.astype({
"time": "datetime64[ns]",
"action": "string",
"symbol": "string",
"price": "float64",
"disequilibrium": "float64",
"scaled_disequilibrium": "float64",
"pair": "object"
})
def reset(self) -> None:
pass

View File

@ -1,4 +1,5 @@
from typing import Any, Dict, List, Optional
import pandas as pd # type:ignore
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults # type:ignore
@ -19,18 +20,45 @@ class TradingPair:
user_data_: Dict[str, Any]
predicted_df_: Optional[pd.DataFrame]
def __init__(
self, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str
self, config: Dict[str, Any], market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str
):
self.symbol_a_ = symbol_a
self.symbol_b_ = symbol_b
self.price_column_ = price_column
self.set_market_data(market_data)
self.user_data_ = {}
self.predicted_df_ = None
self.config_ = config
def set_market_data(self, market_data: pd.DataFrame) -> None:
self.market_data_ = pd.DataFrame(
self._transform_dataframe(market_data)[["tstamp"] + self.colnames()]
)
self.market_data_ = self.market_data_.dropna().reset_index(drop=True)
self.market_data_['tstamp'] = pd.to_datetime(self.market_data_['tstamp'])
self.market_data_ = self.market_data_.sort_values('tstamp')
self.user_data_ = {}
def get_begin_index(self) -> int:
if "trading_hours" not in self.config_:
return 0
assert "timezone" in self.config_["trading_hours"]
assert "begin_session" in self.config_["trading_hours"]
start_time = pd.to_datetime(self.config_["trading_hours"]["begin_session"]).tz_localize(self.config_["trading_hours"]["timezone"]).time()
mask = self.market_data_['tstamp'].dt.time >= start_time
return int(self.market_data_.index[mask].min())
def get_end_index(self) -> int:
if "trading_hours" not in self.config_:
return 0
assert "timezone" in self.config_["trading_hours"]
assert "end_session" in self.config_["trading_hours"]
end_time = pd.to_datetime(self.config_["trading_hours"]["end_session"]).tz_localize(self.config_["trading_hours"]["timezone"]).time()
mask = self.market_data_['tstamp'].dt.time <= end_time
return int(self.market_data_.index[mask].max())
def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
# Select only the columns we need
@ -69,7 +97,7 @@ class TradingPair:
drop=True
) # do not dropna() since irrelevant symbol would affect dataset
return result_df
return result_df.dropna()
def get_datasets(
self,
@ -80,7 +108,7 @@ class TradingPair:
testing_start_index = training_start_index + training_minutes
self.training_df_ = self.market_data_.iloc[
training_start_index:testing_start_index, :
training_start_index:testing_start_index, : training_minutes
].copy()
assert self.training_df_ is not None
self.training_df_ = self.training_df_.dropna().reset_index(drop=True)
@ -101,7 +129,7 @@ class TradingPair:
f"{self.price_column_}_{self.symbol_b_}",
]
def fit_VECM(self):
def fit_VECM(self) -> None:
assert self.training_df_ is not None
vecm_df = self.training_df_[self.colnames()].reset_index(drop=True)
vecm_model = VECM(vecm_df, coint_rank=1)
@ -120,20 +148,20 @@ class TradingPair:
# print(f"{self}: {self.vecm_fit_.summary()}")
pass
def check_cointegration_johansen(self):
def check_cointegration_johansen(self) -> bool:
assert self.training_df_ is not None
from statsmodels.tsa.vector_ar.vecm import coint_johansen
df = self.training_df_[self.colnames()].reset_index(drop=True)
result = coint_johansen(df, det_order=0, k_ar_diff=1)
print(
f"{self}: lr1={result.lr1[0]} > cvt={result.cvt[0, 1]}? {result.lr1[0] > result.cvt[0, 1]}"
)
is_cointegrated = result.lr1[0] > result.cvt[0, 1]
# print(
# f"{self}: lr1={result.lr1[0]} > cvt={result.cvt[0, 1]}? {result.lr1[0] > result.cvt[0, 1]}"
# )
is_cointegrated: bool = bool(result.lr1[0] > result.cvt[0, 1])
return is_cointegrated
def check_cointegration_engle_granger(self):
def check_cointegration_engle_granger(self) -> bool:
from statsmodels.tsa.stattools import coint
col1, col2 = self.colnames()
@ -144,22 +172,23 @@ class TradingPair:
# Run Engle-Granger cointegration test
pvalue = coint(series1, series2)[1]
# Define cointegration if p-value < 0.05 (i.e., reject null of no cointegration)
is_cointegrated = pvalue < 0.05
print(f"{self}: is_cointegrated={is_cointegrated} pvalue={pvalue}")
is_cointegrated: bool = bool(pvalue < 0.05)
# print(f"{self}: is_cointegrated={is_cointegrated} pvalue={pvalue}")
return is_cointegrated
def train_pair(self) -> bool:
def check_cointegration(self) -> bool:
is_cointegrated_johansen = self.check_cointegration_johansen()
is_cointegrated_engle_granger = self.check_cointegration_engle_granger()
if not is_cointegrated_johansen and not is_cointegrated_engle_granger:
return False
pass
result = is_cointegrated_johansen or is_cointegrated_engle_granger
return result or True # TODO: remove this
def train_pair(self) -> bool:
result = self.check_cointegration()
# print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80)
self.fit_VECM()
assert self.training_df_ is not None and self.vecm_fit_ is not None
diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
print(diseq_series.shape)
# print(diseq_series.shape)
self.training_mu_ = float(diseq_series[0].mean())
self.training_std_ = float(diseq_series[0].std())
@ -171,7 +200,45 @@ class TradingPair:
diseq_series - self.training_mu_
) / self.training_std_
return True
return result
def add_trades(self, trades: pd.DataFrame) -> None:
if self.user_data_["trades"] is None or len(self.user_data_["trades"]) == 0:
# If trades is empty or None, just assign the new trades directly
self.user_data_["trades"] = trades.copy()
else:
# Ensure both DataFrames have the same columns and dtypes before concatenation
existing_trades = self.user_data_["trades"]
# If existing trades is empty, just assign the new trades
if len(existing_trades) == 0:
self.user_data_["trades"] = trades.copy()
else:
# Ensure both DataFrames have the same columns
if set(existing_trades.columns) != set(trades.columns):
# Add missing columns to trades with appropriate default values
for col in existing_trades.columns:
if col not in trades.columns:
if col == "time":
trades[col] = pd.Timestamp.now()
elif col in ["action", "symbol"]:
trades[col] = ""
elif col in ["price", "disequilibrium", "scaled_disequilibrium"]:
trades[col] = 0.0
elif col == "pair":
trades[col] = None
else:
trades[col] = None
# Concatenate with explicit dtypes to avoid warnings
self.user_data_["trades"] = pd.concat(
[existing_trades, trades],
ignore_index=True,
copy=False
)
def get_trades(self) -> pd.DataFrame:
return self.user_data_["trades"] if "trades" in self.user_data_ else pd.DataFrame()
def predict(self) -> pd.DataFrame:
assert self.testing_df_ is not None
@ -179,9 +246,12 @@ class TradingPair:
predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_))
# Convert prediction to a DataFrame for readability
# predicted_df =
predicted_df = pd.DataFrame(
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
)
self.predicted_df_ = pd.merge(
predicted_df = pd.merge(
self.testing_df_.reset_index(drop=True),
pd.DataFrame(
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
@ -191,18 +261,34 @@ class TradingPair:
suffixes=("", "_pred"),
).dropna()
self.predicted_df_["disequilibrium"] = (
self.predicted_df_[self.colnames()] @ self.vecm_fit_.beta
predicted_df["disequilibrium"] = (
predicted_df[self.colnames()] @ self.vecm_fit_.beta
)
self.predicted_df_["scaled_disequilibrium"] = (
abs(self.predicted_df_["disequilibrium"] - self.training_mu_)
predicted_df["scaled_disequilibrium"] = (
abs(predicted_df["disequilibrium"] - self.training_mu_)
/ self.training_std_
)
# print("*** PREDICTED DF")
# print(predicted_df)
# print("*" * 80)
# print("*** SELF.PREDICTED_DF")
# print(self.predicted_df_)
# print("*" * 80)
predicted_df = predicted_df.reset_index(drop=True)
if self.predicted_df_ is None:
self.predicted_df_ = predicted_df
else:
self.predicted_df_ = pd.concat([self.predicted_df_, predicted_df], ignore_index=True)
# Reset index to ensure proper indexing
self.predicted_df_ = self.predicted_df_.reset_index()
self.predicted_df_ = self.predicted_df_.reset_index(drop=True)
return self.predicted_df_
def __repr__(self) -> str:
return self.name()
def name(self) -> str:
return f"{self.symbol_a_} & {self.symbol_b_}"
# return f"{self.symbol_a_} & {self.symbol_b_}"

View File

@ -24,11 +24,14 @@ hjson>=3.0.2
html5lib>=1.1
httplib2>=0.20.2
idna>=3.3
ipython>=8.18.1
ipywidgets>=8.1.1
ifaddr>=0.1.7
IMDbPY>=2021.4.18
ipykernel>=6.29.5
jeepney>=0.7.1
jsonschema>=3.2.0
jupyter>=1.0.0
keyring>=23.5.0
launchpadlib>=1.10.16
lazr.restfulclient>=0.14.4
@ -42,14 +45,18 @@ more-itertools>=8.10.0
multidict>=6.0.4
mypy>=0.942
mypy-extensions>=0.4.3
nbformat>=5.10.2
netaddr>=0.8.0
######### netifaces>=0.11.0
numpy>=1.26.4,<2.3.0
oauthlib>=3.2.0
packaging>=23.1
pandas>=2.2.3
pathspec>=0.11.1
pexpect>=4.8.0
Pillow>=9.0.1
platformdirs>=3.2.0
plotly>=5.19.0
protobuf>=3.12.4
psutil>=5.9.0
ptyprocess>=0.7.0

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -15,7 +15,7 @@ from pt_trading.results import (
store_config_in_database,
store_results_in_database,
)
from pt_trading.fit_methods import PairsTradingFitMethod
from pt_trading.fit_method import PairsTradingFitMethod
from pt_trading.trading_pair import TradingPair
@ -84,6 +84,7 @@ def run_backtest(
for a_index, b_index in unique_index_pairs:
pair = TradingPair(
config=config_copy,
market_data=market_data_df,
symbol_a=instruments[a_index],
symbol_b=instruments[b_index],
@ -95,21 +96,17 @@ def run_backtest(
pairs_trades = []
for pair in _create_pairs(config, instruments):
single_pair_trades = fit_method.run_pair(
pair=pair, config=config, bt_result=bt_result
pair=pair, bt_result=bt_result
)
if single_pair_trades is not None and len(single_pair_trades) > 0:
pairs_trades.append(single_pair_trades)
print(f"pairs_trades: {pairs_trades}")
# Check if result_list has any data before concatenating
if len(pairs_trades) == 0:
print("No trading signals found for any pairs")
return bt_result
result = pd.concat(pairs_trades, ignore_index=True)
result["time"] = pd.to_datetime(result["time"])
result = result.set_index("time").sort_index()
bt_result.collect_single_day_results(result)
bt_result.collect_single_day_results(pairs_trades)
return bt_result
@ -226,7 +223,10 @@ def main() -> None:
# Store results with file name as key
filename = os.path.basename(datafile)
all_results[filename] = {"trades": bt_results.trades.copy()}
all_results[filename] = {
"trades": bt_results.trades.copy(),
"outstanding_positions": bt_results.outstanding_positions.copy(),
}
# Store results in database
if args.result_db.upper() != "NONE":