This commit is contained in:
Oleg Sheynin 2025-05-25 23:05:17 -04:00
commit 5c6841820b
9 changed files with 1711 additions and 0 deletions

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
# SpecStory explanation file
.specstory/
.history/
.cursorindexingignore
data
.vscode/
cvttpy

Binary file not shown.

521
src/cointegration-03.py Normal file
View File

@ -0,0 +1,521 @@
import datetime
import sys
import json
from typing import Any, Dict, List
from cvttpy.trading.instrument import ExchangeInstrument
from cvttpy.tools.timeutils import NanoPerMin
import pandas as pd
import numpy as np
# ============= statsmodels ===================
from statsmodels.tsa.vector_ar.vecm import VECM
UNSET_FLOAT: float = sys.float_info.max
UNSET_INT: int = sys.maxsize
# ------------------------ Configuration ------------------------
# Default configuration
CONFIG = {
"exchange_id": "ALPACA",
"datafiles": [
"./data/equity/20250508.alpaca_sim_md.db",
# "./data/equity/20250509.alpaca_sim_md.db",
# "./data/equity/20250512.alpaca_sim_md.db",
# "./data/equity/20250513.alpaca_sim_md.db",
# "./data/equity/20250514.alpaca_sim_md.db",
# "./data/equity/20250515.alpaca_sim_md.db",
# "./data/equity/20250516.alpaca_sim_md.db",
# "./data/equity/20250519.alpaca_sim_md.db",
# "./data/equity/20250520.alpaca_sim_md.db"
],
"instruments": [
"COIN",
"GBTC",
# "HOOD",
# "MSTR",
# "PYPL",
],
"trading_hours": {"begin_session": "14:30:00", "end_session": "21:00:00"},
"price_aggregate": "close",
"min_required_points": 30,
"zero_threshold": 1e-10,
"equilibrium_threshold": 10.0,
"training_minutes": 120,
}
# ====== later ===================
# # Try to load configuration from file, fall back to defaults if not found
# CONFIG_FILE = "config.json"
# try:
# with open(CONFIG_FILE, "r") as f:
# user_config = json.load(f)
# CONFIG.update(user_config)
# print(f"Loaded configuration from {CONFIG_FILE}")
# except (FileNotFoundError, json.JSONDecodeError) as e:
# print(f"Using default configuration. Error loading {CONFIG_FILE}: {str(e)}")
# # Create a default config file if it doesn't exist
# try:
# with open(CONFIG_FILE, "w") as f:
# json.dump(CONFIG, f, indent=4)
# print(f"Created default configuration file: {CONFIG_FILE}")
# except Exception as e:
# print(f"Warning: Could not create default config file: {str(e)}")
# ------------------------ Settings ------------------------
TRADES = {}
def load_summaries(datafile: str) -> pd.DataFrame:
from tools.data_loader import load_sqlite_to_dataframe
instrument_ids = ["\"" + "STOCK-" + instrument + "\"" for instrument in CONFIG["instruments"]]
exchange_id = CONFIG["exchange_id"]
query = "select tstamp"
query += ", tstamp_ns as time_ns"
query += ", substr(instrument_id, 7) as symbol"
query += ", open"
query += ", high"
query += ", low"
query += ", close"
query += ", volume"
query += ", num_trades"
query += ", vwap"
query += " from md_1min_bars"
query += f" where exchange_id ='{exchange_id}'"
query += f" and instrument_id in ({','.join(instrument_ids)})"
df = load_sqlite_to_dataframe(db_path=datafile, query=query)
# Trading Hours
date_str = df["tstamp"][0][0:10]
start_time = f"{date_str} {CONFIG['trading_hours']['begin_session']}"
end_time = f"{date_str} {CONFIG['trading_hours']['end_session']}"
# Perform boolean selection
df = df[(df["tstamp"] >= start_time) & (df["tstamp"] <= end_time)]
df["tstamp"] = pd.to_datetime(df["tstamp"])
return df
def transform_dataframe(df):
# Select only the columns we need
px_col = CONFIG["price_aggregate"]
selected_columns = ["tstamp", "symbol", px_col]
df_selected = df[selected_columns]
# Start with unique timestamps
result_df = df_selected["tstamp"].drop_duplicates().reset_index(drop=True)
# For each unique symbol, add a corresponding close price column
for symbol in df_selected["symbol"].unique():
# Filter rows for this symbol
df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True)
# Create column name like "close-COIN"
price_column = f"{px_col}-{symbol}"
# Create temporary dataframe with timestamp and price
temp_df = pd.DataFrame({
"tstamp": df_symbol["tstamp"],
price_column: df_symbol[px_col]
})
# Join with our result dataframe
result_df = pd.merge(result_df, temp_df, on="tstamp", how="left")
return result_df
def process_summaries(summaries_df: pd.DataFrame) -> None:
result_df = transform_dataframe(summaries_df)
price_columns = [
column
for column in result_df.columns
if column.startswith(f"{CONFIG['price_aggregate']}-")
]
# ========================= Split into training and testing datasets =========================
# Training dataset: first CONFIG[training_minutes] rows
training_ts_df = result_df.iloc[:CONFIG["training_minutes"], :].copy().reset_index(drop=True)
# Testing dataset: remaining rows after training period
testing_ts_df = result_df.iloc[CONFIG["training_minutes"]:, :].copy().reset_index(drop=True)
# Store timestamps for later use
testing_timestamps = testing_ts_df["tstamp"].copy()
# Find the starting indices for A and B
all_indexes = range(len(price_columns))
unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j]
result_columns = [
"time",
"action",
"symbol",
"price",
"divergence",
"pair",
]
result_list = []
px_col = CONFIG["price_aggregate"]
for a_index, b_index in unique_index_pairs:
try:
# Get the actual variable names
col_name_a = price_columns[a_index]
col_name_b = price_columns[b_index]
symbol_a = col_name_a[len(f"{px_col}-") :]
symbol_b = col_name_b[len(f"{px_col}-") :].replace(
"STOCK-", ""
)
pair = f"{symbol_a} & {symbol_b}"
# ===== Training dataset =====
training_pair_df = pd.DataFrame(
{
f"{col_name_a}": training_ts_df[col_name_a],
f"{col_name_b}": training_ts_df[col_name_b],
}
).dropna().reset_index(drop=True)
# Check if we have enough data points for a meaningful analysis
min_required_points = CONFIG[
"min_required_points"
] # Minimum number of points for a reasonable VECM model
if len(training_pair_df) < min_required_points:
print(
f"{pair}: Not enough data points for analysis. Found {len(training_pair_df)}, need at least {min_required_points}"
)
continue
# Check for non-finite values
if not np.isfinite(training_pair_df).all().all():
print(f"{pair}: Data contains non-finite values (NaN or inf)")
continue
# ===== Testing dataset =====
testing_pair_df = pd.DataFrame(
{
f"{col_name_a}": testing_ts_df[col_name_a],
f"{col_name_b}": testing_ts_df[col_name_b],
}
).dropna()
testing_pair_df = testing_pair_df.reset_index(drop=True)
testing_pair_df = pd.merge(testing_timestamps.to_frame(), testing_pair_df , left_index=True, right_index=True)
# Fit the VECM
try:
vecm_model = VECM(training_pair_df, coint_rank=1)
vecm_fit = vecm_model.fit()
# Check if the model converged properly
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
print(f"{pair}: VECM model failed to converge properly")
continue
beta = vecm_fit.beta
# Predict the next step
try:
next_values = vecm_fit.predict(steps=len(testing_pair_df))
except Exception as e:
print(f"{pair}: Prediction failed: {str(e)}")
continue
# Convert prediction to a DataFrame for readability
predicted_df = pd.DataFrame(next_values, columns=[col_name_a, col_name_b])
except Exception as e:
print(f"{pair}: VECM model fitting failed: {str(e)}")
continue
predicted_df["equilibrium_term"] = (
beta[0] * predicted_df[col_name_a]
+ beta[1] * predicted_df[col_name_b]
)
pair_result_df = pd.merge(
testing_pair_df, predicted_df, left_index=True, right_index=True, suffixes=('', '_pred')
).dropna()
pair_result_df["testing_eqlbrm_term"] = (
beta[0] * pair_result_df[col_name_a]
+ beta[1] * pair_result_df[col_name_b]
)
pair_result_df["abs_testing_eqlbrm_term"] = np.abs(pair_result_df["testing_eqlbrm_term"])
# Check if the first value is non-zero to avoid division by zero
initial_abs_term = pair_result_df.loc[0, "abs_testing_eqlbrm_term"]
if (
initial_abs_term < CONFIG["zero_threshold"]
): # Small threshold to avoid division by very small numbers
print(
f"{pair}: Skipping due to near-zero initial equilibrium term: {initial_abs_term}"
)
continue
condition = (
pair_result_df["abs_testing_eqlbrm_term"]
< initial_abs_term / CONFIG["equilibrium_threshold"]
)
first_row_index = next(
(index for index, value in condition.items() if value), None
)
if first_row_index is not None:
first_row = pair_result_df.loc[first_row_index]
fr_df = first_row.to_frame().T
# Add safeguard against division by zero
if (
abs(beta[1]) < CONFIG["zero_threshold"]
): # Small threshold to avoid division by very small numbers
print(f"{pair}: Skipping due to near-zero beta[1] value: {beta[1]}")
continue
if predicted_df.iloc[0, 1] > predicted_df.iloc[0, 0] / abs(beta[1]):
my_tuple_a1 = (
pair_result_df["tstamp"].iloc[0],
"BUY",
symbol_a,
pair_result_df[f"{col_name_a}"].iloc[0],
pair_result_df["testing_eqlbrm_term"].iloc[0],
pair,
)
my_tuple_b1 = (
pair_result_df["tstamp"].iloc[0],
"SELL",
symbol_b,
pair_result_df[f"{col_name_b}"].iloc[0],
pair_result_df["testing_eqlbrm_term"].iloc[0],
pair,
)
my_tuple_a2 = (
fr_df["tstamp"].iloc[0],
"SELL",
symbol_a,
fr_df[f"{col_name_a}"].iloc[0],
fr_df["abs_testing_eqlbrm_term"].iloc[0],
pair,
)
my_tuple_b2 = (
fr_df["tstamp"].iloc[0],
"BUY",
symbol_b,
fr_df[f"{col_name_b}"].iloc[0],
fr_df["abs_testing_eqlbrm_term"].iloc[0],
pair,
)
else:
my_tuple_a1 = (
pair_result_df["tstamp"].iloc[0],
"SELL",
symbol_a,
pair_result_df[f"testing_{col_name_a}"].iloc[0],
pair_result_df["testing_eqlbrm_term"].iloc[0],
pair,
)
my_tuple_b1 = (
pair_result_df["tstamp"].iloc[0],
"BUY",
symbol_b,
pair_result_df[f"testing_{col_name_b}"].iloc[0],
pair_result_df["testing_eqlbrm_term"].iloc[0],
pair,
)
my_tuple_a2 = (
fr_df["tstamp"].iloc[0],
"BUY",
symbol_a,
fr_df[f"testing_{col_name_a}"].iloc[0],
fr_df["abs_testing_eqlbrm_term"].iloc[0],
pair,
)
my_tuple_b2 = (
fr_df["tstamp"].iloc[0],
"SELL",
symbol_b,
fr_df[f"testing_{col_name_b}"].iloc[0],
fr_df["abs_testing_eqlbrm_term"].iloc[0],
pair,
)
# Add tuples to data frame
tuple_df = pd.DataFrame(
[my_tuple_a1, my_tuple_b1, my_tuple_a2, my_tuple_b2],
columns=result_columns,
)
# print(tuple_df)
result_list.append(tuple_df)
else:
print(f"{pair}: NO SIGNAL FOUND")
except KeyError:
print(
f"Error: Column '{price_columns[a_index]}' or '{price_columns[b_index]}' not found."
)
return []
# Check if result_list has any data before concatenating
if not result_list:
print("No trading signals found for any pairs")
return None
result = pd.concat(result_list, ignore_index=True)
result["time"] = pd.to_datetime(result["time"])
result = result.set_index("time").sort_index()
collect_single_day_results(result)
# print_single_day_results(result)
def add_trade(pair, symbol, action, price):
# Ensure we always use clean names without STOCK- prefix
pair = pair.replace("STOCK-", "")
symbol = symbol.replace("STOCK-", "")
if pair not in TRADES:
TRADES[pair] = {symbol: []}
if symbol not in TRADES[pair]:
TRADES[pair][symbol] = []
TRADES[pair][symbol].append((action, price))
def collect_single_day_results(result):
if result is None:
return
print("\n -------------- Suggested Trades ")
print(result)
for row in result.itertuples():
action = row.action
symbol = row.symbol
price = row.price
add_trade(pair=row.pair, action=action, symbol=symbol, price=price)
def print_single_day_results(result):
for pair, symbols in TRADES.items():
print(f"\n--- {pair} ---")
for symbol, trades in symbols.items():
for side, price in trades:
print(f"{symbol} {side} at ${price}")
def print_results_suummary(all_results):
# Summary of all processed files
print("\n====== Summary of All Processed Files ======")
for filename, data in all_results.items():
trade_count = sum(
len(trades)
for symbol_trades in data["trades"].values()
for trades in symbol_trades.values()
)
print(f"{filename}: {trade_count} trades")
def calculate_returns(all_results):
print("\n====== Returns By Day and Pair ======")
for filename, data in all_results.items():
day_return = 0
print(f"\n--- {filename} ---")
# Process each pair
for pair, symbols in data["trades"].items():
pair_return = 0
pair_trades = []
# Calculate individual symbol returns in the pair
for symbol, trades in symbols.items():
if len(trades) >= 2: # Need at least entry and exit
# Get entry and exit trades
entry_action, entry_price = trades[0]
exit_action, exit_price = trades[1]
# Calculate return based on action
symbol_return = 0
if entry_action == "BUY" and exit_action == "SELL":
# Long position
symbol_return = (exit_price - entry_price) / entry_price * 100
elif entry_action == "SELL" and exit_action == "BUY":
# Short position
symbol_return = (entry_price - exit_price) / entry_price * 100
pair_trades.append(
(
symbol,
entry_action,
entry_price,
exit_action,
exit_price,
symbol_return,
)
)
pair_return += symbol_return
# Print pair returns
if pair_trades:
print(f" {pair}:")
for (
symbol,
entry_action,
entry_price,
exit_action,
exit_price,
symbol_return,
) in pair_trades:
print(
f" {symbol}: {entry_action} @ ${entry_price:.2f}, {exit_action} @ ${exit_price:.2f}, Return: {symbol_return:.2f}%"
)
print(f" Pair Total Return: {pair_return:.2f}%")
day_return += pair_return
# Print day total return
if day_return != 0:
print(f" Day Total Return: {day_return:.2f}%")
if __name__ == "__main__":
# Initialize a dictionary to store all trade results
all_results = {}
# Process each data file
for datafile in CONFIG["datafiles"]:
print(f"\n====== Processing {datafile} ======")
# Clear the TRADES global dictionary for the new file
TRADES.clear()
# Process data for this file
try:
file_results = process_summaries(
summaries_df=load_summaries(datafile)
)
# Store results with file name as key
filename = datafile.split("/")[-1]
all_results[filename] = {"trades": TRADES.copy(), "results": file_results}
print(f"Successfully processed {filename}")
except Exception as e:
print(f"Error processing {datafile}: {str(e)}")
# print_results_suummary(all_results)
calculate_returns(all_results)

