170 lines
5.9 KiB
Python
170 lines
5.9 KiB
Python
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import warnings
|
|
warnings.filterwarnings('ignore')
|
|
|
|
# Import our utility functions
|
|
from .backshift import backshift
|
|
from .smartMovingStd import smartMovingStd
|
|
from .smartsum import smartsum
|
|
from .smartmean import smartmean
|
|
from .smartstd import smartstd
|
|
from .calculateMaxDD import calculateMaxDD
|
|
from .data_loader import load_earnings_data, load_stock_data
|
|
|
|
def main():
|
|
"""
|
|
Python implementation of the pead.m post-earnings announcement drift strategy.
|
|
This strategy trades stocks around earnings announcements based on gap direction.
|
|
"""
|
|
print("Post-Earnings Announcement Drift (PEAD) Strategy")
|
|
print("=" * 50)
|
|
|
|
# Strategy parameters
|
|
lookback = 90
|
|
threshold_factor = 0.5 # 0.5 standard deviations
|
|
num_stocks = 30 # Portfolio size normalization
|
|
|
|
print(f"Strategy Parameters:")
|
|
print(f"Lookback period: {lookback} days")
|
|
print(f"Threshold factor: {threshold_factor} std devs")
|
|
print(f"Portfolio normalization: {num_stocks} stocks")
|
|
|
|
try:
|
|
# Try to load real earnings and stock data
|
|
print("Loading earnings announcement data...")
|
|
earnings_data = load_earnings_data()
|
|
|
|
print("Loading stock data...")
|
|
stock_data = load_stock_data('20120424')
|
|
|
|
tday = earnings_data['tday']
|
|
earnann = earnings_data['earnann']
|
|
cl = stock_data['cl']
|
|
op = stock_data['op']
|
|
|
|
n_days, n_stocks = cl.shape
|
|
print(f"Loaded real data:")
|
|
print(f" {n_days} days, {n_stocks} stocks")
|
|
print(f" Total earnings announcements: {np.sum(earnann)}")
|
|
|
|
except (FileNotFoundError, KeyError) as e:
|
|
print(f"Could not load real data ({e}), using synthetic data for demonstration...")
|
|
|
|
# Synthetic data for demonstration
|
|
np.random.seed(42)
|
|
n_days = 1000
|
|
n_stocks = 500
|
|
|
|
# Generate synthetic stock data
|
|
tday = np.arange(20090102, 20090102 + n_days)
|
|
|
|
# Create synthetic price data
|
|
returns = np.random.normal(0, 0.02, (n_days, n_stocks))
|
|
cl = 100 * np.cumprod(1 + returns, axis=0)
|
|
|
|
# Create opening prices with gaps
|
|
gap_returns = np.random.normal(0, 0.01, (n_days, n_stocks))
|
|
op = np.roll(cl, 1, axis=0) * (1 + gap_returns)
|
|
op[0, :] = cl[0, :] # First day opening = closing
|
|
|
|
# Create synthetic earnings announcement data
|
|
# Random earnings announcements (about 5% chance per stock per day)
|
|
earnann = np.random.random((n_days, n_stocks)) < 0.05
|
|
|
|
print(f"Generated synthetic data:")
|
|
print(f" {n_days} days, {n_stocks} stocks")
|
|
print(f" Total earnings announcements: {np.sum(earnann)}")
|
|
|
|
# Calculate close-to-open returns
|
|
ret_c2o = (op - backshift(1, cl)) / backshift(1, cl)
|
|
ret_c2o = np.nan_to_num(ret_c2o, nan=0)
|
|
|
|
# Calculate moving standard deviation of C2O returns
|
|
std_c2o = smartMovingStd(ret_c2o, lookback)
|
|
|
|
# Initialize positions
|
|
positions = np.zeros_like(cl)
|
|
|
|
# Generate trading signals
|
|
print("Generating trading signals...")
|
|
|
|
# Long signal: positive gap >= threshold AND earnings announcement
|
|
longs = (ret_c2o >= threshold_factor * std_c2o) & earnann
|
|
|
|
# Short signal: negative gap <= -threshold AND earnings announcement
|
|
shorts = (ret_c2o <= -threshold_factor * std_c2o) & earnann
|
|
|
|
# Set positions
|
|
positions[longs] = 1
|
|
positions[shorts] = -1
|
|
|
|
# Calculate returns (open to close, expecting drift to continue)
|
|
with np.errstate(divide='ignore', invalid='ignore'):
|
|
stock_returns = positions * (cl - op) / op
|
|
|
|
stock_returns = np.nan_to_num(stock_returns, nan=0)
|
|
|
|
# Calculate portfolio returns (sum across stocks, normalize by portfolio size)
|
|
daily_ret = smartsum(stock_returns, dim=1) / num_stocks
|
|
daily_ret = np.nan_to_num(daily_ret, nan=0)
|
|
|
|
# Calculate cumulative returns
|
|
cumret = np.cumprod(1 + daily_ret) - 1
|
|
|
|
# Plot results
|
|
plt.figure(figsize=(12, 6))
|
|
plt.plot(cumret)
|
|
plt.title('Post-Earnings Announcement Drift Strategy - Cumulative Returns')
|
|
plt.xlabel('Days')
|
|
plt.ylabel('Cumulative Return')
|
|
plt.grid(True)
|
|
plt.show()
|
|
|
|
# Performance metrics
|
|
avg_ann_ret = 252 * smartmean(daily_ret)
|
|
ann_volatility = np.sqrt(252) * smartstd(daily_ret)
|
|
sharpe_ratio = avg_ann_ret / ann_volatility if ann_volatility != 0 else 0
|
|
apr = np.prod(1 + daily_ret) ** (252 / len(daily_ret)) - 1
|
|
|
|
maxDD, maxDDD = calculateMaxDD(cumret)
|
|
|
|
print(f"\nPerformance Metrics:")
|
|
print(f"Average Annual Return: {avg_ann_ret:7.4f}")
|
|
print(f"Sharpe Ratio: {sharpe_ratio:4.2f}")
|
|
print(f"APR: {apr:10.4f}")
|
|
print(f"Max Drawdown: {maxDD:.6f}")
|
|
print(f"Max Drawdown Duration: {int(maxDDD)} days")
|
|
|
|
# Trading statistics
|
|
total_positions = np.sum(np.abs(positions))
|
|
long_positions = np.sum(positions > 0)
|
|
short_positions = np.sum(positions < 0)
|
|
|
|
print(f"\nTrading Statistics:")
|
|
print(f"Total positions: {total_positions}")
|
|
print(f"Long positions: {long_positions}")
|
|
print(f"Short positions: {short_positions}")
|
|
|
|
# Calculate win rate for non-zero returns
|
|
non_zero_returns = daily_ret[daily_ret != 0]
|
|
if len(non_zero_returns) > 0:
|
|
win_rate = np.sum(non_zero_returns > 0) / len(non_zero_returns)
|
|
print(f"Win rate: {win_rate:.2%}")
|
|
|
|
# Average return per trade
|
|
if total_positions > 0:
|
|
avg_return_per_position = np.sum(stock_returns) / total_positions
|
|
print(f"Average return per position: {avg_return_per_position:.4f}")
|
|
|
|
return {
|
|
'returns': daily_ret,
|
|
'cumulative_returns': cumret,
|
|
'sharpe_ratio': sharpe_ratio,
|
|
'max_drawdown': maxDD,
|
|
'total_positions': total_positions,
|
|
'apr': apr
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
main() |