import numpy as np import matplotlib.pyplot as plt import warnings warnings.filterwarnings('ignore') # Import our utility functions from .backshift import backshift from .smartMovingStd import smartMovingStd from .smartsum import smartsum from .smartmean import smartmean from .smartstd import smartstd from .calculateMaxDD import calculateMaxDD from .data_loader import load_earnings_data, load_stock_data def main(): """ Python implementation of the pead.m post-earnings announcement drift strategy. This strategy trades stocks around earnings announcements based on gap direction. """ print("Post-Earnings Announcement Drift (PEAD) Strategy") print("=" * 50) # Strategy parameters lookback = 90 threshold_factor = 0.5 # 0.5 standard deviations num_stocks = 30 # Portfolio size normalization print(f"Strategy Parameters:") print(f"Lookback period: {lookback} days") print(f"Threshold factor: {threshold_factor} std devs") print(f"Portfolio normalization: {num_stocks} stocks") try: # Try to load real earnings and stock data print("Loading earnings announcement data...") earnings_data = load_earnings_data() print("Loading stock data...") stock_data = load_stock_data('20120424') tday = earnings_data['tday'] earnann = earnings_data['earnann'] cl = stock_data['cl'] op = stock_data['op'] n_days, n_stocks = cl.shape print(f"Loaded real data:") print(f" {n_days} days, {n_stocks} stocks") print(f" Total earnings announcements: {np.sum(earnann)}") except (FileNotFoundError, KeyError) as e: print(f"Could not load real data ({e}), using synthetic data for demonstration...") # Synthetic data for demonstration np.random.seed(42) n_days = 1000 n_stocks = 500 # Generate synthetic stock data tday = np.arange(20090102, 20090102 + n_days) # Create synthetic price data returns = np.random.normal(0, 0.02, (n_days, n_stocks)) cl = 100 * np.cumprod(1 + returns, axis=0) # Create opening prices with gaps gap_returns = np.random.normal(0, 0.01, (n_days, n_stocks)) op = np.roll(cl, 1, axis=0) * (1 + gap_returns) op[0, :] = cl[0, :] # First day opening = closing # Create synthetic earnings announcement data # Random earnings announcements (about 5% chance per stock per day) earnann = np.random.random((n_days, n_stocks)) < 0.05 print(f"Generated synthetic data:") print(f" {n_days} days, {n_stocks} stocks") print(f" Total earnings announcements: {np.sum(earnann)}") # Calculate close-to-open returns ret_c2o = (op - backshift(1, cl)) / backshift(1, cl) ret_c2o = np.nan_to_num(ret_c2o, nan=0) # Calculate moving standard deviation of C2O returns std_c2o = smartMovingStd(ret_c2o, lookback) # Initialize positions positions = np.zeros_like(cl) # Generate trading signals print("Generating trading signals...") # Long signal: positive gap >= threshold AND earnings announcement longs = (ret_c2o >= threshold_factor * std_c2o) & earnann # Short signal: negative gap <= -threshold AND earnings announcement shorts = (ret_c2o <= -threshold_factor * std_c2o) & earnann # Set positions positions[longs] = 1 positions[shorts] = -1 # Calculate returns (open to close, expecting drift to continue) with np.errstate(divide='ignore', invalid='ignore'): stock_returns = positions * (cl - op) / op stock_returns = np.nan_to_num(stock_returns, nan=0) # Calculate portfolio returns (sum across stocks, normalize by portfolio size) daily_ret = smartsum(stock_returns, dim=1) / num_stocks daily_ret = np.nan_to_num(daily_ret, nan=0) # Calculate cumulative returns cumret = np.cumprod(1 + daily_ret) - 1 # Plot results plt.figure(figsize=(12, 6)) plt.plot(cumret) plt.title('Post-Earnings Announcement Drift Strategy - Cumulative Returns') plt.xlabel('Days') plt.ylabel('Cumulative Return') plt.grid(True) plt.show() # Performance metrics avg_ann_ret = 252 * smartmean(daily_ret) ann_volatility = np.sqrt(252) * smartstd(daily_ret) sharpe_ratio = avg_ann_ret / ann_volatility if ann_volatility != 0 else 0 apr = np.prod(1 + daily_ret) ** (252 / len(daily_ret)) - 1 maxDD, maxDDD = calculateMaxDD(cumret) print(f"\nPerformance Metrics:") print(f"Average Annual Return: {avg_ann_ret:7.4f}") print(f"Sharpe Ratio: {sharpe_ratio:4.2f}") print(f"APR: {apr:10.4f}") print(f"Max Drawdown: {maxDD:.6f}") print(f"Max Drawdown Duration: {int(maxDDD)} days") # Trading statistics total_positions = np.sum(np.abs(positions)) long_positions = np.sum(positions > 0) short_positions = np.sum(positions < 0) print(f"\nTrading Statistics:") print(f"Total positions: {total_positions}") print(f"Long positions: {long_positions}") print(f"Short positions: {short_positions}") # Calculate win rate for non-zero returns non_zero_returns = daily_ret[daily_ret != 0] if len(non_zero_returns) > 0: win_rate = np.sum(non_zero_returns > 0) / len(non_zero_returns) print(f"Win rate: {win_rate:.2%}") # Average return per trade if total_positions > 0: avg_return_per_position = np.sum(stock_returns) / total_positions print(f"Average return per position: {avg_return_per_position:.4f}") return { 'returns': daily_ret, 'cumulative_returns': cumret, 'sharpe_ratio': sharpe_ratio, 'max_drawdown': maxDD, 'total_positions': total_positions, 'apr': apr } if __name__ == "__main__": main()