From 066bdbdb93f80d7502aa63761087798922ede1bd Mon Sep 17 00:00:00 2001 From: Oleg Sheynin Date: Mon, 5 May 2025 15:31:56 -0400 Subject: [PATCH] new aggregate features are added to research --- build/run_build.sh | 2 +- research/aggregate_features.sh | 275 ++++++++++++++++++ .../__DEPRECATED__/archive_yesterday_md.sh | 44 --- 3 files changed, 276 insertions(+), 45 deletions(-) create mode 100755 research/aggregate_features.sh delete mode 100755 scripts/__DEPRECATED__/archive_yesterday_md.sh diff --git a/build/run_build.sh b/build/run_build.sh index afac142..5bac889 100755 --- a/build/run_build.sh +++ b/build/run_build.sh @@ -10,7 +10,7 @@ if [ "" == "${prj}" ] ; then usage fi -Cmd="pushd /home/oleg/develop/cvtt2" +Cmd="pushd /home/oleg/develop/cvtt2-ops" Cmd="${Cmd} && (cd ${prj}" Cmd="${Cmd} && git pushall)" Cmd="${Cmd} && ./build_release.sh -p ${prj}" diff --git a/research/aggregate_features.sh b/research/aggregate_features.sh new file mode 100755 index 0000000..1063306 --- /dev/null +++ b/research/aggregate_features.sh @@ -0,0 +1,275 @@ +#!/bin/bash + +if [ $# -ne 2 ]; then + echo "Usage: $0 " + exit 1 +fi + +SRC_DB=$1 +DEST_DB=$2 + +if [ ! -f "$SRC_DB" ]; then + echo "Error: Source database file $SRC_DB does not exist" + exit 1 +fi + +echo "Creating feature tables in $DEST_DB using data from $SRC_DB..." + +# Create md_1min_trade_features table +echo "Creating md_1min_trade_features table..." +sqlite3 "$DEST_DB" " +DROP TABLE IF EXISTS md_1min_trade_features; +CREATE TABLE IF NOT EXISTS md_1min_trade_features ( + bin_tstamp TEXT, + tstamp_ns INTEGER, + exchange_id TEXT, + instrument_id TEXT, + price_mean REAL, + price_median REAL, + volume REAL, + vwap REAL, + signed_volume REAL, + order_flow_imbalance REAL, + num_trades INTEGER, + avg_trade_size REAL, + PRIMARY KEY (bin_tstamp, exchange_id, instrument_id) +);" + +# Create index for md_1min_trade_features +echo "Creating index for md_1min_trade_features..." +sqlite3 "$DEST_DB" " +CREATE UNIQUE INDEX IF NOT EXISTS md_1min_trade_features_uidx +ON md_1min_trade_features(bin_tstamp, exchange_id, instrument_id);" + +# Populate md_1min_trade_features using source database +echo "Populating md_1min_trade_features..." +sqlite3 "$SRC_DB" "ATTACH DATABASE '$DEST_DB' AS dest; +WITH trade_metrics AS ( + SELECT + tstamp, + strftime('%Y-%m-%d %H:%M:00', tstamp) as bin_tstamp, + exchange_id, + instrument_id, + px as price, + qty, + CASE + WHEN condition = 'B' THEN qty + WHEN condition = 'S' THEN -qty + ELSE 0 + END as signed_qty + FROM md_trades +), +trade_metrics_agg AS ( + SELECT + bin_tstamp, + exchange_id, + instrument_id, + COUNT(*) as cnt, + MIN(tstamp) as min_tstamp, + MAX(tstamp) as max_tstamp + FROM trade_metrics + GROUP BY bin_tstamp, exchange_id, instrument_id +) +INSERT INTO dest.md_1min_trade_features +SELECT + tm.bin_tstamp, + CAST(strftime('%s', tm.bin_tstamp) * 1000000000 AS INTEGER) as tstamp_ns, + tm.exchange_id, + tm.instrument_id, + AVG(price) as price_mean, + AVG(CASE WHEN rank_num >= FLOOR(cnt/2.0) AND rank_num <= CEIL(cnt/2.0) THEN price ELSE NULL END) as price_median, + SUM(qty) as volume, + SUM(price * qty) / SUM(qty) as vwap, + SUM(signed_qty) as signed_volume, + SUM(CASE WHEN signed_qty > 0 THEN signed_qty ELSE 0 END) - + SUM(CASE WHEN signed_qty < 0 THEN ABS(signed_qty) ELSE 0 END) as order_flow_imbalance, + COUNT(*) as num_trades, + AVG(qty) as avg_trade_size +FROM ( + SELECT + tm.*, + tma.cnt, + ROW_NUMBER() OVER (PARTITION BY tm.bin_tstamp, tm.exchange_id, tm.instrument_id ORDER BY price) as rank_num + FROM trade_metrics tm + JOIN trade_metrics_agg tma + ON tm.bin_tstamp = tma.bin_tstamp + AND tm.exchange_id = tma.exchange_id + AND tm.instrument_id = tma.instrument_id +) tm +GROUP BY tm.bin_tstamp, tm.exchange_id, tm.instrument_id;" + +# Create md_1min_quote_features table in destination database +echo "Creating md_1min_quote_features table..." +sqlite3 "$DEST_DB" " +DROP TABLE IF EXISTS md_1min_quote_features; +CREATE TABLE IF NOT EXISTS md_1min_quote_features ( + bin_tstamp TEXT, + tstamp_ns INTEGER, + exchange_id TEXT, + instrument_id TEXT, + mid_price_open REAL, + mid_price_high REAL, + mid_price_low REAL, + mid_price_close REAL, + mid_price_mean REAL, + rel_spread_mean REAL, + rel_spread_min REAL, + rel_spread_max REAL, + rel_spread_first REAL, + rel_spread_last REAL, + l1_imbalance_mean REAL, + l1_imbalance_min REAL, + l1_imbalance_max REAL, + l1_imbalance_first REAL, + l1_imbalance_last REAL, + micro_price_mean REAL, + micro_price_min REAL, + micro_price_max REAL, + micro_price_first REAL, + micro_price_last REAL, + weighted_mid_mean REAL, + weighted_mid_min REAL, + weighted_mid_max REAL, + weighted_mid_first REAL, + weighted_mid_last REAL, + PRIMARY KEY (bin_tstamp, exchange_id, instrument_id) +);" + +# Create index for md_1min_quote_features +echo "Creating index for md_1min_quote_features..." +sqlite3 "$DEST_DB" " +CREATE UNIQUE INDEX IF NOT EXISTS md_1min_quote_features_uidx +ON md_1min_quote_features(bin_tstamp, exchange_id, instrument_id);" + +# Populate md_1min_quote_features using source database +echo "Populating md_1min_quote_features..." +sqlite3 "$SRC_DB" "ATTACH DATABASE '$DEST_DB' AS dest; +INSERT INTO dest.md_1min_quote_features +SELECT + strftime('%Y-%m-%d %H:%M:00', tstamp) as bin_tstamp, + CAST(strftime('%s', tstamp) * 1000000000 AS INTEGER) as tstamp_ns, + exchange_id, + instrument_id, + FIRST_VALUE((ask_px + bid_px) / 2.0) OVER w as mid_price_open, + MAX((ask_px + bid_px) / 2.0) as mid_price_high, + MIN((ask_px + bid_px) / 2.0) as mid_price_low, + LAST_VALUE((ask_px + bid_px) / 2.0) OVER w as mid_price_close, + AVG((ask_px + bid_px) / 2.0) as mid_price_mean, + AVG((ask_px - bid_px) / ((ask_px + bid_px) / 2.0)) as rel_spread_mean, + MIN((ask_px - bid_px) / ((ask_px + bid_px) / 2.0)) as rel_spread_min, + MAX((ask_px - bid_px) / ((ask_px + bid_px) / 2.0)) as rel_spread_max, + FIRST_VALUE((ask_px - bid_px) / ((ask_px + bid_px) / 2.0)) OVER w as rel_spread_first, + LAST_VALUE((ask_px - bid_px) / ((ask_px + bid_px) / 2.0)) OVER w as rel_spread_last, + AVG((bid_qty - ask_qty) / (bid_qty + ask_qty)) as l1_imbalance_mean, + MIN((bid_qty - ask_qty) / (bid_qty + ask_qty)) as l1_imbalance_min, + MAX((bid_qty - ask_qty) / (bid_qty + ask_qty)) as l1_imbalance_max, + FIRST_VALUE((bid_qty - ask_qty) / (bid_qty + ask_qty)) OVER w as l1_imbalance_first, + LAST_VALUE((bid_qty - ask_qty) / (bid_qty + ask_qty)) OVER w as l1_imbalance_last, + AVG((ask_px * bid_qty + bid_px * ask_qty) / (bid_qty + ask_qty)) as micro_price_mean, + MIN((ask_px * bid_qty + bid_px * ask_qty) / (bid_qty + ask_qty)) as micro_price_min, + MAX((ask_px * bid_qty + bid_px * ask_qty) / (bid_qty + ask_qty)) as micro_price_max, + FIRST_VALUE((ask_px * bid_qty + bid_px * ask_qty) / (bid_qty + ask_qty)) OVER w as micro_price_first, + LAST_VALUE((ask_px * bid_qty + bid_px * ask_qty) / (bid_qty + ask_qty)) OVER w as micro_price_last, + AVG((ask_px * ask_qty + bid_px * bid_qty) / (bid_qty + ask_qty)) as weighted_mid_mean, + MIN((ask_px * ask_qty + bid_px * bid_qty) / (bid_qty + ask_qty)) as weighted_mid_min, + MAX((ask_px * ask_qty + bid_px * bid_qty) / (bid_qty + ask_qty)) as weighted_mid_max, + FIRST_VALUE((ask_px * ask_qty + bid_px * bid_qty) / (bid_qty + ask_qty)) OVER w as weighted_mid_first, + LAST_VALUE((ask_px * ask_qty + bid_px * bid_qty) / (bid_qty + ask_qty)) OVER w as weighted_mid_last +FROM md_quotes +GROUP BY strftime('%Y-%m-%d %H:%M:00', tstamp), exchange_id, instrument_id +WINDOW w AS ( + PARTITION BY strftime('%Y-%m-%d %H:%M:00', tstamp), exchange_id, instrument_id + ORDER BY tstamp + RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING +);" + +# Copy the md_1min_bars table to destination database +echo "Copying md_1min_bars to destination..." +sqlite3 "$DEST_DB" " +DROP TABLE IF EXISTS md_1min_bars; +CREATE TABLE IF NOT EXISTS md_1min_bars ( + bin_tstamp TEXT, + tstamp_ns INTEGER, + exchange_id TEXT, + instrument_id TEXT, + open REAL, + high REAL, + low REAL, + close REAL, + volume REAL, + vwap REAL, + num_trades INTEGER, + PRIMARY KEY (bin_tstamp, exchange_id, instrument_id) +);" + +echo "Creating index for md_1min_bars..." +sqlite3 "$DEST_DB" " +CREATE UNIQUE INDEX IF NOT EXISTS md_1min_bars_uidx +ON md_1min_bars(bin_tstamp, exchange_id, instrument_id);" + +echo "Populating md_1min_bars..." +sqlite3 "$SRC_DB" "ATTACH DATABASE '$DEST_DB' AS dest; +INSERT INTO dest.md_1min_bars +SELECT * FROM md_1min_bars;" + +# Create the combined features view in destination database +echo "Creating combined features view..." +sqlite3 "$DEST_DB" " +DROP VIEW IF EXISTS md_1min_features_view; +CREATE VIEW IF NOT EXISTS md_1min_features_view AS +SELECT + b.bin_tstamp, + b.tstamp_ns, + b.exchange_id, + b.instrument_id, + -- OHLCV data from md_1min_bars + b.open, + b.high, + b.low, + b.close, + b.volume, + b.vwap, + b.num_trades, + -- Quote features + q.mid_price_open, + q.mid_price_high, + q.mid_price_low, + q.mid_price_close, + q.mid_price_mean, + q.rel_spread_mean, + q.rel_spread_min, + q.rel_spread_max, + q.rel_spread_first, + q.rel_spread_last, + q.l1_imbalance_mean, + q.l1_imbalance_min, + q.l1_imbalance_max, + q.l1_imbalance_first, + q.l1_imbalance_last, + q.micro_price_mean, + q.micro_price_min, + q.micro_price_max, + q.micro_price_first, + q.micro_price_last, + q.weighted_mid_mean, + q.weighted_mid_min, + q.weighted_mid_max, + q.weighted_mid_first, + q.weighted_mid_last, + -- Trade features + t.price_mean as trade_price_mean, + t.price_median as trade_price_median, + t.signed_volume, + t.order_flow_imbalance, + t.avg_trade_size +FROM md_1min_bars b +LEFT JOIN md_1min_quote_features q + ON b.bin_tstamp = q.bin_tstamp + AND b.exchange_id = q.exchange_id + AND b.instrument_id = q.instrument_id +LEFT JOIN md_1min_trade_features t + ON b.bin_tstamp = t.bin_tstamp + AND b.exchange_id = t.exchange_id + AND b.instrument_id = t.instrument_id;" + +echo "Feature tables created and populated successfully in $DEST_DB!" \ No newline at end of file diff --git a/scripts/__DEPRECATED__/archive_yesterday_md.sh b/scripts/__DEPRECATED__/archive_yesterday_md.sh deleted file mode 100755 index 1b93651..0000000 --- a/scripts/__DEPRECATED__/archive_yesterday_md.sh +++ /dev/null @@ -1,44 +0,0 @@ -#!/bin/bash - -Python=/home/cvtt/.pyenv/python3.12-venv/bin/python3.12 -RootDir=/home/cvtt/prod -export PYTHONPATH=${RootDir} - -host=${1} -if [ "${host}" == "cvttdata" ] -then - ArchiveRootDir=/home/cvtt/prod/archive/md_archive/crypto/cvttdata - CredKey=TSDB_MD_CVTTDATA_RO -elif [ "${host}" == "cloud21" ] -then - ArchiveRootDir=/home/cvtt/prod/archive/md_archive/crypto/cloud21 - CredKey=TSDB_MD_CLD21_RO -else - echo "Unknown host ${host}. ${0} Aborted." - exit 1 -fi - -mkdir -p ${ArchiveRootDir} - -yesterday=$(date -d "yesterday" +%Y%m%d) -Schemas=${2} -if [ "${Schemas}" == "" ] -then - Schemas="coinbase,bnbspot,bnbfut" -fi -echo "Schemas=${Schemas}" - -Cmd= -Cmd="${Python}" -Cmd="${Cmd} ${RootDir}/cvttpy/research/utils/archive_ts_md.py" -Cmd="${Cmd} --config=http://cloud16.cvtt.vpn:6789/apps/md_recorder" -Cmd="${Cmd} --db_credentials_key=${CredKey}" -Cmd="${Cmd} --date=${yesterday}" -Cmd="${Cmd} --schemas=${Schemas}" -Cmd="${Cmd} --root_dir=${ArchiveRootDir}" -Cmd="${Cmd} --format=SQLite" -Cmd="${Cmd} --compress" -echo ${Cmd} -eval ${Cmd} - -echo "${0} ${*} Done."