2025-07-30 17:08:06 +00:00

529 lines
20 KiB
Python

import os
import sqlite3
from datetime import date, datetime
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from pt_trading.trading_pair import TradingPair
# Recommended replacement adapters and converters for Python 3.12+
# From: https://docs.python.org/3/library/sqlite3.html#sqlite3-adapter-converter-recipes
def adapt_date_iso(val: date) -> str:
"""Adapt datetime.date to ISO 8601 date."""
return val.isoformat()
def adapt_datetime_iso(val: datetime) -> str:
"""Adapt datetime.datetime to timezone-naive ISO 8601 date."""
return val.isoformat()
def convert_date(val: bytes) -> date:
"""Convert ISO 8601 date to datetime.date object."""
return datetime.fromisoformat(val.decode()).date()
def convert_datetime(val: bytes) -> datetime:
"""Convert ISO 8601 datetime to datetime.datetime object."""
return datetime.fromisoformat(val.decode())
# Register the adapters and converters
sqlite3.register_adapter(date, adapt_date_iso)
sqlite3.register_adapter(datetime, adapt_datetime_iso)
sqlite3.register_converter("date", convert_date)
sqlite3.register_converter("datetime", convert_datetime)
def create_result_database(db_path: str) -> None:
"""
Create the SQLite database and required tables if they don't exist.
"""
try:
# Create directory if it doesn't exist
db_dir = os.path.dirname(db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
print(f"Created directory: {db_dir}")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create the pt_bt_results table for completed trades
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS pt_bt_results (
date DATE,
pair TEXT,
symbol TEXT,
open_time DATETIME,
open_side TEXT,
open_price REAL,
open_quantity INTEGER,
open_disequilibrium REAL,
close_time DATETIME,
close_side TEXT,
close_price REAL,
close_quantity INTEGER,
close_disequilibrium REAL,
symbol_return REAL,
pair_return REAL,
close_condition TEXT
)
"""
)
cursor.execute("DELETE FROM pt_bt_results;")
# Create the outstanding_positions table for open positions
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS outstanding_positions (
date DATE,
pair TEXT,
symbol TEXT,
position_quantity REAL,
last_price REAL,
unrealized_return REAL,
open_price REAL,
open_side TEXT
)
"""
)
cursor.execute("DELETE FROM outstanding_positions;")
# Create the config table for storing configuration JSON for reference
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS config (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_timestamp DATETIME,
config_file_path TEXT,
config_json TEXT,
datafiles TEXT,
instruments TEXT
)
"""
)
cursor.execute("DELETE FROM config;")
conn.commit()
conn.close()
except Exception as e:
print(f"Error creating result database: {str(e)}")
raise
def store_config_in_database(
db_path: str,
config_file_path: str,
config: Dict,
datafiles: List[Tuple[str, str]],
instruments: List[Dict[str, str]],
) -> None:
"""
Store configuration information in the database for reference.
"""
import json
if db_path.upper() == "NONE":
return
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Convert config to JSON string
config_json = json.dumps(config, indent=2, default=str)
# Convert lists to comma-separated strings for storage
datafiles_str = ", ".join([f"{datafile}" for _, datafile in datafiles])
instruments_str = ", ".join(
[
f"{inst['symbol']}:{inst['instrument_type']}:{inst['exchange_id']}"
for inst in instruments
]
)
# Insert configuration record
cursor.execute(
"""
INSERT INTO config (
run_timestamp, config_file_path, config_json, datafiles, instruments
) VALUES (?, ?, ?, ?, ?)
""",
(
datetime.now(),
config_file_path,
config_json,
datafiles_str,
instruments_str,
),
)
conn.commit()
conn.close()
print(f"Configuration stored in database")
except Exception as e:
print(f"Error storing configuration in database: {str(e)}")
import traceback
traceback.print_exc()
def convert_timestamp(timestamp: Any) -> Optional[datetime]:
"""Convert pandas Timestamp to Python datetime object for SQLite compatibility."""
if timestamp is None:
return None
if isinstance(timestamp, pd.Timestamp):
return timestamp.to_pydatetime()
elif isinstance(timestamp, datetime):
return timestamp
elif isinstance(timestamp, date):
return datetime.combine(timestamp, datetime.min.time())
elif isinstance(timestamp, str):
return datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
elif isinstance(timestamp, int):
return datetime.fromtimestamp(timestamp)
else:
raise ValueError(f"Unsupported timestamp type: {type(timestamp)}")
DayT = str
TradeT = Dict[str, Any]
OutstandingPositionT = Dict[str, Any]
class PairResearchResult:
"""
Class to handle pair research results for a single pair across multiple days.
Simplified version of BacktestResult focused on single pair analysis.
"""
trades_: Dict[DayT, pd.DataFrame]
outstanding_positions_: Dict[DayT, List[OutstandingPositionT]]
symbol_roundtrip_trades_: Dict[str, List[Dict[str, Any]]]
def __init__(self, config: Dict[str, Any]) -> None:
self.config_ = config
self.trades_ = {}
self.outstanding_positions_ = {}
self.total_realized_pnl = 0.0
self.symbol_roundtrip_trades_ = {}
def add_day_results(self, day: DayT, trades: pd.DataFrame, outstanding_positions: List[Dict[str, Any]]) -> None:
assert isinstance(trades, pd.DataFrame)
self.trades_[day] = trades
self.outstanding_positions_[day] = outstanding_positions
# def all_trades(self) -> List[TradeT]:
# """Get all trades across all days as a flat list."""
# all_trades_list: List[TradeT] = []
# for day_trades in self.trades_.values():
# all_trades_list.extend(day_trades.to_dict(orient="records"))
# return all_trades_list
def outstanding_positions(self) -> List[OutstandingPositionT]:
"""Get all outstanding positions across all days as a flat list."""
res: List[Dict[str, Any]] = []
for day in self.outstanding_positions_.keys():
res.extend(self.outstanding_positions_[day])
return res
def calculate_returns(self) -> None:
"""Calculate and store total returns for the single pair across all days."""
self.extract_roundtrip_trades()
self.total_realized_pnl = 0.0
for day, day_trades in self.symbol_roundtrip_trades_.items():
for trade in day_trades:
self.total_realized_pnl += trade['symbol_return']
def extract_roundtrip_trades(self) -> None:
"""
Extract round-trip trades by day, grouping open/close pairs for each symbol.
Returns a dictionary with day as key and list of completed round-trip trades.
"""
def _symbol_return(trade1_side: str, trade1_px: float, trade2_side: str, trade2_px: float) -> float:
if trade1_side == "BUY" and trade2_side == "SELL":
return (trade2_px - trade1_px) / trade1_px * 100
elif trade1_side == "SELL" and trade2_side == "BUY":
return (trade1_px - trade2_px) / trade1_px * 100
else:
return 0
# Process each day separately
for day, day_trades in self.trades_.items():
# Sort trades by timestamp for the day
sorted_trades = day_trades #sorted(day_trades, key=lambda x: x["timestamp"] if x["timestamp"] else pd.Timestamp.min)
day_roundtrips = []
# Process trades in groups of 4 (open A, open B, close A, close B)
for idx in range(0, len(sorted_trades), 4):
if idx + 3 >= len(sorted_trades):
break
trade_a_1 = sorted_trades.iloc[idx] # Open A
trade_b_1 = sorted_trades.iloc[idx + 1] # Open B
trade_a_2 = sorted_trades.iloc[idx + 2] # Close A
trade_b_2 = sorted_trades.iloc[idx + 3] # Close B
# Validate trade sequence
if not (trade_a_1["action"] == "OPEN" and trade_a_2["action"] == "CLOSE"):
continue
if not (trade_b_1["action"] == "OPEN" and trade_b_2["action"] == "CLOSE"):
continue
# Calculate individual symbol returns
symbol_a_return = _symbol_return(
trade_a_1["side"], trade_a_1["price"],
trade_a_2["side"], trade_a_2["price"]
)
symbol_b_return = _symbol_return(
trade_b_1["side"], trade_b_1["price"],
trade_b_2["side"], trade_b_2["price"]
)
pair_return = symbol_a_return + symbol_b_return
# Create round-trip records for both symbols
funding_per_position = self.config_.get("funding_per_pair", 10000) / 2
# Symbol A round-trip
day_roundtrips.append({
"symbol": trade_a_1["symbol"],
"open_side": trade_a_1["side"],
"open_price": trade_a_1["price"],
"open_time": trade_a_1["time"],
"close_side": trade_a_2["side"],
"close_price": trade_a_2["price"],
"close_time": trade_a_2["time"],
"symbol_return": symbol_a_return,
"pair_return": pair_return,
"shares": funding_per_position / trade_a_1["price"],
"close_condition": trade_a_2.get("status", "UNKNOWN"),
"open_disequilibrium": trade_a_1.get("disequilibrium"),
"close_disequilibrium": trade_a_2.get("disequilibrium"),
})
# Symbol B round-trip
day_roundtrips.append({
"symbol": trade_b_1["symbol"],
"open_side": trade_b_1["side"],
"open_price": trade_b_1["price"],
"open_time": trade_b_1["time"],
"close_side": trade_b_2["side"],
"close_price": trade_b_2["price"],
"close_time": trade_b_2["time"],
"symbol_return": symbol_b_return,
"pair_return": pair_return,
"shares": funding_per_position / trade_b_1["price"],
"close_condition": trade_b_2.get("status", "UNKNOWN"),
"open_disequilibrium": trade_b_1.get("disequilibrium"),
"close_disequilibrium": trade_b_2.get("disequilibrium"),
})
if day_roundtrips:
self.symbol_roundtrip_trades_[day] = day_roundtrips
def print_returns_by_day(self) -> None:
"""
Print detailed return information for each day, grouped by day.
Shows individual symbol round-trips and daily totals.
"""
print("\n====== PAIR RESEARCH RETURNS BY DAY ======")
total_return_all_days = 0.0
for day, day_trades in sorted(self.symbol_roundtrip_trades_.items()):
print(f"\n--- {day} ---")
day_total_return = 0.0
pair_returns = []
# Group trades by pair (every 2 trades form a pair)
for idx in range(0, len(day_trades), 2):
if idx + 1 < len(day_trades):
trade_a = day_trades[idx]
trade_b = day_trades[idx + 1]
# Print individual symbol results
print(f" {trade_a['open_time'].time()}-{trade_a['close_time'].time()}")
print(f" {trade_a['symbol']}: {trade_a['open_side']} @ ${trade_a['open_price']:.2f}"
f"{trade_a['close_side']} @ ${trade_a['close_price']:.2f} | "
f"Return: {trade_a['symbol_return']:+.2f}% | Shares: {trade_a['shares']:.2f}")
print(f" {trade_b['symbol']}: {trade_b['open_side']} @ ${trade_b['open_price']:.2f}"
f"{trade_b['close_side']} @ ${trade_b['close_price']:.2f} | "
f"Return: {trade_b['symbol_return']:+.2f}% | Shares: {trade_b['shares']:.2f}")
# Show disequilibrium info if available
if trade_a.get('open_disequilibrium') is not None:
print(f" Disequilibrium: Open: {trade_a['open_disequilibrium']:.4f}, "
f"Close: {trade_a['close_disequilibrium']:.4f}")
pair_return = trade_a['pair_return']
print(f" Pair Return: {pair_return:+.2f}% | Close Condition: {trade_a['close_condition']}")
print()
pair_returns.append(pair_return)
day_total_return += pair_return
print(f" Day Total Return: {day_total_return:+.2f}% ({len(pair_returns)} pairs)")
total_return_all_days += day_total_return
print(f"\n====== TOTAL RETURN ACROSS ALL DAYS ======")
print(f"Total Return: {total_return_all_days:+.2f}%")
print(f"Total Days: {len(self.symbol_roundtrip_trades_)}")
if len(self.symbol_roundtrip_trades_) > 0:
print(f"Average Daily Return: {total_return_all_days / len(self.symbol_roundtrip_trades_):+.2f}%")
def get_return_summary(self) -> Dict[str, Any]:
"""
Get a summary of returns across all days.
Returns a dictionary with key metrics.
"""
if len(self.symbol_roundtrip_trades_) == 0:
return {
"total_return": 0.0,
"total_days": 0,
"total_pairs": 0,
"average_daily_return": 0.0,
"best_day": None,
"worst_day": None,
"daily_returns": {}
}
daily_returns = {}
total_return = 0.0
total_pairs = 0
for day, day_trades in self.symbol_roundtrip_trades_.items():
day_return = 0.0
day_pairs = len(day_trades) // 2 # Each pair has 2 symbol trades
for trade in day_trades:
day_return += trade['symbol_return']
daily_returns[day] = {
"return": day_return,
"pairs": day_pairs
}
total_return += day_return
total_pairs += day_pairs
best_day = max(daily_returns.items(), key=lambda x: x[1]["return"]) if daily_returns else None
worst_day = min(daily_returns.items(), key=lambda x: x[1]["return"]) if daily_returns else None
return {
"total_return": total_return,
"total_days": len(self.symbol_roundtrip_trades_),
"total_pairs": total_pairs,
"average_daily_return": total_return / len(self.symbol_roundtrip_trades_) if self.symbol_roundtrip_trades_ else 0.0,
"best_day": best_day,
"worst_day": worst_day,
"daily_returns": daily_returns
}
def print_grand_totals(self) -> None:
"""Print grand totals for the single pair analysis."""
summary = self.get_return_summary()
print(f"\n====== PAIR RESEARCH GRAND TOTALS ======")
print(f"Total Return: {summary['total_return']:+.2f}%")
print(f"Total Days Traded: {summary['total_days']}")
print(f"Total Pair Trades: {summary['total_pairs']}")
if summary['total_days'] > 0:
print(f"Average Daily Return: {summary['average_daily_return']:+.2f}%")
if summary['best_day']:
best_day, best_data = summary['best_day']
print(f"Best Day: {best_day} ({best_data['return']:+.2f}%)")
if summary['worst_day']:
worst_day, worst_data = summary['worst_day']
print(f"Worst Day: {worst_day} ({worst_data['return']:+.2f}%)")
# Update the total_realized_pnl for backward compatibility
self.total_realized_pnl = summary['total_return']
def analyze_pair_performance(self) -> None:
"""
Main method to perform comprehensive pair research analysis.
Extracts round-trip trades, calculates returns, groups by day, and prints results.
"""
print(f"\n{'='*60}")
print(f"PAIR RESEARCH PERFORMANCE ANALYSIS")
print(f"{'='*60}")
self.calculate_returns()
self.print_returns_by_day()
self.print_outstanding_positions()
self.print_grand_totals()
self._print_additional_metrics()
def _print_additional_metrics(self) -> None:
"""Print additional performance metrics."""
summary = self.get_return_summary()
if summary['total_days'] == 0:
return
print(f"\n====== ADDITIONAL METRICS ======")
# Calculate win rate
winning_days = sum(1 for day_data in summary['daily_returns'].values() if day_data['return'] > 0)
win_rate = (winning_days / summary['total_days']) * 100
print(f"Winning Days: {winning_days}/{summary['total_days']} ({win_rate:.1f}%)")
# Calculate average trade return
if summary['total_pairs'] > 0:
# Each pair has 2 symbol trades, so total symbol trades = total_pairs * 2
total_symbol_trades = summary['total_pairs'] * 2
avg_symbol_return = summary['total_return'] / total_symbol_trades
print(f"Average Symbol Return: {avg_symbol_return:+.2f}%")
avg_pair_return = summary['total_return'] / summary['total_pairs'] / 2 # Divide by 2 since we sum both symbols
print(f"Average Pair Return: {avg_pair_return:+.2f}%")
# Show daily return distribution
daily_returns_list = [data['return'] for data in summary['daily_returns'].values()]
if daily_returns_list:
print(f"Daily Return Range: {min(daily_returns_list):+.2f}% to {max(daily_returns_list):+.2f}%")
def print_outstanding_positions(self) -> None:
"""Print outstanding positions for the single pair."""
all_positions: List[OutstandingPositionT] = self.outstanding_positions()
if not all_positions:
print("\n====== NO OUTSTANDING POSITIONS ======")
return
print(f"\n====== OUTSTANDING POSITIONS ======")
print(f"{'Symbol':<10} {'Side':<4} {'Shares':<10} {'Open $':<8} {'Current $':<10} {'Value $':<12}")
print("-" * 70)
total_value = 0.0
for pos in all_positions:
current_value = pos.get("last_value", 0.0)
print(f"{pos['symbol']:<10} {pos['open_side']:<4} {pos['shares']:<10.2f} "
f"{pos['open_px']:<8.2f} {pos['last_px']:<10.2f} {current_value:<12.2f}")
total_value += current_value
print("-" * 70)
print(f"{'TOTAL VALUE':<60} ${total_value:<12.2f}")
def get_total_realized_pnl(self) -> float:
"""Get total realized PnL."""
return self.total_realized_pnl