186 lines
5.9 KiB
Python
186 lines
5.9 KiB
Python
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import warnings
|
|
warnings.filterwarnings('ignore')
|
|
|
|
# Import our utility functions
|
|
from .backshift import backshift
|
|
from .smartsum import smartsum
|
|
from .smartmean import smartmean
|
|
from .smartstd import smartstd
|
|
from .calculateMaxDD import calculateMaxDD
|
|
from .data_loader import load_stock_data
|
|
|
|
def lag(x):
|
|
"""Simple lag function equivalent to MATLAB's lag"""
|
|
return backshift(1, x)
|
|
|
|
def main():
|
|
"""
|
|
Python implementation of the kentdaniel.m momentum strategy.
|
|
This implements a long-short equity momentum strategy.
|
|
"""
|
|
print("Kent Daniel Momentum Strategy")
|
|
print("=" * 40)
|
|
|
|
# Strategy parameters
|
|
lookback = 252
|
|
holddays = 25
|
|
topN = 50
|
|
|
|
print(f"Strategy Parameters:")
|
|
print(f"Lookback: {lookback} days")
|
|
print(f"Hold days: {holddays} days")
|
|
print(f"Top N stocks: {topN}")
|
|
|
|
try:
|
|
# Try to load real stock data
|
|
print("Loading stock data...")
|
|
stock_data = load_stock_data('20120424')
|
|
|
|
tday = stock_data['tday']
|
|
cl = stock_data['cl']
|
|
op = stock_data['op']
|
|
|
|
n_days, n_stocks = cl.shape
|
|
print(f"Loaded real stock data:")
|
|
print(f" {n_days} days, {n_stocks} stocks")
|
|
|
|
except (FileNotFoundError, KeyError) as e:
|
|
print(f"Could not load real data ({e}), using synthetic data for demonstration...")
|
|
|
|
# Synthetic data for demonstration
|
|
np.random.seed(42)
|
|
n_days = 1000
|
|
n_stocks = 500 # Simulate S&P 500
|
|
|
|
# Generate synthetic stock data
|
|
tday = np.arange(20070515, 20070515 + n_days)
|
|
|
|
# Create synthetic price data
|
|
returns = np.random.normal(0, 0.02, (n_days, n_stocks))
|
|
# Add some momentum effect
|
|
for i in range(1, n_days):
|
|
returns[i] += 0.1 * returns[i-1] # Simple momentum
|
|
|
|
cl = 100 * np.cumprod(1 + returns, axis=0)
|
|
op = cl * (1 + np.random.normal(0, 0.005, cl.shape)) # Opening prices
|
|
|
|
# Date range for backtesting
|
|
idx_start = np.where(tday == 20070515)[0]
|
|
idx_end = np.where(tday == 20071231)[0] if len(np.where(tday == 20071231)[0]) > 0 else [len(tday)-1]
|
|
|
|
if len(idx_start) == 0:
|
|
idx_start = [lookback]
|
|
if len(idx_end) == 0:
|
|
idx_end = [len(tday)-1]
|
|
|
|
idx_start = idx_start[0]
|
|
idx_end = idx_end[0]
|
|
|
|
print(f"Backtest period: {tday[idx_start]} to {tday[idx_end]}")
|
|
|
|
# Calculate momentum returns
|
|
ret = (cl - backshift(lookback, cl)) / backshift(lookback, cl)
|
|
|
|
# Initialize position arrays
|
|
longs = np.zeros_like(ret, dtype=bool)
|
|
shorts = np.zeros_like(ret, dtype=bool)
|
|
positions = np.zeros_like(ret)
|
|
|
|
# Generate trading signals
|
|
print("Generating trading signals...")
|
|
for t in range(lookback, len(tday)):
|
|
if t % 100 == 0:
|
|
print(f" Processing day {t}/{len(tday)}")
|
|
|
|
# Get returns for this day
|
|
day_returns = ret[t, :]
|
|
|
|
# Find stocks with valid data (not NaN)
|
|
valid_stocks = ~np.isnan(day_returns)
|
|
valid_indices = np.where(valid_stocks)[0]
|
|
|
|
if len(valid_indices) < 2 * topN:
|
|
continue
|
|
|
|
# Sort returns
|
|
valid_returns = day_returns[valid_indices]
|
|
sorted_indices = np.argsort(valid_returns)
|
|
|
|
# Select top and bottom performers
|
|
bottom_indices = valid_indices[sorted_indices[:topN]] # Worst performers (shorts)
|
|
top_indices = valid_indices[sorted_indices[-topN:]] # Best performers (longs)
|
|
|
|
# Set long and short signals
|
|
longs[t, top_indices] = True
|
|
shorts[t, bottom_indices] = True
|
|
|
|
# Build positions over holding period
|
|
print("Building positions...")
|
|
for h in range(holddays):
|
|
long_lag = backshift(h, longs.astype(float))
|
|
long_lag = np.nan_to_num(long_lag, nan=0).astype(bool)
|
|
|
|
short_lag = backshift(h, shorts.astype(float))
|
|
short_lag = np.nan_to_num(short_lag, nan=0).astype(bool)
|
|
|
|
positions[long_lag] += 1
|
|
positions[short_lag] -= 1
|
|
|
|
# Calculate daily returns
|
|
print("Calculating returns...")
|
|
price_changes = cl - lag(cl)
|
|
lagged_prices = lag(cl)
|
|
|
|
# Avoid division by zero
|
|
with np.errstate(divide='ignore', invalid='ignore'):
|
|
stock_returns = price_changes / lagged_prices
|
|
|
|
stock_returns = np.nan_to_num(stock_returns, nan=0)
|
|
|
|
# Calculate portfolio returns
|
|
lagged_positions = backshift(1, positions)
|
|
portfolio_returns = lagged_positions * stock_returns
|
|
|
|
# Sum across all stocks and normalize
|
|
daily_ret = smartsum(portfolio_returns, dim=1) / (2 * topN) / holddays
|
|
daily_ret = np.nan_to_num(daily_ret, nan=0)
|
|
|
|
# Calculate cumulative returns for the backtest period
|
|
backtest_returns = daily_ret[idx_start:idx_end+1]
|
|
cumret = np.cumprod(1 + backtest_returns) - 1
|
|
|
|
# Plot results
|
|
plt.figure(figsize=(12, 6))
|
|
plt.plot(cumret)
|
|
plt.title('Kent Daniel Momentum Strategy - Cumulative Returns')
|
|
plt.xlabel('Days')
|
|
plt.ylabel('Cumulative Return')
|
|
plt.grid(True)
|
|
plt.show()
|
|
|
|
# Performance metrics
|
|
avg_ann_ret = 252 * smartmean(backtest_returns)
|
|
ann_volatility = np.sqrt(252) * smartstd(backtest_returns)
|
|
sharpe_ratio = avg_ann_ret / ann_volatility if ann_volatility != 0 else 0
|
|
apr = np.prod(1 + backtest_returns) ** (252 / len(backtest_returns)) - 1
|
|
|
|
maxDD, maxDDD = calculateMaxDD(cumret)
|
|
|
|
print(f"\nPerformance Metrics:")
|
|
print(f"Average Annual Return: {avg_ann_ret:7.4f}")
|
|
print(f"Sharpe Ratio: {sharpe_ratio:4.2f}")
|
|
print(f"APR: {apr:10.4f}")
|
|
print(f"Max Drawdown: {maxDD:.6f}")
|
|
print(f"Max Drawdown Duration: {int(maxDDD)} days")
|
|
|
|
return {
|
|
'returns': backtest_returns,
|
|
'cumulative_returns': cumret,
|
|
'sharpe_ratio': sharpe_ratio,
|
|
'max_drawdown': maxDD
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
main() |