43
src/direct_solution.py Normal file
View File

@ -0,0 +1,43 @@
import pandas as pd
# This example shows how to transform your dataframe to have columns like close-COIN, close-PYPL, etc.
# Assuming df is your input dataframe with the structure shown in your question
def transform_dataframe(df):
# Select only the columns we need
selected_columns = ["tstamp", "symbol", "close"]
df_selected = df[selected_columns]
# Start with unique timestamps
result_df = df_selected["tstamp"].drop_duplicates().reset_index(drop=True)
# For each unique symbol, add a corresponding close price column
for symbol in df_selected["symbol"].unique():
# Filter rows for this symbol
df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True)
# Create column name like "close-COIN"
price_column = f"close-{symbol}"
# Create temporary dataframe with timestamp and price
temp_df = pd.DataFrame({
"tstamp": df_symbol["tstamp"],
price_column: df_symbol["close"]
})
# Join with our result dataframe
result_df = pd.merge(result_df, temp_df, on="tstamp", how="left")
return result_df
# Example usage (assuming df is your input dataframe):
# result_df = transform_dataframe(df)
# print(result_df.head())
"""
The resulting dataframe will look like:
tstamp close-COIN close-GBTC close-HOOD close-MSTR close-PYPL
0 2025-05-20 14:30:00 262.3650 45.1234 21.567 935.42 72.1611
1 2025-05-20 14:31:00 262.5850 45.2100 21.589 935.67 72.1611
...
"""

