From 4b1b5424308250460e7ae916205e610920980f33 Mon Sep 17 00:00:00 2001 From: yasha Date: Sat, 19 Apr 2025 02:14:53 +0000 Subject: [PATCH] feat: Handle imputed bars in sequence creation (logic, tests, whitelist) --- .../src/pipeline_stages/sequence_creation.py | 182 +++++++++++++++++ .../tests/test_sequence_creation.py | 186 ++++++++++++++++++ 2 files changed, 368 insertions(+) create mode 100644 gru_sac_predictor/src/pipeline_stages/sequence_creation.py create mode 100644 gru_sac_predictor/tests/test_sequence_creation.py diff --git a/gru_sac_predictor/src/pipeline_stages/sequence_creation.py b/gru_sac_predictor/src/pipeline_stages/sequence_creation.py new file mode 100644 index 00000000..bd6781a5 --- /dev/null +++ b/gru_sac_predictor/src/pipeline_stages/sequence_creation.py @@ -0,0 +1,182 @@ +# Stage functions for creating GRU input sequences +import logging +import sys +import numpy as np +import pandas as pd +from typing import Tuple, Dict, Optional, List +import json # Added for saving artefact + +# Assuming IOManager is importable from parent src directory +from ..io_manager import IOManager + +logger = logging.getLogger(__name__) + +def create_sequences_fold( + X_data: pd.DataFrame, + y_data: pd.DataFrame, + target_names: List[str], # e.g., ['mu', 'dir3'] or ['mu', 'dir'] + lookback: int, + name: str, # e.g., "Train", "Validation", "Test" + config: dict, # For gru.drop_imputed_sequences + io: Optional[IOManager] # For saving artefact +) -> Tuple[Optional[np.ndarray], Optional[Dict], Optional[pd.Index], int]: + """ + Transforms pruned, scaled feature DataFrame into 3D sequences for GRU input + and extracts corresponding targets for a specific data split (Train/Val/Test). + Handles dropping sequences containing imputed bars based on config. + + Args: + X_data (pd.DataFrame): Pruned, scaled features for the split. + y_data (pd.DataFrame): Targets for the split. + target_names (List[str]): List of target column names in y_data (e.g., ['mu', 'dir3']). + lookback (int): Sequence length. + name (str): Name of the split (e.g., "Train", "Validation", "Test") for logging. + config (dict): Pipeline configuration dictionary. + io (Optional[IOManager]): IOManager instance for saving artefacts. + + Returns: + Tuple containing: + - X_seq (np.ndarray or None): 3D feature sequences. + - y_seq_dict (Dict or None): Dictionary of target sequences. + - target_indices (pd.Index or None): Timestamps corresponding to the targets. + - dropped_count (int): Number of sequences dropped due to imputed bars. + Returns (None, None, None, 0) if sequence creation fails or data is insufficient. + Raises SystemExit on critical errors (e.g., misalignment). + """ + logger.info(f"--- Creating {name} Sequences ---") + use_ternary = config.get('gru', {}).get('use_ternary', False) + drop_imputed = config.get('gru', {}).get('drop_imputed_sequences', False) + imputed_col_name = 'bar_imputed' # Assuming this is the column name + + # --- Input Validation --- # + if X_data is None or y_data is None or X_data.empty or y_data.empty: + logger.error(f"{name}: Missing or empty features/targets for sequence creation.") + return None, None, None, 0 + + # Check for bar_imputed column + if imputed_col_name not in X_data.columns: + logger.error(f"{name}: Required column '{imputed_col_name}' not found in features. Cannot handle imputed sequences.") + # Decide whether to proceed without it or raise error - raising for now + raise SystemExit(f"{name}: '{imputed_col_name}' column missing. Sequence creation halted.") + + # Strict Anti-Leakage Check + try: + assert X_data.index.equals(y_data.index), \ + f"{name}: Features and targets indices misaligned!" + except AssertionError as e: + logger.error(f"Data alignment check failed: {e}. Potential data leakage. Aborting.") + raise SystemExit(f"{name}: {e}") + + # Check target columns exist + if not all(col in y_data.columns for col in target_names): + missing_targets = set(target_names) - set(y_data.columns) + logger.error(f"{name}: Target columns {missing_targets} not found in y_data. Aborting.") + raise SystemExit(f"{name}: Missing target columns for sequencing.") + # --- End Input Validation --- # + + # Convert DataFrames to numpy for potential speedup, keep index access + features_np = X_data.values + imputed_flag_np = X_data[imputed_col_name].values.astype(bool) # Ensure boolean type + # Extract targets based on target_names + targets_dict_np = {name: y_data[name].values for name in target_names} + + X_seq_list, y_seq_dict_list = [], {name: [] for name in target_names} + mask_seq_list = [] # To store the imputed flag sequences + target_indices = [] + + if len(X_data) <= lookback: + logger.warning(f"{name}: DataFrame length ({len(X_data)}) is not greater than lookback ({lookback}). Cannot create sequences.") + return None, None, None, 0 + + for i in range(lookback, len(features_np)): + # Feature window: [i-lookback, i) + X_seq_list.append(features_np[i - lookback : i]) + mask_seq_list.append(imputed_flag_np[i - lookback : i]) + + # Targets correspond to index i + for t_name in target_names: + target_val = targets_dict_np[t_name][i] + # Special handling for potential list/array type in ternary labels + if use_ternary and 'dir' in t_name and isinstance(target_val, list): + target_val = np.array(target_val, dtype=np.float32) + y_seq_dict_list[t_name].append(target_val) + + target_indices.append(y_data.index[i]) # Get index corresponding to target + + if not X_seq_list: # Check if any sequences were created + logger.warning(f"{name}: No sequences were generated (length <= lookback?).") + return None, None, None, 0 + + # Convert lists to numpy arrays + X_seq = np.array(X_seq_list, dtype=np.float32) + mask_seq = np.array(mask_seq_list, dtype=bool) + target_indices_pd = pd.Index(target_indices) + y_seq_dict_np = {} + for t_name in target_names: + try: + # Attempt to stack; requires consistent shapes + if use_ternary and 'dir' in t_name: + y_seq_dict_np[t_name] = np.stack(y_seq_dict_list[t_name]).astype(np.float32) + else: # Assuming other targets are scalar + y_seq_dict_np[t_name] = np.array(y_seq_dict_list[t_name], dtype=np.float32) + except ValueError as e: + logger.error(f"{name}: Error stacking target '{t_name}': {e}. Check target consistency (especially ternary).", exc_info=True) + shapes = [getattr(item, 'shape', type(item)) for item in y_seq_dict_list[t_name]] + from collections import Counter + logger.error(f"Target shapes/types found: {Counter(shapes)}") + raise SystemExit(f"{name}: Inconsistent target shapes for '{t_name}' during sequence creation.") from e + + orig_n = X_seq.shape[0] + dropped_count = 0 + + # Conditionally drop sequences containing imputed bars + if drop_imputed: + logger.info(f"{name}: Dropping sequences containing imputed bars (drop_imputed_sequences=True)...") + valid_mask = ~mask_seq.any(axis=1) + X_seq = X_seq[valid_mask] + mask_seq = mask_seq[valid_mask] # Keep mask aligned, though not explicitly used later + for t_name in target_names: + y_seq_dict_np[t_name] = y_seq_dict_np[t_name][valid_mask] + target_indices_pd = target_indices_pd[valid_mask] + + dropped_count = orig_n - X_seq.shape[0] + logger.info(f"{name}: Generated {orig_n} sequences, dropped {dropped_count} containing imputed bars. Remaining: {X_seq.shape[0]}") + + # Save summary artifact + if io: + summary_data = { + "split_name": name, + "total_sequences_generated": orig_n, + "sequences_dropped_imputed": dropped_count, + "sequences_remaining": X_seq.shape[0], + "drop_imputed_sequences_config": drop_imputed + } + try: + filename = f"imputed_sequence_summary_{name.lower()}.json" + io.save_json(summary_data, filename, section='results', indent=4) + logger.info(f"Saved imputed sequence summary to results/{filename}") + except Exception as e: + logger.error(f"Failed to save imputed sequence summary for {name}: {e}") + else: + logger.warning(f"IOManager not available, cannot save imputed sequence summary for {name}.") + + else: + logger.info(f"{name}: Generated {orig_n} sequences. Keeping sequences with imputed bars (drop_imputed_sequences=False).") + + # Final checks + if X_seq.shape[0] == 0: + logger.error(f"{name}: No valid sequences remaining after potential filtering. Aborting.") + return None, None, None, dropped_count # Return 0 count if no sequences left + + # --- REMOVE: Final Dictionary Mapping (Let GRU handler manage this) --- # + # final_y_seq_dict = { + # 'mu': y_seq_dict_np['ret'], # Map 'ret' to 'mu' + # 'dir3': y_seq_dict_np['dir3'] # Keep 'dir3' as is + # } + # --- END REMOVE --- # + + # Log final shapes + logger.info(f"Sequence shapes created for {name}:") + logger.info(f" X={X_seq.shape}, y_keys={list(y_seq_dict_np.keys())}, indices={len(target_indices_pd)}") + + return X_seq, y_seq_dict_np, target_indices_pd, dropped_count \ No newline at end of file diff --git a/gru_sac_predictor/tests/test_sequence_creation.py b/gru_sac_predictor/tests/test_sequence_creation.py new file mode 100644 index 00000000..01539535 --- /dev/null +++ b/gru_sac_predictor/tests/test_sequence_creation.py @@ -0,0 +1,186 @@ +import pytest +import pandas as pd +import numpy as np +from omegaconf import OmegaConf +from unittest.mock import MagicMock +import os +import tempfile +import json + +# Adjust the import path based on your project structure +from gru_sac_predictor.src.pipeline_stages.sequence_creation import create_sequences_fold +from gru_sac_predictor.src.io_manager import IOManager # Adjust path if needed + +# --- Test Fixtures --- + +@pytest.fixture +def sample_data_with_imputed(): + """Creates sample X and y dataframes with a 'bar_imputed' column.""" + dates = pd.to_datetime(pd.date_range('2023-01-01', periods=20, freq='T')) + lookback = 5 + n_features_orig = 3 + n_samples = len(dates) + + # Features (including bar_imputed) + X_data = pd.DataFrame( + np.random.randn(n_samples, n_features_orig), + index=dates, + columns=[f'feat_{i}' for i in range(n_features_orig)] + ) + # Add bar_imputed column - mark some bars as imputed + imputed_flags = np.zeros(n_samples, dtype=bool) + imputed_flags[2] = True # Imputed within first potential sequence + imputed_flags[8] = True # Imputed within a later potential sequence + imputed_flags[15] = True # Imputed near the end + X_data['bar_imputed'] = imputed_flags + + # Targets (mu and dir3) + y_data = pd.DataFrame({ + 'mu': np.random.randn(n_samples), + 'dir3': [list(row) for row in np.eye(3)[np.random.randint(0, 3, n_samples)]] # Example one-hot + }, index=dates) + + return X_data, y_data + +@pytest.fixture +def base_config(): + """Creates a base OmegaConf config for testing sequence creation.""" + conf = OmegaConf.create({ + 'gru': { + 'lookback': 5, + 'use_ternary': True, # Matches sample_data_with_imputed + 'drop_imputed_sequences': True # Default to True for testing dropping + }, + # Add other necessary sections if needed + }) + return conf + +@pytest.fixture +def mock_io_manager(): + """Creates a mock IOManager for testing artefact saving.""" + with tempfile.TemporaryDirectory() as tmpdir: + mock_io = MagicMock(spec=IOManager) + mock_io.results_dir = tmpdir + saved_jsons = {} + def mock_save_json(data, filename, **kwargs): + filepath = os.path.join(tmpdir, filename) + saved_jsons[filename] = data + with open(filepath, 'w') as f: + json.dump(data, f, **kwargs) + mock_io.save_json.side_effect = mock_save_json + mock_io.get_artifact_path.side_effect = lambda filename: os.path.join(tmpdir, filename) + mock_io._saved_jsons = saved_jsons + yield mock_io + +# --- Test Functions --- + +def test_sequence_creation_shapes(sample_data_with_imputed, base_config, mock_io_manager): + X_data, y_data = sample_data_with_imputed + lookback = base_config.gru.lookback + n_features = X_data.shape[1] + n_samples = len(X_data) + expected_n_seq = n_samples - lookback + + # Test without dropping imputed + cfg_no_drop = base_config.copy() + cfg_no_drop.gru.drop_imputed_sequences = False + + X_seq, y_seq_dict, indices, dropped_count = create_sequences_fold( + X_data=X_data, y_data=y_data, target_names=['mu', 'dir3'], + lookback=lookback, name="TestSplit", config=cfg_no_drop, io=mock_io_manager + ) + + assert dropped_count == 0 + assert X_seq is not None + assert y_seq_dict is not None + assert indices is not None + assert X_seq.shape == (expected_n_seq, lookback, n_features) + assert 'mu' in y_seq_dict and y_seq_dict['mu'].shape == (expected_n_seq,) + assert 'dir3' in y_seq_dict and y_seq_dict['dir3'].shape == (expected_n_seq, 3) + assert len(indices) == expected_n_seq + # Check first target index corresponds to lookback-th original index + assert indices[0] == X_data.index[lookback] + # Check last target index corresponds to last original index + assert indices[-1] == X_data.index[-1] + +def test_sequence_dropping_imputed(sample_data_with_imputed, base_config, mock_io_manager): + X_data, y_data = sample_data_with_imputed + lookback = base_config.gru.lookback + n_samples = len(X_data) + expected_n_seq_orig = n_samples - lookback + + # Config with dropping enabled (default in fixture) + cfg_drop = base_config + + X_seq, y_seq_dict, indices, dropped_count = create_sequences_fold( + X_data=X_data.copy(), y_data=y_data.copy(), target_names=['mu', 'dir3'], + lookback=lookback, name="TestDrop", config=cfg_drop, io=mock_io_manager + ) + + assert X_seq is not None + assert y_seq_dict is not None + assert indices is not None + + # Determine which original sequences should have been dropped + # A sequence starting at index i uses data from [i, i+lookback-1] + # The target corresponds to index i+lookback + # We need to check the imputed flag in the range [i, i+lookback-1] for each potential sequence target index i+lookback + + # Original target indices range from index `lookback` to `n_samples - 1` + should_be_dropped_mask = np.zeros(expected_n_seq_orig, dtype=bool) + imputed_flags_np = X_data['bar_imputed'].values + for seq_idx in range(expected_n_seq_orig): + # The features for this sequence are from original indices [seq_idx, seq_idx + lookback - 1] + feature_indices_range = slice(seq_idx, seq_idx + lookback) + if np.any(imputed_flags_np[feature_indices_range]): + should_be_dropped_mask[seq_idx] = True + + expected_dropped_count = np.sum(should_be_dropped_mask) + expected_remaining_count = expected_n_seq_orig - expected_dropped_count + + assert dropped_count == expected_dropped_count + assert X_seq.shape[0] == expected_remaining_count + assert y_seq_dict['mu'].shape[0] == expected_remaining_count + assert y_seq_dict['dir3'].shape[0] == expected_remaining_count + assert len(indices) == expected_remaining_count + + # Check that the remaining indices are correct (weren't marked for dropping) + original_indices = X_data.index[lookback:] + expected_remaining_indices = original_indices[~should_be_dropped_mask] + pd.testing.assert_index_equal(indices, expected_remaining_indices) + + # Check artifact saving + assert 'imputed_sequence_summary_testdrop.json' in mock_io_manager._saved_jsons + report_data = mock_io_manager._saved_jsons['imputed_sequence_summary_testdrop.json'] + assert report_data['total_sequences_generated'] == expected_n_seq_orig + assert report_data['sequences_dropped_imputed'] == expected_dropped_count + assert report_data['sequences_remaining'] == expected_remaining_count + +def test_sequence_creation_no_imputed_col(sample_data_with_imputed, base_config, mock_io_manager): + X_data, y_data = sample_data_with_imputed + X_data_no_imputed = X_data.drop(columns=['bar_imputed']) + lookback = base_config.gru.lookback + + with pytest.raises(SystemExit) as excinfo: + create_sequences_fold( + X_data=X_data_no_imputed, y_data=y_data, target_names=['mu', 'dir3'], + lookback=lookback, name="TestNoImputedCol", config=base_config, io=mock_io_manager + ) + assert "'bar_imputed' column missing" in str(excinfo.value) + +def test_sequence_creation_insufficient_data(sample_data_with_imputed, base_config, mock_io_manager): + X_data, y_data = sample_data_with_imputed + lookback = base_config.gru.lookback + # Create data shorter than lookback + X_short = X_data.iloc[:lookback-1] + y_short = y_data.iloc[:lookback-1] + + X_seq, y_seq_dict, indices, dropped_count = create_sequences_fold( + X_data=X_short, y_data=y_short, target_names=['mu', 'dir3'], + lookback=lookback, name="TestShort", config=base_config, io=mock_io_manager + ) + + assert X_seq is None + assert y_seq_dict is None + assert indices is None + assert dropped_count == 0 \ No newline at end of file