2025-06-05 08:48:33 +02:00

170 lines
5.9 KiB
Python

import numpy as np
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
# Import our utility functions
from .backshift import backshift
from .smartMovingStd import smartMovingStd
from .smartsum import smartsum
from .smartmean import smartmean
from .smartstd import smartstd
from .calculateMaxDD import calculateMaxDD
from .data_loader import load_earnings_data, load_stock_data
def main():
"""
Python implementation of the pead.m post-earnings announcement drift strategy.
This strategy trades stocks around earnings announcements based on gap direction.
"""
print("Post-Earnings Announcement Drift (PEAD) Strategy")
print("=" * 50)
# Strategy parameters
lookback = 90
threshold_factor = 0.5 # 0.5 standard deviations
num_stocks = 30 # Portfolio size normalization
print(f"Strategy Parameters:")
print(f"Lookback period: {lookback} days")
print(f"Threshold factor: {threshold_factor} std devs")
print(f"Portfolio normalization: {num_stocks} stocks")
try:
# Try to load real earnings and stock data
print("Loading earnings announcement data...")
earnings_data = load_earnings_data()
print("Loading stock data...")
stock_data = load_stock_data('20120424')
tday = earnings_data['tday']
earnann = earnings_data['earnann']
cl = stock_data['cl']
op = stock_data['op']
n_days, n_stocks = cl.shape
print(f"Loaded real data:")
print(f" {n_days} days, {n_stocks} stocks")
print(f" Total earnings announcements: {np.sum(earnann)}")
except (FileNotFoundError, KeyError) as e:
print(f"Could not load real data ({e}), using synthetic data for demonstration...")
# Synthetic data for demonstration
np.random.seed(42)
n_days = 1000
n_stocks = 500
# Generate synthetic stock data
tday = np.arange(20090102, 20090102 + n_days)
# Create synthetic price data
returns = np.random.normal(0, 0.02, (n_days, n_stocks))
cl = 100 * np.cumprod(1 + returns, axis=0)
# Create opening prices with gaps
gap_returns = np.random.normal(0, 0.01, (n_days, n_stocks))
op = np.roll(cl, 1, axis=0) * (1 + gap_returns)
op[0, :] = cl[0, :] # First day opening = closing
# Create synthetic earnings announcement data
# Random earnings announcements (about 5% chance per stock per day)
earnann = np.random.random((n_days, n_stocks)) < 0.05
print(f"Generated synthetic data:")
print(f" {n_days} days, {n_stocks} stocks")
print(f" Total earnings announcements: {np.sum(earnann)}")
# Calculate close-to-open returns
ret_c2o = (op - backshift(1, cl)) / backshift(1, cl)
ret_c2o = np.nan_to_num(ret_c2o, nan=0)
# Calculate moving standard deviation of C2O returns
std_c2o = smartMovingStd(ret_c2o, lookback)
# Initialize positions
positions = np.zeros_like(cl)
# Generate trading signals
print("Generating trading signals...")
# Long signal: positive gap >= threshold AND earnings announcement
longs = (ret_c2o >= threshold_factor * std_c2o) & earnann
# Short signal: negative gap <= -threshold AND earnings announcement
shorts = (ret_c2o <= -threshold_factor * std_c2o) & earnann
# Set positions
positions[longs] = 1
positions[shorts] = -1
# Calculate returns (open to close, expecting drift to continue)
with np.errstate(divide='ignore', invalid='ignore'):
stock_returns = positions * (cl - op) / op
stock_returns = np.nan_to_num(stock_returns, nan=0)
# Calculate portfolio returns (sum across stocks, normalize by portfolio size)
daily_ret = smartsum(stock_returns, dim=1) / num_stocks
daily_ret = np.nan_to_num(daily_ret, nan=0)
# Calculate cumulative returns
cumret = np.cumprod(1 + daily_ret) - 1
# Plot results
plt.figure(figsize=(12, 6))
plt.plot(cumret)
plt.title('Post-Earnings Announcement Drift Strategy - Cumulative Returns')
plt.xlabel('Days')
plt.ylabel('Cumulative Return')
plt.grid(True)
plt.show()
# Performance metrics
avg_ann_ret = 252 * smartmean(daily_ret)
ann_volatility = np.sqrt(252) * smartstd(daily_ret)
sharpe_ratio = avg_ann_ret / ann_volatility if ann_volatility != 0 else 0
apr = np.prod(1 + daily_ret) ** (252 / len(daily_ret)) - 1
maxDD, maxDDD = calculateMaxDD(cumret)
print(f"\nPerformance Metrics:")
print(f"Average Annual Return: {avg_ann_ret:7.4f}")
print(f"Sharpe Ratio: {sharpe_ratio:4.2f}")
print(f"APR: {apr:10.4f}")
print(f"Max Drawdown: {maxDD:.6f}")
print(f"Max Drawdown Duration: {int(maxDDD)} days")
# Trading statistics
total_positions = np.sum(np.abs(positions))
long_positions = np.sum(positions > 0)
short_positions = np.sum(positions < 0)
print(f"\nTrading Statistics:")
print(f"Total positions: {total_positions}")
print(f"Long positions: {long_positions}")
print(f"Short positions: {short_positions}")
# Calculate win rate for non-zero returns
non_zero_returns = daily_ret[daily_ret != 0]
if len(non_zero_returns) > 0:
win_rate = np.sum(non_zero_returns > 0) / len(non_zero_returns)
print(f"Win rate: {win_rate:.2%}")
# Average return per trade
if total_positions > 0:
avg_return_per_position = np.sum(stock_returns) / total_positions
print(f"Average return per position: {avg_return_per_position:.4f}")
return {
'returns': daily_ret,
'cumulative_returns': cumret,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': maxDD,
'total_positions': total_positions,
'apr': apr
}
if __name__ == "__main__":
main()