77
src/pivot_example.py Normal file
View File

@ -0,0 +1,77 @@
import pandas as pd
# Assuming your dataframe is named 'df'
def pivot_dataframe(df):
# Convert timestamp to datetime if it's not already
df['tstamp'] = pd.to_datetime(df['tstamp'])
# Create a pivot table with 'tstamp' as index and 'symbol' as columns
# You can choose any aggregation function (mean, first, last, etc.)
# if there are multiple values per timestamp/symbol
pivoted_df = pd.pivot_table(
df,
values=['close', 'open', 'high', 'low', 'volume'],
index='tstamp',
columns='symbol',
aggfunc='first' # Use 'first' if there's only one value per timestamp/symbol
)
# Flatten the multi-level column names
pivoted_df.columns = [f"{col[0]}-{col[1]}" for col in pivoted_df.columns]
# Reset index to make tstamp a column
pivoted_df = pivoted_df.reset_index()
return pivoted_df
# Example usage:
# pivoted_df = pivot_dataframe(your_dataframe)
# print(pivoted_df.head())
# Alternative approach (similar to your code):
def pivot_alternative(df):
# Create an empty dataframe with just the timestamp
result_df = df['tstamp'].drop_duplicates().reset_index(drop=True)
# For each symbol, create a separate column for each metric
for symbol in df['symbol'].unique():
# Filter dataframe for this symbol
df_symbol = df[df['symbol'] == symbol].reset_index(drop=True)
# Create columns for each price/volume metric
for metric in ['close', 'open', 'high', 'low', 'volume']:
column_name = f"{metric}-{symbol}"
# Create a temporary dataframe with tstamp and the new column
temp_df = pd.DataFrame({
'tstamp': df_symbol['tstamp'],
column_name: df_symbol[metric]
})
# Merge with the result dataframe
result_df = pd.merge(result_df, temp_df, on='tstamp', how='left')
return result_df
# Example using the code similar to what's in your main code:
"""
# Selected columns from original dataframe
selected_columns = ["tstamp", "symbol", "close"]
df_selected = df[selected_columns]
# Create result dataframe starting with timestamps
result_df = df_selected["tstamp"].drop_duplicates().reset_index(drop=True)
# For each symbol, add a column with its close price
for symbol in df_selected["symbol"].unique():
df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True)
price_column = f"close-{symbol}"
# Create temp dataframe with tstamp and the price column
temp_df = pd.DataFrame({
"tstamp": df_symbol["tstamp"],
price_column: df_symbol["close"]
})
# Merge with result dataframe
result_df = pd.merge(result_df, temp_df, on="tstamp", how="left")
"""

