From 79898135344df6aa6ba790c2d1ff74ff5e07b736 Mon Sep 17 00:00:00 2001 From: Filip Stefaniuk Date: Wed, 16 Oct 2024 20:54:37 +0200 Subject: [PATCH] Update the strategies implementations --- src/strategy/evaluation.py | 27 ++- src/strategy/metrics.py | 15 +- src/strategy/plotting.py | 3 +- src/strategy/strategy.py | 398 ++++++++++++++++++++++++++++++++++++- src/strategy/util.py | 118 +++++++++++ 5 files changed, 544 insertions(+), 17 deletions(-) create mode 100644 src/strategy/util.py diff --git a/src/strategy/evaluation.py b/src/strategy/evaluation.py index 8db2160..48f7bed 100644 --- a/src/strategy/evaluation.py +++ b/src/strategy/evaluation.py @@ -18,6 +18,8 @@ def parameter_sweep( params_filter: Optional[Callable] = None, log_every: int = 200, exchange_fee: float = 0.001, + padding: int = 0, + sort_by: str = 'mod_ir', interval: str = '5min') -> pd.DataFrame: """Evaluates the strategy on a different sets of hyperparameters.""" @@ -39,20 +41,24 @@ def parameter_sweep( data, exchange_fee=exchange_fee, interval=interval, + padding=padding, include_arrays=False), map( lambda p: strategy_class( **p), chunk))) pbar.update(len(tmp)) - result += tmp + result += list(zip(tmp, map( + lambda p: strategy_class( + **p), chunk))) - return pd.DataFrame(result) + return sorted(result, key=lambda x: x[0][sort_by], reverse=True) def evaluate_strategy( data: pd.DataFrame, strategy: StrategyBase, include_arrays: bool = True, + padding: int = 0, exchange_fee: float = 0.001, interval: str = "5min"): """Evaluates a trading strategy.""" @@ -75,6 +81,12 @@ def evaluate_strategy( timestamps = data['close_time'].to_numpy() assert positions.shape[0] == timestamps.shape[0] - 1 + # Pad the results + positions = positions[padding:] + timestamps = timestamps[padding:] + long_returns = long_returns[padding:] + short_returns = short_returns[padding:] + # Compute returns of the strategy. strategy_returns = np.zeros_like(positions, dtype=np.float64) strategy_returns[positions == LONG_POSITION] = \ @@ -83,9 +95,9 @@ def evaluate_strategy( short_returns[positions == SHORT_POSITION] # Include exchange fees - positions_changed = np.append([EXIT_POSITION], positions[:-1]) != positions - strategy_returns[positions_changed] = ( - strategy_returns[positions_changed] + 1.0) * (1.0 - exchange_fee) - 1.0 + strategy_returns = (strategy_returns + 1.0) * ( + 1.0 - exchange_fee * np.abs(np.append( + [EXIT_POSITION], positions[:-1]) - positions)) - 1.0 strategy_returns = np.append([0.], strategy_returns) portfolio_value = np.cumprod(strategy_returns + 1) @@ -97,9 +109,10 @@ def evaluate_strategy( 'arc': metrics.arc(portfolio_value, interval=interval), 'asd': metrics.asd(portfolio_value, interval=interval), 'ir': metrics.ir(portfolio_value, interval=interval), + 'mod_ir': metrics.modified_ir(portfolio_value, interval=interval), 'md': metrics.max_drawdown(portfolio_value), - 'n_trades': np.sum(np.append([EXIT_POSITION], positions[:-1]) != - np.append(positions[1:], [EXIT_POSITION])), + 'n_trades': np.sum(np.abs(np.append([EXIT_POSITION], positions[:-1]) - + np.append(positions[1:], [EXIT_POSITION]))), 'long_pos': np.sum(positions == LONG_POSITION) / positions.size, 'short_pos': np.sum(positions == SHORT_POSITION) / positions.size, } diff --git a/src/strategy/metrics.py b/src/strategy/metrics.py index b4a9015..2137b10 100644 --- a/src/strategy/metrics.py +++ b/src/strategy/metrics.py @@ -6,6 +6,8 @@ from numpy.typing import NDArray NUM_INTERVALS = { 'min': 365 * 24 * 60, '5min': 365 * 24 * 12, + '15min': 365 * 24 * 4, + '30min': 365 * 24 * 2, 'hour': 365 * 24, 'day': 365 } @@ -48,7 +50,12 @@ def max_drawdown(array: NDArray[Any]): return np.max((cummax - array) / cummax) -# def modified_ir(array: NDArray[Any]): -# """Information Ratio adjusted by drawdown and ARC.""" -# return ir(array) * arc(array) * (np.sign(arc(array)) / -# max_drawdown(array)) +def modified_ir(array: NDArray[Any], interval: str = '5min'): + ret = (ir(array, interval=interval) + * np.abs(arc(array, interval=interval))) + md = max_drawdown(array) + + if md > 0: + ret = ret / md + + return ret diff --git a/src/strategy/plotting.py b/src/strategy/plotting.py index ed79b4c..e9d94ee 100644 --- a/src/strategy/plotting.py +++ b/src/strategy/plotting.py @@ -9,9 +9,10 @@ def plot_sweep_results( parameters: List[str], objective: str = 'value', top_n: int = 5, + round: int = 2, title: str = "Hyperparameters search results"): """Helper function for plotting results of hyperparameter search.""" - data = sweep_results[list(parameters) + [objective]].round(2) + data = sweep_results[list(parameters) + [objective]].round(round) fig = ff.create_table( data.sort_values( diff --git a/src/strategy/strategy.py b/src/strategy/strategy.py index b40a827..a00920f 100644 --- a/src/strategy/strategy.py +++ b/src/strategy/strategy.py @@ -1,11 +1,13 @@ +import talib import numpy as np import pandas as pd # import logging from typing import Dict, Any +# from strategy.util import rsi_obos EXIT_POSITION = 0 LONG_POSITION = 1 -SHORT_POSITION = 2 +SHORT_POSITION = -1 class StrategyBase: @@ -35,6 +37,177 @@ class BuyAndHoldStrategy(StrategyBase): dtype=np.int32) +class MACDStrategy(StrategyBase): + """Strategy based on Moving Average Convergence / Divergence.""" + + NAME = "MACD" + + def __init__( + self, + fast_window_size: int = 12, + slow_window_size: int = 26, + signal_window_size: int = 9, + short_sell: bool = False): + + if (fast_window_size == 1 or + slow_window_size == 1 or + signal_window_size == 1 or + fast_window_size >= slow_window_size): + raise ValueError + + self.fast_window_size = fast_window_size + self.slow_window_size = slow_window_size + self.signal_window_size = signal_window_size + self.short_sell = short_sell + self.name = MACDStrategy.NAME + # f"{MACDStrategy.NAME}" +\ + # "(fast={self.fast_window_size}," +\ + # " slow={self.slow_window_size}," +\ + # " signal={self.signal_window_size})" + + def info(self) -> Dict[str, Any]: + return { + 'strategy_name': self.name, + 'fast_window_size': self.fast_window_size, + 'slow_window_size': self.slow_window_size, + 'signal_window_size': self.signal_window_size, + 'short_sell': self.short_sell + } + + def run(self, data: pd.DataFrame): + array = data['close_price'].to_numpy() + macd, signal, _ = talib.MACD( + array, + fastperiod=self.fast_window_size, + slowperiod=self.slow_window_size, + signalperiod=self.signal_window_size + ) + + result = np.full_like(array, EXIT_POSITION, dtype=np.int32) + result[macd > signal] = LONG_POSITION + + if self.short_sell: + result[macd < signal] = SHORT_POSITION + + # run_info = { + # 'macd': macd, + # 'signal': signal + # } + return result # , run_info + + +class RSIStrategy(StrategyBase): + """Strategy based on RSI.""" + + NAME = "RSI" + + def __init__(self, + window_size: int = 14, + enter_long=None, + exit_long=None, + enter_short=None, + exit_short=None): + self.window_size = window_size + self.enter_long = enter_long + self.exit_long = exit_long + self.enter_short = enter_short + self.exit_short = exit_short + self.name = RSIStrategy.NAME + # f"{RSIStrategy.NAME}(" +\ + # "window={self.window_size}," +\ + # "[{self.oversold}, {self.overbought}])" + + def info(self) -> Dict[str, Any]: + return { + 'strategy_name': self.name, + 'window_size': self.window_size, + 'enter_long': self.enter_long, + 'exit_long': self.exit_long, + 'enter_short': self.enter_short, + 'exit_short': self.exit_short + } + + def run(self, data: pd.DataFrame): + array = data['close_price'].to_numpy() + + rsi = talib.RSI(array, timeperiod=self.window_size) + enter_long = rsi > (self.enter_long or np.infty) + exit_long = rsi < (self.exit_long or -np.infty) + enter_short = rsi < ( + self.enter_short or -np.infty) + exit_short = rsi > (self.exit_short or np.infty) + + positions = np.full(rsi.shape, np.nan) + positions[exit_long | exit_short] = EXIT_POSITION + positions[enter_long] = LONG_POSITION + positions[enter_short] = SHORT_POSITION + + # Fix the first position + if np.isnan(positions[0]): + positions[0] = EXIT_POSITION + + mask = np.isnan(positions) + idx = np.where(~mask, np.arange(mask.size), 0) + np.maximum.accumulate(idx, out=idx) + positions[mask] = positions[idx[mask]] + + return positions.astype(np.int32) + # result = rsi_obos(rsi, self.oversold, self.overbought) + + # run_info = { + # 'rsi': rsi + # } + + # return result # , run_info + + +class BaselineReturnsStrategy(StrategyBase): + def __init__( + self, + enter_long, + exit_long, + enter_short, + exit_short): + self.enter_long = enter_long + self.exit_long = exit_long + self.enter_short = enter_short + self.exit_short = exit_short + + def info(self): + return { + 'strategy_name': 'Baseline predictions', + 'enter_long': self.enter_long, + 'exit_long': self.exit_long, + 'enter_short': self.enter_short, + 'exit_short': self.exit_short + } + + def run(self, data): + + ret = data['returns'].to_numpy() + enter_long = ret > (self.enter_long or np.infty) + exit_long = ret < (self.exit_long or -np.infty) + enter_short = ret < ( + self.enter_short or -np.infty) + exit_short = ret > (self.exit_short or np.infty) + + positions = np.full(ret.shape, np.nan) + positions[exit_long | exit_short] = EXIT_POSITION + positions[enter_long] = LONG_POSITION + positions[enter_short] = SHORT_POSITION + + # Fix the first position + if np.isnan(positions[0]): + positions[0] = EXIT_POSITION + + mask = np.isnan(positions) + idx = np.where(~mask, np.arange(mask.size), 0) + np.maximum.accumulate(idx, out=idx) + positions[mask] = positions[idx[mask]] + + return positions.astype(np.int32) + + class ModelPredictionsStrategyBase(StrategyBase): """Base class for strategies based on model predictions.""" @@ -74,6 +247,64 @@ class ModelPredictionsStrategyBase(StrategyBase): raise NotImplementedError() +class ModelGmadlPredictionsStrategy(ModelPredictionsStrategyBase): + def __init__( + self, + predictions, + enter_long=None, + exit_long=None, + enter_short=None, + exit_short=None, + future=1, + name: str = None, + ): + super().__init__( + predictions, + name=name + ) + + self.enter_long = enter_long + self.exit_long = exit_long + self.enter_short = enter_short + self.exit_short = exit_short + self.future = future + + def info(self): + return super().info() | { + 'enter_long': self.enter_long, + 'exit_long': self.exit_long, + 'enter_short': self.enter_short, + 'exit_short': self.exit_short + } + + def get_positions(self, data): + # bfill() is a hack to make it work with non predicted data + arr_preds = np.stack(data['prediction'].ffill().bfill().to_numpy()) + arr_preds = arr_preds[:, self.future, 0] + + enter_long = arr_preds > (self.enter_long or np.infty) + exit_long = arr_preds < (self.exit_long or -np.infty) + enter_short = arr_preds < ( + self.enter_short or -np.infty) + exit_short = arr_preds > (self.exit_short or np.infty) + + positions = np.full(arr_preds.shape, np.nan) + positions[exit_long | exit_short] = EXIT_POSITION + positions[enter_long] = LONG_POSITION + positions[enter_short] = SHORT_POSITION + + # Fix the first position + if np.isnan(positions[0]): + positions[0] = EXIT_POSITION + + mask = np.isnan(positions) + idx = np.where(~mask, np.arange(mask.size), 0) + np.maximum.accumulate(idx, out=idx) + positions[mask] = positions[idx[mask]] + + return positions.astype(np.int32) + + class ModelQuantilePredictionsStrategy(ModelPredictionsStrategyBase): def __init__( self, @@ -86,7 +317,8 @@ class ModelQuantilePredictionsStrategy(ModelPredictionsStrategyBase): name: str = None, future: int = 1, target: str = 'close_price', - exchange_fee: int = 0.001 + exchange_fee: int = 0.001, + new_impl=True ): super().__init__( predictions, @@ -100,10 +332,12 @@ class ModelQuantilePredictionsStrategy(ModelPredictionsStrategyBase): self.quantile_exit_long = quantile_exit_long self.quantile_enter_short = quantile_enter_short self.quantile_exit_short = quantile_exit_short + self.new_impl = new_impl def info(self): return super().info() | { 'quantiles': self.quantiles, + 'exchange_fee': self.exchange_fee, 'quantile_enter_long': self.quantile_enter_long, 'quantile_exit_long': self.quantile_exit_long, 'quantile_enter_short': self.quantile_enter_short, @@ -111,6 +345,58 @@ class ModelQuantilePredictionsStrategy(ModelPredictionsStrategyBase): } def get_positions(self, data): + if self.new_impl: + return self.get_positions2(data) + return self.get_positions1(data) + + def get_positions2(self, data): + arr_target = data[self.target].to_numpy() + arr_preds = np.stack( + # bfill() is a hack to make it work with non predicted data + data['prediction'].ffill().bfill().to_numpy()) + + enter_long = (((arr_preds[ + :, self.future - 1, self.get_quantile_idx( + round(1 - self.quantile_enter_long, 2))] + if self.quantile_enter_long + else np.full(arr_target.shape, -np.infty))) + - arr_target) / arr_target > self.exchange_fee + enter_short = ((arr_preds[ + :, self.future - 1, self.get_quantile_idx( + self.quantile_enter_short)] + if self.quantile_enter_short + else np.full(arr_target.shape, np.infty)) + - arr_target) / arr_target < -self.exchange_fee + exit_long = ((arr_preds[ + :, self.future - 1, self.get_quantile_idx( + self.quantile_exit_long)] + if self.quantile_exit_long + else np.full(arr_target.shape, np.infty)) + - arr_target) / arr_target < -self.exchange_fee + exit_short = ((arr_preds[ + :, self.future - 1, self.get_quantile_idx( + round(1 - self.quantile_exit_short, 2))] + if self.quantile_exit_short + else np.full(arr_target.shape, -np.infty)) + - arr_target) / arr_target > self.exchange_fee + + positions = np.full(arr_target.shape, np.nan) + positions[exit_long | exit_short] = EXIT_POSITION + positions[enter_long] = LONG_POSITION + positions[enter_short] = SHORT_POSITION + + # Fix the first position + if np.isnan(positions[0]): + positions[0] = EXIT_POSITION + + mask = np.isnan(positions) + idx = np.where(~mask, np.arange(mask.size), 0) + np.maximum.accumulate(idx, out=idx) + positions[mask] = positions[idx[mask]] + + return positions.astype(np.int32) + + def get_positions1(self, data): arr_preds = data['prediction'].to_numpy() arr_target = data[self.target].to_numpy() @@ -171,21 +457,123 @@ class ConcatenatedStrategies(StrategyBase): each on the next `window_size` data points. """ - def __init__(self, window_size, strategies, name='Concatenated Strategy'): + def __init__( + self, + window_size, + strategies, + name='Concatenated Strategy', + padding=0): self.window_size = window_size self.strategies = strategies self.name = name + self.padding = padding def info(self): return {'strategy_name': self.name} def run(self, data): - chunks = [data[i:i+self.window_size].copy() - for i in range(0, data.shape[0], self.window_size)] + chunks = [data[i-self.padding:i+self.window_size].copy() + for i in range( + self.padding, data.shape[0], self.window_size)] assert len(chunks) <= len(self.strategies) positions = [] for chunk, strategy in zip(chunks, self.strategies): positions.append(strategy.run(chunk)) + positions = [ + pos if not i else pos[self.padding:] + for i, pos in enumerate(positions) + ] + return np.concatenate(positions) + + +class ModelQuantileReturnsPredictionsStrategy(ModelPredictionsStrategyBase): + def __init__( + self, + predictions, + quantiles, + quantile_enter_long=None, + quantile_exit_long=None, + quantile_enter_short=None, + quantile_exit_short=None, + name: str = None, + future: int = 1, + target: str = 'returns', + exchange_fee: int = 0.001, + new_impl=True + ): + super().__init__( + predictions, + name=name, + future=future, + target=target, + exchange_fee=exchange_fee) + + self.quantiles = quantiles + self.quantile_enter_long = quantile_enter_long + self.quantile_exit_long = quantile_exit_long + self.quantile_enter_short = quantile_enter_short + self.quantile_exit_short = quantile_exit_short + self.new_impl = new_impl + + def info(self): + return super().info() | { + 'quantiles': self.quantiles, + 'exchange_fee': self.exchange_fee, + 'quantile_enter_long': self.quantile_enter_long, + 'quantile_exit_long': self.quantile_exit_long, + 'quantile_enter_short': self.quantile_enter_short, + 'quantile_exit_short': self.quantile_exit_short + } + + def get_positions(self, data): + arr_target = data[self.target].to_numpy() + arr_preds = np.stack( + # bfill() is a hack to make it work with non predicted data + data['prediction'].ffill().bfill().to_numpy()) + + enter_long = (((arr_preds[ + :, self.future - 1, self.get_quantile_idx( + round(1 - self.quantile_enter_long, 2))] + if self.quantile_enter_long + else np.full(arr_target.shape, -np.infty))) + > self.exchange_fee) + enter_short = ((arr_preds[ + :, self.future - 1, self.get_quantile_idx( + self.quantile_enter_short)] + if self.quantile_enter_short + else np.full(arr_target.shape, np.infty)) + < -self.exchange_fee) + exit_long = ((arr_preds[ + :, self.future - 1, self.get_quantile_idx( + self.quantile_exit_long)] + if self.quantile_exit_long + else np.full(arr_target.shape, np.infty)) + < -self.exchange_fee) + exit_short = ((arr_preds[ + :, self.future - 1, self.get_quantile_idx( + round(1 - self.quantile_exit_short, 2))] + if self.quantile_exit_short + else np.full(arr_target.shape, -np.infty)) + > self.exchange_fee) + + positions = np.full(arr_target.shape, np.nan) + positions[exit_long | exit_short] = EXIT_POSITION + positions[enter_long] = LONG_POSITION + positions[enter_short] = SHORT_POSITION + + # Fix the first position + if np.isnan(positions[0]): + positions[0] = EXIT_POSITION + + mask = np.isnan(positions) + idx = np.where(~mask, np.arange(mask.size), 0) + np.maximum.accumulate(idx, out=idx) + positions[mask] = positions[idx[mask]] + + return positions.astype(np.int32) + + def get_quantile_idx(self, quantile): + return self.quantiles.index(quantile) diff --git a/src/strategy/util.py b/src/strategy/util.py new file mode 100644 index 0000000..0ad26ad --- /dev/null +++ b/src/strategy/util.py @@ -0,0 +1,118 @@ +import wandb +import os +import torch +import pandas as pd +import numpy as np +from numba import jit +from numba import int32, float64, optional + + +def get_sweep_data_windows(sweep_id): + """Get all data windows evaluated during sweep moving window eval.""" + sweep = wandb.Api().sweep(sweep_id) + sweep_dataset = sweep.config[ + 'parameters']['data']['parameters']['dataset']['value'] + sliding_window_min = sweep.config[ + 'parameters']['data']['parameters']['sliding_window']['min'] + slidinw_window_max = sweep.config[ + 'parameters']['data']['parameters']['sliding_window']['max'] + + return get_data_windows( + sweep.project, + sweep_dataset, + min_window=sliding_window_min, + max_window=slidinw_window_max) + + +def get_data_windows(project, dataset_name, min_window=0, max_window=5): + artifact_name = f"{project}/{dataset_name}" + artifact = wandb.Api().artifact(artifact_name) + base_path = artifact.download() + name = artifact.metadata['name'] + + result = [] + for i in range(min_window, max_window+1): + in_sample_name =\ + f"in-sample-{i}" + in_sample_data = pd.read_csv(os.path.join( + base_path, name + '-' + in_sample_name + '.csv')) + out_of_sample_name =\ + f"out-of-sample-{i}" + out_of_sample_data = pd.read_csv(os.path.join( + base_path, name + '-' + out_of_sample_name + '.csv')) + result.append((in_sample_data, out_of_sample_data)) + + return result + + +def get_sweep_window_predictions(sweep_id, part): + result = [] + for run in wandb.Api().sweep(sweep_id).runs: + window_num = run.config['data']['sliding_window'] + + window_prediction = list( + filter(lambda x: ( + x.type == 'prediction' + and x.name.startswith(f'prediction-{part}')), + run.logged_artifacts())) + + assert len(window_prediction) == 1 + window_prediction = window_prediction[0] + + artifact_path = window_prediction.download() + index = torch.load(os.path.join( + artifact_path, 'index.pt'), map_location=torch.device('cpu')) + preds = torch.load(os.path.join( + artifact_path, 'predictions.pt'), map_location=torch.device('cpu')) + + result.append((window_num, index, preds.numpy())) + + result = sorted(result, key=lambda x: x[0]) + return result + + +def get_predictions_dataframe(*window_predictions): + result = [] + for _, idx, preds in window_predictions: + df = pd.DataFrame(idx) + df['prediction'] = list(preds) + result.append(df) + + result = pd.concat(result).sort_values(by='time_index') + + assert 'time_index' in result.columns + assert 'group_id' in result.columns + assert 'prediction' in result.columns + + return result + + +@jit((float64[:], int32, int32), nopython=True) +def rsi_obos(rsi_arr, oversold, overbought): + moves = np.zeros(rsi_arr.size, dtype=np.int32) + for i in range(1, rsi_arr.size): + moves[i] = 1 if rsi_arr[i - 1] < oversold and rsi_arr[i] > oversold \ + else 0 if rsi_arr[i - 1] > overbought and rsi_arr[i] < overbought \ + else moves[i - 1] + return moves + + +# @jit(( +# float64[:], +# optional(int32), +# optional(int32), +# optional(int32), +# optional(int32)), nopython=True) +# def quantile_model_pos( +# preds, +# enter_long, +# exit_long, +# enter_short, +# exit_short): +# return None +# # moves = np.zeros(preds.size, dtype=np.int32) +# # for i in range(1, preds.size): + + +# if __name__ == '__main__': +# quantile_model_pos(np.array([0.2, 0.1, 0.3]), 2, 5, None, None)