Compare commits
26 Commits
85c9d2ab93
...
a04e8878fb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a04e8878fb | ||
|
|
facf7fb0c6 | ||
|
|
9c34d935bd | ||
|
|
20f150a6b7 | ||
|
|
d46bcb64d6 | ||
|
|
26659ede12 | ||
|
|
e9995312a0 | ||
|
|
a46c8a7576 | ||
|
|
fe2ebbb27f | ||
|
|
ddd9f4adb9 | ||
|
|
4bc947cf07 | ||
|
|
51944b3a2f | ||
|
|
bff1c54b48 | ||
|
|
9c91f37bcc | ||
|
|
76547e1176 | ||
|
|
80cf1b60ef | ||
|
|
94ffb32f50 | ||
|
|
967c01c367 | ||
|
|
747ca05b16 | ||
|
|
30ae95a808 | ||
|
|
bcba183768 | ||
|
|
cc0072dcc8 | ||
|
|
35a1cd748e | ||
|
|
3b003c7811 | ||
|
|
b24285802a | ||
|
|
48f18f7b4f |
@ -7,16 +7,6 @@
|
||||
"db_table_name": "md_1min_bars",
|
||||
"exchange_id": "BNBSPOT",
|
||||
"instrument_id_pfx": "PAIR-",
|
||||
# "instruments": [
|
||||
# "BTC-USDT",
|
||||
# "BCH-USDT",
|
||||
# "ETH-USDT",
|
||||
# "LTC-USDT",
|
||||
# "XRP-USDT",
|
||||
# "ADA-USDT",
|
||||
# "SOL-USDT",
|
||||
# "DOT-USDT"
|
||||
# ],
|
||||
"trading_hours": {
|
||||
"begin_session": "00:00:00",
|
||||
"end_session": "23:59:00",
|
||||
@ -29,5 +19,13 @@
|
||||
"dis-equilibrium_close_trshld": 0.5,
|
||||
"training_minutes": 120,
|
||||
"funding_per_pair": 2000.0,
|
||||
"fit_method_class": "pt_trading.fit_methods.StaticFit"
|
||||
"fit_method_class": "pt_trading.sliding_fit.SlidingFit",
|
||||
# "fit_method_class": "pt_trading.static_fit.StaticFit",
|
||||
"close_outstanding_positions": true,
|
||||
"trading_hours": {
|
||||
"begin_session": "06:00:00",
|
||||
"end_session": "16:00:00",
|
||||
"timezone": "America/New_York"
|
||||
}
|
||||
|
||||
}
|
||||
@ -2,7 +2,7 @@
|
||||
"security_type": "EQUITY",
|
||||
"data_directory": "./data/equity",
|
||||
"datafiles": [
|
||||
"202506*.mktdata.ohlcv.db",
|
||||
"20250618.mktdata.ohlcv.db",
|
||||
],
|
||||
"db_table_name": "md_1min_bars",
|
||||
"exchange_id": "ALPACA",
|
||||
@ -19,7 +19,9 @@
|
||||
"dis-equilibrium_close_trshld": 1.0,
|
||||
"training_minutes": 120,
|
||||
"funding_per_pair": 2000.0,
|
||||
"fit_method_class": "pt_trading.fit_methods.SlidingFit",
|
||||
"exclude_instruments": ["CAN"]
|
||||
# "fit_method_class": "pt_trading.sliding_fit.SlidingFit",
|
||||
"fit_method_class": "pt_trading.static_fit.StaticFit",
|
||||
"exclude_instruments": ["CAN"],
|
||||
"close_outstanding_positions": false
|
||||
|
||||
}
|
||||
26
configuration/equity_lg.cfg
Normal file
26
configuration/equity_lg.cfg
Normal file
@ -0,0 +1,26 @@
|
||||
{
|
||||
"security_type": "EQUITY",
|
||||
"data_directory": "./data/equity",
|
||||
"datafiles": [
|
||||
"20250602.mktdata.ohlcv.db",
|
||||
],
|
||||
"db_table_name": "md_1min_bars",
|
||||
"exchange_id": "ALPACA",
|
||||
"instrument_id_pfx": "STOCK-",
|
||||
"trading_hours": {
|
||||
"begin_session": "9:30:00",
|
||||
"end_session": "16:00:00",
|
||||
"timezone": "America/New_York"
|
||||
},
|
||||
"price_column": "close",
|
||||
"min_required_points": 30,
|
||||
"zero_threshold": 1e-10,
|
||||
"dis-equilibrium_open_trshld": 2.0,
|
||||
"dis-equilibrium_close_trshld": 1.0,
|
||||
"training_minutes": 120,
|
||||
"funding_per_pair": 2000.0,
|
||||
"fit_method_class": "pt_trading.fit_methods.StaticFit",
|
||||
"exclude_instruments": ["CAN"]
|
||||
}
|
||||
# "fit_method_class": "pt_trading.fit_methods.SlidingFit",
|
||||
# "fit_method_class": "pt_trading.fit_methods.StaticFit",
|
||||
27
configuration/equity_single.cfg
Normal file
27
configuration/equity_single.cfg
Normal file
@ -0,0 +1,27 @@
|
||||
{
|
||||
"security_type": "EQUITY",
|
||||
"data_directory": "./data/equity",
|
||||
# "datafiles": [
|
||||
# "20250604.mktdata.ohlcv.db",
|
||||
# ],
|
||||
"db_table_name": "md_1min_bars",
|
||||
"exchange_id": "ALPACA",
|
||||
"instrument_id_pfx": "STOCK-",
|
||||
"trading_hours": {
|
||||
"begin_session": "9:30:00",
|
||||
"end_session": "16:00:00",
|
||||
"timezone": "America/New_York"
|
||||
},
|
||||
"price_column": "close",
|
||||
"min_required_points": 30,
|
||||
"zero_threshold": 1e-10,
|
||||
"dis-equilibrium_open_trshld": 2.0,
|
||||
"dis-equilibrium_close_trshld": 1.0,
|
||||
"training_minutes": 120,
|
||||
"funding_per_pair": 2000.0,
|
||||
"fit_method_class": "pt_trading.sliding_fit.SlidingFit",
|
||||
# "fit_method_class": "pt_trading.static_fit.StaticFit",
|
||||
"exclude_instruments": ["CAN"],
|
||||
"close_outstanding_positions": false
|
||||
|
||||
}
|
||||
115
lg_notes.md
Normal file
115
lg_notes.md
Normal file
@ -0,0 +1,115 @@
|
||||
07.11.2025
|
||||
pairs_trading/configuration <---- directory for config
|
||||
equity_lg.cfg <-------- copy of equity.cfg
|
||||
How to run a Program: TRIANGLEsquare ----> triangle EQUITY backtest
|
||||
Results are in > results (timestamp table for all runs)
|
||||
table "...timestamp... .pt_backtest_results.equity.db"
|
||||
going to table using sqlite
|
||||
> sqlite3 '/home/coder/results/20250721_175750.pt_backtest_results.equity.db'
|
||||
|
||||
sqlite> .databases
|
||||
main: /home/coder/results/20250717_180122.pt_backtest_results.equity.db r/w
|
||||
sqlite> .tables
|
||||
config outstanding_positions pt_bt_results
|
||||
|
||||
sqlite> PRAGMA table_info('pt_bt_results');
|
||||
0|date|DATE|0||0
|
||||
1|pair|TEXT|0||0
|
||||
2|symbol|TEXT|0||0
|
||||
3|open_time|DATETIME|0||0
|
||||
4|open_side|TEXT|0||0
|
||||
5|open_price|REAL|0||0
|
||||
6|open_quantity|INTEGER|0||0
|
||||
7|open_disequilibrium|REAL|0||0
|
||||
8|close_time|DATETIME|0||0
|
||||
9|close_side|TEXT|0||0
|
||||
10|close_price|REAL|0||0
|
||||
11|close_quantity|INTEGER|0||0
|
||||
12|close_disequilibrium|REAL|0||0
|
||||
13|symbol_return|REAL|0||0
|
||||
14|pair_return|REAL|0||0
|
||||
|
||||
select count(*) as cnt from pt_bt_results;
|
||||
8
|
||||
|
||||
select * from pt_bt_results;
|
||||
|
||||
select
|
||||
date, close_time, pair, symbol, symbol_return, pair_return
|
||||
from pt_bt_results ;
|
||||
|
||||
select date, sum(symbol_return) as daily_return
|
||||
from pt_bt_results where date = '2025-06-18' group by date;
|
||||
|
||||
.quit
|
||||
|
||||
sqlite3 '/home/coder/results/20250717_172435.pt_backtest_results.equity.db'
|
||||
|
||||
sqlite> select date, sum(symbol_return) as daily_return
|
||||
from pt_bt_results group by date;
|
||||
|
||||
2025-06-02|1.29845390060828
|
||||
...
|
||||
2025-06-18|-43.5084977104115 <========== ????? ==========>
|
||||
2025-06-20|11.8605547517183
|
||||
|
||||
|
||||
select
|
||||
date, close_time, pair, symbol, symbol_return, pair_return
|
||||
from pt_bt_results ;
|
||||
|
||||
select date, close_time, pair, symbol, symbol_return, pair_return
|
||||
from pt_bt_results where date = '2025-06-18';
|
||||
|
||||
|
||||
./scripts/load_equity_pair_intraday.sh -A NVDA -B QQQ -d 20250701 -T ./intraday_md
|
||||
|
||||
to inspect exactly what sources, formats, and processing steps you can open the script with:
|
||||
head -n 50 ./scripts/load_equity_pair_intraday.sh
|
||||
|
||||
|
||||
|
||||
✓ Data file found: /home/coder/pairs_trading/data/crypto/20250605.mktdata.ohlcv.db
|
||||
|
||||
sqlite3 '/home/coder/results/20250722_201930.pt_backtest_results.crypto.db'
|
||||
|
||||
sqlite3 '/home/coder/results/xxxxxxxx_yyyyyy.pt_backtest_results.pseudo.db'
|
||||
|
||||
11111111
|
||||
=== At your terminal, run these commands:
|
||||
sqlite3 '/home/coder/results/20250722_201930.pt_backtest_results.crypto.db'
|
||||
=== Then inside the SQLite prompt:
|
||||
.mode csv
|
||||
.headers on
|
||||
.output results_20250722.csv
|
||||
SELECT * FROM pt_bt_results;
|
||||
.output stdout
|
||||
.quit
|
||||
|
||||
cd /home/coder/
|
||||
|
||||
# === mode csv formats output as CSV
|
||||
# === headers on includes column names
|
||||
# === output my_table.csv directs output to that file
|
||||
# === Run your SELECT query, then revert output
|
||||
# === Open my_table.csv in Excel directly
|
||||
|
||||
# ======== Using scp (Secure Copy)
|
||||
# === On your local machine, open a terminal and run:
|
||||
scp cvtt@953f6e8df266:/home/coder/results_20250722.csv ~/Downloads/
|
||||
|
||||
|
||||
# ===== convert cvs pandas dataframe ====== -->
|
||||
import pandas as pd
|
||||
# Replace with the actual path to your CSV file
|
||||
file_path = '/home/coder/results_20250722.csv'
|
||||
# Read the CSV file into a DataFrame
|
||||
df = pd.read_csv(file_path)
|
||||
# Show the first few rows
|
||||
print(df.head())
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
36
lib/pt_trading/fit_method.py
Normal file
36
lib/pt_trading/fit_method.py
Normal file
@ -0,0 +1,36 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Dict, Optional, cast
|
||||
|
||||
import pandas as pd # type: ignore[import]
|
||||
from pt_trading.results import BacktestResult
|
||||
from pt_trading.trading_pair import TradingPair
|
||||
|
||||
NanoPerMin = 1e9
|
||||
|
||||
|
||||
class PairsTradingFitMethod(ABC):
|
||||
TRADES_COLUMNS = [
|
||||
"time",
|
||||
"action",
|
||||
"symbol",
|
||||
"price",
|
||||
"disequilibrium",
|
||||
"scaled_disequilibrium",
|
||||
"pair",
|
||||
]
|
||||
|
||||
@abstractmethod
|
||||
def run_pair(
|
||||
self, pair: TradingPair, bt_result: BacktestResult
|
||||
) -> Optional[pd.DataFrame]: ...
|
||||
|
||||
@abstractmethod
|
||||
def reset(self) -> None: ...
|
||||
|
||||
|
||||
class PairState(Enum):
|
||||
INITIAL = 1
|
||||
OPEN = 2
|
||||
CLOSED = 3
|
||||
CLOSED_POSITIONS = 4
|
||||
@ -1,419 +0,0 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Dict, Optional, cast
|
||||
|
||||
import pandas as pd # type: ignore[import]
|
||||
|
||||
from pt_trading.results import BacktestResult
|
||||
from pt_trading.trading_pair import TradingPair
|
||||
|
||||
NanoPerMin = 1e9
|
||||
|
||||
class PairsTradingFitMethod(ABC):
|
||||
TRADES_COLUMNS = [
|
||||
"time",
|
||||
"action",
|
||||
"symbol",
|
||||
"price",
|
||||
"disequilibrium",
|
||||
"scaled_disequilibrium",
|
||||
"pair",
|
||||
]
|
||||
@abstractmethod
|
||||
def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def reset(self):
|
||||
...
|
||||
|
||||
class StaticFit(PairsTradingFitMethod):
|
||||
|
||||
def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]: # abstractmethod
|
||||
pair.get_datasets(training_minutes=config["training_minutes"])
|
||||
try:
|
||||
is_cointegrated = pair.train_pair()
|
||||
if not is_cointegrated:
|
||||
print(f"{pair} IS NOT COINTEGRATED")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"{pair}: Training failed: {str(e)}")
|
||||
return None
|
||||
|
||||
try:
|
||||
pair.predict()
|
||||
except Exception as e:
|
||||
print(f"{pair}: Prediction failed: {str(e)}")
|
||||
return None
|
||||
|
||||
pair_trades = self.create_trading_signals(pair=pair, config=config, result=bt_result)
|
||||
|
||||
return pair_trades
|
||||
|
||||
def create_trading_signals(self, pair: TradingPair, config: Dict, result: BacktestResult) -> pd.DataFrame:
|
||||
beta = pair.vecm_fit_.beta # type: ignore
|
||||
colname_a, colname_b = pair.colnames()
|
||||
|
||||
predicted_df = pair.predicted_df_
|
||||
|
||||
open_threshold = config["dis-equilibrium_open_trshld"]
|
||||
close_threshold = config["dis-equilibrium_close_trshld"]
|
||||
|
||||
# Iterate through the testing dataset to find the first trading opportunity
|
||||
open_row_index = None
|
||||
for row_idx in range(len(predicted_df)):
|
||||
curr_disequilibrium = predicted_df["scaled_disequilibrium"][row_idx]
|
||||
|
||||
# Check if current row has sufficient disequilibrium (not near-zero)
|
||||
if curr_disequilibrium >= open_threshold:
|
||||
open_row_index = row_idx
|
||||
break
|
||||
|
||||
# If no row with sufficient disequilibrium found, skip this pair
|
||||
if open_row_index is None:
|
||||
print(f"{pair}: Insufficient disequilibrium in testing dataset. Skipping.")
|
||||
return pd.DataFrame()
|
||||
|
||||
# Look for close signal starting from the open position
|
||||
trading_signals_df = (
|
||||
predicted_df["scaled_disequilibrium"][open_row_index:] < close_threshold
|
||||
)
|
||||
|
||||
# Adjust indices to account for the offset from open_row_index
|
||||
close_row_index = None
|
||||
for idx, value in trading_signals_df.items():
|
||||
if value:
|
||||
close_row_index = idx
|
||||
break
|
||||
|
||||
open_row = predicted_df.loc[open_row_index]
|
||||
open_tstamp = open_row["tstamp"]
|
||||
open_disequilibrium = open_row["disequilibrium"]
|
||||
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
|
||||
open_px_a = open_row[f"{colname_a}"]
|
||||
open_px_b = open_row[f"{colname_b}"]
|
||||
|
||||
abs_beta = abs(beta[1])
|
||||
pred_px_b = predicted_df.loc[open_row_index][f"{colname_b}_pred"]
|
||||
pred_px_a = predicted_df.loc[open_row_index][f"{colname_a}_pred"]
|
||||
|
||||
if pred_px_b * abs_beta - pred_px_a > 0:
|
||||
open_side_a = "BUY"
|
||||
open_side_b = "SELL"
|
||||
close_side_a = "SELL"
|
||||
close_side_b = "BUY"
|
||||
else:
|
||||
open_side_b = "BUY"
|
||||
open_side_a = "SELL"
|
||||
close_side_b = "SELL"
|
||||
close_side_a = "BUY"
|
||||
|
||||
# If no close signal found, print position and unrealized PnL
|
||||
if close_row_index is None:
|
||||
|
||||
last_row_index = len(predicted_df) - 1
|
||||
|
||||
# Use the new method from BacktestResult to handle outstanding positions
|
||||
result.handle_outstanding_position(
|
||||
pair=pair,
|
||||
pair_result_df=predicted_df,
|
||||
last_row_index=last_row_index,
|
||||
open_side_a=open_side_a,
|
||||
open_side_b=open_side_b,
|
||||
open_px_a=open_px_a,
|
||||
open_px_b=open_px_b,
|
||||
open_tstamp=open_tstamp,
|
||||
)
|
||||
|
||||
# Return only open trades (no close trades)
|
||||
trd_signal_tuples = [
|
||||
(
|
||||
open_tstamp,
|
||||
open_side_a,
|
||||
pair.symbol_a_,
|
||||
open_px_a,
|
||||
open_disequilibrium,
|
||||
open_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
(
|
||||
open_tstamp,
|
||||
open_side_b,
|
||||
pair.symbol_b_,
|
||||
open_px_b,
|
||||
open_disequilibrium,
|
||||
open_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
]
|
||||
else:
|
||||
# Close signal found - create complete trade
|
||||
close_row = predicted_df.loc[close_row_index]
|
||||
close_tstamp = close_row["tstamp"]
|
||||
close_disequilibrium = close_row["disequilibrium"]
|
||||
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
|
||||
close_px_a = close_row[f"{colname_a}"]
|
||||
close_px_b = close_row[f"{colname_b}"]
|
||||
|
||||
print(f"{pair}: Close signal found at index {close_row_index}")
|
||||
|
||||
trd_signal_tuples = [
|
||||
(
|
||||
open_tstamp,
|
||||
open_side_a,
|
||||
pair.symbol_a_,
|
||||
open_px_a,
|
||||
open_disequilibrium,
|
||||
open_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
(
|
||||
open_tstamp,
|
||||
open_side_b,
|
||||
pair.symbol_b_,
|
||||
open_px_b,
|
||||
open_disequilibrium,
|
||||
open_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
(
|
||||
close_tstamp,
|
||||
close_side_a,
|
||||
pair.symbol_a_,
|
||||
close_px_a,
|
||||
close_disequilibrium,
|
||||
close_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
(
|
||||
close_tstamp,
|
||||
close_side_b,
|
||||
pair.symbol_b_,
|
||||
close_px_b,
|
||||
close_disequilibrium,
|
||||
close_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
]
|
||||
|
||||
# Add tuples to data frame
|
||||
return pd.DataFrame(
|
||||
trd_signal_tuples,
|
||||
columns=self.TRADES_COLUMNS, # type: ignore
|
||||
)
|
||||
|
||||
def reset(self) -> None:
|
||||
pass
|
||||
|
||||
class PairState(Enum):
|
||||
INITIAL = 1
|
||||
OPEN = 2
|
||||
CLOSED = 3
|
||||
|
||||
class SlidingFit(PairsTradingFitMethod):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.curr_training_start_idx_ = 0
|
||||
|
||||
def run_pair(self, config: Dict, pair: TradingPair, bt_result: BacktestResult) -> Optional[pd.DataFrame]:
|
||||
print(f"***{pair}*** STARTING....")
|
||||
|
||||
pair.user_data_['state'] = PairState.INITIAL
|
||||
pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS) # type: ignore
|
||||
pair.user_data_["is_cointegrated"] = False
|
||||
|
||||
open_threshold = config["dis-equilibrium_open_trshld"]
|
||||
close_threshold = config["dis-equilibrium_open_trshld"]
|
||||
|
||||
training_minutes = config["training_minutes"]
|
||||
while True:
|
||||
print(self.curr_training_start_idx_, end='\r')
|
||||
pair.get_datasets(
|
||||
training_minutes=training_minutes,
|
||||
training_start_index=self.curr_training_start_idx_,
|
||||
testing_size=1
|
||||
)
|
||||
|
||||
if len(pair.training_df_) < training_minutes:
|
||||
print(f"{pair}: {self.curr_training_start_idx_} Not enough training data. Completing the job.")
|
||||
if pair.user_data_["state"] == PairState.OPEN:
|
||||
print(f"{pair}: {self.curr_training_start_idx_} Position is not closed.")
|
||||
# outstanding positions
|
||||
# last_row_index = self.curr_training_start_idx_ + training_minutes
|
||||
|
||||
bt_result.handle_outstanding_position(
|
||||
pair=pair,
|
||||
pair_result_df=pair.predicted_df_,
|
||||
last_row_index=0,
|
||||
open_side_a=pair.user_data_["open_side_a"],
|
||||
open_side_b=pair.user_data_["open_side_b"],
|
||||
open_px_a=pair.user_data_["open_px_a"],
|
||||
open_px_b=pair.user_data_["open_px_b"],
|
||||
open_tstamp=pair.user_data_["open_tstamp"],
|
||||
)
|
||||
break
|
||||
|
||||
try:
|
||||
is_cointegrated = pair.train_pair()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"{pair}: Training failed: {str(e)}") from e
|
||||
|
||||
if pair.user_data_["is_cointegrated"] != is_cointegrated:
|
||||
pair.user_data_["is_cointegrated"] = is_cointegrated
|
||||
if not is_cointegrated:
|
||||
if pair.user_data_["state"] == PairState.OPEN:
|
||||
print(f"{pair} {self.curr_training_start_idx_} LOST COINTEGRATION. Consider closing positions...")
|
||||
else:
|
||||
print(f"{pair} {self.curr_training_start_idx_} IS NOT COINTEGRATED. Moving on")
|
||||
else:
|
||||
print('*' * 80)
|
||||
print(f"Pair {pair} ({self.curr_training_start_idx_}) IS COINTEGRATED")
|
||||
print('*' * 80)
|
||||
if not is_cointegrated:
|
||||
self.curr_training_start_idx_ += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
pair.predict()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"{pair}: Prediction failed: {str(e)}") from e
|
||||
|
||||
if pair.user_data_["state"] == PairState.INITIAL:
|
||||
|
||||
open_trades = self._get_open_trades(pair, open_threshold=open_threshold)
|
||||
if open_trades is not None:
|
||||
pair.user_data_["trades"] = open_trades
|
||||
pair.user_data_["state"] = PairState.OPEN
|
||||
elif pair.user_data_["state"] == PairState.OPEN:
|
||||
close_trades = self._get_close_trades(pair, close_threshold=close_threshold)
|
||||
if close_trades is not None:
|
||||
pair.user_data_["trades"] = pd.concat([pair.user_data_["trades"], close_trades], ignore_index=True)
|
||||
pair.user_data_["state"] = PairState.CLOSED
|
||||
break
|
||||
|
||||
self.curr_training_start_idx_ += 1
|
||||
|
||||
print(f"***{pair}*** FINISHED ... {len(pair.user_data_['trades'])}")
|
||||
return pair.user_data_["trades"]
|
||||
|
||||
def _get_open_trades(self, pair: TradingPair, open_threshold: float) -> Optional[pd.DataFrame]:
|
||||
colname_a, colname_b = pair.colnames()
|
||||
|
||||
predicted_df = pair.predicted_df_
|
||||
|
||||
# Check if we have any data to work with
|
||||
if len(predicted_df) == 0:
|
||||
return None
|
||||
|
||||
open_row = predicted_df.iloc[0]
|
||||
open_tstamp = open_row["tstamp"]
|
||||
open_disequilibrium = open_row["disequilibrium"]
|
||||
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
|
||||
open_px_a = open_row[f"{colname_a}"]
|
||||
open_px_b = open_row[f"{colname_b}"]
|
||||
|
||||
if open_scaled_disequilibrium < open_threshold:
|
||||
return None
|
||||
|
||||
# creating the trades
|
||||
if open_disequilibrium > 0:
|
||||
open_side_a = "SELL"
|
||||
open_side_b = "BUY"
|
||||
close_side_a = "BUY"
|
||||
close_side_b = "SELL"
|
||||
else:
|
||||
open_side_a = "BUY"
|
||||
open_side_b = "SELL"
|
||||
close_side_a = "SELL"
|
||||
close_side_b = "BUY"
|
||||
|
||||
# save closing sides
|
||||
pair.user_data_["open_side_a"] = open_side_a
|
||||
pair.user_data_["open_side_b"] = open_side_b
|
||||
pair.user_data_["open_px_a"] = open_px_a
|
||||
pair.user_data_["open_px_b"] = open_px_b
|
||||
|
||||
pair.user_data_["open_tstamp"] = open_tstamp
|
||||
|
||||
pair.user_data_["close_side_a"] = close_side_a
|
||||
pair.user_data_["close_side_b"] = close_side_b
|
||||
|
||||
|
||||
# create opening trades
|
||||
trd_signal_tuples = [
|
||||
(
|
||||
open_tstamp,
|
||||
open_side_a,
|
||||
pair.symbol_a_,
|
||||
open_px_a,
|
||||
open_disequilibrium,
|
||||
open_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
(
|
||||
open_tstamp,
|
||||
open_side_b,
|
||||
pair.symbol_b_,
|
||||
open_px_b,
|
||||
open_disequilibrium,
|
||||
open_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
]
|
||||
return pd.DataFrame(
|
||||
trd_signal_tuples,
|
||||
columns=self.TRADES_COLUMNS, # type: ignore
|
||||
)
|
||||
|
||||
def _get_close_trades(self, pair: TradingPair, close_threshold: float) -> Optional[pd.DataFrame]:
|
||||
colname_a, colname_b = pair.colnames()
|
||||
|
||||
# Check if we have any data to work with
|
||||
if len(pair.predicted_df_) == 0:
|
||||
return None
|
||||
|
||||
close_row = pair.predicted_df_.iloc[0]
|
||||
close_tstamp = close_row["tstamp"]
|
||||
close_disequilibrium = close_row["disequilibrium"]
|
||||
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
|
||||
close_px_a = close_row[f"{colname_a}"]
|
||||
close_px_b = close_row[f"{colname_b}"]
|
||||
|
||||
close_side_a = pair.user_data_["close_side_a"]
|
||||
close_side_b = pair.user_data_["close_side_b"]
|
||||
|
||||
if close_scaled_disequilibrium > close_threshold:
|
||||
return None
|
||||
|
||||
trd_signal_tuples = [
|
||||
(
|
||||
close_tstamp,
|
||||
close_side_a,
|
||||
pair.symbol_a_,
|
||||
close_px_a,
|
||||
close_disequilibrium,
|
||||
close_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
(
|
||||
close_tstamp,
|
||||
close_side_b,
|
||||
pair.symbol_b_,
|
||||
close_px_b,
|
||||
close_disequilibrium,
|
||||
close_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
]
|
||||
|
||||
# Add tuples to data frame
|
||||
return pd.DataFrame(
|
||||
trd_signal_tuples,
|
||||
columns=self.TRADES_COLUMNS, # type: ignore
|
||||
)
|
||||
|
||||
def reset(self):
|
||||
self.curr_training_start_idx_ = 0
|
||||
|
||||
|
||||
|
||||
@ -1,28 +1,30 @@
|
||||
from typing import Any, Dict, List
|
||||
import pandas as pd
|
||||
import sqlite3
|
||||
import os
|
||||
from datetime import datetime, date
|
||||
import sqlite3
|
||||
from datetime import date, datetime
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import pandas as pd
|
||||
from pt_trading.trading_pair import TradingPair
|
||||
|
||||
|
||||
# Recommended replacement adapters and converters for Python 3.12+
|
||||
# From: https://docs.python.org/3/library/sqlite3.html#sqlite3-adapter-converter-recipes
|
||||
def adapt_date_iso(val):
|
||||
def adapt_date_iso(val: date) -> str:
|
||||
"""Adapt datetime.date to ISO 8601 date."""
|
||||
return val.isoformat()
|
||||
|
||||
|
||||
def adapt_datetime_iso(val):
|
||||
def adapt_datetime_iso(val: datetime) -> str:
|
||||
"""Adapt datetime.datetime to timezone-naive ISO 8601 date."""
|
||||
return val.isoformat()
|
||||
|
||||
|
||||
def convert_date(val):
|
||||
def convert_date(val: bytes) -> date:
|
||||
"""Convert ISO 8601 date to datetime.date object."""
|
||||
return datetime.fromisoformat(val.decode()).date()
|
||||
|
||||
|
||||
def convert_datetime(val):
|
||||
def convert_datetime(val: bytes) -> datetime:
|
||||
"""Convert ISO 8601 datetime to datetime.datetime object."""
|
||||
return datetime.fromisoformat(val.decode())
|
||||
|
||||
@ -39,6 +41,12 @@ def create_result_database(db_path: str) -> None:
|
||||
Create the SQLite database and required tables if they don't exist.
|
||||
"""
|
||||
try:
|
||||
# Create directory if it doesn't exist
|
||||
db_dir = os.path.dirname(db_path)
|
||||
if db_dir and not os.path.exists(db_dir):
|
||||
os.makedirs(db_dir, exist_ok=True)
|
||||
print(f"Created directory: {db_dir}")
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
@ -172,7 +180,7 @@ def store_results_in_database(
|
||||
if db_path.upper() == "NONE":
|
||||
return
|
||||
|
||||
def convert_timestamp(timestamp):
|
||||
def convert_timestamp(timestamp: Any) -> Optional[datetime]:
|
||||
"""Convert pandas Timestamp to Python datetime object for SQLite compatibility."""
|
||||
if timestamp is None:
|
||||
return None
|
||||
@ -423,14 +431,14 @@ class BacktestResult:
|
||||
|
||||
def add_trade(
|
||||
self,
|
||||
pair_nm,
|
||||
symbol,
|
||||
action,
|
||||
price,
|
||||
disequilibrium=None,
|
||||
scaled_disequilibrium=None,
|
||||
timestamp=None,
|
||||
):
|
||||
pair_nm: str,
|
||||
symbol: str,
|
||||
action: str,
|
||||
price: Any,
|
||||
disequilibrium: Optional[float] = None,
|
||||
scaled_disequilibrium: Optional[float] = None,
|
||||
timestamp: Optional[datetime] = None,
|
||||
) -> None:
|
||||
"""Add a trade to the results tracking."""
|
||||
pair_nm = str(pair_nm)
|
||||
|
||||
@ -442,11 +450,11 @@ class BacktestResult:
|
||||
(action, price, disequilibrium, scaled_disequilibrium, timestamp)
|
||||
)
|
||||
|
||||
def add_outstanding_position(self, position: Dict[str, Any]):
|
||||
def add_outstanding_position(self, position: Dict[str, Any]) -> None:
|
||||
"""Add an outstanding position to tracking."""
|
||||
self.outstanding_positions.append(position)
|
||||
|
||||
def add_realized_pnl(self, realized_pnl: float):
|
||||
def add_realized_pnl(self, realized_pnl: float) -> None:
|
||||
"""Add realized PnL to the total."""
|
||||
self.total_realized_pnl += realized_pnl
|
||||
|
||||
@ -462,14 +470,15 @@ class BacktestResult:
|
||||
"""Get all trades."""
|
||||
return self.trades
|
||||
|
||||
def clear_trades(self):
|
||||
def clear_trades(self) -> None:
|
||||
"""Clear all trades (used when processing new files)."""
|
||||
self.trades.clear()
|
||||
|
||||
def collect_single_day_results(self, result):
|
||||
def collect_single_day_results(self, pairs_trades: List[pd.DataFrame]) -> None:
|
||||
"""Collect and process single day trading results."""
|
||||
if result is None:
|
||||
return
|
||||
result = pd.concat(pairs_trades, ignore_index=True)
|
||||
result["time"] = pd.to_datetime(result["time"])
|
||||
result = result.set_index("time").sort_index()
|
||||
|
||||
print("\n -------------- Suggested Trades ")
|
||||
print(result)
|
||||
@ -482,16 +491,16 @@ class BacktestResult:
|
||||
scaled_disequilibrium = getattr(row, "scaled_disequilibrium", None)
|
||||
timestamp = getattr(row, "time", None)
|
||||
self.add_trade(
|
||||
pair_nm=row.pair,
|
||||
action=action,
|
||||
symbol=symbol,
|
||||
price=price,
|
||||
pair_nm=str(row.pair),
|
||||
action=str(action),
|
||||
symbol=str(symbol),
|
||||
price=float(str(price)),
|
||||
disequilibrium=disequilibrium,
|
||||
scaled_disequilibrium=scaled_disequilibrium,
|
||||
timestamp=timestamp,
|
||||
)
|
||||
|
||||
def print_single_day_results(self):
|
||||
def print_single_day_results(self) -> None:
|
||||
"""Print single day results summary."""
|
||||
for pair, symbols in self.trades.items():
|
||||
print(f"\n--- {pair} ---")
|
||||
@ -501,7 +510,7 @@ class BacktestResult:
|
||||
side, price = trade_data[:2]
|
||||
print(f"{symbol} {side} at ${price}")
|
||||
|
||||
def print_results_summary(self, all_results):
|
||||
def print_results_summary(self, all_results: Dict[str, Dict[str, Any]]) -> None:
|
||||
"""Print summary of all processed files."""
|
||||
print("\n====== Summary of All Processed Files ======")
|
||||
for filename, data in all_results.items():
|
||||
@ -512,7 +521,7 @@ class BacktestResult:
|
||||
)
|
||||
print(f"{filename}: {trade_count} trades")
|
||||
|
||||
def calculate_returns(self, all_results: Dict):
|
||||
def calculate_returns(self, all_results: Dict[str, Dict[str, Any]]) -> None:
|
||||
"""Calculate and print returns by day and pair."""
|
||||
print("\n====== Returns By Day and Pair ======")
|
||||
|
||||
@ -520,6 +529,8 @@ class BacktestResult:
|
||||
day_return = 0
|
||||
print(f"\n--- {filename} ---")
|
||||
|
||||
self.outstanding_positions = data["outstanding_positions"]
|
||||
|
||||
# Process each pair
|
||||
for pair, symbols in data["trades"].items():
|
||||
pair_return = 0
|
||||
@ -527,56 +538,62 @@ class BacktestResult:
|
||||
|
||||
# Calculate individual symbol returns in the pair
|
||||
for symbol, trades in symbols.items():
|
||||
if len(trades) >= 2: # Need at least entry and exit
|
||||
# Get entry and exit trades - handle both old and new tuple formats
|
||||
if len(trades[0]) == 2: # Old format: (action, price)
|
||||
entry_action, entry_price = trades[0]
|
||||
exit_action, exit_price = trades[1]
|
||||
open_disequilibrium = None
|
||||
open_scaled_disequilibrium = None
|
||||
close_disequilibrium = None
|
||||
close_scaled_disequilibrium = None
|
||||
else: # New format: (action, price, disequilibrium, scaled_disequilibrium, timestamp)
|
||||
entry_action, entry_price = trades[0][:2]
|
||||
exit_action, exit_price = trades[1][:2]
|
||||
open_disequilibrium = (
|
||||
trades[0][2] if len(trades[0]) > 2 else None
|
||||
)
|
||||
open_scaled_disequilibrium = (
|
||||
trades[0][3] if len(trades[0]) > 3 else None
|
||||
)
|
||||
close_disequilibrium = (
|
||||
trades[1][2] if len(trades[1]) > 2 else None
|
||||
)
|
||||
close_scaled_disequilibrium = (
|
||||
trades[1][3] if len(trades[1]) > 3 else None
|
||||
)
|
||||
if len(trades) == 0:
|
||||
continue
|
||||
|
||||
# Calculate return based on action
|
||||
symbol_return = 0
|
||||
if entry_action == "BUY" and exit_action == "SELL":
|
||||
# Long position
|
||||
symbol_return = (
|
||||
(exit_price - entry_price) / entry_price * 100
|
||||
)
|
||||
elif entry_action == "SELL" and exit_action == "BUY":
|
||||
# Short position
|
||||
symbol_return = (
|
||||
(entry_price - exit_price) / entry_price * 100
|
||||
)
|
||||
symbol_trades = []
|
||||
|
||||
# Process all trades sequentially for this symbol
|
||||
for i, trade in enumerate(trades):
|
||||
# Handle both old and new tuple formats
|
||||
if len(trade) == 2: # Old format: (action, price)
|
||||
action, price = trade
|
||||
disequilibrium = None
|
||||
scaled_disequilibrium = None
|
||||
timestamp = None
|
||||
else: # New format: (action, price, disequilibrium, scaled_disequilibrium, timestamp)
|
||||
action, price = trade[:2]
|
||||
disequilibrium = trade[2] if len(trade) > 2 else None
|
||||
scaled_disequilibrium = trade[3] if len(trade) > 3 else None
|
||||
timestamp = trade[4] if len(trade) > 4 else None
|
||||
|
||||
symbol_trades.append((action, price, disequilibrium, scaled_disequilibrium, timestamp))
|
||||
|
||||
# Calculate returns for all trade combinations
|
||||
for i in range(len(symbol_trades) - 1):
|
||||
trade1 = symbol_trades[i]
|
||||
trade2 = symbol_trades[i + 1]
|
||||
|
||||
action1, price1, diseq1, scaled_diseq1, ts1 = trade1
|
||||
action2, price2, diseq2, scaled_diseq2, ts2 = trade2
|
||||
|
||||
# Calculate return based on action combination
|
||||
trade_return = 0
|
||||
if action1 == "BUY" and action2 == "SELL":
|
||||
# Long position
|
||||
trade_return = (price2 - price1) / price1 * 100
|
||||
elif action1 == "SELL" and action2 == "BUY":
|
||||
# Short position
|
||||
trade_return = (price1 - price2) / price1 * 100
|
||||
|
||||
symbol_return += trade_return
|
||||
|
||||
# Store trade details for reporting
|
||||
pair_trades.append(
|
||||
(
|
||||
symbol,
|
||||
entry_action,
|
||||
entry_price,
|
||||
exit_action,
|
||||
exit_price,
|
||||
symbol_return,
|
||||
open_scaled_disequilibrium,
|
||||
close_scaled_disequilibrium,
|
||||
action1,
|
||||
price1,
|
||||
action2,
|
||||
price2,
|
||||
trade_return,
|
||||
scaled_diseq1,
|
||||
scaled_diseq2,
|
||||
i + 1, # Trade sequence number
|
||||
)
|
||||
)
|
||||
|
||||
pair_return += symbol_return
|
||||
|
||||
# Print pair returns with disequilibrium information
|
||||
@ -584,23 +601,24 @@ class BacktestResult:
|
||||
print(f" {pair}:")
|
||||
for (
|
||||
symbol,
|
||||
entry_action,
|
||||
entry_price,
|
||||
exit_action,
|
||||
exit_price,
|
||||
symbol_return,
|
||||
open_scaled_disequilibrium,
|
||||
close_scaled_disequilibrium,
|
||||
action1,
|
||||
price1,
|
||||
action2,
|
||||
price2,
|
||||
trade_return,
|
||||
scaled_diseq1,
|
||||
scaled_diseq2,
|
||||
trade_num,
|
||||
) in pair_trades:
|
||||
disequil_info = ""
|
||||
if (
|
||||
open_scaled_disequilibrium is not None
|
||||
and close_scaled_disequilibrium is not None
|
||||
scaled_diseq1 is not None
|
||||
and scaled_diseq2 is not None
|
||||
):
|
||||
disequil_info = f" | Open Dis-eq: {open_scaled_disequilibrium:.2f}, Close Dis-eq: {close_scaled_disequilibrium:.2f}"
|
||||
disequil_info = f" | Open Dis-eq: {scaled_diseq1:.2f}, Close Dis-eq: {scaled_diseq2:.2f}"
|
||||
|
||||
print(
|
||||
f" {symbol}: {entry_action} @ ${entry_price:.2f}, {exit_action} @ ${exit_price:.2f}, Return: {symbol_return:.2f}%{disequil_info}"
|
||||
f" {symbol} (Trade #{trade_num}): {action1} @ ${price1:.2f}, {action2} @ ${price2:.2f}, Return: {trade_return:.2f}%{disequil_info}"
|
||||
)
|
||||
print(f" Pair Total Return: {pair_return:.2f}%")
|
||||
day_return += pair_return
|
||||
@ -610,7 +628,7 @@ class BacktestResult:
|
||||
print(f" Day Total Return: {day_return:.2f}%")
|
||||
self.add_realized_pnl(day_return)
|
||||
|
||||
def print_outstanding_positions(self):
|
||||
def print_outstanding_positions(self) -> None:
|
||||
"""Print all outstanding positions with share quantities and current values."""
|
||||
if not self.get_outstanding_positions():
|
||||
print("\n====== NO OUTSTANDING POSITIONS ======")
|
||||
@ -684,22 +702,22 @@ class BacktestResult:
|
||||
|
||||
print(f"{'TOTAL OUTSTANDING VALUE':<80} ${total_value:<12.2f}")
|
||||
|
||||
def print_grand_totals(self):
|
||||
def print_grand_totals(self) -> None:
|
||||
"""Print grand totals across all pairs."""
|
||||
print(f"\n====== GRAND TOTALS ACROSS ALL PAIRS ======")
|
||||
print(f"Total Realized PnL: {self.get_total_realized_pnl():.2f}%")
|
||||
|
||||
def handle_outstanding_position(
|
||||
self,
|
||||
pair,
|
||||
pair_result_df,
|
||||
last_row_index,
|
||||
open_side_a,
|
||||
open_side_b,
|
||||
open_px_a,
|
||||
open_px_b,
|
||||
open_tstamp,
|
||||
):
|
||||
pair: TradingPair,
|
||||
pair_result_df: pd.DataFrame,
|
||||
last_row_index: int,
|
||||
open_side_a: str,
|
||||
open_side_b: str,
|
||||
open_px_a: float,
|
||||
open_px_b: float,
|
||||
open_tstamp: datetime,
|
||||
) -> Tuple[float, float, float]:
|
||||
"""
|
||||
Handle calculation and tracking of outstanding positions when no close signal is found.
|
||||
|
||||
@ -727,8 +745,8 @@ class BacktestResult:
|
||||
shares_b = funding_per_position / open_px_b
|
||||
|
||||
# Calculate current position values (shares * current price)
|
||||
current_value_a = shares_a * last_px_a
|
||||
current_value_b = shares_b * last_px_b
|
||||
current_value_a = shares_a * last_px_a * (-1 if open_side_a == "SELL" else 1)
|
||||
current_value_b = shares_b * last_px_b * (-1 if open_side_b == "SELL" else 1)
|
||||
total_current_value = current_value_a + current_value_b
|
||||
|
||||
# Get disequilibrium information
|
||||
|
||||
362
lib/pt_trading/sliding_fit.py
Normal file
362
lib/pt_trading/sliding_fit.py
Normal file
@ -0,0 +1,362 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Dict, Optional, cast
|
||||
|
||||
import pandas as pd # type: ignore[import]
|
||||
from pt_trading.fit_method import PairState, PairsTradingFitMethod
|
||||
from pt_trading.results import BacktestResult
|
||||
from pt_trading.trading_pair import TradingPair
|
||||
|
||||
NanoPerMin = 1e9
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class SlidingFit(PairsTradingFitMethod):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
def run_pair(
|
||||
self, pair: TradingPair, bt_result: BacktestResult
|
||||
) -> Optional[pd.DataFrame]:
|
||||
print(f"***{pair}*** STARTING....")
|
||||
config = pair.config_
|
||||
|
||||
curr_training_start_idx = pair.get_begin_index()
|
||||
end_index = pair.get_end_index()
|
||||
|
||||
pair.user_data_["state"] = PairState.INITIAL
|
||||
# Initialize trades DataFrame with proper dtypes to avoid concatenation warnings
|
||||
pair.user_data_["trades"] = pd.DataFrame(columns=self.TRADES_COLUMNS).astype({
|
||||
"time": "datetime64[ns]",
|
||||
"action": "string",
|
||||
"symbol": "string",
|
||||
"price": "float64",
|
||||
"disequilibrium": "float64",
|
||||
"scaled_disequilibrium": "float64",
|
||||
"pair": "object"
|
||||
})
|
||||
pair.user_data_["is_cointegrated"] = False
|
||||
|
||||
training_minutes = config["training_minutes"]
|
||||
curr_predicted_row_idx = 0
|
||||
while True:
|
||||
print(curr_training_start_idx, end="\r")
|
||||
pair.get_datasets(
|
||||
training_minutes=training_minutes,
|
||||
training_start_index=curr_training_start_idx,
|
||||
testing_size=1,
|
||||
)
|
||||
|
||||
if len(pair.training_df_) < training_minutes:
|
||||
print(
|
||||
f"{pair}: current offset={curr_training_start_idx}"
|
||||
f" * Training data length={len(pair.training_df_)} < {training_minutes}"
|
||||
" * Not enough training data. Completing the job."
|
||||
)
|
||||
break
|
||||
|
||||
try:
|
||||
# ================================ TRAINING ================================
|
||||
is_cointegrated = pair.train_pair()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"{pair}: Training failed: {str(e)}") from e
|
||||
|
||||
if pair.user_data_["is_cointegrated"] != is_cointegrated:
|
||||
pair.user_data_["is_cointegrated"] = is_cointegrated
|
||||
if not is_cointegrated:
|
||||
if pair.user_data_["state"] == PairState.OPEN:
|
||||
print(
|
||||
f"{pair} {curr_training_start_idx} LOST COINTEGRATION. Consider closing positions..."
|
||||
)
|
||||
else:
|
||||
print(
|
||||
f"{pair} {curr_training_start_idx} IS NOT COINTEGRATED. Moving on"
|
||||
)
|
||||
else:
|
||||
print("*" * 80)
|
||||
print(
|
||||
f"Pair {pair} ({curr_training_start_idx}) IS COINTEGRATED"
|
||||
)
|
||||
print("*" * 80)
|
||||
if not is_cointegrated:
|
||||
curr_training_start_idx += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
# ================================ PREDICTION ================================
|
||||
pair.predict()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"{pair}: Prediction failed: {str(e)}") from e
|
||||
|
||||
# break
|
||||
|
||||
curr_training_start_idx += 1
|
||||
if curr_training_start_idx > end_index:
|
||||
break
|
||||
curr_predicted_row_idx += 1
|
||||
|
||||
self._create_trading_signals(pair, config, bt_result)
|
||||
print(f"***{pair}*** FINISHED ... {len(pair.user_data_['trades'])}")
|
||||
return pair.get_trades()
|
||||
|
||||
def _create_trading_signals(
|
||||
self, pair: TradingPair, config: Dict, bt_result: BacktestResult
|
||||
) -> None:
|
||||
if pair.predicted_df_ is None:
|
||||
print(f"{pair.market_data_.iloc[0]['tstamp']} {pair}: No predicted data")
|
||||
return
|
||||
|
||||
open_threshold = config["dis-equilibrium_open_trshld"]
|
||||
close_threshold = config["dis-equilibrium_close_trshld"]
|
||||
for curr_predicted_row_idx in range(len(pair.predicted_df_)):
|
||||
pred_row = pair.predicted_df_.iloc[curr_predicted_row_idx]
|
||||
if pair.user_data_["state"] in [PairState.INITIAL, PairState.CLOSED, PairState.CLOSED_POSITIONS]:
|
||||
open_trades = self._get_open_trades(
|
||||
pair, row=pred_row, open_threshold=open_threshold
|
||||
)
|
||||
if open_trades is not None:
|
||||
open_trades["status"] = "OPEN"
|
||||
print(f"OPEN TRADES:\n{open_trades}")
|
||||
pair.add_trades(open_trades)
|
||||
pair.user_data_["state"] = PairState.OPEN
|
||||
elif pair.user_data_["state"] == PairState.OPEN:
|
||||
close_trades = self._get_close_trades(
|
||||
pair, row=pred_row, close_threshold=close_threshold
|
||||
)
|
||||
if close_trades is not None:
|
||||
close_trades["status"] = "CLOSE"
|
||||
print(f"CLOSE TRADES:\n{close_trades}")
|
||||
pair.add_trades(close_trades)
|
||||
pair.user_data_["state"] = PairState.CLOSED
|
||||
|
||||
# Outstanding positions
|
||||
if pair.user_data_["state"] == PairState.OPEN:
|
||||
print(
|
||||
f"{pair}: *** Position is NOT CLOSED. ***"
|
||||
)
|
||||
# outstanding positions
|
||||
if config["close_outstanding_positions"]:
|
||||
close_position_trades = self._get_close_position_trades(
|
||||
pair=pair,
|
||||
row=pred_row,
|
||||
close_threshold=close_threshold,
|
||||
)
|
||||
if close_position_trades is not None:
|
||||
close_position_trades["status"] = "CLOSE_POSITION"
|
||||
print(f"CLOSE_POSITION TRADES:\n{close_position_trades}")
|
||||
pair.add_trades(close_position_trades)
|
||||
pair.user_data_["state"] = PairState.CLOSED_POSITIONS
|
||||
else:
|
||||
if pair.predicted_df_ is not None:
|
||||
bt_result.handle_outstanding_position(
|
||||
pair=pair,
|
||||
pair_result_df=pair.predicted_df_,
|
||||
last_row_index=0,
|
||||
open_side_a=pair.user_data_["open_side_a"],
|
||||
open_side_b=pair.user_data_["open_side_b"],
|
||||
open_px_a=pair.user_data_["open_px_a"],
|
||||
open_px_b=pair.user_data_["open_px_b"],
|
||||
open_tstamp=pair.user_data_["open_tstamp"],
|
||||
)
|
||||
|
||||
def _get_open_trades(
|
||||
self, pair: TradingPair, row: pd.Series, open_threshold: float
|
||||
) -> Optional[pd.DataFrame]:
|
||||
colname_a, colname_b = pair.colnames()
|
||||
|
||||
assert pair.predicted_df_ is not None
|
||||
predicted_df = pair.predicted_df_
|
||||
|
||||
# Check if we have any data to work with
|
||||
if len(predicted_df) == 0:
|
||||
return None
|
||||
|
||||
open_row = row
|
||||
open_tstamp = open_row["tstamp"]
|
||||
open_disequilibrium = open_row["disequilibrium"]
|
||||
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
|
||||
open_px_a = open_row[f"{colname_a}"]
|
||||
open_px_b = open_row[f"{colname_b}"]
|
||||
|
||||
if open_scaled_disequilibrium < open_threshold:
|
||||
return None
|
||||
|
||||
# creating the trades
|
||||
print(f"OPEN_TRADES: {row["tstamp"]} {open_scaled_disequilibrium=}")
|
||||
if open_disequilibrium > 0:
|
||||
open_side_a = "SELL"
|
||||
open_side_b = "BUY"
|
||||
close_side_a = "BUY"
|
||||
close_side_b = "SELL"
|
||||
else:
|
||||
open_side_a = "BUY"
|
||||
open_side_b = "SELL"
|
||||
close_side_a = "SELL"
|
||||
close_side_b = "BUY"
|
||||
|
||||
# save closing sides
|
||||
pair.user_data_["open_side_a"] = open_side_a
|
||||
pair.user_data_["open_side_b"] = open_side_b
|
||||
pair.user_data_["open_px_a"] = open_px_a
|
||||
pair.user_data_["open_px_b"] = open_px_b
|
||||
|
||||
pair.user_data_["open_tstamp"] = open_tstamp
|
||||
|
||||
pair.user_data_["close_side_a"] = close_side_a
|
||||
pair.user_data_["close_side_b"] = close_side_b
|
||||
|
||||
# create opening trades
|
||||
trd_signal_tuples = [
|
||||
(
|
||||
open_tstamp,
|
||||
open_side_a,
|
||||
pair.symbol_a_,
|
||||
open_px_a,
|
||||
open_disequilibrium,
|
||||
open_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
(
|
||||
open_tstamp,
|
||||
open_side_b,
|
||||
pair.symbol_b_,
|
||||
open_px_b,
|
||||
open_disequilibrium,
|
||||
open_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
]
|
||||
# Create DataFrame with explicit dtypes to avoid concatenation warnings
|
||||
df = pd.DataFrame(
|
||||
trd_signal_tuples,
|
||||
columns=self.TRADES_COLUMNS,
|
||||
)
|
||||
# Ensure consistent dtypes
|
||||
return df.astype({
|
||||
"time": "datetime64[ns]",
|
||||
"action": "string",
|
||||
"symbol": "string",
|
||||
"price": "float64",
|
||||
"disequilibrium": "float64",
|
||||
"scaled_disequilibrium": "float64",
|
||||
"pair": "object"
|
||||
})
|
||||
|
||||
def _get_close_trades(
|
||||
self, pair: TradingPair, row: pd.Series, close_threshold: float
|
||||
) -> Optional[pd.DataFrame]:
|
||||
colname_a, colname_b = pair.colnames()
|
||||
|
||||
assert pair.predicted_df_ is not None
|
||||
if len(pair.predicted_df_) == 0:
|
||||
return None
|
||||
|
||||
close_row = row
|
||||
close_tstamp = close_row["tstamp"]
|
||||
close_disequilibrium = close_row["disequilibrium"]
|
||||
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
|
||||
close_px_a = close_row[f"{colname_a}"]
|
||||
close_px_b = close_row[f"{colname_b}"]
|
||||
|
||||
close_side_a = pair.user_data_["close_side_a"]
|
||||
close_side_b = pair.user_data_["close_side_b"]
|
||||
|
||||
if close_scaled_disequilibrium > close_threshold:
|
||||
return None
|
||||
trd_signal_tuples = [
|
||||
(
|
||||
close_tstamp,
|
||||
close_side_a,
|
||||
pair.symbol_a_,
|
||||
close_px_a,
|
||||
close_disequilibrium,
|
||||
close_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
(
|
||||
close_tstamp,
|
||||
close_side_b,
|
||||
pair.symbol_b_,
|
||||
close_px_b,
|
||||
close_disequilibrium,
|
||||
close_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
]
|
||||
|
||||
# Add tuples to data frame with explicit dtypes to avoid concatenation warnings
|
||||
df = pd.DataFrame(
|
||||
trd_signal_tuples,
|
||||
columns=self.TRADES_COLUMNS,
|
||||
)
|
||||
# Ensure consistent dtypes
|
||||
return df.astype({
|
||||
"time": "datetime64[ns]",
|
||||
"action": "string",
|
||||
"symbol": "string",
|
||||
"price": "float64",
|
||||
"disequilibrium": "float64",
|
||||
"scaled_disequilibrium": "float64",
|
||||
"pair": "object"
|
||||
})
|
||||
|
||||
def _get_close_position_trades(
|
||||
self, pair: TradingPair, row: pd.Series, close_threshold: float
|
||||
) -> Optional[pd.DataFrame]:
|
||||
colname_a, colname_b = pair.colnames()
|
||||
|
||||
assert pair.predicted_df_ is not None
|
||||
if len(pair.predicted_df_) == 0:
|
||||
return None
|
||||
|
||||
close_position_row = row
|
||||
close_position_tstamp = close_position_row["tstamp"]
|
||||
close_position_disequilibrium = close_position_row["disequilibrium"]
|
||||
close_position_scaled_disequilibrium = close_position_row["scaled_disequilibrium"]
|
||||
close_position_px_a = close_position_row[f"{colname_a}"]
|
||||
close_position_px_b = close_position_row[f"{colname_b}"]
|
||||
|
||||
close_position_side_a = pair.user_data_["close_side_a"]
|
||||
close_position_side_b = pair.user_data_["close_side_b"]
|
||||
|
||||
trd_signal_tuples = [
|
||||
(
|
||||
close_position_tstamp,
|
||||
close_position_side_a,
|
||||
pair.symbol_a_,
|
||||
close_position_px_a,
|
||||
close_position_disequilibrium,
|
||||
close_position_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
(
|
||||
close_position_tstamp,
|
||||
close_position_side_b,
|
||||
pair.symbol_b_,
|
||||
close_position_px_b,
|
||||
close_position_disequilibrium,
|
||||
close_position_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
]
|
||||
|
||||
# Add tuples to data frame with explicit dtypes to avoid concatenation warnings
|
||||
df = pd.DataFrame(
|
||||
trd_signal_tuples,
|
||||
columns=self.TRADES_COLUMNS,
|
||||
)
|
||||
# Ensure consistent dtypes
|
||||
return df.astype({
|
||||
"time": "datetime64[ns]",
|
||||
"action": "string",
|
||||
"symbol": "string",
|
||||
"price": "float64",
|
||||
"disequilibrium": "float64",
|
||||
"scaled_disequilibrium": "float64",
|
||||
"pair": "object"
|
||||
})
|
||||
|
||||
def reset(self) -> None:
|
||||
curr_training_start_idx = 0
|
||||
220
lib/pt_trading/static_fit.py
Normal file
220
lib/pt_trading/static_fit.py
Normal file
@ -0,0 +1,220 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Dict, Optional, cast
|
||||
|
||||
import pandas as pd # type: ignore[import]
|
||||
from pt_trading.results import BacktestResult
|
||||
from pt_trading.trading_pair import TradingPair
|
||||
from pt_trading.fit_method import PairsTradingFitMethod
|
||||
|
||||
NanoPerMin = 1e9
|
||||
|
||||
|
||||
|
||||
class StaticFit(PairsTradingFitMethod):
|
||||
|
||||
def run_pair(
|
||||
self, pair: TradingPair, bt_result: BacktestResult
|
||||
) -> Optional[pd.DataFrame]: # abstractmethod
|
||||
config = pair.config_
|
||||
pair.get_datasets(training_minutes=config["training_minutes"])
|
||||
try:
|
||||
is_cointegrated = pair.train_pair()
|
||||
if not is_cointegrated:
|
||||
print(f"{pair} IS NOT COINTEGRATED")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"{pair}: Training failed: {str(e)}")
|
||||
return None
|
||||
|
||||
try:
|
||||
pair.predict()
|
||||
except Exception as e:
|
||||
print(f"{pair}: Prediction failed: {str(e)}")
|
||||
return None
|
||||
|
||||
pair_trades = self.create_trading_signals(
|
||||
pair=pair, config=config, result=bt_result
|
||||
)
|
||||
|
||||
return pair_trades
|
||||
|
||||
def create_trading_signals(
|
||||
self, pair: TradingPair, config: Dict, result: BacktestResult
|
||||
) -> pd.DataFrame:
|
||||
beta = pair.vecm_fit_.beta # type: ignore
|
||||
colname_a, colname_b = pair.colnames()
|
||||
|
||||
predicted_df = pair.predicted_df_
|
||||
if predicted_df is None:
|
||||
# Return empty DataFrame with correct columns and dtypes
|
||||
return pd.DataFrame(columns=self.TRADES_COLUMNS).astype({
|
||||
"time": "datetime64[ns]",
|
||||
"action": "string",
|
||||
"symbol": "string",
|
||||
"price": "float64",
|
||||
"disequilibrium": "float64",
|
||||
"scaled_disequilibrium": "float64",
|
||||
"pair": "object"
|
||||
})
|
||||
|
||||
open_threshold = config["dis-equilibrium_open_trshld"]
|
||||
close_threshold = config["dis-equilibrium_close_trshld"]
|
||||
|
||||
# Iterate through the testing dataset to find the first trading opportunity
|
||||
open_row_index = None
|
||||
for row_idx in range(len(predicted_df)):
|
||||
curr_disequilibrium = predicted_df["scaled_disequilibrium"][row_idx]
|
||||
|
||||
# Check if current row has sufficient disequilibrium (not near-zero)
|
||||
if curr_disequilibrium >= open_threshold:
|
||||
open_row_index = row_idx
|
||||
break
|
||||
|
||||
# If no row with sufficient disequilibrium found, skip this pair
|
||||
if open_row_index is None:
|
||||
print(f"{pair}: Insufficient disequilibrium in testing dataset. Skipping.")
|
||||
return pd.DataFrame()
|
||||
|
||||
# Look for close signal starting from the open position
|
||||
trading_signals_df = (
|
||||
predicted_df["scaled_disequilibrium"][open_row_index:] < close_threshold
|
||||
)
|
||||
|
||||
# Adjust indices to account for the offset from open_row_index
|
||||
close_row_index = None
|
||||
for idx, value in trading_signals_df.items():
|
||||
if value:
|
||||
close_row_index = idx
|
||||
break
|
||||
|
||||
open_row = predicted_df.loc[open_row_index]
|
||||
open_px_a = predicted_df.at[open_row_index, f"{colname_a}"]
|
||||
open_px_b = predicted_df.at[open_row_index, f"{colname_b}"]
|
||||
open_tstamp = predicted_df.at[open_row_index, "tstamp"]
|
||||
open_disequilibrium = open_row["disequilibrium"]
|
||||
open_scaled_disequilibrium = open_row["scaled_disequilibrium"]
|
||||
|
||||
abs_beta = abs(beta[1])
|
||||
pred_px_b = predicted_df.loc[open_row_index][f"{colname_b}_pred"]
|
||||
pred_px_a = predicted_df.loc[open_row_index][f"{colname_a}_pred"]
|
||||
|
||||
if pred_px_b * abs_beta - pred_px_a > 0:
|
||||
open_side_a = "BUY"
|
||||
open_side_b = "SELL"
|
||||
close_side_a = "SELL"
|
||||
close_side_b = "BUY"
|
||||
else:
|
||||
open_side_b = "BUY"
|
||||
open_side_a = "SELL"
|
||||
close_side_b = "SELL"
|
||||
close_side_a = "BUY"
|
||||
|
||||
# If no close signal found, print position and unrealized PnL
|
||||
if close_row_index is None:
|
||||
|
||||
last_row_index = len(predicted_df) - 1
|
||||
|
||||
# Use the new method from BacktestResult to handle outstanding positions
|
||||
result.handle_outstanding_position(
|
||||
pair=pair,
|
||||
pair_result_df=predicted_df,
|
||||
last_row_index=last_row_index,
|
||||
open_side_a=open_side_a,
|
||||
open_side_b=open_side_b,
|
||||
open_px_a=float(open_px_a),
|
||||
open_px_b=float(open_px_b),
|
||||
open_tstamp=pd.Timestamp(open_tstamp),
|
||||
)
|
||||
|
||||
# Return only open trades (no close trades)
|
||||
trd_signal_tuples = [
|
||||
(
|
||||
open_tstamp,
|
||||
open_side_a,
|
||||
pair.symbol_a_,
|
||||
open_px_a,
|
||||
open_disequilibrium,
|
||||
open_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
(
|
||||
open_tstamp,
|
||||
open_side_b,
|
||||
pair.symbol_b_,
|
||||
open_px_b,
|
||||
open_disequilibrium,
|
||||
open_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
]
|
||||
else:
|
||||
# Close signal found - create complete trade
|
||||
close_row = predicted_df.loc[close_row_index]
|
||||
close_tstamp = close_row["tstamp"]
|
||||
close_disequilibrium = close_row["disequilibrium"]
|
||||
close_scaled_disequilibrium = close_row["scaled_disequilibrium"]
|
||||
close_px_a = close_row[f"{colname_a}"]
|
||||
close_px_b = close_row[f"{colname_b}"]
|
||||
|
||||
print(f"{pair}: Close signal found at index {close_row_index}")
|
||||
|
||||
trd_signal_tuples = [
|
||||
(
|
||||
open_tstamp,
|
||||
open_side_a,
|
||||
pair.symbol_a_,
|
||||
open_px_a,
|
||||
open_disequilibrium,
|
||||
open_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
(
|
||||
open_tstamp,
|
||||
open_side_b,
|
||||
pair.symbol_b_,
|
||||
open_px_b,
|
||||
open_disequilibrium,
|
||||
open_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
(
|
||||
close_tstamp,
|
||||
close_side_a,
|
||||
pair.symbol_a_,
|
||||
close_px_a,
|
||||
close_disequilibrium,
|
||||
close_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
(
|
||||
close_tstamp,
|
||||
close_side_b,
|
||||
pair.symbol_b_,
|
||||
close_px_b,
|
||||
close_disequilibrium,
|
||||
close_scaled_disequilibrium,
|
||||
pair,
|
||||
),
|
||||
]
|
||||
|
||||
# Add tuples to data frame with explicit dtypes to avoid concatenation warnings
|
||||
df = pd.DataFrame(
|
||||
trd_signal_tuples,
|
||||
columns=self.TRADES_COLUMNS,
|
||||
)
|
||||
# Ensure consistent dtypes
|
||||
return df.astype({
|
||||
"time": "datetime64[ns]",
|
||||
"action": "string",
|
||||
"symbol": "string",
|
||||
"price": "float64",
|
||||
"disequilibrium": "float64",
|
||||
"scaled_disequilibrium": "float64",
|
||||
"pair": "object"
|
||||
})
|
||||
|
||||
def reset(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import pandas as pd # type:ignore
|
||||
from statsmodels.tsa.vector_ar.vecm import VECM, VECMResults # type:ignore
|
||||
|
||||
@ -19,18 +20,45 @@ class TradingPair:
|
||||
|
||||
user_data_: Dict[str, Any]
|
||||
|
||||
predicted_df_: Optional[pd.DataFrame]
|
||||
|
||||
def __init__(
|
||||
self, market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str
|
||||
self, config: Dict[str, Any], market_data: pd.DataFrame, symbol_a: str, symbol_b: str, price_column: str
|
||||
):
|
||||
self.symbol_a_ = symbol_a
|
||||
self.symbol_b_ = symbol_b
|
||||
self.price_column_ = price_column
|
||||
self.set_market_data(market_data)
|
||||
self.user_data_ = {}
|
||||
self.predicted_df_ = None
|
||||
self.config_ = config
|
||||
|
||||
def set_market_data(self, market_data: pd.DataFrame) -> None:
|
||||
self.market_data_ = pd.DataFrame(
|
||||
self._transform_dataframe(market_data)[["tstamp"] + self.colnames()]
|
||||
)
|
||||
|
||||
self.market_data_ = self.market_data_.dropna().reset_index(drop=True)
|
||||
self.market_data_['tstamp'] = pd.to_datetime(self.market_data_['tstamp'])
|
||||
self.market_data_ = self.market_data_.sort_values('tstamp')
|
||||
|
||||
self.user_data_ = {}
|
||||
def get_begin_index(self) -> int:
|
||||
if "trading_hours" not in self.config_:
|
||||
return 0
|
||||
assert "timezone" in self.config_["trading_hours"]
|
||||
assert "begin_session" in self.config_["trading_hours"]
|
||||
start_time = pd.to_datetime(self.config_["trading_hours"]["begin_session"]).tz_localize(self.config_["trading_hours"]["timezone"]).time()
|
||||
mask = self.market_data_['tstamp'].dt.time >= start_time
|
||||
return int(self.market_data_.index[mask].min())
|
||||
|
||||
def get_end_index(self) -> int:
|
||||
if "trading_hours" not in self.config_:
|
||||
return 0
|
||||
assert "timezone" in self.config_["trading_hours"]
|
||||
assert "end_session" in self.config_["trading_hours"]
|
||||
end_time = pd.to_datetime(self.config_["trading_hours"]["end_session"]).tz_localize(self.config_["trading_hours"]["timezone"]).time()
|
||||
mask = self.market_data_['tstamp'].dt.time <= end_time
|
||||
return int(self.market_data_.index[mask].max())
|
||||
|
||||
def _transform_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
# Select only the columns we need
|
||||
@ -69,7 +97,7 @@ class TradingPair:
|
||||
drop=True
|
||||
) # do not dropna() since irrelevant symbol would affect dataset
|
||||
|
||||
return result_df
|
||||
return result_df.dropna()
|
||||
|
||||
def get_datasets(
|
||||
self,
|
||||
@ -80,7 +108,7 @@ class TradingPair:
|
||||
|
||||
testing_start_index = training_start_index + training_minutes
|
||||
self.training_df_ = self.market_data_.iloc[
|
||||
training_start_index:testing_start_index, :
|
||||
training_start_index:testing_start_index, : training_minutes
|
||||
].copy()
|
||||
assert self.training_df_ is not None
|
||||
self.training_df_ = self.training_df_.dropna().reset_index(drop=True)
|
||||
@ -101,7 +129,7 @@ class TradingPair:
|
||||
f"{self.price_column_}_{self.symbol_b_}",
|
||||
]
|
||||
|
||||
def fit_VECM(self):
|
||||
def fit_VECM(self) -> None:
|
||||
assert self.training_df_ is not None
|
||||
vecm_df = self.training_df_[self.colnames()].reset_index(drop=True)
|
||||
vecm_model = VECM(vecm_df, coint_rank=1)
|
||||
@ -120,20 +148,20 @@ class TradingPair:
|
||||
# print(f"{self}: {self.vecm_fit_.summary()}")
|
||||
pass
|
||||
|
||||
def check_cointegration_johansen(self):
|
||||
def check_cointegration_johansen(self) -> bool:
|
||||
assert self.training_df_ is not None
|
||||
from statsmodels.tsa.vector_ar.vecm import coint_johansen
|
||||
|
||||
df = self.training_df_[self.colnames()].reset_index(drop=True)
|
||||
result = coint_johansen(df, det_order=0, k_ar_diff=1)
|
||||
print(
|
||||
f"{self}: lr1={result.lr1[0]} > cvt={result.cvt[0, 1]}? {result.lr1[0] > result.cvt[0, 1]}"
|
||||
)
|
||||
is_cointegrated = result.lr1[0] > result.cvt[0, 1]
|
||||
# print(
|
||||
# f"{self}: lr1={result.lr1[0]} > cvt={result.cvt[0, 1]}? {result.lr1[0] > result.cvt[0, 1]}"
|
||||
# )
|
||||
is_cointegrated: bool = bool(result.lr1[0] > result.cvt[0, 1])
|
||||
|
||||
return is_cointegrated
|
||||
|
||||
def check_cointegration_engle_granger(self):
|
||||
def check_cointegration_engle_granger(self) -> bool:
|
||||
from statsmodels.tsa.stattools import coint
|
||||
|
||||
col1, col2 = self.colnames()
|
||||
@ -144,22 +172,23 @@ class TradingPair:
|
||||
# Run Engle-Granger cointegration test
|
||||
pvalue = coint(series1, series2)[1]
|
||||
# Define cointegration if p-value < 0.05 (i.e., reject null of no cointegration)
|
||||
is_cointegrated = pvalue < 0.05
|
||||
print(f"{self}: is_cointegrated={is_cointegrated} pvalue={pvalue}")
|
||||
is_cointegrated: bool = bool(pvalue < 0.05)
|
||||
# print(f"{self}: is_cointegrated={is_cointegrated} pvalue={pvalue}")
|
||||
return is_cointegrated
|
||||
|
||||
def train_pair(self) -> bool:
|
||||
def check_cointegration(self) -> bool:
|
||||
is_cointegrated_johansen = self.check_cointegration_johansen()
|
||||
is_cointegrated_engle_granger = self.check_cointegration_engle_granger()
|
||||
if not is_cointegrated_johansen and not is_cointegrated_engle_granger:
|
||||
return False
|
||||
pass
|
||||
result = is_cointegrated_johansen or is_cointegrated_engle_granger
|
||||
return result or True # TODO: remove this
|
||||
|
||||
def train_pair(self) -> bool:
|
||||
result = self.check_cointegration()
|
||||
# print('*' * 80 + '\n' + f"**************** {self} IS COINTEGRATED ****************\n" + '*' * 80)
|
||||
self.fit_VECM()
|
||||
assert self.training_df_ is not None and self.vecm_fit_ is not None
|
||||
diseq_series = self.training_df_[self.colnames()] @ self.vecm_fit_.beta
|
||||
print(diseq_series.shape)
|
||||
# print(diseq_series.shape)
|
||||
self.training_mu_ = float(diseq_series[0].mean())
|
||||
self.training_std_ = float(diseq_series[0].std())
|
||||
|
||||
@ -171,7 +200,45 @@ class TradingPair:
|
||||
diseq_series - self.training_mu_
|
||||
) / self.training_std_
|
||||
|
||||
return True
|
||||
return result
|
||||
|
||||
def add_trades(self, trades: pd.DataFrame) -> None:
|
||||
if self.user_data_["trades"] is None or len(self.user_data_["trades"]) == 0:
|
||||
# If trades is empty or None, just assign the new trades directly
|
||||
self.user_data_["trades"] = trades.copy()
|
||||
else:
|
||||
# Ensure both DataFrames have the same columns and dtypes before concatenation
|
||||
existing_trades = self.user_data_["trades"]
|
||||
|
||||
# If existing trades is empty, just assign the new trades
|
||||
if len(existing_trades) == 0:
|
||||
self.user_data_["trades"] = trades.copy()
|
||||
else:
|
||||
# Ensure both DataFrames have the same columns
|
||||
if set(existing_trades.columns) != set(trades.columns):
|
||||
# Add missing columns to trades with appropriate default values
|
||||
for col in existing_trades.columns:
|
||||
if col not in trades.columns:
|
||||
if col == "time":
|
||||
trades[col] = pd.Timestamp.now()
|
||||
elif col in ["action", "symbol"]:
|
||||
trades[col] = ""
|
||||
elif col in ["price", "disequilibrium", "scaled_disequilibrium"]:
|
||||
trades[col] = 0.0
|
||||
elif col == "pair":
|
||||
trades[col] = None
|
||||
else:
|
||||
trades[col] = None
|
||||
|
||||
# Concatenate with explicit dtypes to avoid warnings
|
||||
self.user_data_["trades"] = pd.concat(
|
||||
[existing_trades, trades],
|
||||
ignore_index=True,
|
||||
copy=False
|
||||
)
|
||||
|
||||
def get_trades(self) -> pd.DataFrame:
|
||||
return self.user_data_["trades"] if "trades" in self.user_data_ else pd.DataFrame()
|
||||
|
||||
def predict(self) -> pd.DataFrame:
|
||||
assert self.testing_df_ is not None
|
||||
@ -179,9 +246,12 @@ class TradingPair:
|
||||
predicted_prices = self.vecm_fit_.predict(steps=len(self.testing_df_))
|
||||
|
||||
# Convert prediction to a DataFrame for readability
|
||||
# predicted_df =
|
||||
predicted_df = pd.DataFrame(
|
||||
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
|
||||
)
|
||||
|
||||
self.predicted_df_ = pd.merge(
|
||||
|
||||
predicted_df = pd.merge(
|
||||
self.testing_df_.reset_index(drop=True),
|
||||
pd.DataFrame(
|
||||
predicted_prices, columns=pd.Index(self.colnames()), dtype=float
|
||||
@ -191,18 +261,34 @@ class TradingPair:
|
||||
suffixes=("", "_pred"),
|
||||
).dropna()
|
||||
|
||||
self.predicted_df_["disequilibrium"] = (
|
||||
self.predicted_df_[self.colnames()] @ self.vecm_fit_.beta
|
||||
predicted_df["disequilibrium"] = (
|
||||
predicted_df[self.colnames()] @ self.vecm_fit_.beta
|
||||
)
|
||||
|
||||
self.predicted_df_["scaled_disequilibrium"] = (
|
||||
abs(self.predicted_df_["disequilibrium"] - self.training_mu_)
|
||||
predicted_df["scaled_disequilibrium"] = (
|
||||
abs(predicted_df["disequilibrium"] - self.training_mu_)
|
||||
/ self.training_std_
|
||||
)
|
||||
|
||||
# print("*** PREDICTED DF")
|
||||
# print(predicted_df)
|
||||
# print("*" * 80)
|
||||
# print("*** SELF.PREDICTED_DF")
|
||||
# print(self.predicted_df_)
|
||||
# print("*" * 80)
|
||||
|
||||
predicted_df = predicted_df.reset_index(drop=True)
|
||||
if self.predicted_df_ is None:
|
||||
self.predicted_df_ = predicted_df
|
||||
else:
|
||||
self.predicted_df_ = pd.concat([self.predicted_df_, predicted_df], ignore_index=True)
|
||||
# Reset index to ensure proper indexing
|
||||
self.predicted_df_ = self.predicted_df_.reset_index()
|
||||
self.predicted_df_ = self.predicted_df_.reset_index(drop=True)
|
||||
return self.predicted_df_
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return self.name()
|
||||
|
||||
def name(self) -> str:
|
||||
return f"{self.symbol_a_} & {self.symbol_b_}"
|
||||
# return f"{self.symbol_a_} & {self.symbol_b_}"
|
||||
|
||||
@ -24,11 +24,14 @@ hjson>=3.0.2
|
||||
html5lib>=1.1
|
||||
httplib2>=0.20.2
|
||||
idna>=3.3
|
||||
ipython>=8.18.1
|
||||
ipywidgets>=8.1.1
|
||||
ifaddr>=0.1.7
|
||||
IMDbPY>=2021.4.18
|
||||
ipykernel>=6.29.5
|
||||
jeepney>=0.7.1
|
||||
jsonschema>=3.2.0
|
||||
jupyter>=1.0.0
|
||||
keyring>=23.5.0
|
||||
launchpadlib>=1.10.16
|
||||
lazr.restfulclient>=0.14.4
|
||||
@ -42,14 +45,18 @@ more-itertools>=8.10.0
|
||||
multidict>=6.0.4
|
||||
mypy>=0.942
|
||||
mypy-extensions>=0.4.3
|
||||
nbformat>=5.10.2
|
||||
netaddr>=0.8.0
|
||||
######### netifaces>=0.11.0
|
||||
numpy>=1.26.4,<2.3.0
|
||||
oauthlib>=3.2.0
|
||||
packaging>=23.1
|
||||
pandas>=2.2.3
|
||||
pathspec>=0.11.1
|
||||
pexpect>=4.8.0
|
||||
Pillow>=9.0.1
|
||||
platformdirs>=3.2.0
|
||||
plotly>=5.19.0
|
||||
protobuf>=3.12.4
|
||||
psutil>=5.9.0
|
||||
ptyprocess>=0.7.0
|
||||
|
||||
4433
research/notebooks/__DEPRECATED__/pt_pair_backtest.ipynb
Normal file
4433
research/notebooks/__DEPRECATED__/pt_pair_backtest.ipynb
Normal file
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -15,7 +15,7 @@ from pt_trading.results import (
|
||||
store_config_in_database,
|
||||
store_results_in_database,
|
||||
)
|
||||
from pt_trading.fit_methods import PairsTradingFitMethod
|
||||
from pt_trading.fit_method import PairsTradingFitMethod
|
||||
from pt_trading.trading_pair import TradingPair
|
||||
|
||||
|
||||
@ -84,6 +84,7 @@ def run_backtest(
|
||||
|
||||
for a_index, b_index in unique_index_pairs:
|
||||
pair = TradingPair(
|
||||
config=config_copy,
|
||||
market_data=market_data_df,
|
||||
symbol_a=instruments[a_index],
|
||||
symbol_b=instruments[b_index],
|
||||
@ -95,21 +96,17 @@ def run_backtest(
|
||||
pairs_trades = []
|
||||
for pair in _create_pairs(config, instruments):
|
||||
single_pair_trades = fit_method.run_pair(
|
||||
pair=pair, config=config, bt_result=bt_result
|
||||
pair=pair, bt_result=bt_result
|
||||
)
|
||||
if single_pair_trades is not None and len(single_pair_trades) > 0:
|
||||
pairs_trades.append(single_pair_trades)
|
||||
|
||||
print(f"pairs_trades: {pairs_trades}")
|
||||
# Check if result_list has any data before concatenating
|
||||
if len(pairs_trades) == 0:
|
||||
print("No trading signals found for any pairs")
|
||||
return bt_result
|
||||
|
||||
result = pd.concat(pairs_trades, ignore_index=True)
|
||||
result["time"] = pd.to_datetime(result["time"])
|
||||
result = result.set_index("time").sort_index()
|
||||
|
||||
bt_result.collect_single_day_results(result)
|
||||
bt_result.collect_single_day_results(pairs_trades)
|
||||
return bt_result
|
||||
|
||||
|
||||
@ -226,7 +223,10 @@ def main() -> None:
|
||||
|
||||
# Store results with file name as key
|
||||
filename = os.path.basename(datafile)
|
||||
all_results[filename] = {"trades": bt_results.trades.copy()}
|
||||
all_results[filename] = {
|
||||
"trades": bt_results.trades.copy(),
|
||||
"outstanding_positions": bt_results.outstanding_positions.copy(),
|
||||
}
|
||||
|
||||
# Store results in database
|
||||
if args.result_db.upper() != "NONE":
|
||||
|
||||
BIN
researchresults/equity/20250714_003409.equity_results.db
Normal file
BIN
researchresults/equity/20250714_003409.equity_results.db
Normal file
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user