515
src/pt_backtest.py Normal file
View File

@ -0,0 +1,515 @@
import datetime
import sys
import json
from typing import Any, Dict, List, Tuple, Optional
import pandas as pd
import numpy as np
# ============= statsmodels ===================
from statsmodels.tsa.vector_ar.vecm import VECM
NanoPerMin = 1e9
UNSET_FLOAT: float = sys.float_info.max
UNSET_INT: int = sys.maxsize
# ------------------------ Configuration ------------------------
# Default configuration
CONFIG = {
"exchange_id": "ALPACA",
"data_directory": "./data/equity",
"datafiles": [
"20250508.alpaca_sim_md.db",
# "20250509.alpaca_sim_md.db",
# "20250512.alpaca_sim_md.db",
# "20250513.alpaca_sim_md.db",
# "20250514.alpaca_sim_md.db",
# "20250515.alpaca_sim_md.db",
# "20250516.alpaca_sim_md.db",
# "20250519.alpaca_sim_md.db",
# "20250520.alpaca_sim_md.db"
],
"instruments": [
"COIN",
"GBTC",
"HOOD",
"MSTR",
"PYPL",
],
"trading_hours": {"begin_session": "14:30:00", "end_session": "21:00:00"},
"price_column": "close",
"min_required_points": 30,
"zero_threshold": 1e-10,
"equilibrium_threshold": 10.0,
# "training_minutes": 120,
"training_minutes": 180,
}
# ====== later ===================
# # Try to load configuration from file, fall back to defaults if not found
# CONFIG_FILE = "config.json"
# try:
# with open(CONFIG_FILE, "r") as f:
# user_config = json.load(f)
# CONFIG.update(user_config)
# print(f"Loaded configuration from {CONFIG_FILE}")
# except (FileNotFoundError, json.JSONDecodeError) as e:
# print(f"Using default configuration. Error loading {CONFIG_FILE}: {str(e)}")
# # Create a default config file if it doesn't exist
# try:
# with open(CONFIG_FILE, "w") as f:
# json.dump(CONFIG, f, indent=4)
# print(f"Created default configuration file: {CONFIG_FILE}")
# except Exception as e:
# print(f"Warning: Could not create default config file: {str(e)}")
# ------------------------ Settings ------------------------
TRADES = {}
class Pair:
symbol_a_: str
symbol_b_: str
def __init__(self, symbol_a: str, symbol_b: str):
self.symbol_a_ = symbol_a
self.symbol_b_ = symbol_b
def __repr__(self) ->str:
return f"{self.symbol_a_} & {self.symbol_b_}"
def load_market_data(datafile: str) -> pd.DataFrame:
from tools.data_loader import load_sqlite_to_dataframe
instrument_ids = ["\"" + "STOCK-" + instrument + "\"" for instrument in CONFIG["instruments"]]
exchange_id = CONFIG["exchange_id"]
query = "select tstamp"
query += ", tstamp_ns as time_ns"
query += ", substr(instrument_id, 7) as symbol"
query += ", open"
query += ", high"
query += ", low"
query += ", close"
query += ", volume"
query += ", num_trades"
query += ", vwap"
query += " from md_1min_bars"
query += f" where exchange_id ='{exchange_id}'"
query += f" and instrument_id in ({','.join(instrument_ids)})"
df = load_sqlite_to_dataframe(db_path=datafile, query=query)
# Trading Hours
date_str = df["tstamp"][0][0:10]
start_time = f"{date_str} {CONFIG['trading_hours']['begin_session']}"
end_time = f"{date_str} {CONFIG['trading_hours']['end_session']}"
# Perform boolean selection
df = df[(df["tstamp"] >= start_time) & (df["tstamp"] <= end_time)]
df["tstamp"] = pd.to_datetime(df["tstamp"])
return df
def transform_dataframe(df: pd.DataFrame, price_column: str):
# Select only the columns we need
df_selected = df[["tstamp", "symbol", price_column]]
# Start with unique timestamps
result_df = df_selected["tstamp"].drop_duplicates().reset_index(drop=True)
# For each unique symbol, add a corresponding close price column
for symbol in df_selected["symbol"].unique():
# Filter rows for this symbol
df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True)
# Create column name like "close-COIN"
new_price_column = f"{price_column}_{symbol}"
# Create temporary dataframe with timestamp and price
temp_df = pd.DataFrame({
"tstamp": df_symbol["tstamp"],
new_price_column: df_symbol[price_column]
})
# Join with our result dataframe
result_df = pd.merge(result_df, temp_df, on="tstamp", how="left")
result_df = result_df.reset_index(drop=True) # do not dropna() since irrelevant symbol would affect dataset
return result_df
def get_datasets(df: pd.DataFrame, training_minutes: int, colname_a: str, colname_b: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
# Training dataset
df = df[["tstamp", colname_a, colname_b]]
df = df.dropna()
training_df = df.iloc[:training_minutes - 1, :].copy()
training_df.reset_index(drop=True).dropna().reset_index(drop=True)
# Testing dataset
testing_df = df.iloc[training_minutes:, :].copy()
testing_df.reset_index(drop=True).dropna().reset_index(drop=True)
return (training_df, testing_df)
def fit_VECM(training_pair_df, colname_a, colname_b):
vecm_model = VECM(training_pair_df[[colname_a, colname_b]].reset_index(drop=True), coint_rank=1)
vecm_fit = vecm_model.fit()
# Check if the model converged properly
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
print(f"{pair}: VECM model failed to converge properly")
return vecm_fit
def create_trading_signals(vecm_fit, testing_pair_df, symbol_a, symbol_b, colname_a, colname_b) -> pd.DataFrame:
result_columns = [
"time",
"action",
"symbol",
"price",
"divergence",
"pair",
]
pair = Pair(symbol_a=symbol_a, symbol_b=symbol_b)
next_values = vecm_fit.predict(steps=len(testing_pair_df))
# Convert prediction to a DataFrame for readability
predicted_df = pd.DataFrame(next_values, columns=[colname_a, colname_b])
beta = vecm_fit.beta
predicted_df["equilibrium_term"] = (
beta[0] * predicted_df[colname_a]
+ beta[1] * predicted_df[colname_b]
)
pair_result_df = pd.merge(
testing_pair_df.reset_index(drop=True), predicted_df, left_index=True, right_index=True, suffixes=('', '_pred')
).dropna()
pair_result_df["testing_eqlbrm_term"] = (
beta[0] * pair_result_df[colname_a]
+ beta[1] * pair_result_df[colname_b]
)
pair_result_df["abs_testing_eqlbrm_term"] = np.abs(pair_result_df["testing_eqlbrm_term"])
# Check if the first value is non-zero to avoid division by zero
pair_result_df = pair_result_df.reset_index()
initial_abs_term = pair_result_df["abs_testing_eqlbrm_term"][0]
if (
initial_abs_term < CONFIG["zero_threshold"]
): # Small threshold to avoid division by very small numbers
print(
f"{pair}: Skipping pair due to near-zero initial equilibrium: {initial_abs_term}"
)
return pd.DataFrame()
trading_signals_df = (
pair_result_df["abs_testing_eqlbrm_term"]
< initial_abs_term / CONFIG["equilibrium_threshold"]
)
close_row_index = next(
(index for index, value in trading_signals_df.items() if value), None
)
if close_row_index is None:
print(f"{pair}: NO SIGNAL FOUND")
return pd.DataFrame()
open_row = pair_result_df.loc[0]
close_row = pair_result_df.loc[close_row_index]
open_tstamp = open_row["tstamp"]
open_eqlbrm = open_row["testing_eqlbrm_term"]
open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"]
close_tstamp = close_row["tstamp"]
close_eqlbrm = close_row["testing_eqlbrm_term"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
abs_beta = abs(beta[1])
pred_px_b = pair_result_df.loc[0][f"{colname_b}_pred"]
pred_px_a = pair_result_df.loc[0][f"{colname_a}_pred"]
if pred_px_b * abs_beta - pred_px_a > 0:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
else:
open_side_b = "BUY"
open_side_a = "SELL"
close_side_b = "SELL"
close_side_a = "BUY"
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
symbol_a,
open_px_a,
open_eqlbrm,
pair,
),
(
open_tstamp,
open_side_b,
symbol_b,
open_px_b,
open_eqlbrm,
pair,
),
(
close_tstamp,
close_side_a,
symbol_a,
close_px_a,
close_eqlbrm,
pair,
),
(
close_tstamp,
close_side_b,
symbol_b,
close_px_b,
close_eqlbrm,
pair,
),
]
# Add tuples to data frame
return pd.DataFrame(
trd_signal_tuples,
columns=result_columns,
)
def run_single_pair(market_data: pd.DataFrame, price_column:str, symbol_a: str, symbol_b: str) -> Optional[pd.DataFrame]:
colname_a = f"{price_column}_{symbol_a}"
colname_b = f"{price_column}_{symbol_b}"
training_pair_df, testing_pair_df = get_datasets(df=market_data, training_minutes=CONFIG["training_minutes"], colname_a=colname_a, colname_b=colname_b)
# Check if we have enough data points for a meaningful analysis
min_required_points = CONFIG[
"min_required_points"
] # Minimum number of points for a reasonable VECM model
if len(training_pair_df) < min_required_points:
print(
f"{pair}: Not enough data points for analysis. Found {len(training_pair_df)}, need at least {min_required_points}"
)
return None
# Check for non-finite values
if not np.isfinite(training_pair_df).all().all():
print(f"{pair}: Data contains non-finite values (NaN or inf)")
return None
# Fit the VECM
try:
vecm_fit = fit_VECM(training_pair_df, colname_a=colname_a, colname_b=colname_b)
except Exception as e:
print(f"{pair}: VECM fitting failed: {str(e)}")
return None
# Add safeguard against division by zero
if (
abs(vecm_fit.beta[1]) < CONFIG["zero_threshold"]
): # Small threshold to avoid division by very small numbers
print(f"{pair}: Skipping due to near-zero beta[1] value: {vecm_fit.beta[1]}")
return None
try:
pair_trades = create_trading_signals(
vecm_fit=vecm_fit,
testing_pair_df=testing_pair_df,
symbol_a=symbol_a,
symbol_b=symbol_b,
colname_a=colname_a,
colname_b=colname_b
)
except Exception as e:
print(f"{pair}: Prediction failed: {str(e)}")
return
return pair_trades
def run_pairs(summaries_df: pd.DataFrame, price_column: str) -> None:
result_df = transform_dataframe(df=summaries_df, price_column=price_column)
stock_price_columns = [
column
for column in result_df.columns
if column.startswith(f"{price_column}_")
]
# Find the starting indices for A and B
all_indexes = range(len(stock_price_columns))
unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j]
pairs_trades = []
for a_index, b_index in unique_index_pairs:
# Get the actual variable names
colname_a = stock_price_columns[a_index]
colname_b = stock_price_columns[b_index]
symbol_a = colname_a[len(f"{price_column}-") :]
symbol_b = colname_b[len(f"{price_column}-") :].replace(
"STOCK-", ""
)
pair = f"{symbol_a} & {symbol_b}"
single_pair_trades = run_single_pair(market_data=result_df, price_column=price_column, symbol_a=symbol_a, symbol_b=symbol_b)
if single_pair_trades is not None:
pairs_trades.append(single_pair_trades)
# Check if result_list has any data before concatenating
if not pairs_trades:
print("No trading signals found for any pairs")
return None
result = pd.concat(pairs_trades, ignore_index=True)
result["time"] = pd.to_datetime(result["time"])
result = result.set_index("time").sort_index()
collect_single_day_results(result)
# print_single_day_results(result)
def add_trade(pair, symbol, action, price):
# Ensure we always use clean names without STOCK- prefix
pair = str(pair).replace("STOCK-", "")
symbol = symbol.replace("STOCK-", "")
if pair not in TRADES:
TRADES[pair] = {symbol: []}
if symbol not in TRADES[pair]:
TRADES[pair][symbol] = []
TRADES[pair][symbol].append((action, price))
def collect_single_day_results(result):
if result is None:
return
print("\n -------------- Suggested Trades ")
print(result)
for row in result.itertuples():
action = row.action
symbol = row.symbol
price = row.price
add_trade(pair=row.pair, action=action, symbol=symbol, price=price)
def print_single_day_results(result):
for pair, symbols in TRADES.items():
print(f"\n--- {pair} ---")
for symbol, trades in symbols.items():
for side, price in trades:
print(f"{symbol} {side} at ${price}")
def print_results_suummary(all_results):
# Summary of all processed files
print("\n====== Summary of All Processed Files ======")
for filename, data in all_results.items():
trade_count = sum(
len(trades)
for symbol_trades in data["trades"].values()
for trades in symbol_trades.values()
)
print(f"{filename}: {trade_count} trades")
def calculate_returns(all_results: Dict):
print("\n====== Returns By Day and Pair ======")
for filename, data in all_results.items():
day_return = 0
print(f"\n--- {filename} ---")
# Process each pair
for pair, symbols in data["trades"].items():
pair_return = 0
pair_trades = []
# Calculate individual symbol returns in the pair
for symbol, trades in symbols.items():
if len(trades) >= 2: # Need at least entry and exit
# Get entry and exit trades
entry_action, entry_price = trades[0]
exit_action, exit_price = trades[1]
# Calculate return based on action
symbol_return = 0
if entry_action == "BUY" and exit_action == "SELL":
# Long position
symbol_return = (exit_price - entry_price) / entry_price * 100
elif entry_action == "SELL" and exit_action == "BUY":
# Short position
symbol_return = (entry_price - exit_price) / entry_price * 100
pair_trades.append(
(
symbol,
entry_action,
entry_price,
exit_action,
exit_price,
symbol_return,
)
)
pair_return += symbol_return
# Print pair returns
if pair_trades:
print(f" {pair}:")
for (
symbol,
entry_action,
entry_price,
exit_action,
exit_price,
symbol_return,
) in pair_trades:
print(
f" {symbol}: {entry_action} @ ${entry_price:.2f}, {exit_action} @ ${exit_price:.2f}, Return: {symbol_return:.2f}%"
)
print(f" Pair Total Return: {pair_return:.2f}%")
day_return += pair_return
# Print day total return
if day_return != 0:
print(f" Day Total Return: {day_return:.2f}%")
if __name__ == "__main__":
# Initialize a dictionary to store all trade results
all_results = {}
# Process each data file
price_column = CONFIG["price_column"]
for datafile in CONFIG["datafiles"]:
print(f"\n====== Processing {datafile} ======")
# Clear the TRADES global dictionary for the new file
TRADES.clear()
# Process data for this file
try:
file_results = run_pairs(
summaries_df=load_market_data(f'{CONFIG["data_directory"]}/{datafile}'),
price_column=price_column
)
# Store results with file name as key
filename = datafile.split("/")[-1]
all_results[filename] = {"trades": TRADES.copy(), "results": file_results}
print(f"Successfully processed {filename}")
except Exception as e:
print(f"Error processing {datafile}: {str(e)}")
# print_results_suummary(all_results)
calculate_returns(all_results)

