From 5c6841820bf5eab7dee3dc88bac0181cc21240f8 Mon Sep 17 00:00:00 2001 From: Oleg Sheynin Date: Sun, 25 May 2025 23:05:17 -0400 Subject: [PATCH] initial --- .gitignore | 7 + src/__pycache__/data_loader.cpython-312.pyc | Bin 0 -> 1227 bytes src/cointegration-03.py | 521 +++++++++++++++++ src/direct_solution.py | 43 ++ src/pivot_example.py | 77 +++ src/pt_backtest.py | 515 +++++++++++++++++ src/pt_backtest_slide.py | 523 ++++++++++++++++++ .../__pycache__/data_loader.cpython-312.pyc | Bin 0 -> 1233 bytes src/tools/data_loader.py | 25 + 9 files changed, 1711 insertions(+) create mode 100644 .gitignore create mode 100644 src/__pycache__/data_loader.cpython-312.pyc create mode 100644 src/cointegration-03.py create mode 100644 src/direct_solution.py create mode 100644 src/pivot_example.py create mode 100644 src/pt_backtest.py create mode 100644 src/pt_backtest_slide.py create mode 100644 src/tools/__pycache__/data_loader.cpython-312.pyc create mode 100644 src/tools/data_loader.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ee63380 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +# SpecStory explanation file +.specstory/ +.history/ +.cursorindexingignore +data +.vscode/ +cvttpy diff --git a/src/__pycache__/data_loader.cpython-312.pyc b/src/__pycache__/data_loader.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..3bc2e4ff6191218cbb081fffa450ac8e3a7d0d5f GIT binary patch literal 1227 zcmah}&1)1%6o1v#Jv}q&?nz9L&4S6W5p_^9iWhM=LPop<5ky$rxip=w%E#C<-Bx!p znJ_UTyO11~2rD=s3%ZwGiGP6Cn4l<=pv$|)G_bCwQt3#Z@2v>~!4+{0 zK=cLLZvuyb^~|YJE+G-6sosOXm84e$x5Rws)EjCYMV$#Nj?%u0!t?t@X*aYws>2Na z4eDq>oYwyPnE%c`>X76psF%3v`nB4ub>;HK$%y7DV?H~ZpVcqy7=;Yi!|R@aPIFx{ zsT*-6AlzcqA+~_*>r<39c*^r##|ydUdVWZ`7RPffV3W0oXD-aRG>9hsS|P=CGHM4- zG{LnvlF7G_D`bpobjA%Lj%X+kF(1ozsW{xSc#+U)>iI#@k0vOy1LjvmH(}BB zMGZR699f_kGPg*a$gw@&A(RaU4gP~PWv^lzcI4X<9%D|GvK~=L77PCo>L#=mQ|sN# z9bB4UoNv`%a&hR>qh6&$l}PK z7uy(&zKmyfSD=`)sb1!K)cf*sB&Ub-`*^f9WLV zY1^(klQr9ZC%-3dREd46D*RE0g$2a3ku&O1yC%jV4jYG!kjbx!rvj%&oRI0liRUB} zNtSJM)nVh)9FwtAO#THP!j{m$x`o;_mH(EVPu zP;WaAs!AL~`bCD%b=4ZB literal 0 HcmV?d00001 diff --git a/src/cointegration-03.py b/src/cointegration-03.py new file mode 100644 index 0000000..4b88bd0 --- /dev/null +++ b/src/cointegration-03.py @@ -0,0 +1,521 @@ +import datetime +import sys +import json + +from typing import Any, Dict, List + +from cvttpy.trading.instrument import ExchangeInstrument + +from cvttpy.tools.timeutils import NanoPerMin + +import pandas as pd +import numpy as np + +# ============= statsmodels =================== +from statsmodels.tsa.vector_ar.vecm import VECM + +UNSET_FLOAT: float = sys.float_info.max +UNSET_INT: int = sys.maxsize + +# ------------------------ Configuration ------------------------ +# Default configuration +CONFIG = { + "exchange_id": "ALPACA", + "datafiles": [ + "./data/equity/20250508.alpaca_sim_md.db", + # "./data/equity/20250509.alpaca_sim_md.db", + # "./data/equity/20250512.alpaca_sim_md.db", + # "./data/equity/20250513.alpaca_sim_md.db", + # "./data/equity/20250514.alpaca_sim_md.db", + # "./data/equity/20250515.alpaca_sim_md.db", + # "./data/equity/20250516.alpaca_sim_md.db", + # "./data/equity/20250519.alpaca_sim_md.db", + # "./data/equity/20250520.alpaca_sim_md.db" + ], + "instruments": [ + "COIN", + "GBTC", + # "HOOD", + # "MSTR", + # "PYPL", + ], + "trading_hours": {"begin_session": "14:30:00", "end_session": "21:00:00"}, + "price_aggregate": "close", + "min_required_points": 30, + "zero_threshold": 1e-10, + "equilibrium_threshold": 10.0, + "training_minutes": 120, +} + +# ====== later =================== +# # Try to load configuration from file, fall back to defaults if not found +# CONFIG_FILE = "config.json" +# try: +# with open(CONFIG_FILE, "r") as f: +# user_config = json.load(f) +# CONFIG.update(user_config) +# print(f"Loaded configuration from {CONFIG_FILE}") +# except (FileNotFoundError, json.JSONDecodeError) as e: +# print(f"Using default configuration. Error loading {CONFIG_FILE}: {str(e)}") +# # Create a default config file if it doesn't exist +# try: +# with open(CONFIG_FILE, "w") as f: +# json.dump(CONFIG, f, indent=4) +# print(f"Created default configuration file: {CONFIG_FILE}") +# except Exception as e: +# print(f"Warning: Could not create default config file: {str(e)}") +# ------------------------ Settings ------------------------ + +TRADES = {} + + + +def load_summaries(datafile: str) -> pd.DataFrame: + from tools.data_loader import load_sqlite_to_dataframe + + instrument_ids = ["\"" + "STOCK-" + instrument + "\"" for instrument in CONFIG["instruments"]] + exchange_id = CONFIG["exchange_id"] + + query = "select tstamp" + query += ", tstamp_ns as time_ns" + query += ", substr(instrument_id, 7) as symbol" + query += ", open" + query += ", high" + query += ", low" + query += ", close" + query += ", volume" + query += ", num_trades" + query += ", vwap" + + query += " from md_1min_bars" + query += f" where exchange_id ='{exchange_id}'" + query += f" and instrument_id in ({','.join(instrument_ids)})" + + df = load_sqlite_to_dataframe(db_path=datafile, query=query) + + # Trading Hours + date_str = df["tstamp"][0][0:10] + start_time = f"{date_str} {CONFIG['trading_hours']['begin_session']}" + end_time = f"{date_str} {CONFIG['trading_hours']['end_session']}" + + # Perform boolean selection + df = df[(df["tstamp"] >= start_time) & (df["tstamp"] <= end_time)] + df["tstamp"] = pd.to_datetime(df["tstamp"]) + + return df + +def transform_dataframe(df): + # Select only the columns we need + px_col = CONFIG["price_aggregate"] + selected_columns = ["tstamp", "symbol", px_col] + df_selected = df[selected_columns] + + # Start with unique timestamps + result_df = df_selected["tstamp"].drop_duplicates().reset_index(drop=True) + + # For each unique symbol, add a corresponding close price column + for symbol in df_selected["symbol"].unique(): + # Filter rows for this symbol + df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True) + + # Create column name like "close-COIN" + price_column = f"{px_col}-{symbol}" + + # Create temporary dataframe with timestamp and price + temp_df = pd.DataFrame({ + "tstamp": df_symbol["tstamp"], + price_column: df_symbol[px_col] + }) + + # Join with our result dataframe + result_df = pd.merge(result_df, temp_df, on="tstamp", how="left") + + return result_df + + +def process_summaries(summaries_df: pd.DataFrame) -> None: + + result_df = transform_dataframe(summaries_df) + + price_columns = [ + column + for column in result_df.columns + if column.startswith(f"{CONFIG['price_aggregate']}-") + ] + + # ========================= Split into training and testing datasets ========================= + + # Training dataset: first CONFIG[training_minutes] rows + training_ts_df = result_df.iloc[:CONFIG["training_minutes"], :].copy().reset_index(drop=True) + + # Testing dataset: remaining rows after training period + testing_ts_df = result_df.iloc[CONFIG["training_minutes"]:, :].copy().reset_index(drop=True) + + # Store timestamps for later use + testing_timestamps = testing_ts_df["tstamp"].copy() + + # Find the starting indices for A and B + all_indexes = range(len(price_columns)) + unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j] + + result_columns = [ + "time", + "action", + "symbol", + "price", + "divergence", + "pair", + ] + result_list = [] + px_col = CONFIG["price_aggregate"] + for a_index, b_index in unique_index_pairs: + try: + # Get the actual variable names + col_name_a = price_columns[a_index] + col_name_b = price_columns[b_index] + + symbol_a = col_name_a[len(f"{px_col}-") :] + symbol_b = col_name_b[len(f"{px_col}-") :].replace( + "STOCK-", "" + ) + pair = f"{symbol_a} & {symbol_b}" + + # ===== Training dataset ===== + training_pair_df = pd.DataFrame( + { + f"{col_name_a}": training_ts_df[col_name_a], + f"{col_name_b}": training_ts_df[col_name_b], + } + ).dropna().reset_index(drop=True) + + # Check if we have enough data points for a meaningful analysis + min_required_points = CONFIG[ + "min_required_points" + ] # Minimum number of points for a reasonable VECM model + if len(training_pair_df) < min_required_points: + print( + f"{pair}: Not enough data points for analysis. Found {len(training_pair_df)}, need at least {min_required_points}" + ) + continue + + # Check for non-finite values + if not np.isfinite(training_pair_df).all().all(): + print(f"{pair}: Data contains non-finite values (NaN or inf)") + continue + + # ===== Testing dataset ===== + testing_pair_df = pd.DataFrame( + { + f"{col_name_a}": testing_ts_df[col_name_a], + f"{col_name_b}": testing_ts_df[col_name_b], + } + ).dropna() + testing_pair_df = testing_pair_df.reset_index(drop=True) + + testing_pair_df = pd.merge(testing_timestamps.to_frame(), testing_pair_df , left_index=True, right_index=True) + + # Fit the VECM + try: + vecm_model = VECM(training_pair_df, coint_rank=1) + vecm_fit = vecm_model.fit() + + # Check if the model converged properly + if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None: + print(f"{pair}: VECM model failed to converge properly") + continue + + beta = vecm_fit.beta + + # Predict the next step + try: + next_values = vecm_fit.predict(steps=len(testing_pair_df)) + except Exception as e: + print(f"{pair}: Prediction failed: {str(e)}") + continue + + # Convert prediction to a DataFrame for readability + predicted_df = pd.DataFrame(next_values, columns=[col_name_a, col_name_b]) + except Exception as e: + print(f"{pair}: VECM model fitting failed: {str(e)}") + continue + + predicted_df["equilibrium_term"] = ( + beta[0] * predicted_df[col_name_a] + + beta[1] * predicted_df[col_name_b] + ) + + pair_result_df = pd.merge( + testing_pair_df, predicted_df, left_index=True, right_index=True, suffixes=('', '_pred') + ).dropna() + + pair_result_df["testing_eqlbrm_term"] = ( + beta[0] * pair_result_df[col_name_a] + + beta[1] * pair_result_df[col_name_b] + ) + + pair_result_df["abs_testing_eqlbrm_term"] = np.abs(pair_result_df["testing_eqlbrm_term"]) + + # Check if the first value is non-zero to avoid division by zero + initial_abs_term = pair_result_df.loc[0, "abs_testing_eqlbrm_term"] + if ( + initial_abs_term < CONFIG["zero_threshold"] + ): # Small threshold to avoid division by very small numbers + print( + f"{pair}: Skipping due to near-zero initial equilibrium term: {initial_abs_term}" + ) + continue + + condition = ( + pair_result_df["abs_testing_eqlbrm_term"] + < initial_abs_term / CONFIG["equilibrium_threshold"] + ) + first_row_index = next( + (index for index, value in condition.items() if value), None + ) + + if first_row_index is not None: + first_row = pair_result_df.loc[first_row_index] + fr_df = first_row.to_frame().T + + # Add safeguard against division by zero + if ( + abs(beta[1]) < CONFIG["zero_threshold"] + ): # Small threshold to avoid division by very small numbers + print(f"{pair}: Skipping due to near-zero beta[1] value: {beta[1]}") + continue + + if predicted_df.iloc[0, 1] > predicted_df.iloc[0, 0] / abs(beta[1]): + + my_tuple_a1 = ( + pair_result_df["tstamp"].iloc[0], + "BUY", + symbol_a, + pair_result_df[f"{col_name_a}"].iloc[0], + pair_result_df["testing_eqlbrm_term"].iloc[0], + pair, + ) + my_tuple_b1 = ( + pair_result_df["tstamp"].iloc[0], + "SELL", + symbol_b, + pair_result_df[f"{col_name_b}"].iloc[0], + pair_result_df["testing_eqlbrm_term"].iloc[0], + pair, + ) + my_tuple_a2 = ( + fr_df["tstamp"].iloc[0], + "SELL", + symbol_a, + fr_df[f"{col_name_a}"].iloc[0], + fr_df["abs_testing_eqlbrm_term"].iloc[0], + pair, + ) + my_tuple_b2 = ( + fr_df["tstamp"].iloc[0], + "BUY", + symbol_b, + fr_df[f"{col_name_b}"].iloc[0], + fr_df["abs_testing_eqlbrm_term"].iloc[0], + pair, + ) + + else: + my_tuple_a1 = ( + pair_result_df["tstamp"].iloc[0], + "SELL", + symbol_a, + pair_result_df[f"testing_{col_name_a}"].iloc[0], + pair_result_df["testing_eqlbrm_term"].iloc[0], + pair, + ) + my_tuple_b1 = ( + pair_result_df["tstamp"].iloc[0], + "BUY", + symbol_b, + pair_result_df[f"testing_{col_name_b}"].iloc[0], + pair_result_df["testing_eqlbrm_term"].iloc[0], + pair, + ) + my_tuple_a2 = ( + fr_df["tstamp"].iloc[0], + "BUY", + symbol_a, + fr_df[f"testing_{col_name_a}"].iloc[0], + fr_df["abs_testing_eqlbrm_term"].iloc[0], + pair, + ) + my_tuple_b2 = ( + fr_df["tstamp"].iloc[0], + "SELL", + symbol_b, + fr_df[f"testing_{col_name_b}"].iloc[0], + fr_df["abs_testing_eqlbrm_term"].iloc[0], + pair, + ) + + # Add tuples to data frame + tuple_df = pd.DataFrame( + [my_tuple_a1, my_tuple_b1, my_tuple_a2, my_tuple_b2], + columns=result_columns, + ) + # print(tuple_df) + + result_list.append(tuple_df) + + else: + print(f"{pair}: NO SIGNAL FOUND") + + except KeyError: + print( + f"Error: Column '{price_columns[a_index]}' or '{price_columns[b_index]}' not found." + ) + return [] + + # Check if result_list has any data before concatenating + if not result_list: + print("No trading signals found for any pairs") + return None + + result = pd.concat(result_list, ignore_index=True) + result["time"] = pd.to_datetime(result["time"]) + result = result.set_index("time").sort_index() + + collect_single_day_results(result) + # print_single_day_results(result) + + +def add_trade(pair, symbol, action, price): + # Ensure we always use clean names without STOCK- prefix + pair = pair.replace("STOCK-", "") + symbol = symbol.replace("STOCK-", "") + + if pair not in TRADES: + TRADES[pair] = {symbol: []} + if symbol not in TRADES[pair]: + TRADES[pair][symbol] = [] + TRADES[pair][symbol].append((action, price)) + + +def collect_single_day_results(result): + if result is None: + return + + print("\n -------------- Suggested Trades ") + print(result) + + for row in result.itertuples(): + action = row.action + symbol = row.symbol + price = row.price + add_trade(pair=row.pair, action=action, symbol=symbol, price=price) + + +def print_single_day_results(result): + for pair, symbols in TRADES.items(): + print(f"\n--- {pair} ---") + for symbol, trades in symbols.items(): + for side, price in trades: + print(f"{symbol} {side} at ${price}") + + +def print_results_suummary(all_results): + # Summary of all processed files + print("\n====== Summary of All Processed Files ======") + for filename, data in all_results.items(): + trade_count = sum( + len(trades) + for symbol_trades in data["trades"].values() + for trades in symbol_trades.values() + ) + print(f"{filename}: {trade_count} trades") + + +def calculate_returns(all_results): + print("\n====== Returns By Day and Pair ======") + + for filename, data in all_results.items(): + day_return = 0 + print(f"\n--- {filename} ---") + + # Process each pair + for pair, symbols in data["trades"].items(): + pair_return = 0 + pair_trades = [] + + # Calculate individual symbol returns in the pair + for symbol, trades in symbols.items(): + if len(trades) >= 2: # Need at least entry and exit + # Get entry and exit trades + entry_action, entry_price = trades[0] + exit_action, exit_price = trades[1] + + # Calculate return based on action + symbol_return = 0 + if entry_action == "BUY" and exit_action == "SELL": + # Long position + symbol_return = (exit_price - entry_price) / entry_price * 100 + elif entry_action == "SELL" and exit_action == "BUY": + # Short position + symbol_return = (entry_price - exit_price) / entry_price * 100 + + pair_trades.append( + ( + symbol, + entry_action, + entry_price, + exit_action, + exit_price, + symbol_return, + ) + ) + pair_return += symbol_return + + # Print pair returns + if pair_trades: + print(f" {pair}:") + for ( + symbol, + entry_action, + entry_price, + exit_action, + exit_price, + symbol_return, + ) in pair_trades: + print( + f" {symbol}: {entry_action} @ ${entry_price:.2f}, {exit_action} @ ${exit_price:.2f}, Return: {symbol_return:.2f}%" + ) + print(f" Pair Total Return: {pair_return:.2f}%") + day_return += pair_return + + # Print day total return + if day_return != 0: + print(f" Day Total Return: {day_return:.2f}%") + + +if __name__ == "__main__": + # Initialize a dictionary to store all trade results + all_results = {} + + # Process each data file + for datafile in CONFIG["datafiles"]: + print(f"\n====== Processing {datafile} ======") + + # Clear the TRADES global dictionary for the new file + TRADES.clear() + + # Process data for this file + try: + file_results = process_summaries( + summaries_df=load_summaries(datafile) + ) + + # Store results with file name as key + filename = datafile.split("/")[-1] + all_results[filename] = {"trades": TRADES.copy(), "results": file_results} + + print(f"Successfully processed {filename}") + except Exception as e: + print(f"Error processing {datafile}: {str(e)}") + + # print_results_suummary(all_results) + calculate_returns(all_results) diff --git a/src/direct_solution.py b/src/direct_solution.py new file mode 100644 index 0000000..260a22b --- /dev/null +++ b/src/direct_solution.py @@ -0,0 +1,43 @@ +import pandas as pd + +# This example shows how to transform your dataframe to have columns like close-COIN, close-PYPL, etc. + +# Assuming df is your input dataframe with the structure shown in your question +def transform_dataframe(df): + # Select only the columns we need + selected_columns = ["tstamp", "symbol", "close"] + df_selected = df[selected_columns] + + # Start with unique timestamps + result_df = df_selected["tstamp"].drop_duplicates().reset_index(drop=True) + + # For each unique symbol, add a corresponding close price column + for symbol in df_selected["symbol"].unique(): + # Filter rows for this symbol + df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True) + + # Create column name like "close-COIN" + price_column = f"close-{symbol}" + + # Create temporary dataframe with timestamp and price + temp_df = pd.DataFrame({ + "tstamp": df_symbol["tstamp"], + price_column: df_symbol["close"] + }) + + # Join with our result dataframe + result_df = pd.merge(result_df, temp_df, on="tstamp", how="left") + + return result_df + +# Example usage (assuming df is your input dataframe): +# result_df = transform_dataframe(df) +# print(result_df.head()) + +""" +The resulting dataframe will look like: + tstamp close-COIN close-GBTC close-HOOD close-MSTR close-PYPL +0 2025-05-20 14:30:00 262.3650 45.1234 21.567 935.42 72.1611 +1 2025-05-20 14:31:00 262.5850 45.2100 21.589 935.67 72.1611 +... +""" \ No newline at end of file diff --git a/src/pivot_example.py b/src/pivot_example.py new file mode 100644 index 0000000..5554d47 --- /dev/null +++ b/src/pivot_example.py @@ -0,0 +1,77 @@ +import pandas as pd + +# Assuming your dataframe is named 'df' +def pivot_dataframe(df): + # Convert timestamp to datetime if it's not already + df['tstamp'] = pd.to_datetime(df['tstamp']) + + # Create a pivot table with 'tstamp' as index and 'symbol' as columns + # You can choose any aggregation function (mean, first, last, etc.) + # if there are multiple values per timestamp/symbol + pivoted_df = pd.pivot_table( + df, + values=['close', 'open', 'high', 'low', 'volume'], + index='tstamp', + columns='symbol', + aggfunc='first' # Use 'first' if there's only one value per timestamp/symbol + ) + + # Flatten the multi-level column names + pivoted_df.columns = [f"{col[0]}-{col[1]}" for col in pivoted_df.columns] + + # Reset index to make tstamp a column + pivoted_df = pivoted_df.reset_index() + + return pivoted_df + +# Example usage: +# pivoted_df = pivot_dataframe(your_dataframe) +# print(pivoted_df.head()) + +# Alternative approach (similar to your code): +def pivot_alternative(df): + # Create an empty dataframe with just the timestamp + result_df = df['tstamp'].drop_duplicates().reset_index(drop=True) + + # For each symbol, create a separate column for each metric + for symbol in df['symbol'].unique(): + # Filter dataframe for this symbol + df_symbol = df[df['symbol'] == symbol].reset_index(drop=True) + + # Create columns for each price/volume metric + for metric in ['close', 'open', 'high', 'low', 'volume']: + column_name = f"{metric}-{symbol}" + # Create a temporary dataframe with tstamp and the new column + temp_df = pd.DataFrame({ + 'tstamp': df_symbol['tstamp'], + column_name: df_symbol[metric] + }) + + # Merge with the result dataframe + result_df = pd.merge(result_df, temp_df, on='tstamp', how='left') + + return result_df + +# Example using the code similar to what's in your main code: +""" +# Selected columns from original dataframe +selected_columns = ["tstamp", "symbol", "close"] +df_selected = df[selected_columns] + +# Create result dataframe starting with timestamps +result_df = df_selected["tstamp"].drop_duplicates().reset_index(drop=True) + +# For each symbol, add a column with its close price +for symbol in df_selected["symbol"].unique(): + df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True) + price_column = f"close-{symbol}" + + # Create temp dataframe with tstamp and the price column + temp_df = pd.DataFrame({ + "tstamp": df_symbol["tstamp"], + price_column: df_symbol["close"] + }) + + # Merge with result dataframe + result_df = pd.merge(result_df, temp_df, on="tstamp", how="left") +""" \ No newline at end of file diff --git a/src/pt_backtest.py b/src/pt_backtest.py new file mode 100644 index 0000000..e85d56f --- /dev/null +++ b/src/pt_backtest.py @@ -0,0 +1,515 @@ +import datetime +import sys +import json + +from typing import Any, Dict, List, Tuple, Optional + +import pandas as pd +import numpy as np + +# ============= statsmodels =================== +from statsmodels.tsa.vector_ar.vecm import VECM + +NanoPerMin = 1e9 +UNSET_FLOAT: float = sys.float_info.max +UNSET_INT: int = sys.maxsize + +# ------------------------ Configuration ------------------------ +# Default configuration +CONFIG = { + "exchange_id": "ALPACA", + "data_directory": "./data/equity", + "datafiles": [ + "20250508.alpaca_sim_md.db", + # "20250509.alpaca_sim_md.db", + # "20250512.alpaca_sim_md.db", + # "20250513.alpaca_sim_md.db", + # "20250514.alpaca_sim_md.db", + # "20250515.alpaca_sim_md.db", + # "20250516.alpaca_sim_md.db", + # "20250519.alpaca_sim_md.db", + # "20250520.alpaca_sim_md.db" + ], + "instruments": [ + "COIN", + "GBTC", + "HOOD", + "MSTR", + "PYPL", + ], + "trading_hours": {"begin_session": "14:30:00", "end_session": "21:00:00"}, + "price_column": "close", + "min_required_points": 30, + "zero_threshold": 1e-10, + "equilibrium_threshold": 10.0, + # "training_minutes": 120, + "training_minutes": 180, +} + +# ====== later =================== +# # Try to load configuration from file, fall back to defaults if not found +# CONFIG_FILE = "config.json" +# try: +# with open(CONFIG_FILE, "r") as f: +# user_config = json.load(f) +# CONFIG.update(user_config) +# print(f"Loaded configuration from {CONFIG_FILE}") +# except (FileNotFoundError, json.JSONDecodeError) as e: +# print(f"Using default configuration. Error loading {CONFIG_FILE}: {str(e)}") +# # Create a default config file if it doesn't exist +# try: +# with open(CONFIG_FILE, "w") as f: +# json.dump(CONFIG, f, indent=4) +# print(f"Created default configuration file: {CONFIG_FILE}") +# except Exception as e: +# print(f"Warning: Could not create default config file: {str(e)}") +# ------------------------ Settings ------------------------ + +TRADES = {} +class Pair: + symbol_a_: str + symbol_b_: str + + def __init__(self, symbol_a: str, symbol_b: str): + self.symbol_a_ = symbol_a + self.symbol_b_ = symbol_b + + def __repr__(self) ->str: + return f"{self.symbol_a_} & {self.symbol_b_}" + +def load_market_data(datafile: str) -> pd.DataFrame: + from tools.data_loader import load_sqlite_to_dataframe + + instrument_ids = ["\"" + "STOCK-" + instrument + "\"" for instrument in CONFIG["instruments"]] + exchange_id = CONFIG["exchange_id"] + + query = "select tstamp" + query += ", tstamp_ns as time_ns" + query += ", substr(instrument_id, 7) as symbol" + query += ", open" + query += ", high" + query += ", low" + query += ", close" + query += ", volume" + query += ", num_trades" + query += ", vwap" + + query += " from md_1min_bars" + query += f" where exchange_id ='{exchange_id}'" + query += f" and instrument_id in ({','.join(instrument_ids)})" + + df = load_sqlite_to_dataframe(db_path=datafile, query=query) + + # Trading Hours + date_str = df["tstamp"][0][0:10] + start_time = f"{date_str} {CONFIG['trading_hours']['begin_session']}" + end_time = f"{date_str} {CONFIG['trading_hours']['end_session']}" + + # Perform boolean selection + df = df[(df["tstamp"] >= start_time) & (df["tstamp"] <= end_time)] + df["tstamp"] = pd.to_datetime(df["tstamp"]) + + return df + +def transform_dataframe(df: pd.DataFrame, price_column: str): + # Select only the columns we need + df_selected = df[["tstamp", "symbol", price_column]] + + # Start with unique timestamps + result_df = df_selected["tstamp"].drop_duplicates().reset_index(drop=True) + + # For each unique symbol, add a corresponding close price column + for symbol in df_selected["symbol"].unique(): + # Filter rows for this symbol + df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True) + + # Create column name like "close-COIN" + new_price_column = f"{price_column}_{symbol}" + + # Create temporary dataframe with timestamp and price + temp_df = pd.DataFrame({ + "tstamp": df_symbol["tstamp"], + new_price_column: df_symbol[price_column] + }) + + # Join with our result dataframe + result_df = pd.merge(result_df, temp_df, on="tstamp", how="left") + result_df = result_df.reset_index(drop=True) # do not dropna() since irrelevant symbol would affect dataset + + return result_df + +def get_datasets(df: pd.DataFrame, training_minutes: int, colname_a: str, colname_b: str) -> Tuple[pd.DataFrame, pd.DataFrame]: + # Training dataset + df = df[["tstamp", colname_a, colname_b]] + df = df.dropna() + + training_df = df.iloc[:training_minutes - 1, :].copy() + training_df.reset_index(drop=True).dropna().reset_index(drop=True) + + # Testing dataset + testing_df = df.iloc[training_minutes:, :].copy() + testing_df.reset_index(drop=True).dropna().reset_index(drop=True) + + return (training_df, testing_df) + +def fit_VECM(training_pair_df, colname_a, colname_b): + vecm_model = VECM(training_pair_df[[colname_a, colname_b]].reset_index(drop=True), coint_rank=1) + vecm_fit = vecm_model.fit() + + # Check if the model converged properly + if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None: + print(f"{pair}: VECM model failed to converge properly") + + return vecm_fit + +def create_trading_signals(vecm_fit, testing_pair_df, symbol_a, symbol_b, colname_a, colname_b) -> pd.DataFrame: + result_columns = [ + "time", + "action", + "symbol", + "price", + "divergence", + "pair", + ] + + pair = Pair(symbol_a=symbol_a, symbol_b=symbol_b) + next_values = vecm_fit.predict(steps=len(testing_pair_df)) + + # Convert prediction to a DataFrame for readability + predicted_df = pd.DataFrame(next_values, columns=[colname_a, colname_b]) + + beta = vecm_fit.beta + + predicted_df["equilibrium_term"] = ( + beta[0] * predicted_df[colname_a] + + beta[1] * predicted_df[colname_b] + ) + + pair_result_df = pd.merge( + testing_pair_df.reset_index(drop=True), predicted_df, left_index=True, right_index=True, suffixes=('', '_pred') + ).dropna() + + pair_result_df["testing_eqlbrm_term"] = ( + beta[0] * pair_result_df[colname_a] + + beta[1] * pair_result_df[colname_b] + ) + + pair_result_df["abs_testing_eqlbrm_term"] = np.abs(pair_result_df["testing_eqlbrm_term"]) + + # Check if the first value is non-zero to avoid division by zero + pair_result_df = pair_result_df.reset_index() + initial_abs_term = pair_result_df["abs_testing_eqlbrm_term"][0] + if ( + initial_abs_term < CONFIG["zero_threshold"] + ): # Small threshold to avoid division by very small numbers + print( + f"{pair}: Skipping pair due to near-zero initial equilibrium: {initial_abs_term}" + ) + return pd.DataFrame() + + trading_signals_df = ( + pair_result_df["abs_testing_eqlbrm_term"] + < initial_abs_term / CONFIG["equilibrium_threshold"] + ) + close_row_index = next( + (index for index, value in trading_signals_df.items() if value), None + ) + + if close_row_index is None: + print(f"{pair}: NO SIGNAL FOUND") + return pd.DataFrame() + + open_row = pair_result_df.loc[0] + close_row = pair_result_df.loc[close_row_index] + + open_tstamp = open_row["tstamp"] + open_eqlbrm = open_row["testing_eqlbrm_term"] + open_px_a = open_row[f"{colname_a}"] + open_px_b = open_row[f"{colname_b}"] + + close_tstamp = close_row["tstamp"] + close_eqlbrm = close_row["testing_eqlbrm_term"] + close_px_a = close_row[f"{colname_a}"] + close_px_b = close_row[f"{colname_b}"] + + abs_beta = abs(beta[1]) + pred_px_b = pair_result_df.loc[0][f"{colname_b}_pred"] + pred_px_a = pair_result_df.loc[0][f"{colname_a}_pred"] + + if pred_px_b * abs_beta - pred_px_a > 0: + open_side_a = "BUY" + open_side_b = "SELL" + close_side_a = "SELL" + close_side_b = "BUY" + else: + open_side_b = "BUY" + open_side_a = "SELL" + close_side_b = "SELL" + close_side_a = "BUY" + + trd_signal_tuples = [ + ( + open_tstamp, + open_side_a, + symbol_a, + open_px_a, + open_eqlbrm, + pair, + ), + ( + open_tstamp, + open_side_b, + symbol_b, + open_px_b, + open_eqlbrm, + pair, + ), + ( + close_tstamp, + close_side_a, + symbol_a, + close_px_a, + close_eqlbrm, + pair, + ), + ( + close_tstamp, + close_side_b, + symbol_b, + close_px_b, + close_eqlbrm, + pair, + ), + ] + + # Add tuples to data frame + return pd.DataFrame( + trd_signal_tuples, + columns=result_columns, + ) + + +def run_single_pair(market_data: pd.DataFrame, price_column:str, symbol_a: str, symbol_b: str) -> Optional[pd.DataFrame]: + colname_a = f"{price_column}_{symbol_a}" + colname_b = f"{price_column}_{symbol_b}" + training_pair_df, testing_pair_df = get_datasets(df=market_data, training_minutes=CONFIG["training_minutes"], colname_a=colname_a, colname_b=colname_b) + + # Check if we have enough data points for a meaningful analysis + min_required_points = CONFIG[ + "min_required_points" + ] # Minimum number of points for a reasonable VECM model + if len(training_pair_df) < min_required_points: + print( + f"{pair}: Not enough data points for analysis. Found {len(training_pair_df)}, need at least {min_required_points}" + ) + return None + + # Check for non-finite values + if not np.isfinite(training_pair_df).all().all(): + print(f"{pair}: Data contains non-finite values (NaN or inf)") + return None + + # Fit the VECM + try: + vecm_fit = fit_VECM(training_pair_df, colname_a=colname_a, colname_b=colname_b) + except Exception as e: + print(f"{pair}: VECM fitting failed: {str(e)}") + return None + + # Add safeguard against division by zero + if ( + abs(vecm_fit.beta[1]) < CONFIG["zero_threshold"] + ): # Small threshold to avoid division by very small numbers + print(f"{pair}: Skipping due to near-zero beta[1] value: {vecm_fit.beta[1]}") + return None + + try: + pair_trades = create_trading_signals( + vecm_fit=vecm_fit, + testing_pair_df=testing_pair_df, + symbol_a=symbol_a, + symbol_b=symbol_b, + colname_a=colname_a, + colname_b=colname_b + ) + except Exception as e: + print(f"{pair}: Prediction failed: {str(e)}") + return + + return pair_trades + + +def run_pairs(summaries_df: pd.DataFrame, price_column: str) -> None: + + result_df = transform_dataframe(df=summaries_df, price_column=price_column) + + stock_price_columns = [ + column + for column in result_df.columns + if column.startswith(f"{price_column}_") + ] + + # Find the starting indices for A and B + all_indexes = range(len(stock_price_columns)) + unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j] + + pairs_trades = [] + for a_index, b_index in unique_index_pairs: + # Get the actual variable names + colname_a = stock_price_columns[a_index] + colname_b = stock_price_columns[b_index] + + symbol_a = colname_a[len(f"{price_column}-") :] + symbol_b = colname_b[len(f"{price_column}-") :].replace( + "STOCK-", "" + ) + pair = f"{symbol_a} & {symbol_b}" + + single_pair_trades = run_single_pair(market_data=result_df, price_column=price_column, symbol_a=symbol_a, symbol_b=symbol_b) + if single_pair_trades is not None: + pairs_trades.append(single_pair_trades) + # Check if result_list has any data before concatenating + if not pairs_trades: + print("No trading signals found for any pairs") + return None + + result = pd.concat(pairs_trades, ignore_index=True) + result["time"] = pd.to_datetime(result["time"]) + result = result.set_index("time").sort_index() + + collect_single_day_results(result) + # print_single_day_results(result) + + +def add_trade(pair, symbol, action, price): + # Ensure we always use clean names without STOCK- prefix + pair = str(pair).replace("STOCK-", "") + symbol = symbol.replace("STOCK-", "") + + if pair not in TRADES: + TRADES[pair] = {symbol: []} + if symbol not in TRADES[pair]: + TRADES[pair][symbol] = [] + TRADES[pair][symbol].append((action, price)) + +def collect_single_day_results(result): + if result is None: + return + + print("\n -------------- Suggested Trades ") + print(result) + + for row in result.itertuples(): + action = row.action + symbol = row.symbol + price = row.price + add_trade(pair=row.pair, action=action, symbol=symbol, price=price) + +def print_single_day_results(result): + for pair, symbols in TRADES.items(): + print(f"\n--- {pair} ---") + for symbol, trades in symbols.items(): + for side, price in trades: + print(f"{symbol} {side} at ${price}") + +def print_results_suummary(all_results): + # Summary of all processed files + print("\n====== Summary of All Processed Files ======") + for filename, data in all_results.items(): + trade_count = sum( + len(trades) + for symbol_trades in data["trades"].values() + for trades in symbol_trades.values() + ) + print(f"{filename}: {trade_count} trades") + +def calculate_returns(all_results: Dict): + print("\n====== Returns By Day and Pair ======") + + for filename, data in all_results.items(): + day_return = 0 + print(f"\n--- {filename} ---") + + # Process each pair + for pair, symbols in data["trades"].items(): + pair_return = 0 + pair_trades = [] + + # Calculate individual symbol returns in the pair + for symbol, trades in symbols.items(): + if len(trades) >= 2: # Need at least entry and exit + # Get entry and exit trades + entry_action, entry_price = trades[0] + exit_action, exit_price = trades[1] + + # Calculate return based on action + symbol_return = 0 + if entry_action == "BUY" and exit_action == "SELL": + # Long position + symbol_return = (exit_price - entry_price) / entry_price * 100 + elif entry_action == "SELL" and exit_action == "BUY": + # Short position + symbol_return = (entry_price - exit_price) / entry_price * 100 + + pair_trades.append( + ( + symbol, + entry_action, + entry_price, + exit_action, + exit_price, + symbol_return, + ) + ) + pair_return += symbol_return + + # Print pair returns + if pair_trades: + print(f" {pair}:") + for ( + symbol, + entry_action, + entry_price, + exit_action, + exit_price, + symbol_return, + ) in pair_trades: + print( + f" {symbol}: {entry_action} @ ${entry_price:.2f}, {exit_action} @ ${exit_price:.2f}, Return: {symbol_return:.2f}%" + ) + print(f" Pair Total Return: {pair_return:.2f}%") + day_return += pair_return + + # Print day total return + if day_return != 0: + print(f" Day Total Return: {day_return:.2f}%") + +if __name__ == "__main__": + # Initialize a dictionary to store all trade results + all_results = {} + + # Process each data file + price_column = CONFIG["price_column"] + for datafile in CONFIG["datafiles"]: + print(f"\n====== Processing {datafile} ======") + + # Clear the TRADES global dictionary for the new file + TRADES.clear() + + # Process data for this file + try: + file_results = run_pairs( + summaries_df=load_market_data(f'{CONFIG["data_directory"]}/{datafile}'), + price_column=price_column + ) + + # Store results with file name as key + filename = datafile.split("/")[-1] + all_results[filename] = {"trades": TRADES.copy(), "results": file_results} + + print(f"Successfully processed {filename}") + except Exception as e: + print(f"Error processing {datafile}: {str(e)}") + + # print_results_suummary(all_results) + calculate_returns(all_results) diff --git a/src/pt_backtest_slide.py b/src/pt_backtest_slide.py new file mode 100644 index 0000000..3b60723 --- /dev/null +++ b/src/pt_backtest_slide.py @@ -0,0 +1,523 @@ +import datetime +import sys +import json + +from typing import Any, Dict, List, Tuple, Optional + +import pandas as pd +import numpy as np + +# ============= statsmodels =================== +from statsmodels.tsa.vector_ar.vecm import VECM + +NanoPerMin = 1e9 +UNSET_FLOAT: float = sys.float_info.max +UNSET_INT: int = sys.maxsize + +# ------------------------ Configuration ------------------------ +# Default configuration +CONFIG: Dict = { + "exchange_id": "ALPACA", + "data_directory": "./data/equity", + "datafiles": [ + "20250508.alpaca_sim_md.db", + # "20250509.alpaca_sim_md.db", + # "20250512.alpaca_sim_md.db", + # "20250513.alpaca_sim_md.db", + # "20250514.alpaca_sim_md.db", + # "20250515.alpaca_sim_md.db", + # "20250516.alpaca_sim_md.db", + # "20250519.alpaca_sim_md.db", + # "20250520.alpaca_sim_md.db" + ], + "instruments": [ + "COIN", + "GBTC", + "HOOD", + "MSTR", + "PYPL", + ], + "trading_hours": {"begin_session": "14:30:00", "end_session": "21:00:00"}, + "price_column": "close", + "min_required_points": 30, + "zero_threshold": 1e-10, + "equilibrium_threshold": 10.0, + # "training_minutes": 120, + "training_minutes": 120, +} + +# ====== later =================== +# # Try to load configuration from file, fall back to defaults if not found +# CONFIG_FILE = "config.json" +# try: +# with open(CONFIG_FILE, "r") as f: +# user_config = json.load(f) +# CONFIG.update(user_config) +# print(f"Loaded configuration from {CONFIG_FILE}") +# except (FileNotFoundError, json.JSONDecodeError) as e: +# print(f"Using default configuration. Error loading {CONFIG_FILE}: {str(e)}") +# # Create a default config file if it doesn't exist +# try: +# with open(CONFIG_FILE, "w") as f: +# json.dump(CONFIG, f, indent=4) +# print(f"Created default configuration file: {CONFIG_FILE}") +# except Exception as e: +# print(f"Warning: Could not create default config file: {str(e)}") +# ------------------------ Settings ------------------------ + +TRADES = {} +class Pair: + symbol_a_: str + symbol_b_: str + price_column_: str + + def __init__(self, symbol_a: str, symbol_b: str, price_column: str): + self.symbol_a_ = symbol_a + self.symbol_b_ = symbol_b + self.price_column_ = price_column + + def colname_a(self) -> str: + return f"{self.price_column_}_{self.symbol_a_}" + + def colname_b(self) -> str: + return f"{self.price_column_}_{self.symbol_b_}" + + def __repr__(self) ->str: + return f"{self.symbol_a_} & {self.symbol_b_}" + +def load_market_data(datafile: str) -> pd.DataFrame: + from tools.data_loader import load_sqlite_to_dataframe + + instrument_ids = ["\"" + "STOCK-" + instrument + "\"" for instrument in CONFIG["instruments"]] + exchange_id = CONFIG["exchange_id"] + + query = "select tstamp" + query += ", tstamp_ns as time_ns" + query += ", substr(instrument_id, 7) as symbol" + query += ", open" + query += ", high" + query += ", low" + query += ", close" + query += ", volume" + query += ", num_trades" + query += ", vwap" + + query += " from md_1min_bars" + query += f" where exchange_id ='{exchange_id}'" + query += f" and instrument_id in ({','.join(instrument_ids)})" + + df = load_sqlite_to_dataframe(db_path=datafile, query=query) + + # Trading Hours + date_str = df["tstamp"][0][0:10] + start_time = f"{date_str} {CONFIG['trading_hours']['begin_session']}" + end_time = f"{date_str} {CONFIG['trading_hours']['end_session']}" + + # Perform boolean selection + df = df[(df["tstamp"] >= start_time) & (df["tstamp"] <= end_time)] + df["tstamp"] = pd.to_datetime(df["tstamp"]) + + return df + +def transform_dataframe(df: pd.DataFrame, price_column: str): + # Select only the columns we need + df_selected = df[["tstamp", "symbol", price_column]] + + # Start with unique timestamps + result_df: pd.DataFrame = pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True) + + # For each unique symbol, add a corresponding close price column + for symbol in df_selected["symbol"].unique(): + # Filter rows for this symbol + df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True) + + # Create column name like "close-COIN" + new_price_column = f"{price_column}_{symbol}" + + # Create temporary dataframe with timestamp and price + temp_df = pd.DataFrame({ + "tstamp": df_symbol["tstamp"], + new_price_column: df_symbol[price_column] + }) + + # Join with our result dataframe + result_df = pd.merge(result_df, temp_df, on="tstamp", how="left") + result_df = result_df.reset_index(drop=True) # do not dropna() since irrelevant symbol would affect dataset + + return result_df + +def get_datasets(df: pd.DataFrame, training_minutes: int, pair: Pair) -> Tuple[pd.DataFrame, pd.DataFrame]: + # Training dataset + colname_a = pair.colname_a() + colname_b = pair.colname_b() + df = df[["tstamp", colname_a, colname_b]] + df = df.dropna() + + training_df = df.iloc[:training_minutes - 1, :].copy() + training_df.reset_index(drop=True).dropna().reset_index(drop=True) + + # Testing dataset + testing_df = df.iloc[training_minutes:, :].copy() + testing_df.reset_index(drop=True).dropna().reset_index(drop=True) + + return (training_df, testing_df) + +def fit_VECM(training_pair_df, pair: Pair): + vecm_model = VECM(training_pair_df[[pair.colname_a(), pair.colname_b()]].reset_index(drop=True), coint_rank=1) + vecm_fit = vecm_model.fit() + + # Check if the model converged properly + if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None: + print(f"{pair}: VECM model failed to converge properly") + + return vecm_fit + +def create_trading_signals(vecm_fit, testing_pair_df, pair: Pair, colname_a, colname_b) -> pd.DataFrame: + result_columns = [ + "time", + "action", + "symbol", + "price", + "divergence", + "pair", + ] + + next_values = vecm_fit.predict(steps=len(testing_pair_df)) + + # Convert prediction to a DataFrame for readability + predicted_df = pd.DataFrame(next_values, columns=[colname_a, colname_b]) + + beta = vecm_fit.beta + + predicted_df["equilibrium_term"] = ( + beta[0] * predicted_df[colname_a] + + beta[1] * predicted_df[colname_b] + ) + + pair_result_df = pd.merge( + testing_pair_df.reset_index(drop=True), predicted_df, left_index=True, right_index=True, suffixes=('', '_pred') + ).dropna() + + pair_result_df["testing_eqlbrm_term"] = ( + beta[0] * pair_result_df[colname_a] + + beta[1] * pair_result_df[colname_b] + ) + + pair_result_df["abs_testing_eqlbrm_term"] = np.abs(pair_result_df["testing_eqlbrm_term"]) + + # Check if the first value is non-zero to avoid division by zero + pair_result_df = pair_result_df.reset_index() + initial_abs_term = pair_result_df["abs_testing_eqlbrm_term"][0] + if ( + initial_abs_term < CONFIG["zero_threshold"] + ): # Small threshold to avoid division by very small numbers + print( + f"{pair}: Skipping pair due to near-zero initial equilibrium: {initial_abs_term}" + ) + return pd.DataFrame() + + trading_signals_df = ( + pair_result_df["abs_testing_eqlbrm_term"] + < initial_abs_term / CONFIG["equilibrium_threshold"] + ) + close_row_index = next( + (index for index, value in trading_signals_df.items() if value), None + ) + + if close_row_index is None: + print(f"{pair}: NO SIGNAL FOUND") + return pd.DataFrame() + + open_row = pair_result_df.loc[0] + close_row = pair_result_df.loc[close_row_index] + + open_tstamp = open_row["tstamp"] + open_eqlbrm = open_row["testing_eqlbrm_term"] + open_px_a = open_row[f"{colname_a}"] + open_px_b = open_row[f"{colname_b}"] + + close_tstamp = close_row["tstamp"] + close_eqlbrm = close_row["testing_eqlbrm_term"] + close_px_a = close_row[f"{colname_a}"] + close_px_b = close_row[f"{colname_b}"] + + abs_beta = abs(beta[1]) + pred_px_b = pair_result_df.loc[0][f"{colname_b}_pred"] + pred_px_a = pair_result_df.loc[0][f"{colname_a}_pred"] + + if pred_px_b * abs_beta - pred_px_a > 0: + open_side_a = "BUY" + open_side_b = "SELL" + close_side_a = "SELL" + close_side_b = "BUY" + else: + open_side_b = "BUY" + open_side_a = "SELL" + close_side_b = "SELL" + close_side_a = "BUY" + + trd_signal_tuples = [ + ( + open_tstamp, + open_side_a, + pair.symbol_a_, + open_px_a, + open_eqlbrm, + pair, + ), + ( + open_tstamp, + open_side_b, + pair.symbol_b_, + open_px_b, + open_eqlbrm, + pair, + ), + ( + close_tstamp, + close_side_a, + pair.symbol_a_, + close_px_a, + close_eqlbrm, + pair, + ), + ( + close_tstamp, + close_side_b, + pair.symbol_b_, + close_px_b, + close_eqlbrm, + pair, + ), + ] + + # Add tuples to data frame + return pd.DataFrame( + trd_signal_tuples, + columns=result_columns, + ) + + +def run_single_pair(market_data: pd.DataFrame, price_column:str, pair: Pair) -> Optional[pd.DataFrame]: + colname_a = f"{price_column}_{pair.symbol_a_}" + colname_b = f"{price_column}_{pair.symbol_b_}" + training_pair_df, testing_pair_df = get_datasets(df=market_data, training_minutes=CONFIG["training_minutes"], pair=pair) + + # Check if we have enough data points for a meaningful analysis + min_required_points = CONFIG[ + "min_required_points" + ] # Minimum number of points for a reasonable VECM model + if len(training_pair_df) < min_required_points: + print( + f"{pair}: Not enough data points for analysis. Found {len(training_pair_df)}, need at least {min_required_points}" + ) + return None + + # Check for non-finite values + if not np.isfinite(training_pair_df).all().all(): + print(f"{pair}: Data contains non-finite values (NaN or inf)") + return None + + # Fit the VECM + try: + vecm_fit = fit_VECM(training_pair_df, pair=pair) + except Exception as e: + print(f"{pair}: VECM fitting failed: {str(e)}") + return None + + # Add safeguard against division by zero + if ( + abs(vecm_fit.beta[1]) < CONFIG["zero_threshold"] + ): # Small threshold to avoid division by very small numbers + print(f"{pair}: Skipping due to near-zero beta[1] value: {vecm_fit.beta[1]}") + return None + + try: + pair_trades = create_trading_signals( + vecm_fit=vecm_fit, + testing_pair_df=testing_pair_df, + pair=pair, + colname_a=colname_a, + colname_b=colname_b + ) + except Exception as e: + print(f"{pair}: Prediction failed: {str(e)}") + return None + + return pair_trades + + +def run_pairs(summaries_df: pd.DataFrame, price_column: str) -> None: + + result_df = transform_dataframe(df=summaries_df, price_column=price_column) + + stock_price_columns = [ + column + for column in result_df.columns + if column.startswith(f"{price_column}_") + ] + + # Find the starting indices for A and B + all_indexes = range(len(stock_price_columns)) + unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j] + + pairs_trades = [] + for a_index, b_index in unique_index_pairs: + # Get the actual variable names + colname_a = stock_price_columns[a_index] + colname_b = stock_price_columns[b_index] + + symbol_a = colname_a[len(f"{price_column}-") :] + symbol_b = colname_b[len(f"{price_column}-") :].replace( + "STOCK-", "" + ) + pair = Pair(symbol_a, symbol_b, price_column) + + single_pair_trades = run_single_pair(market_data=result_df, price_column=price_column, pair=pair) + if single_pair_trades is not None: + pairs_trades.append(single_pair_trades) + # Check if result_list has any data before concatenating + if not pairs_trades: + print("No trading signals found for any pairs") + return None + + result = pd.concat(pairs_trades, ignore_index=True) + result["time"] = pd.to_datetime(result["time"]) + result = result.set_index("time").sort_index() + + collect_single_day_results(result) + # print_single_day_results(result) + + +def add_trade(pair, symbol, action, price): + # Ensure we always use clean names without STOCK- prefix + pair = str(pair).replace("STOCK-", "") + symbol = symbol.replace("STOCK-", "") + + if pair not in TRADES: + TRADES[pair] = {symbol: []} + if symbol not in TRADES[pair]: + TRADES[pair][symbol] = [] + TRADES[pair][symbol].append((action, price)) + +def collect_single_day_results(result): + if result is None: + return + + print("\n -------------- Suggested Trades ") + print(result) + + for row in result.itertuples(): + action = row.action + symbol = row.symbol + price = row.price + add_trade(pair=row.pair, action=action, symbol=symbol, price=price) + +def print_single_day_results(result): + for pair, symbols in TRADES.items(): + print(f"\n--- {pair} ---") + for symbol, trades in symbols.items(): + for side, price in trades: + print(f"{symbol} {side} at ${price}") + +def print_results_suummary(all_results): + # Summary of all processed files + print("\n====== Summary of All Processed Files ======") + for filename, data in all_results.items(): + trade_count = sum( + len(trades) + for symbol_trades in data["trades"].values() + for trades in symbol_trades.values() + ) + print(f"{filename}: {trade_count} trades") + +def calculate_returns(all_results: Dict): + print("\n====== Returns By Day and Pair ======") + + for filename, data in all_results.items(): + day_return = 0 + print(f"\n--- {filename} ---") + + # Process each pair + for pair, symbols in data["trades"].items(): + pair_return = 0 + pair_trades = [] + + # Calculate individual symbol returns in the pair + for symbol, trades in symbols.items(): + if len(trades) >= 2: # Need at least entry and exit + # Get entry and exit trades + entry_action, entry_price = trades[0] + exit_action, exit_price = trades[1] + + # Calculate return based on action + symbol_return = 0 + if entry_action == "BUY" and exit_action == "SELL": + # Long position + symbol_return = (exit_price - entry_price) / entry_price * 100 + elif entry_action == "SELL" and exit_action == "BUY": + # Short position + symbol_return = (entry_price - exit_price) / entry_price * 100 + + pair_trades.append( + ( + symbol, + entry_action, + entry_price, + exit_action, + exit_price, + symbol_return, + ) + ) + pair_return += symbol_return + + # Print pair returns + if pair_trades: + print(f" {pair}:") + for ( + symbol, + entry_action, + entry_price, + exit_action, + exit_price, + symbol_return, + ) in pair_trades: + print( + f" {symbol}: {entry_action} @ ${entry_price:.2f}, {exit_action} @ ${exit_price:.2f}, Return: {symbol_return:.2f}%" + ) + print(f" Pair Total Return: {pair_return:.2f}%") + day_return += pair_return + + # Print day total return + if day_return != 0: + print(f" Day Total Return: {day_return:.2f}%") + +if __name__ == "__main__": + # Initialize a dictionary to store all trade results + all_results = {} + + # Process each data file + price_column = CONFIG["price_column"] + for datafile in CONFIG["datafiles"]: + print(f"\n====== Processing {datafile} ======") + + # Clear the TRADES global dictionary for the new file + TRADES.clear() + + # Process data for this file + try: + run_pairs( + summaries_df=load_market_data(f'{CONFIG["data_directory"]}/{datafile}'), + price_column=price_column + ) + + # Store results with file name as key + filename = datafile.split("/")[-1] + all_results[filename] = {"trades": TRADES.copy()} + + print(f"Successfully processed {filename}") + except Exception as e: + print(f"Error processing {datafile}: {str(e)}") + + # print_results_suummary(all_results) + calculate_returns(all_results) diff --git a/src/tools/__pycache__/data_loader.cpython-312.pyc b/src/tools/__pycache__/data_loader.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..dcebc3277b129ae86158452c5ed18147976fd091 GIT binary patch literal 1233 zcmah}&1>976n~?UR=aCSYuBd4f!a%5nl24iLoOkS19k`$69Oer+Pw%_Gyd3WSITJY z^_q1YLJIbwIFLXVN+Ic`E$$!Ct9>f;k~D_66JJVt$!&>yD9)ubvK8mx(kJxhJ^g;~ zW8O&m&9;jm<=0?z_Ou4@LOGe}hsItgjR!!0pb)ADE2^T@*Q%Ikcc7|MgW#_|0$^ng zs7|3~WoeF9q7O=k7(eGWWGy9CQ?m2a+&~9Df);9_iuqEx*MdV0By)}E^A!Hrc%pg*3sHVZ7u6(8MiS(ypW+AhsFZ1zP48D$(|pr83Vy} zd5r*n106Sk4}c9U>%5eah}PD>h3~YiR|L1^e3$i`dK2;9jFmQF-H6Xp=EW?k$!-!i zh-3{q!+cf1O<3UaC<+so_}urx$S0JI#w~F|nW|&C8IMPvf~T2Zr))r$Qzet{rMd-O z%{B&iOYg3)tgUpKPfMp(^}p=WGkfTXJ=B?hNS@jsuWCD`zHTpA@7(|V-WN}L-rKQy z*C*E|zq#7QU=98AcL}Wi^|7_FSD+crZr;A@bmluVkDU*<^QU(5_UqlAfj5xZG43`W zJHy-gvVyvX40+wn|5x(;#FcH{K0R*1rqw^;;LYLE_$b~SwUm9nG*Q4?1w-0f4xZ?P zt-j%jPq4^)UfrK-c-~9(RSBz3yfby_Px&mVAW`K0R7kys9EUt|oU{_AzA2H5{RZ(9 zW=bcWlS-smo+osl%`6B^rq3|-FZc=eqz2Y6)qehBJtFfVy~N7WRV9-#sk#^;^a6^{ z!Fdk-uXG0ucFUk!t4qJ*!9R-cKRo-xxku-I9Dj7-mrL8l$y=BI)C;R4Ur%;T&`+{q Gnc-h0IUNfC literal 0 HcmV?d00001 diff --git a/src/tools/data_loader.py b/src/tools/data_loader.py new file mode 100644 index 0000000..67f76a6 --- /dev/null +++ b/src/tools/data_loader.py @@ -0,0 +1,25 @@ + +import sys +import sqlite3 +import pandas as pd + +def load_sqlite_to_dataframe(db_path, query): + try: + conn = sqlite3.connect(db_path) + + df = pd.read_sql_query(query, conn) + return df + except sqlite3.Error as excpt: + print(f"SQLite error: {excpt}") + raise + except Exception as e: + print(f"Error: {excpt}") + raise + finally: + if 'conn' in locals(): + conn.close() + +if __name__ == "__main__": + df1 = load_sqlite_to_dataframe(sys.argv[1], table_name='md_1min_bars') + + print(df1) \ No newline at end of file