#!/usr/bin/env python3 """ Market Window Analysis Script This script performs rolling window analysis on market data using the market_predictor package. Example Usage: python analyze_market_windows.py \ --symbol BTC-USD \ --start-date 2024-01-01 \ --end-date 2024-01-31 \ --interval 5m \ --training-window 60 \ --inference-window 12 \ --inference-offset 0 Arguments: --symbol: Trading pair symbol (e.g., BTC-USD) --start-date: Analysis start date (YYYY-MM-DD) --end-date: Analysis end date (YYYY-MM-DD) --interval: Data interval (1m, 5m, 15m, 1h, etc.) --training-window: Number of intervals in training window --inference-window: Number of intervals in inference window --inference-offset: Offset between training and inference windows --output: Optional output file path for predictions CSV """ import asyncio import argparse from datetime import datetime import pandas as pd from tqdm import tqdm import nest_asyncio from market_predictor.market_data_fetcher import MarketDataFetcher from market_predictor.data_processor import MarketDataProcessor from market_predictor.prediction_service import PredictionService from market_predictor.performance_metrics import PerformanceMetrics # Enable nested event loops nest_asyncio.apply() async def analyze_market_data( market_data: pd.DataFrame, training_window_size: int = 60, inference_window_size: int = 12, inference_offset: int = 0 ) -> pd.DataFrame: """Analyze market data using rolling windows.""" # Validate required columns required_columns = {'Close', 'VWAP', 'Volume'} missing_columns = required_columns - set(market_data.columns) if missing_columns: # Map yfinance columns to required columns column_mapping = { 'Adj Close': 'Close', 'Volume': 'Volume' } market_data = market_data.rename(columns=column_mapping) # Calculate VWAP if missing if 'VWAP' not in market_data.columns: market_data['VWAP'] = ( (market_data['High'] + market_data['Low'] + market_data['Close']) / 3 * market_data['Volume'] ).cumsum() / market_data['Volume'].cumsum() processor = MarketDataProcessor(market_data) processed_data = processor.df service = PredictionService( market_data=processed_data, training_window_size=training_window_size, inference_window_size=inference_window_size, inference_offset=inference_offset ) total_size = training_window_size + inference_offset + inference_window_size total_windows = len(processed_data) - total_size predictions = [] with tqdm(total=total_windows, desc="Processing", ncols=80) as pbar: async for pred in service.generate_rolling_predictions(): if pred: predictions.append(pred) pbar.update(1) predictions_df = pd.DataFrame(predictions) if predictions else pd.DataFrame() if not predictions_df.empty: metrics = PerformanceMetrics(predictions_df, processed_data) report = metrics.generate_report() print("\nPerformance Report:") print(report) return predictions_df def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="Market Window Analysis") parser.add_argument("--symbol", required=True, help="Trading pair symbol") parser.add_argument("--start-date", required=True, help="Start date (YYYY-MM-DD)") parser.add_argument("--end-date", required=True, help="End date (YYYY-MM-DD)") parser.add_argument("--interval", default="5m", help="Data interval") parser.add_argument("--training-window", type=int, default=60, help="Training window size") parser.add_argument("--inference-window", type=int, default=12, help="Inference window size") parser.add_argument("--inference-offset", type=int, default=0, help="Inference offset") parser.add_argument("--output", help="Output file path for predictions CSV") return parser.parse_args() async def main(): args = parse_args() fetcher = MarketDataFetcher(args.symbol) market_data = fetcher.fetch_data( start_date=args.start_date, end_date=args.end_date, interval=args.interval ) print(f"Fetched {len(market_data)} rows of data") try: predictions_df = await analyze_market_data( market_data, training_window_size=args.training_window, inference_window_size=args.inference_window, inference_offset=args.inference_offset ) if args.output and not predictions_df.empty: predictions_df.to_csv(args.output) print(f"\nPredictions saved to: {args.output}") except Exception as e: print(f"Analysis failed: {str(e)}") if __name__ == "__main__": nest_asyncio.apply() asyncio.run(main())