523
src/pt_backtest_slide.py Normal file
View File

@ -0,0 +1,523 @@
import datetime
import sys
import json
from typing import Any, Dict, List, Tuple, Optional
import pandas as pd
import numpy as np
# ============= statsmodels ===================
from statsmodels.tsa.vector_ar.vecm import VECM
NanoPerMin = 1e9
UNSET_FLOAT: float = sys.float_info.max
UNSET_INT: int = sys.maxsize
# ------------------------ Configuration ------------------------
# Default configuration
CONFIG: Dict = {
"exchange_id": "ALPACA",
"data_directory": "./data/equity",
"datafiles": [
"20250508.alpaca_sim_md.db",
# "20250509.alpaca_sim_md.db",
# "20250512.alpaca_sim_md.db",
# "20250513.alpaca_sim_md.db",
# "20250514.alpaca_sim_md.db",
# "20250515.alpaca_sim_md.db",
# "20250516.alpaca_sim_md.db",
# "20250519.alpaca_sim_md.db",
# "20250520.alpaca_sim_md.db"
],
"instruments": [
"COIN",
"GBTC",
"HOOD",
"MSTR",
"PYPL",
],
"trading_hours": {"begin_session": "14:30:00", "end_session": "21:00:00"},
"price_column": "close",
"min_required_points": 30,
"zero_threshold": 1e-10,
"equilibrium_threshold": 10.0,
# "training_minutes": 120,
"training_minutes": 120,
}
# ====== later ===================
# # Try to load configuration from file, fall back to defaults if not found
# CONFIG_FILE = "config.json"
# try:
# with open(CONFIG_FILE, "r") as f:
# user_config = json.load(f)
# CONFIG.update(user_config)
# print(f"Loaded configuration from {CONFIG_FILE}")
# except (FileNotFoundError, json.JSONDecodeError) as e:
# print(f"Using default configuration. Error loading {CONFIG_FILE}: {str(e)}")
# # Create a default config file if it doesn't exist
# try:
# with open(CONFIG_FILE, "w") as f:
# json.dump(CONFIG, f, indent=4)
# print(f"Created default configuration file: {CONFIG_FILE}")
# except Exception as e:
# print(f"Warning: Could not create default config file: {str(e)}")
# ------------------------ Settings ------------------------
TRADES = {}
class Pair:
symbol_a_: str
symbol_b_: str
price_column_: str
def __init__(self, symbol_a: str, symbol_b: str, price_column: str):
self.symbol_a_ = symbol_a
self.symbol_b_ = symbol_b
self.price_column_ = price_column
def colname_a(self) -> str:
return f"{self.price_column_}_{self.symbol_a_}"
def colname_b(self) -> str:
return f"{self.price_column_}_{self.symbol_b_}"
def __repr__(self) ->str:
return f"{self.symbol_a_} & {self.symbol_b_}"
def load_market_data(datafile: str) -> pd.DataFrame:
from tools.data_loader import load_sqlite_to_dataframe
instrument_ids = ["\"" + "STOCK-" + instrument + "\"" for instrument in CONFIG["instruments"]]
exchange_id = CONFIG["exchange_id"]
query = "select tstamp"
query += ", tstamp_ns as time_ns"
query += ", substr(instrument_id, 7) as symbol"
query += ", open"
query += ", high"
query += ", low"
query += ", close"
query += ", volume"
query += ", num_trades"
query += ", vwap"
query += " from md_1min_bars"
query += f" where exchange_id ='{exchange_id}'"
query += f" and instrument_id in ({','.join(instrument_ids)})"
df = load_sqlite_to_dataframe(db_path=datafile, query=query)
# Trading Hours
date_str = df["tstamp"][0][0:10]
start_time = f"{date_str} {CONFIG['trading_hours']['begin_session']}"
end_time = f"{date_str} {CONFIG['trading_hours']['end_session']}"
# Perform boolean selection
df = df[(df["tstamp"] >= start_time) & (df["tstamp"] <= end_time)]
df["tstamp"] = pd.to_datetime(df["tstamp"])
return df
def transform_dataframe(df: pd.DataFrame, price_column: str):
# Select only the columns we need
df_selected = df[["tstamp", "symbol", price_column]]
# Start with unique timestamps
result_df: pd.DataFrame = pd.DataFrame(df_selected["tstamp"]).drop_duplicates().reset_index(drop=True)
# For each unique symbol, add a corresponding close price column
for symbol in df_selected["symbol"].unique():
# Filter rows for this symbol
df_symbol = df_selected[df_selected["symbol"] == symbol].reset_index(drop=True)
# Create column name like "close-COIN"
new_price_column = f"{price_column}_{symbol}"
# Create temporary dataframe with timestamp and price
temp_df = pd.DataFrame({
"tstamp": df_symbol["tstamp"],
new_price_column: df_symbol[price_column]
})
# Join with our result dataframe
result_df = pd.merge(result_df, temp_df, on="tstamp", how="left")
result_df = result_df.reset_index(drop=True) # do not dropna() since irrelevant symbol would affect dataset
return result_df
def get_datasets(df: pd.DataFrame, training_minutes: int, pair: Pair) -> Tuple[pd.DataFrame, pd.DataFrame]:
# Training dataset
colname_a = pair.colname_a()
colname_b = pair.colname_b()
df = df[["tstamp", colname_a, colname_b]]
df = df.dropna()
training_df = df.iloc[:training_minutes - 1, :].copy()
training_df.reset_index(drop=True).dropna().reset_index(drop=True)
# Testing dataset
testing_df = df.iloc[training_minutes:, :].copy()
testing_df.reset_index(drop=True).dropna().reset_index(drop=True)
return (training_df, testing_df)
def fit_VECM(training_pair_df, pair: Pair):
vecm_model = VECM(training_pair_df[[pair.colname_a(), pair.colname_b()]].reset_index(drop=True), coint_rank=1)
vecm_fit = vecm_model.fit()
# Check if the model converged properly
if not hasattr(vecm_fit, "beta") or vecm_fit.beta is None:
print(f"{pair}: VECM model failed to converge properly")
return vecm_fit
def create_trading_signals(vecm_fit, testing_pair_df, pair: Pair, colname_a, colname_b) -> pd.DataFrame:
result_columns = [
"time",
"action",
"symbol",
"price",
"divergence",
"pair",
]
next_values = vecm_fit.predict(steps=len(testing_pair_df))
# Convert prediction to a DataFrame for readability
predicted_df = pd.DataFrame(next_values, columns=[colname_a, colname_b])
beta = vecm_fit.beta
predicted_df["equilibrium_term"] = (
beta[0] * predicted_df[colname_a]
+ beta[1] * predicted_df[colname_b]
)
pair_result_df = pd.merge(
testing_pair_df.reset_index(drop=True), predicted_df, left_index=True, right_index=True, suffixes=('', '_pred')
).dropna()
pair_result_df["testing_eqlbrm_term"] = (
beta[0] * pair_result_df[colname_a]
+ beta[1] * pair_result_df[colname_b]
)
pair_result_df["abs_testing_eqlbrm_term"] = np.abs(pair_result_df["testing_eqlbrm_term"])
# Check if the first value is non-zero to avoid division by zero
pair_result_df = pair_result_df.reset_index()
initial_abs_term = pair_result_df["abs_testing_eqlbrm_term"][0]
if (
initial_abs_term < CONFIG["zero_threshold"]
): # Small threshold to avoid division by very small numbers
print(
f"{pair}: Skipping pair due to near-zero initial equilibrium: {initial_abs_term}"
)
return pd.DataFrame()
trading_signals_df = (
pair_result_df["abs_testing_eqlbrm_term"]
< initial_abs_term / CONFIG["equilibrium_threshold"]
)
close_row_index = next(
(index for index, value in trading_signals_df.items() if value), None
)
if close_row_index is None:
print(f"{pair}: NO SIGNAL FOUND")
return pd.DataFrame()
open_row = pair_result_df.loc[0]
close_row = pair_result_df.loc[close_row_index]
open_tstamp = open_row["tstamp"]
open_eqlbrm = open_row["testing_eqlbrm_term"]
open_px_a = open_row[f"{colname_a}"]
open_px_b = open_row[f"{colname_b}"]
close_tstamp = close_row["tstamp"]
close_eqlbrm = close_row["testing_eqlbrm_term"]
close_px_a = close_row[f"{colname_a}"]
close_px_b = close_row[f"{colname_b}"]
abs_beta = abs(beta[1])
pred_px_b = pair_result_df.loc[0][f"{colname_b}_pred"]
pred_px_a = pair_result_df.loc[0][f"{colname_a}_pred"]
if pred_px_b * abs_beta - pred_px_a > 0:
open_side_a = "BUY"
open_side_b = "SELL"
close_side_a = "SELL"
close_side_b = "BUY"
else:
open_side_b = "BUY"
open_side_a = "SELL"
close_side_b = "SELL"
close_side_a = "BUY"
trd_signal_tuples = [
(
open_tstamp,
open_side_a,
pair.symbol_a_,
open_px_a,
open_eqlbrm,
pair,
),
(
open_tstamp,
open_side_b,
pair.symbol_b_,
open_px_b,
open_eqlbrm,
pair,
),
(
close_tstamp,
close_side_a,
pair.symbol_a_,
close_px_a,
close_eqlbrm,
pair,
),
(
close_tstamp,
close_side_b,
pair.symbol_b_,
close_px_b,
close_eqlbrm,
pair,
),
]
# Add tuples to data frame
return pd.DataFrame(
trd_signal_tuples,
columns=result_columns,
)
def run_single_pair(market_data: pd.DataFrame, price_column:str, pair: Pair) -> Optional[pd.DataFrame]:
colname_a = f"{price_column}_{pair.symbol_a_}"
colname_b = f"{price_column}_{pair.symbol_b_}"
training_pair_df, testing_pair_df = get_datasets(df=market_data, training_minutes=CONFIG["training_minutes"], pair=pair)
# Check if we have enough data points for a meaningful analysis
min_required_points = CONFIG[
"min_required_points"
] # Minimum number of points for a reasonable VECM model
if len(training_pair_df) < min_required_points:
print(
f"{pair}: Not enough data points for analysis. Found {len(training_pair_df)}, need at least {min_required_points}"
)
return None
# Check for non-finite values
if not np.isfinite(training_pair_df).all().all():
print(f"{pair}: Data contains non-finite values (NaN or inf)")
return None
# Fit the VECM
try:
vecm_fit = fit_VECM(training_pair_df, pair=pair)
except Exception as e:
print(f"{pair}: VECM fitting failed: {str(e)}")
return None
# Add safeguard against division by zero
if (
abs(vecm_fit.beta[1]) < CONFIG["zero_threshold"]
): # Small threshold to avoid division by very small numbers
print(f"{pair}: Skipping due to near-zero beta[1] value: {vecm_fit.beta[1]}")
return None
try:
pair_trades = create_trading_signals(
vecm_fit=vecm_fit,
testing_pair_df=testing_pair_df,
pair=pair,
colname_a=colname_a,
colname_b=colname_b
)
except Exception as e:
print(f"{pair}: Prediction failed: {str(e)}")
return None
return pair_trades
def run_pairs(summaries_df: pd.DataFrame, price_column: str) -> None:
result_df = transform_dataframe(df=summaries_df, price_column=price_column)
stock_price_columns = [
column
for column in result_df.columns
if column.startswith(f"{price_column}_")
]
# Find the starting indices for A and B
all_indexes = range(len(stock_price_columns))
unique_index_pairs = [(i, j) for i in all_indexes for j in all_indexes if i < j]
pairs_trades = []
for a_index, b_index in unique_index_pairs:
# Get the actual variable names
colname_a = stock_price_columns[a_index]
colname_b = stock_price_columns[b_index]
symbol_a = colname_a[len(f"{price_column}-") :]
symbol_b = colname_b[len(f"{price_column}-") :].replace(
"STOCK-", ""
)
pair = Pair(symbol_a, symbol_b, price_column)
single_pair_trades = run_single_pair(market_data=result_df, price_column=price_column, pair=pair)
if single_pair_trades is not None:
pairs_trades.append(single_pair_trades)
# Check if result_list has any data before concatenating
if not pairs_trades:
print("No trading signals found for any pairs")
return None
result = pd.concat(pairs_trades, ignore_index=True)
result["time"] = pd.to_datetime(result["time"])
result = result.set_index("time").sort_index()
collect_single_day_results(result)
# print_single_day_results(result)
def add_trade(pair, symbol, action, price):
# Ensure we always use clean names without STOCK- prefix
pair = str(pair).replace("STOCK-", "")
symbol = symbol.replace("STOCK-", "")
if pair not in TRADES:
TRADES[pair] = {symbol: []}
if symbol not in TRADES[pair]:
TRADES[pair][symbol] = []
TRADES[pair][symbol].append((action, price))
def collect_single_day_results(result):
if result is None:
return
print("\n -------------- Suggested Trades ")
print(result)
for row in result.itertuples():
action = row.action
symbol = row.symbol
price = row.price
add_trade(pair=row.pair, action=action, symbol=symbol, price=price)
def print_single_day_results(result):
for pair, symbols in TRADES.items():
print(f"\n--- {pair} ---")
for symbol, trades in symbols.items():
for side, price in trades:
print(f"{symbol} {side} at ${price}")
def print_results_suummary(all_results):
# Summary of all processed files
print("\n====== Summary of All Processed Files ======")
for filename, data in all_results.items():
trade_count = sum(
len(trades)
for symbol_trades in data["trades"].values()
for trades in symbol_trades.values()
)
print(f"{filename}: {trade_count} trades")
def calculate_returns(all_results: Dict):
print("\n====== Returns By Day and Pair ======")
for filename, data in all_results.items():
day_return = 0
print(f"\n--- {filename} ---")
# Process each pair
for pair, symbols in data["trades"].items():
pair_return = 0
pair_trades = []
# Calculate individual symbol returns in the pair
for symbol, trades in symbols.items():
if len(trades) >= 2: # Need at least entry and exit
# Get entry and exit trades
entry_action, entry_price = trades[0]
exit_action, exit_price = trades[1]
# Calculate return based on action
symbol_return = 0
if entry_action == "BUY" and exit_action == "SELL":
# Long position
symbol_return = (exit_price - entry_price) / entry_price * 100
elif entry_action == "SELL" and exit_action == "BUY":
# Short position
symbol_return = (entry_price - exit_price) / entry_price * 100
pair_trades.append(
(
symbol,
entry_action,
entry_price,
exit_action,
exit_price,
symbol_return,
)
)
pair_return += symbol_return
# Print pair returns
if pair_trades:
print(f" {pair}:")
for (
symbol,
entry_action,
entry_price,
exit_action,
exit_price,
symbol_return,
) in pair_trades:
print(
f" {symbol}: {entry_action} @ ${entry_price:.2f}, {exit_action} @ ${exit_price:.2f}, Return: {symbol_return:.2f}%"
)
print(f" Pair Total Return: {pair_return:.2f}%")
day_return += pair_return
# Print day total return
if day_return != 0:
print(f" Day Total Return: {day_return:.2f}%")
if __name__ == "__main__":
# Initialize a dictionary to store all trade results
all_results = {}
# Process each data file
price_column = CONFIG["price_column"]
for datafile in CONFIG["datafiles"]:
print(f"\n====== Processing {datafile} ======")
# Clear the TRADES global dictionary for the new file
TRADES.clear()
# Process data for this file
try:
run_pairs(
summaries_df=load_market_data(f'{CONFIG["data_directory"]}/{datafile}'),
price_column=price_column
)
# Store results with file name as key
filename = datafile.split("/")[-1]
all_results[filename] = {"trades": TRADES.copy()}
print(f"Successfully processed {filename}")
except Exception as e:
print(f"Error processing {datafile}: {str(e)}")
# print_results_suummary(all_results)
calculate_returns(all_results)

Binary file not shown.

25
src/tools/data_loader.py Normal file
View File

@ -0,0 +1,25 @@
import sys
import sqlite3
import pandas as pd
def load_sqlite_to_dataframe(db_path, query):
try:
conn = sqlite3.connect(db_path)
df = pd.read_sql_query(query, conn)
return df
except sqlite3.Error as excpt:
print(f"SQLite error: {excpt}")
raise
except Exception as e:
print(f"Error: {excpt}")
raise
finally:
if 'conn' in locals():
conn.close()
if __name__ == "__main__":
df1 = load_sqlite_to_dataframe(sys.argv[1], table_name='md_1min_bars')
print(df1)