feat: Handle imputed bars in sequence creation (logic, tests, whitelist)
This commit is contained in:
parent
9d6641a5f2
commit
4b1b542430
182
gru_sac_predictor/src/pipeline_stages/sequence_creation.py
Normal file
182
gru_sac_predictor/src/pipeline_stages/sequence_creation.py
Normal file
@ -0,0 +1,182 @@
|
||||
# Stage functions for creating GRU input sequences
|
||||
import logging
|
||||
import sys
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from typing import Tuple, Dict, Optional, List
|
||||
import json # Added for saving artefact
|
||||
|
||||
# Assuming IOManager is importable from parent src directory
|
||||
from ..io_manager import IOManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def create_sequences_fold(
|
||||
X_data: pd.DataFrame,
|
||||
y_data: pd.DataFrame,
|
||||
target_names: List[str], # e.g., ['mu', 'dir3'] or ['mu', 'dir']
|
||||
lookback: int,
|
||||
name: str, # e.g., "Train", "Validation", "Test"
|
||||
config: dict, # For gru.drop_imputed_sequences
|
||||
io: Optional[IOManager] # For saving artefact
|
||||
) -> Tuple[Optional[np.ndarray], Optional[Dict], Optional[pd.Index], int]:
|
||||
"""
|
||||
Transforms pruned, scaled feature DataFrame into 3D sequences for GRU input
|
||||
and extracts corresponding targets for a specific data split (Train/Val/Test).
|
||||
Handles dropping sequences containing imputed bars based on config.
|
||||
|
||||
Args:
|
||||
X_data (pd.DataFrame): Pruned, scaled features for the split.
|
||||
y_data (pd.DataFrame): Targets for the split.
|
||||
target_names (List[str]): List of target column names in y_data (e.g., ['mu', 'dir3']).
|
||||
lookback (int): Sequence length.
|
||||
name (str): Name of the split (e.g., "Train", "Validation", "Test") for logging.
|
||||
config (dict): Pipeline configuration dictionary.
|
||||
io (Optional[IOManager]): IOManager instance for saving artefacts.
|
||||
|
||||
Returns:
|
||||
Tuple containing:
|
||||
- X_seq (np.ndarray or None): 3D feature sequences.
|
||||
- y_seq_dict (Dict or None): Dictionary of target sequences.
|
||||
- target_indices (pd.Index or None): Timestamps corresponding to the targets.
|
||||
- dropped_count (int): Number of sequences dropped due to imputed bars.
|
||||
Returns (None, None, None, 0) if sequence creation fails or data is insufficient.
|
||||
Raises SystemExit on critical errors (e.g., misalignment).
|
||||
"""
|
||||
logger.info(f"--- Creating {name} Sequences ---")
|
||||
use_ternary = config.get('gru', {}).get('use_ternary', False)
|
||||
drop_imputed = config.get('gru', {}).get('drop_imputed_sequences', False)
|
||||
imputed_col_name = 'bar_imputed' # Assuming this is the column name
|
||||
|
||||
# --- Input Validation --- #
|
||||
if X_data is None or y_data is None or X_data.empty or y_data.empty:
|
||||
logger.error(f"{name}: Missing or empty features/targets for sequence creation.")
|
||||
return None, None, None, 0
|
||||
|
||||
# Check for bar_imputed column
|
||||
if imputed_col_name not in X_data.columns:
|
||||
logger.error(f"{name}: Required column '{imputed_col_name}' not found in features. Cannot handle imputed sequences.")
|
||||
# Decide whether to proceed without it or raise error - raising for now
|
||||
raise SystemExit(f"{name}: '{imputed_col_name}' column missing. Sequence creation halted.")
|
||||
|
||||
# Strict Anti-Leakage Check
|
||||
try:
|
||||
assert X_data.index.equals(y_data.index), \
|
||||
f"{name}: Features and targets indices misaligned!"
|
||||
except AssertionError as e:
|
||||
logger.error(f"Data alignment check failed: {e}. Potential data leakage. Aborting.")
|
||||
raise SystemExit(f"{name}: {e}")
|
||||
|
||||
# Check target columns exist
|
||||
if not all(col in y_data.columns for col in target_names):
|
||||
missing_targets = set(target_names) - set(y_data.columns)
|
||||
logger.error(f"{name}: Target columns {missing_targets} not found in y_data. Aborting.")
|
||||
raise SystemExit(f"{name}: Missing target columns for sequencing.")
|
||||
# --- End Input Validation --- #
|
||||
|
||||
# Convert DataFrames to numpy for potential speedup, keep index access
|
||||
features_np = X_data.values
|
||||
imputed_flag_np = X_data[imputed_col_name].values.astype(bool) # Ensure boolean type
|
||||
# Extract targets based on target_names
|
||||
targets_dict_np = {name: y_data[name].values for name in target_names}
|
||||
|
||||
X_seq_list, y_seq_dict_list = [], {name: [] for name in target_names}
|
||||
mask_seq_list = [] # To store the imputed flag sequences
|
||||
target_indices = []
|
||||
|
||||
if len(X_data) <= lookback:
|
||||
logger.warning(f"{name}: DataFrame length ({len(X_data)}) is not greater than lookback ({lookback}). Cannot create sequences.")
|
||||
return None, None, None, 0
|
||||
|
||||
for i in range(lookback, len(features_np)):
|
||||
# Feature window: [i-lookback, i)
|
||||
X_seq_list.append(features_np[i - lookback : i])
|
||||
mask_seq_list.append(imputed_flag_np[i - lookback : i])
|
||||
|
||||
# Targets correspond to index i
|
||||
for t_name in target_names:
|
||||
target_val = targets_dict_np[t_name][i]
|
||||
# Special handling for potential list/array type in ternary labels
|
||||
if use_ternary and 'dir' in t_name and isinstance(target_val, list):
|
||||
target_val = np.array(target_val, dtype=np.float32)
|
||||
y_seq_dict_list[t_name].append(target_val)
|
||||
|
||||
target_indices.append(y_data.index[i]) # Get index corresponding to target
|
||||
|
||||
if not X_seq_list: # Check if any sequences were created
|
||||
logger.warning(f"{name}: No sequences were generated (length <= lookback?).")
|
||||
return None, None, None, 0
|
||||
|
||||
# Convert lists to numpy arrays
|
||||
X_seq = np.array(X_seq_list, dtype=np.float32)
|
||||
mask_seq = np.array(mask_seq_list, dtype=bool)
|
||||
target_indices_pd = pd.Index(target_indices)
|
||||
y_seq_dict_np = {}
|
||||
for t_name in target_names:
|
||||
try:
|
||||
# Attempt to stack; requires consistent shapes
|
||||
if use_ternary and 'dir' in t_name:
|
||||
y_seq_dict_np[t_name] = np.stack(y_seq_dict_list[t_name]).astype(np.float32)
|
||||
else: # Assuming other targets are scalar
|
||||
y_seq_dict_np[t_name] = np.array(y_seq_dict_list[t_name], dtype=np.float32)
|
||||
except ValueError as e:
|
||||
logger.error(f"{name}: Error stacking target '{t_name}': {e}. Check target consistency (especially ternary).", exc_info=True)
|
||||
shapes = [getattr(item, 'shape', type(item)) for item in y_seq_dict_list[t_name]]
|
||||
from collections import Counter
|
||||
logger.error(f"Target shapes/types found: {Counter(shapes)}")
|
||||
raise SystemExit(f"{name}: Inconsistent target shapes for '{t_name}' during sequence creation.") from e
|
||||
|
||||
orig_n = X_seq.shape[0]
|
||||
dropped_count = 0
|
||||
|
||||
# Conditionally drop sequences containing imputed bars
|
||||
if drop_imputed:
|
||||
logger.info(f"{name}: Dropping sequences containing imputed bars (drop_imputed_sequences=True)...")
|
||||
valid_mask = ~mask_seq.any(axis=1)
|
||||
X_seq = X_seq[valid_mask]
|
||||
mask_seq = mask_seq[valid_mask] # Keep mask aligned, though not explicitly used later
|
||||
for t_name in target_names:
|
||||
y_seq_dict_np[t_name] = y_seq_dict_np[t_name][valid_mask]
|
||||
target_indices_pd = target_indices_pd[valid_mask]
|
||||
|
||||
dropped_count = orig_n - X_seq.shape[0]
|
||||
logger.info(f"{name}: Generated {orig_n} sequences, dropped {dropped_count} containing imputed bars. Remaining: {X_seq.shape[0]}")
|
||||
|
||||
# Save summary artifact
|
||||
if io:
|
||||
summary_data = {
|
||||
"split_name": name,
|
||||
"total_sequences_generated": orig_n,
|
||||
"sequences_dropped_imputed": dropped_count,
|
||||
"sequences_remaining": X_seq.shape[0],
|
||||
"drop_imputed_sequences_config": drop_imputed
|
||||
}
|
||||
try:
|
||||
filename = f"imputed_sequence_summary_{name.lower()}.json"
|
||||
io.save_json(summary_data, filename, section='results', indent=4)
|
||||
logger.info(f"Saved imputed sequence summary to results/{filename}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save imputed sequence summary for {name}: {e}")
|
||||
else:
|
||||
logger.warning(f"IOManager not available, cannot save imputed sequence summary for {name}.")
|
||||
|
||||
else:
|
||||
logger.info(f"{name}: Generated {orig_n} sequences. Keeping sequences with imputed bars (drop_imputed_sequences=False).")
|
||||
|
||||
# Final checks
|
||||
if X_seq.shape[0] == 0:
|
||||
logger.error(f"{name}: No valid sequences remaining after potential filtering. Aborting.")
|
||||
return None, None, None, dropped_count # Return 0 count if no sequences left
|
||||
|
||||
# --- REMOVE: Final Dictionary Mapping (Let GRU handler manage this) --- #
|
||||
# final_y_seq_dict = {
|
||||
# 'mu': y_seq_dict_np['ret'], # Map 'ret' to 'mu'
|
||||
# 'dir3': y_seq_dict_np['dir3'] # Keep 'dir3' as is
|
||||
# }
|
||||
# --- END REMOVE --- #
|
||||
|
||||
# Log final shapes
|
||||
logger.info(f"Sequence shapes created for {name}:")
|
||||
logger.info(f" X={X_seq.shape}, y_keys={list(y_seq_dict_np.keys())}, indices={len(target_indices_pd)}")
|
||||
|
||||
return X_seq, y_seq_dict_np, target_indices_pd, dropped_count
|
||||
186
gru_sac_predictor/tests/test_sequence_creation.py
Normal file
186
gru_sac_predictor/tests/test_sequence_creation.py
Normal file
@ -0,0 +1,186 @@
|
||||
import pytest
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from omegaconf import OmegaConf
|
||||
from unittest.mock import MagicMock
|
||||
import os
|
||||
import tempfile
|
||||
import json
|
||||
|
||||
# Adjust the import path based on your project structure
|
||||
from gru_sac_predictor.src.pipeline_stages.sequence_creation import create_sequences_fold
|
||||
from gru_sac_predictor.src.io_manager import IOManager # Adjust path if needed
|
||||
|
||||
# --- Test Fixtures ---
|
||||
|
||||
@pytest.fixture
|
||||
def sample_data_with_imputed():
|
||||
"""Creates sample X and y dataframes with a 'bar_imputed' column."""
|
||||
dates = pd.to_datetime(pd.date_range('2023-01-01', periods=20, freq='T'))
|
||||
lookback = 5
|
||||
n_features_orig = 3
|
||||
n_samples = len(dates)
|
||||
|
||||
# Features (including bar_imputed)
|
||||
X_data = pd.DataFrame(
|
||||
np.random.randn(n_samples, n_features_orig),
|
||||
index=dates,
|
||||
columns=[f'feat_{i}' for i in range(n_features_orig)]
|
||||
)
|
||||
# Add bar_imputed column - mark some bars as imputed
|
||||
imputed_flags = np.zeros(n_samples, dtype=bool)
|
||||
imputed_flags[2] = True # Imputed within first potential sequence
|
||||
imputed_flags[8] = True # Imputed within a later potential sequence
|
||||
imputed_flags[15] = True # Imputed near the end
|
||||
X_data['bar_imputed'] = imputed_flags
|
||||
|
||||
# Targets (mu and dir3)
|
||||
y_data = pd.DataFrame({
|
||||
'mu': np.random.randn(n_samples),
|
||||
'dir3': [list(row) for row in np.eye(3)[np.random.randint(0, 3, n_samples)]] # Example one-hot
|
||||
}, index=dates)
|
||||
|
||||
return X_data, y_data
|
||||
|
||||
@pytest.fixture
|
||||
def base_config():
|
||||
"""Creates a base OmegaConf config for testing sequence creation."""
|
||||
conf = OmegaConf.create({
|
||||
'gru': {
|
||||
'lookback': 5,
|
||||
'use_ternary': True, # Matches sample_data_with_imputed
|
||||
'drop_imputed_sequences': True # Default to True for testing dropping
|
||||
},
|
||||
# Add other necessary sections if needed
|
||||
})
|
||||
return conf
|
||||
|
||||
@pytest.fixture
|
||||
def mock_io_manager():
|
||||
"""Creates a mock IOManager for testing artefact saving."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
mock_io = MagicMock(spec=IOManager)
|
||||
mock_io.results_dir = tmpdir
|
||||
saved_jsons = {}
|
||||
def mock_save_json(data, filename, **kwargs):
|
||||
filepath = os.path.join(tmpdir, filename)
|
||||
saved_jsons[filename] = data
|
||||
with open(filepath, 'w') as f:
|
||||
json.dump(data, f, **kwargs)
|
||||
mock_io.save_json.side_effect = mock_save_json
|
||||
mock_io.get_artifact_path.side_effect = lambda filename: os.path.join(tmpdir, filename)
|
||||
mock_io._saved_jsons = saved_jsons
|
||||
yield mock_io
|
||||
|
||||
# --- Test Functions ---
|
||||
|
||||
def test_sequence_creation_shapes(sample_data_with_imputed, base_config, mock_io_manager):
|
||||
X_data, y_data = sample_data_with_imputed
|
||||
lookback = base_config.gru.lookback
|
||||
n_features = X_data.shape[1]
|
||||
n_samples = len(X_data)
|
||||
expected_n_seq = n_samples - lookback
|
||||
|
||||
# Test without dropping imputed
|
||||
cfg_no_drop = base_config.copy()
|
||||
cfg_no_drop.gru.drop_imputed_sequences = False
|
||||
|
||||
X_seq, y_seq_dict, indices, dropped_count = create_sequences_fold(
|
||||
X_data=X_data, y_data=y_data, target_names=['mu', 'dir3'],
|
||||
lookback=lookback, name="TestSplit", config=cfg_no_drop, io=mock_io_manager
|
||||
)
|
||||
|
||||
assert dropped_count == 0
|
||||
assert X_seq is not None
|
||||
assert y_seq_dict is not None
|
||||
assert indices is not None
|
||||
assert X_seq.shape == (expected_n_seq, lookback, n_features)
|
||||
assert 'mu' in y_seq_dict and y_seq_dict['mu'].shape == (expected_n_seq,)
|
||||
assert 'dir3' in y_seq_dict and y_seq_dict['dir3'].shape == (expected_n_seq, 3)
|
||||
assert len(indices) == expected_n_seq
|
||||
# Check first target index corresponds to lookback-th original index
|
||||
assert indices[0] == X_data.index[lookback]
|
||||
# Check last target index corresponds to last original index
|
||||
assert indices[-1] == X_data.index[-1]
|
||||
|
||||
def test_sequence_dropping_imputed(sample_data_with_imputed, base_config, mock_io_manager):
|
||||
X_data, y_data = sample_data_with_imputed
|
||||
lookback = base_config.gru.lookback
|
||||
n_samples = len(X_data)
|
||||
expected_n_seq_orig = n_samples - lookback
|
||||
|
||||
# Config with dropping enabled (default in fixture)
|
||||
cfg_drop = base_config
|
||||
|
||||
X_seq, y_seq_dict, indices, dropped_count = create_sequences_fold(
|
||||
X_data=X_data.copy(), y_data=y_data.copy(), target_names=['mu', 'dir3'],
|
||||
lookback=lookback, name="TestDrop", config=cfg_drop, io=mock_io_manager
|
||||
)
|
||||
|
||||
assert X_seq is not None
|
||||
assert y_seq_dict is not None
|
||||
assert indices is not None
|
||||
|
||||
# Determine which original sequences should have been dropped
|
||||
# A sequence starting at index i uses data from [i, i+lookback-1]
|
||||
# The target corresponds to index i+lookback
|
||||
# We need to check the imputed flag in the range [i, i+lookback-1] for each potential sequence target index i+lookback
|
||||
|
||||
# Original target indices range from index `lookback` to `n_samples - 1`
|
||||
should_be_dropped_mask = np.zeros(expected_n_seq_orig, dtype=bool)
|
||||
imputed_flags_np = X_data['bar_imputed'].values
|
||||
for seq_idx in range(expected_n_seq_orig):
|
||||
# The features for this sequence are from original indices [seq_idx, seq_idx + lookback - 1]
|
||||
feature_indices_range = slice(seq_idx, seq_idx + lookback)
|
||||
if np.any(imputed_flags_np[feature_indices_range]):
|
||||
should_be_dropped_mask[seq_idx] = True
|
||||
|
||||
expected_dropped_count = np.sum(should_be_dropped_mask)
|
||||
expected_remaining_count = expected_n_seq_orig - expected_dropped_count
|
||||
|
||||
assert dropped_count == expected_dropped_count
|
||||
assert X_seq.shape[0] == expected_remaining_count
|
||||
assert y_seq_dict['mu'].shape[0] == expected_remaining_count
|
||||
assert y_seq_dict['dir3'].shape[0] == expected_remaining_count
|
||||
assert len(indices) == expected_remaining_count
|
||||
|
||||
# Check that the remaining indices are correct (weren't marked for dropping)
|
||||
original_indices = X_data.index[lookback:]
|
||||
expected_remaining_indices = original_indices[~should_be_dropped_mask]
|
||||
pd.testing.assert_index_equal(indices, expected_remaining_indices)
|
||||
|
||||
# Check artifact saving
|
||||
assert 'imputed_sequence_summary_testdrop.json' in mock_io_manager._saved_jsons
|
||||
report_data = mock_io_manager._saved_jsons['imputed_sequence_summary_testdrop.json']
|
||||
assert report_data['total_sequences_generated'] == expected_n_seq_orig
|
||||
assert report_data['sequences_dropped_imputed'] == expected_dropped_count
|
||||
assert report_data['sequences_remaining'] == expected_remaining_count
|
||||
|
||||
def test_sequence_creation_no_imputed_col(sample_data_with_imputed, base_config, mock_io_manager):
|
||||
X_data, y_data = sample_data_with_imputed
|
||||
X_data_no_imputed = X_data.drop(columns=['bar_imputed'])
|
||||
lookback = base_config.gru.lookback
|
||||
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
create_sequences_fold(
|
||||
X_data=X_data_no_imputed, y_data=y_data, target_names=['mu', 'dir3'],
|
||||
lookback=lookback, name="TestNoImputedCol", config=base_config, io=mock_io_manager
|
||||
)
|
||||
assert "'bar_imputed' column missing" in str(excinfo.value)
|
||||
|
||||
def test_sequence_creation_insufficient_data(sample_data_with_imputed, base_config, mock_io_manager):
|
||||
X_data, y_data = sample_data_with_imputed
|
||||
lookback = base_config.gru.lookback
|
||||
# Create data shorter than lookback
|
||||
X_short = X_data.iloc[:lookback-1]
|
||||
y_short = y_data.iloc[:lookback-1]
|
||||
|
||||
X_seq, y_seq_dict, indices, dropped_count = create_sequences_fold(
|
||||
X_data=X_short, y_data=y_short, target_names=['mu', 'dir3'],
|
||||
lookback=lookback, name="TestShort", config=base_config, io=mock_io_manager
|
||||
)
|
||||
|
||||
assert X_seq is None
|
||||
assert y_seq_dict is None
|
||||
assert indices is None
|
||||
assert dropped_count == 0
|
||||
Loading…
x
Reference in New Issue
Block a user