2025-02-03 23:26:28 -05:00

137 lines
4.6 KiB
Python

#!/usr/bin/env python3
"""
Market Window Analysis Script
This script performs rolling window analysis on market data using the market_predictor package.
Example Usage:
python analyze_market_windows.py \
--symbol BTC-USD \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
--interval 5m \
--training-window 60 \
--inference-window 12 \
--inference-offset 0
Arguments:
--symbol: Trading pair symbol (e.g., BTC-USD)
--start-date: Analysis start date (YYYY-MM-DD)
--end-date: Analysis end date (YYYY-MM-DD)
--interval: Data interval (1m, 5m, 15m, 1h, etc.)
--training-window: Number of intervals in training window
--inference-window: Number of intervals in inference window
--inference-offset: Offset between training and inference windows
--output: Optional output file path for predictions CSV
"""
import asyncio
import argparse
from datetime import datetime
import pandas as pd
from tqdm import tqdm
import nest_asyncio
import logging # Added logging im
from market_predictor.market_data_fetcher import MarketDataFetcher
from market_predictor.data_processor import MarketDataProcessor
from market_predictor.prediction_service import PredictionService
from market_predictor.performance_metrics import PerformanceMetrics
# Enable nested event loops
nest_asyncio.apply()
async def analyze_market_data(
market_data: pd.DataFrame,
training_window_size: int = 78,
inference_window_size: int = 12,
inference_offset: int = 0
) -> pd.DataFrame:
"""
Analyze market data and generate predictions
Args:
market_data: DataFrame containing market data
training_window_size: Size of training window
inference_window_size: Size of inference window
inference_offset: Offset for inference window
Returns:
pd.DataFrame: DataFrame containing predictions
"""
# Initialize processor and service
processor = MarketDataProcessor(
df=market_data,
training_window_size=training_window_size,
inference_window_size=inference_window_size,
inference_offset=inference_offset
)
service = PredictionService(
market_data=processor.df,
training_window_size=training_window_size,
inference_window_size=inference_window_size,
inference_offset=inference_offset
)
# Get predictions and convert to DataFrame
predictions = await service.main()
if not predictions: # Handle empty list case
return pd.DataFrame()
predictions_df = pd.DataFrame(predictions)
# Generate performance report if we have predictions
if len(predictions_df) > 0:
metrics = PerformanceMetrics(predictions_df, processor.df)
report = metrics.generate_report()
print("\nPerformance Report:")
print(report)
return predictions_df
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Market Window Analysis")
parser.add_argument("--symbol", required=True, help="Trading pair symbol")
parser.add_argument("--start-date", required=True, help="Start date (YYYY-MM-DD)")
parser.add_argument("--end-date", required=True, help="End date (YYYY-MM-DD)")
parser.add_argument("--interval", default="5m", help="Data interval")
parser.add_argument("--training-window", type=int, default=60, help="Training window size")
parser.add_argument("--inference-window", type=int, default=12, help="Inference window size")
parser.add_argument("--inference-offset", type=int, default=0, help="Inference offset")
parser.add_argument("--output", help="Output file path for predictions CSV")
return parser.parse_args()
async def main():
args = parse_args()
fetcher = MarketDataFetcher(args.symbol)
market_data = fetcher.fetch_data(
start_date=args.start_date,
end_date=args.end_date,
interval=args.interval
)
print(f"Fetched {len(market_data)} rows of data")
try:
predictions_df = await analyze_market_data(
market_data,
training_window_size=args.training_window,
inference_window_size=args.inference_window,
inference_offset=args.inference_offset
)
print(predictions_df)
if args.output and not predictions_df.empty:
predictions_df.to_csv(args.output)
print(f"\nPredictions saved to: {args.output}")
except Exception as e:
print(f"Analysis failed: {str(e)}")
if __name__ == "__main__":
nest_asyncio.apply()
asyncio.run(main())