import os import glob from typing import Dict, List, Tuple # --- from cvttpy_tools.config import CvttAppConfig # --- from cvttpy_trading.trading.instrument import ExchangeInstrument DayT = str DataFileNameT = str def resolve_datafiles( config: Dict, date_pattern: str, instruments: List[ExchangeInstrument] ) -> List[Tuple[DayT, DataFileNameT]]: resolved_files: List[Tuple[DayT, DataFileNameT]] = [] for exch_inst in instruments: pattern = date_pattern inst_type = exch_inst.user_data_.get("instrument_type", "?instrument_type?") data_dir = config["market_data_loading"][inst_type]["data_directory"] if "*" in pattern or "?" in pattern: # Handle wildcards if not os.path.isabs(pattern): pattern = os.path.join(data_dir, f"{pattern}.mktdata.ohlcv.db") matched_files = glob.glob(pattern) for matched_file in matched_files: import re match = re.search(r"(\d{8})\.mktdata\.ohlcv\.db$", matched_file) assert match is not None day = match.group(1) resolved_files.append((day, matched_file)) else: # Handle explicit file path if not os.path.isabs(pattern): pattern = os.path.join(data_dir, f"{pattern}.mktdata.ohlcv.db") resolved_files.append((date_pattern, pattern)) return sorted(list(set(resolved_files))) # Remove duplicates and sort