import os import glob from typing import Dict, List, Tuple # --- from cvttpy_tools.config import Config # --- from cvttpy_trading.trading.instrument import ExchangeInstrument DayT = str DataFileNameT = str def resolve_datafiles( config: Config, date_pattern: str, instruments: List[ExchangeInstrument] ) -> List[Tuple[DayT, DataFileNameT]]: resolved_files: List[Tuple[DayT, DataFileNameT]] = [] for exch_inst in instruments: pattern = date_pattern inst_type = exch_inst.user_data_.get("instrument_type", "?instrument_type?") data_dir = config.get_value(f"market_data_loading/{inst_type}/data_directory") if "*" in pattern or "?" in pattern: # Handle wildcards if not os.path.isabs(pattern): pattern = os.path.join(data_dir, f"{pattern}.mktdata.ohlcv.db") matched_files = glob.glob(pattern) for matched_file in matched_files: import re match = re.search(r"(\d{8})\.mktdata\.ohlcv\.db$", matched_file) assert match is not None day = match.group(1) resolved_files.append((day, matched_file)) else: # Handle explicit file path if not os.path.isabs(pattern): pattern = os.path.join(data_dir, f"{pattern}.mktdata.ohlcv.db") resolved_files.append((date_pattern, pattern)) return sorted(list(set(resolved_files))) # Remove duplicates and sort