import numpy as np import matplotlib.pyplot as plt import warnings warnings.filterwarnings('ignore') # Import our utility functions from .backshift import backshift from .smartsum import smartsum from .smartmean import smartmean from .smartstd import smartstd from .calculateMaxDD import calculateMaxDD from .data_loader import load_stock_data def lag(x): """Simple lag function equivalent to MATLAB's lag""" return backshift(1, x) def main(): """ Python implementation of the kentdaniel.m momentum strategy. This implements a long-short equity momentum strategy. """ print("Kent Daniel Momentum Strategy") print("=" * 40) # Strategy parameters lookback = 252 holddays = 25 topN = 50 print(f"Strategy Parameters:") print(f"Lookback: {lookback} days") print(f"Hold days: {holddays} days") print(f"Top N stocks: {topN}") try: # Try to load real stock data print("Loading stock data...") stock_data = load_stock_data('20120424') tday = stock_data['tday'] cl = stock_data['cl'] op = stock_data['op'] n_days, n_stocks = cl.shape print(f"Loaded real stock data:") print(f" {n_days} days, {n_stocks} stocks") except (FileNotFoundError, KeyError) as e: print(f"Could not load real data ({e}), using synthetic data for demonstration...") # Synthetic data for demonstration np.random.seed(42) n_days = 1000 n_stocks = 500 # Simulate S&P 500 # Generate synthetic stock data tday = np.arange(20070515, 20070515 + n_days) # Create synthetic price data returns = np.random.normal(0, 0.02, (n_days, n_stocks)) # Add some momentum effect for i in range(1, n_days): returns[i] += 0.1 * returns[i-1] # Simple momentum cl = 100 * np.cumprod(1 + returns, axis=0) op = cl * (1 + np.random.normal(0, 0.005, cl.shape)) # Opening prices # Date range for backtesting idx_start = np.where(tday == 20070515)[0] idx_end = np.where(tday == 20071231)[0] if len(np.where(tday == 20071231)[0]) > 0 else [len(tday)-1] if len(idx_start) == 0: idx_start = [lookback] if len(idx_end) == 0: idx_end = [len(tday)-1] idx_start = idx_start[0] idx_end = idx_end[0] print(f"Backtest period: {tday[idx_start]} to {tday[idx_end]}") # Calculate momentum returns ret = (cl - backshift(lookback, cl)) / backshift(lookback, cl) # Initialize position arrays longs = np.zeros_like(ret, dtype=bool) shorts = np.zeros_like(ret, dtype=bool) positions = np.zeros_like(ret) # Generate trading signals print("Generating trading signals...") for t in range(lookback, len(tday)): if t % 100 == 0: print(f" Processing day {t}/{len(tday)}") # Get returns for this day day_returns = ret[t, :] # Find stocks with valid data (not NaN) valid_stocks = ~np.isnan(day_returns) valid_indices = np.where(valid_stocks)[0] if len(valid_indices) < 2 * topN: continue # Sort returns valid_returns = day_returns[valid_indices] sorted_indices = np.argsort(valid_returns) # Select top and bottom performers bottom_indices = valid_indices[sorted_indices[:topN]] # Worst performers (shorts) top_indices = valid_indices[sorted_indices[-topN:]] # Best performers (longs) # Set long and short signals longs[t, top_indices] = True shorts[t, bottom_indices] = True # Build positions over holding period print("Building positions...") for h in range(holddays): long_lag = backshift(h, longs.astype(float)) long_lag = np.nan_to_num(long_lag, nan=0).astype(bool) short_lag = backshift(h, shorts.astype(float)) short_lag = np.nan_to_num(short_lag, nan=0).astype(bool) positions[long_lag] += 1 positions[short_lag] -= 1 # Calculate daily returns print("Calculating returns...") price_changes = cl - lag(cl) lagged_prices = lag(cl) # Avoid division by zero with np.errstate(divide='ignore', invalid='ignore'): stock_returns = price_changes / lagged_prices stock_returns = np.nan_to_num(stock_returns, nan=0) # Calculate portfolio returns lagged_positions = backshift(1, positions) portfolio_returns = lagged_positions * stock_returns # Sum across all stocks and normalize daily_ret = smartsum(portfolio_returns, dim=1) / (2 * topN) / holddays daily_ret = np.nan_to_num(daily_ret, nan=0) # Calculate cumulative returns for the backtest period backtest_returns = daily_ret[idx_start:idx_end+1] cumret = np.cumprod(1 + backtest_returns) - 1 # Plot results plt.figure(figsize=(12, 6)) plt.plot(cumret) plt.title('Kent Daniel Momentum Strategy - Cumulative Returns') plt.xlabel('Days') plt.ylabel('Cumulative Return') plt.grid(True) plt.show() # Performance metrics avg_ann_ret = 252 * smartmean(backtest_returns) ann_volatility = np.sqrt(252) * smartstd(backtest_returns) sharpe_ratio = avg_ann_ret / ann_volatility if ann_volatility != 0 else 0 apr = np.prod(1 + backtest_returns) ** (252 / len(backtest_returns)) - 1 maxDD, maxDDD = calculateMaxDD(cumret) print(f"\nPerformance Metrics:") print(f"Average Annual Return: {avg_ann_ret:7.4f}") print(f"Sharpe Ratio: {sharpe_ratio:4.2f}") print(f"APR: {apr:10.4f}") print(f"Max Drawdown: {maxDD:.6f}") print(f"Max Drawdown Duration: {int(maxDDD)} days") return { 'returns': backtest_returns, 'cumulative_returns': cumret, 'sharpe_ratio': sharpe_ratio, 'max_drawdown': maxDD } if __name__ == "__main__": main()