2025-02-02 18:22:12 -05:00

142 lines
4.9 KiB
Python

#!/usr/bin/env python3
"""
Market Window Analysis Script
This script performs rolling window analysis on market data using the market_predictor package.
Example Usage:
python analyze_market_windows.py \
--symbol BTC-USD \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
--interval 5m \
--training-window 60 \
--inference-window 12 \
--inference-offset 0
Arguments:
--symbol: Trading pair symbol (e.g., BTC-USD)
--start-date: Analysis start date (YYYY-MM-DD)
--end-date: Analysis end date (YYYY-MM-DD)
--interval: Data interval (1m, 5m, 15m, 1h, etc.)
--training-window: Number of intervals in training window
--inference-window: Number of intervals in inference window
--inference-offset: Offset between training and inference windows
--output: Optional output file path for predictions CSV
"""
import asyncio
import argparse
from datetime import datetime
import pandas as pd
from tqdm import tqdm
import nest_asyncio
from market_predictor.market_data_fetcher import MarketDataFetcher
from market_predictor.data_processor import MarketDataProcessor
from market_predictor.prediction_service import PredictionService
from market_predictor.performance_metrics import PerformanceMetrics
# Enable nested event loops
nest_asyncio.apply()
async def analyze_market_data(
market_data: pd.DataFrame,
training_window_size: int = 60,
inference_window_size: int = 12,
inference_offset: int = 0
) -> pd.DataFrame:
"""Analyze market data using rolling windows."""
# Validate required columns
required_columns = {'Close', 'VWAP', 'Volume'}
missing_columns = required_columns - set(market_data.columns)
if missing_columns:
# Map yfinance columns to required columns
column_mapping = {
'Adj Close': 'Close',
'Volume': 'Volume'
}
market_data = market_data.rename(columns=column_mapping)
# Calculate VWAP if missing
if 'VWAP' not in market_data.columns:
market_data['VWAP'] = (
(market_data['High'] + market_data['Low'] + market_data['Close']) / 3 *
market_data['Volume']
).cumsum() / market_data['Volume'].cumsum()
processor = MarketDataProcessor(market_data)
processed_data = processor.df
service = PredictionService(
market_data=processed_data,
training_window_size=training_window_size,
inference_window_size=inference_window_size,
inference_offset=inference_offset
)
total_size = training_window_size + inference_offset + inference_window_size
total_windows = len(processed_data) - total_size
predictions = []
with tqdm(total=total_windows, desc="Processing", ncols=80) as pbar:
async for pred in service.generate_rolling_predictions():
if pred:
predictions.append(pred)
pbar.update(1)
predictions_df = pd.DataFrame(predictions) if predictions else pd.DataFrame()
if not predictions_df.empty:
metrics = PerformanceMetrics(predictions_df, processed_data)
report = metrics.generate_report()
print("\nPerformance Report:")
print(report)
return predictions_df
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Market Window Analysis")
parser.add_argument("--symbol", required=True, help="Trading pair symbol")
parser.add_argument("--start-date", required=True, help="Start date (YYYY-MM-DD)")
parser.add_argument("--end-date", required=True, help="End date (YYYY-MM-DD)")
parser.add_argument("--interval", default="5m", help="Data interval")
parser.add_argument("--training-window", type=int, default=60, help="Training window size")
parser.add_argument("--inference-window", type=int, default=12, help="Inference window size")
parser.add_argument("--inference-offset", type=int, default=0, help="Inference offset")
parser.add_argument("--output", help="Output file path for predictions CSV")
return parser.parse_args()
async def main():
args = parse_args()
fetcher = MarketDataFetcher(args.symbol)
market_data = fetcher.fetch_data(
start_date=args.start_date,
end_date=args.end_date,
interval=args.interval
)
print(f"Fetched {len(market_data)} rows of data")
try:
predictions_df = await analyze_market_data(
market_data,
training_window_size=args.training_window,
inference_window_size=args.inference_window,
inference_offset=args.inference_offset
)
if args.output and not predictions_df.empty:
predictions_df.to_csv(args.output)
print(f"\nPredictions saved to: {args.output}")
except Exception as e:
print(f"Analysis failed: {str(e)}")
if __name__ == "__main__":
nest_asyncio.apply()
asyncio.run(main())