# Cointegration Analysis of Two Stocks

## --- Cell 1: Input symbols ---

In [1]:

symbol_a = "AAPL"
symbol_b = "MSFT"

API_KEY = "hV11XYhaase6vxw2qmhh"

# --- Cell 2: Imports ---

In [2]:

import yfinance as yf
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint
from statsmodels.tsa.vector_ar.vecm import coint_johansen
import plotly.graph_objects as go
import plotly.subplots as sp
import quandl



## --- Cell 3: Load Data ---

In [3]:


quandl.ApiConfig.api_key = API_KEY

def get_nasdaq_close(symbol):
    try:
        data = quandl.get(f"EOD/{symbol}", start_date="2023-01-01")
        return data['Adj_Close']
    except Exception as e:
        print(f"Error downloading {symbol} from Nasdaq Data Link (EOD):", e)
        return None

# Daily (from Nasdaq Data Link)
daily_a = get_nasdaq_close(symbol_a)
daily_b = get_nasdaq_close(symbol_b)

# Filter to the last 180 days of overlap
daily = pd.concat([daily_a, daily_b], axis=1).dropna().last("180D")
daily.columns = [symbol_a, symbol_b]


intraday = pd.concat([intraday_a, intraday_b], axis=1).dropna()
intraday.columns = [symbol_a, symbol_b]



Error downloading AAPL from Nasdaq Data Link (EOD): (Status 403) Something went wrong. Please try again. If you continue to have problems, please contact us at connect@quandl.com.
Error downloading MSFT from Nasdaq Data Link (EOD): (Status 403) Something went wrong. Please try again. If you continue to have problems, please contact us at connect@quandl.com.


ValueError: All objects passed were None

In [None]:
# --- Cell 4: Cointegration Step 1 (Daily) ---
# Engle-Granger (ADF)
def engle_granger(a, b):
    model = sm.OLS(a, sm.add_constant(b)).fit()
    spread = model.resid
    adf_stat, pvalue, _ = sm.tsa.adfuller(spread)
    return pvalue

# Johansen Test
def johansen_test(df):
    result = coint_johansen(df, det_order=0, k_ar_diff=1)
    trace_stat = result.lr1
    crit_vals = result.cvt[:, 1]  # 5% level
    return trace_stat[0] > crit_vals[0]

# Rolling Validation (30-day window)
rolling_pvalues = []
rolling_johansen = []
window = 30
for i in range(window, len(daily)):
    a_slice = daily[symbol_a].iloc[i - window:i]
    b_slice = daily[symbol_b].iloc[i - window:i]
    df_slice = pd.concat([a_slice, b_slice], axis=1)
    pval = engle_granger(a_slice, b_slice)
    rolling_pvalues.append(pval)
    rolling_johansen.append(johansen_test(df_slice))

mean_pval = np.mean(rolling_pvalues)
passed_johansen_ratio = np.mean(rolling_johansen)

print("\nDaily Cointegration Results (Rolling 30-day):")
print(f"Average Engle-Granger ADF p-value: {mean_pval:.4f}")
print(f"Johansen test passed in {passed_johansen_ratio * 100:.1f}% of windows")



In [None]:
# --- Cell 5: Cointegration Step 2 (Intraday) ---
# Rolling hedge ratio (60-min window)
window_hr = 60
rolling_hedge_ratios = []
for i in range(window_hr, len(intraday)):
    X = intraday[symbol_b].iloc[i - window_hr:i]
    y = intraday[symbol_a].iloc[i - window_hr:i]
    model = sm.OLS(y, sm.add_constant(X)).fit()
    rolling_hedge_ratios.append(model.params[1])

# Extend hedge ratio to full length
rolling_hedge_ratios = pd.Series(rolling_hedge_ratios, index=intraday.index[window_hr:])
hedge_ratio_full = rolling_hedge_ratios.reindex(intraday.index, method='ffill').fillna(method='bfill')

# Spread and Z-score
spread = intraday[symbol_a] - hedge_ratio_full * intraday[symbol_b]
zscore = (spread - spread.rolling(60).mean()) / spread.rolling(60).std()

# --- Cell 6: Trading Signals ---
entry_threshold = 2
exit_threshold = 0.5
transaction_cost = 0.0005  # 5 basis points per leg

signals = pd.DataFrame(index=intraday.index)
signals['zscore'] = zscore
signals['position'] = 0
signals.loc[signals['zscore'] > entry_threshold, 'position'] = -1  # short A, long B
signals.loc[signals['zscore'] < -entry_threshold, 'position'] = 1  # long A, short B
signals['position'] = signals['position'].where(~signals['zscore'].between(-exit_threshold, exit_threshold), 0)
signals['position'] = signals['position'].ffill().fillna(0)

# PnL simulation
returns_a = intraday[symbol_a].pct_change().fillna(0)
returns_b = intraday[symbol_b].pct_change().fillna(0)
hedge_ratio_series = hedge_ratio_full.shift(1)

# Gross PnL
signals['pnl'] = signals['position'].shift(1) * (returns_a - hedge_ratio_series * returns_b)

# Transaction cost estimation
signals['trades'] = signals['position'].diff().abs()
signals['costs'] = signals['trades'] * transaction_cost * (1 + hedge_ratio_series)
signals['net_pnl'] = signals['pnl'] - signals['costs']
signals['cum_pnl'] = signals['net_pnl'].cumsum()



In [None]:
# --- Cell 7: Export Results ---
signals.to_csv("signals_and_pnl.csv")
print("Exported: signals_and_pnl.csv")



In [None]:
# --- Cell 8: Visualization ---
fig = sp.make_subplots(rows=3, cols=1, shared_xaxes=True,
                       subplot_titles=("Price Series (1-min)", "Spread Z-Score", "Cumulative PnL (Net)",))

# Price series
fig.add_trace(go.Scatter(x=intraday.index, y=intraday[symbol_a], name=symbol_a), row=1, col=1)
fig.add_trace(go.Scatter(x=intraday.index, y=intraday[symbol_b], name=symbol_b), row=1, col=1)

# Z-score
fig.add_trace(go.Scatter(x=signals.index, y=signals['zscore'], name="Z-Score"), row=2, col=1)
fig.add_hline(y=entry_threshold, line_dash="dot", row=2, col=1)
fig.add_hline(y=-entry_threshold, line_dash="dot", row=2, col=1)
fig.add_hline(y=0, line_dash="dash", row=2, col=1)

# PnL
fig.add_trace(go.Scatter(x=signals.index, y=signals['cum_pnl'], name="Cumulative PnL (Net)"), row=3, col=1)

fig.update_layout(title=f"Cointegration Strategy: {symbol_a} vs {symbol_b} (1-Minute Data)", height=950)
fig.show()
