diff --git a/.gitignore b/.gitignore index c1436db..999e22d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # SpecStory explanation file __pycache__/ +__OLD__/ .specstory/ .history/ .cursorindexingignore diff --git a/src/cointegration-03.py b/src/cointegration-03.py deleted file mode 100644 index 4b88bd0..0000000 --- a/src/cointegration-03.py +++ /dev/null @@ -1,521 +0,0 @@ -import datetime -import sys -import json - -from typing import Any, Dict, List - -from cvttpy.trading.instrument import ExchangeInstrument - -from cvttpy.tools.timeutils import NanoPerMin - -import pandas as pd -import numpy as np - -# ============= statsmodels =================== -from statsmodels.tsa.vector_ar.vecm import VECM - -UNSET_FLOAT: float = sys.float_info.max -UNSET_INT: int = sys.maxsize - -# ------------------------ Configuration ------------------------ -# Default configuration -CONFIG = { - "exchange_id": "ALPACA", - "datafiles": [ - "./data/equity/20250508.alpaca_sim_md.db", - # "./data/equity/20250509.alpaca_sim_md.db", - # "./data/equity/20250512.alpaca_sim_md.db", - # "./data/equity/20250513.alpaca_sim_md.db", - # "./data/equity/20250514.alpaca_sim_md.db", - # "./data/equity/20250515.alpaca_sim_md.db", - # "./data/equity/20250516.alpaca_sim_md.db", - # "./data/equity/20250519.alpaca_sim_md.db", - # "./data/equity/20250520.alpaca_sim_md.db" - ], - "instruments": [ - "COIN", - "GBTC", - # "HOOD", - # "MSTR", - # "PYPL", - ], - "trading_hours": {"begin_session": "14:30:00", "end_session": "21:00:00"}, - "price_aggregate": "close", - "min_required_points": 30, - "zero_threshold": 1e-10, - "equilibrium_threshold": 10.0, - "training_minutes": 120, -} - -# ====== later =================== -# # Try to load configuration from file, fall back to defaults if not found -# CONFIG_FILE = "config.json" -# try: -# with open(CONFIG_FILE, "r") as f: -# user_config = json.load(f) -# CONFIG.update(user_config) -# print(f"Loaded configuration from {CONFIG_FILE}") -# except (FileNotFoundError, json.JSONDecodeError) as e: -# print(f"Using default configuration. Error loading {CONFIG_FILE}: {str(e)}") -# # Create a default config file if it doesn't exist -# try: -# with open(CONFIG_FILE, "w") as f: -# json.dump(CONFIG, f, indent=4) -# print(f"Created default configuration file: {CONFIG_FILE}") -# except Exception as e: -# print(f"Warning: Could not create default config file: {str(e)}") -# ------------------------ Settings ------------------------ - -TRADES = {} - - - -def load_summaries(datafile: str) -> pd.DataFrame: - from tools.data_loader import load_sqlite_to_dataframe - - instrument_ids = ["\"" + "STOCK-" + instrument + "\"" for instrument in CONFIG["instruments"]] - exchange_id = CONFIG["exchange_id"] - - query = "select tstamp" - query += ", tstamp_ns as time_ns" - query += ", substr(instrument_id, 7) as symbol" - query += ", open" - query += ", high" - query += ", low" - query += ", close" - query += ", volume" - query += ", num_trades" - query += ", vwap" - - query += " from md_1min_bars" - query += f" where exchange_id ='{exchange_id}'" - query += f" and instrument_id in ({','.join(instrument_ids)})" - - df = load_sqlite_to_dataframe(db_path=datafile, query=query) - - # Trading Hours - date_str = df["tstamp"][0][0:10] - start_time = f"{date_str} {CONFIG['trading_hours']['begin_session']}" - end_time = f"{date_str} {CONFIG['trading_hours']['end_session']}" - - # Perform boolean selection - df = df[(df["tstamp"] >= start_time) & (df["tstamp"] <= end_time)] - df["tstamp"] = pd.to_datetime(df["tstamp"]) - - return df - -def transform_dataframe(df): - # Select only the columns we need - px_col = CONFIG["price_aggregate"] - selected_columns = ["tstamp", "symbol", px_col] - df_selected = df[selected_columns] - - # Start with unique timestamps - result_df = df_selected["tstamp"].drop_duplicates().reset_index(drop=True) - - # For each unique symbol, add a corresponding close price column - for symbol in df_selected["symbol"].unique(): - # Filter rows for this symbol - df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True) - - # Create column name like "close-COIN" - price_column = f"{px_col}-{symbol}" - - # Create temporary dataframe with timestamp and price - temp_df = pd.DataFrame({ - "tstamp": df_symbol["tstamp"], - price_column: df_symbol[px_col] - }) - - # Join with our result dataframe - result_df = pd.merge(result_df, temp_df, on="tstamp", how="left") - - return result_df - - -def process_summaries(summaries_df: pd.DataFrame) -> None: - - result_df = transform_dataframe(summaries_df) - - price_columns = [ - column - for column in result_df.columns - if column.startswith(f"{CONFIG['price_aggregate']}-") - ] - - # ========================= Split into training and testing datasets ========================= - - # Training dataset: first CONFIG[training_minutes] rows - training_ts_df = result_df.iloc[:CONFIG["training_minutes"], :].copy().reset_index(drop=True) - - # Testing dataset: remaining rows after training period - testing_ts_df = result_df.iloc[CONFIG["training_minutes"]:, :].copy().reset_index(drop=True) - - # Store timestamps for later use - testing_timestamps = testing_ts_df["tstamp"].copy() - - # Find the starting indices for A and B - all_indexes = range(len(price_columns)) - unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j] - - result_columns = [ - "time", - "action", - "symbol", - "price", - "divergence", - "pair", - ] - result_list = [] - px_col = CONFIG["price_aggregate"] - for a_index, b_index in unique_index_pairs: - try: - # Get the actual variable names - col_name_a = price_columns[a_index] - col_name_b = price_columns[b_index] - - symbol_a = col_name_a[len(f"{px_col}-") :] - symbol_b = col_name_b[len(f"{px_col}-") :].replace( - "STOCK-", "" - ) - pair = f"{symbol_a} & {symbol_b}" - - # ===== Training dataset ===== - training_pair_df = pd.DataFrame( - { - f"{col_name_a}": training_ts_df[col_name_a], - f"{col_name_b}": training_ts_df[col_name_b], - } - ).dropna().reset_index(drop=True) - - # Check if we have enough data points for a meaningful analysis - min_required_points = CONFIG[ - "min_required_points" - ] # Minimum number of points for a reasonable VECM model - if len(training_pair_df) < min_required_points: - print( - f"{pair}: Not enough data points for analysis. Found {len(training_pair_df)}, need at least {min_required_points}" - ) - continue - - # Check for non-finite values - if not np.isfinite(training_pair_df).all().all(): - print(f"{pair}: Data contains non-finite values (NaN or inf)") - continue - - # ===== Testing dataset ===== - testing_pair_df = pd.DataFrame( - { - f"{col_name_a}": testing_ts_df[col_name_a], - f"{col_name_b}": testing_ts_df[col_name_b], - } - ).dropna() - testing_pair_df = testing_pair_df.reset_index(drop=True) - - testing_pair_df = pd.merge(testing_timestamps.to_frame(), testing_pair_df , left_index=True, right_index=True) - - # Fit the VECM - try: - vecm_model = VECM(training_pair_df, coint_rank=1) - vecm_fit = vecm_model.fit() - - # Check if the model converged properly - if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None: - print(f"{pair}: VECM model failed to converge properly") - continue - - beta = vecm_fit.beta - - # Predict the next step - try: - next_values = vecm_fit.predict(steps=len(testing_pair_df)) - except Exception as e: - print(f"{pair}: Prediction failed: {str(e)}") - continue - - # Convert prediction to a DataFrame for readability - predicted_df = pd.DataFrame(next_values, columns=[col_name_a, col_name_b]) - except Exception as e: - print(f"{pair}: VECM model fitting failed: {str(e)}") - continue - - predicted_df["equilibrium_term"] = ( - beta[0] * predicted_df[col_name_a] - + beta[1] * predicted_df[col_name_b] - ) - - pair_result_df = pd.merge( - testing_pair_df, predicted_df, left_index=True, right_index=True, suffixes=('', '_pred') - ).dropna() - - pair_result_df["testing_eqlbrm_term"] = ( - beta[0] * pair_result_df[col_name_a] - + beta[1] * pair_result_df[col_name_b] - ) - - pair_result_df["abs_testing_eqlbrm_term"] = np.abs(pair_result_df["testing_eqlbrm_term"]) - - # Check if the first value is non-zero to avoid division by zero - initial_abs_term = pair_result_df.loc[0, "abs_testing_eqlbrm_term"] - if ( - initial_abs_term < CONFIG["zero_threshold"] - ): # Small threshold to avoid division by very small numbers - print( - f"{pair}: Skipping due to near-zero initial equilibrium term: {initial_abs_term}" - ) - continue - - condition = ( - pair_result_df["abs_testing_eqlbrm_term"] - < initial_abs_term / CONFIG["equilibrium_threshold"] - ) - first_row_index = next( - (index for index, value in condition.items() if value), None - ) - - if first_row_index is not None: - first_row = pair_result_df.loc[first_row_index] - fr_df = first_row.to_frame().T - - # Add safeguard against division by zero - if ( - abs(beta[1]) < CONFIG["zero_threshold"] - ): # Small threshold to avoid division by very small numbers - print(f"{pair}: Skipping due to near-zero beta[1] value: {beta[1]}") - continue - - if predicted_df.iloc[0, 1] > predicted_df.iloc[0, 0] / abs(beta[1]): - - my_tuple_a1 = ( - pair_result_df["tstamp"].iloc[0], - "BUY", - symbol_a, - pair_result_df[f"{col_name_a}"].iloc[0], - pair_result_df["testing_eqlbrm_term"].iloc[0], - pair, - ) - my_tuple_b1 = ( - pair_result_df["tstamp"].iloc[0], - "SELL", - symbol_b, - pair_result_df[f"{col_name_b}"].iloc[0], - pair_result_df["testing_eqlbrm_term"].iloc[0], - pair, - ) - my_tuple_a2 = ( - fr_df["tstamp"].iloc[0], - "SELL", - symbol_a, - fr_df[f"{col_name_a}"].iloc[0], - fr_df["abs_testing_eqlbrm_term"].iloc[0], - pair, - ) - my_tuple_b2 = ( - fr_df["tstamp"].iloc[0], - "BUY", - symbol_b, - fr_df[f"{col_name_b}"].iloc[0], - fr_df["abs_testing_eqlbrm_term"].iloc[0], - pair, - ) - - else: - my_tuple_a1 = ( - pair_result_df["tstamp"].iloc[0], - "SELL", - symbol_a, - pair_result_df[f"testing_{col_name_a}"].iloc[0], - pair_result_df["testing_eqlbrm_term"].iloc[0], - pair, - ) - my_tuple_b1 = ( - pair_result_df["tstamp"].iloc[0], - "BUY", - symbol_b, - pair_result_df[f"testing_{col_name_b}"].iloc[0], - pair_result_df["testing_eqlbrm_term"].iloc[0], - pair, - ) - my_tuple_a2 = ( - fr_df["tstamp"].iloc[0], - "BUY", - symbol_a, - fr_df[f"testing_{col_name_a}"].iloc[0], - fr_df["abs_testing_eqlbrm_term"].iloc[0], - pair, - ) - my_tuple_b2 = ( - fr_df["tstamp"].iloc[0], - "SELL", - symbol_b, - fr_df[f"testing_{col_name_b}"].iloc[0], - fr_df["abs_testing_eqlbrm_term"].iloc[0], - pair, - ) - - # Add tuples to data frame - tuple_df = pd.DataFrame( - [my_tuple_a1, my_tuple_b1, my_tuple_a2, my_tuple_b2], - columns=result_columns, - ) - # print(tuple_df) - - result_list.append(tuple_df) - - else: - print(f"{pair}: NO SIGNAL FOUND") - - except KeyError: - print( - f"Error: Column '{price_columns[a_index]}' or '{price_columns[b_index]}' not found." - ) - return [] - - # Check if result_list has any data before concatenating - if not result_list: - print("No trading signals found for any pairs") - return None - - result = pd.concat(result_list, ignore_index=True) - result["time"] = pd.to_datetime(result["time"]) - result = result.set_index("time").sort_index() - - collect_single_day_results(result) - # print_single_day_results(result) - - -def add_trade(pair, symbol, action, price): - # Ensure we always use clean names without STOCK- prefix - pair = pair.replace("STOCK-", "") - symbol = symbol.replace("STOCK-", "") - - if pair not in TRADES: - TRADES[pair] = {symbol: []} - if symbol not in TRADES[pair]: - TRADES[pair][symbol] = [] - TRADES[pair][symbol].append((action, price)) - - -def collect_single_day_results(result): - if result is None: - return - - print("\n -------------- Suggested Trades ") - print(result) - - for row in result.itertuples(): - action = row.action - symbol = row.symbol - price = row.price - add_trade(pair=row.pair, action=action, symbol=symbol, price=price) - - -def print_single_day_results(result): - for pair, symbols in TRADES.items(): - print(f"\n--- {pair} ---") - for symbol, trades in symbols.items(): - for side, price in trades: - print(f"{symbol} {side} at ${price}") - - -def print_results_suummary(all_results): - # Summary of all processed files - print("\n====== Summary of All Processed Files ======") - for filename, data in all_results.items(): - trade_count = sum( - len(trades) - for symbol_trades in data["trades"].values() - for trades in symbol_trades.values() - ) - print(f"{filename}: {trade_count} trades") - - -def calculate_returns(all_results): - print("\n====== Returns By Day and Pair ======") - - for filename, data in all_results.items(): - day_return = 0 - print(f"\n--- {filename} ---") - - # Process each pair - for pair, symbols in data["trades"].items(): - pair_return = 0 - pair_trades = [] - - # Calculate individual symbol returns in the pair - for symbol, trades in symbols.items(): - if len(trades) >= 2: # Need at least entry and exit - # Get entry and exit trades - entry_action, entry_price = trades[0] - exit_action, exit_price = trades[1] - - # Calculate return based on action - symbol_return = 0 - if entry_action == "BUY" and exit_action == "SELL": - # Long position - symbol_return = (exit_price - entry_price) / entry_price * 100 - elif entry_action == "SELL" and exit_action == "BUY": - # Short position - symbol_return = (entry_price - exit_price) / entry_price * 100 - - pair_trades.append( - ( - symbol, - entry_action, - entry_price, - exit_action, - exit_price, - symbol_return, - ) - ) - pair_return += symbol_return - - # Print pair returns - if pair_trades: - print(f" {pair}:") - for ( - symbol, - entry_action, - entry_price, - exit_action, - exit_price, - symbol_return, - ) in pair_trades: - print( - f" {symbol}: {entry_action} @ ${entry_price:.2f}, {exit_action} @ ${exit_price:.2f}, Return: {symbol_return:.2f}%" - ) - print(f" Pair Total Return: {pair_return:.2f}%") - day_return += pair_return - - # Print day total return - if day_return != 0: - print(f" Day Total Return: {day_return:.2f}%") - - -if __name__ == "__main__": - # Initialize a dictionary to store all trade results - all_results = {} - - # Process each data file - for datafile in CONFIG["datafiles"]: - print(f"\n====== Processing {datafile} ======") - - # Clear the TRADES global dictionary for the new file - TRADES.clear() - - # Process data for this file - try: - file_results = process_summaries( - summaries_df=load_summaries(datafile) - ) - - # Store results with file name as key - filename = datafile.split("/")[-1] - all_results[filename] = {"trades": TRADES.copy(), "results": file_results} - - print(f"Successfully processed {filename}") - except Exception as e: - print(f"Error processing {datafile}: {str(e)}") - - # print_results_suummary(all_results) - calculate_returns(all_results)