new aggregate features are added to research

This commit is contained in:
Oleg Sheynin 2025-05-05 15:31:56 -04:00
parent 9aaf356048
commit 066bdbdb93
3 changed files with 276 additions and 45 deletions

View File

@ -10,7 +10,7 @@ if [ "" == "${prj}" ] ; then
usage
fi
Cmd="pushd /home/oleg/develop/cvtt2"
Cmd="pushd /home/oleg/develop/cvtt2-ops"
Cmd="${Cmd} && (cd ${prj}"
Cmd="${Cmd} && git pushall)"
Cmd="${Cmd} && ./build_release.sh -p ${prj}"

275
research/aggregate_features.sh Executable file
View File

@ -0,0 +1,275 @@
#!/bin/bash
if [ $# -ne 2 ]; then
echo "Usage: $0 <source_database_file> <features_database_file>"
exit 1
fi
SRC_DB=$1
DEST_DB=$2
if [ ! -f "$SRC_DB" ]; then
echo "Error: Source database file $SRC_DB does not exist"
exit 1
fi
echo "Creating feature tables in $DEST_DB using data from $SRC_DB..."
# Create md_1min_trade_features table
echo "Creating md_1min_trade_features table..."
sqlite3 "$DEST_DB" "
DROP TABLE IF EXISTS md_1min_trade_features;
CREATE TABLE IF NOT EXISTS md_1min_trade_features (
bin_tstamp TEXT,
tstamp_ns INTEGER,
exchange_id TEXT,
instrument_id TEXT,
price_mean REAL,
price_median REAL,
volume REAL,
vwap REAL,
signed_volume REAL,
order_flow_imbalance REAL,
num_trades INTEGER,
avg_trade_size REAL,
PRIMARY KEY (bin_tstamp, exchange_id, instrument_id)
);"
# Create index for md_1min_trade_features
echo "Creating index for md_1min_trade_features..."
sqlite3 "$DEST_DB" "
CREATE UNIQUE INDEX IF NOT EXISTS md_1min_trade_features_uidx
ON md_1min_trade_features(bin_tstamp, exchange_id, instrument_id);"
# Populate md_1min_trade_features using source database
echo "Populating md_1min_trade_features..."
sqlite3 "$SRC_DB" "ATTACH DATABASE '$DEST_DB' AS dest;
WITH trade_metrics AS (
SELECT
tstamp,
strftime('%Y-%m-%d %H:%M:00', tstamp) as bin_tstamp,
exchange_id,
instrument_id,
px as price,
qty,
CASE
WHEN condition = 'B' THEN qty
WHEN condition = 'S' THEN -qty
ELSE 0
END as signed_qty
FROM md_trades
),
trade_metrics_agg AS (
SELECT
bin_tstamp,
exchange_id,
instrument_id,
COUNT(*) as cnt,
MIN(tstamp) as min_tstamp,
MAX(tstamp) as max_tstamp
FROM trade_metrics
GROUP BY bin_tstamp, exchange_id, instrument_id
)
INSERT INTO dest.md_1min_trade_features
SELECT
tm.bin_tstamp,
CAST(strftime('%s', tm.bin_tstamp) * 1000000000 AS INTEGER) as tstamp_ns,
tm.exchange_id,
tm.instrument_id,
AVG(price) as price_mean,
AVG(CASE WHEN rank_num >= FLOOR(cnt/2.0) AND rank_num <= CEIL(cnt/2.0) THEN price ELSE NULL END) as price_median,
SUM(qty) as volume,
SUM(price * qty) / SUM(qty) as vwap,
SUM(signed_qty) as signed_volume,
SUM(CASE WHEN signed_qty > 0 THEN signed_qty ELSE 0 END) -
SUM(CASE WHEN signed_qty < 0 THEN ABS(signed_qty) ELSE 0 END) as order_flow_imbalance,
COUNT(*) as num_trades,
AVG(qty) as avg_trade_size
FROM (
SELECT
tm.*,
tma.cnt,
ROW_NUMBER() OVER (PARTITION BY tm.bin_tstamp, tm.exchange_id, tm.instrument_id ORDER BY price) as rank_num
FROM trade_metrics tm
JOIN trade_metrics_agg tma
ON tm.bin_tstamp = tma.bin_tstamp
AND tm.exchange_id = tma.exchange_id
AND tm.instrument_id = tma.instrument_id
) tm
GROUP BY tm.bin_tstamp, tm.exchange_id, tm.instrument_id;"
# Create md_1min_quote_features table in destination database
echo "Creating md_1min_quote_features table..."
sqlite3 "$DEST_DB" "
DROP TABLE IF EXISTS md_1min_quote_features;
CREATE TABLE IF NOT EXISTS md_1min_quote_features (
bin_tstamp TEXT,
tstamp_ns INTEGER,
exchange_id TEXT,
instrument_id TEXT,
mid_price_open REAL,
mid_price_high REAL,
mid_price_low REAL,
mid_price_close REAL,
mid_price_mean REAL,
rel_spread_mean REAL,
rel_spread_min REAL,
rel_spread_max REAL,
rel_spread_first REAL,
rel_spread_last REAL,
l1_imbalance_mean REAL,
l1_imbalance_min REAL,
l1_imbalance_max REAL,
l1_imbalance_first REAL,
l1_imbalance_last REAL,
micro_price_mean REAL,
micro_price_min REAL,
micro_price_max REAL,
micro_price_first REAL,
micro_price_last REAL,
weighted_mid_mean REAL,
weighted_mid_min REAL,
weighted_mid_max REAL,
weighted_mid_first REAL,
weighted_mid_last REAL,
PRIMARY KEY (bin_tstamp, exchange_id, instrument_id)
);"
# Create index for md_1min_quote_features
echo "Creating index for md_1min_quote_features..."
sqlite3 "$DEST_DB" "
CREATE UNIQUE INDEX IF NOT EXISTS md_1min_quote_features_uidx
ON md_1min_quote_features(bin_tstamp, exchange_id, instrument_id);"
# Populate md_1min_quote_features using source database
echo "Populating md_1min_quote_features..."
sqlite3 "$SRC_DB" "ATTACH DATABASE '$DEST_DB' AS dest;
INSERT INTO dest.md_1min_quote_features
SELECT
strftime('%Y-%m-%d %H:%M:00', tstamp) as bin_tstamp,
CAST(strftime('%s', tstamp) * 1000000000 AS INTEGER) as tstamp_ns,
exchange_id,
instrument_id,
FIRST_VALUE((ask_px + bid_px) / 2.0) OVER w as mid_price_open,
MAX((ask_px + bid_px) / 2.0) as mid_price_high,
MIN((ask_px + bid_px) / 2.0) as mid_price_low,
LAST_VALUE((ask_px + bid_px) / 2.0) OVER w as mid_price_close,
AVG((ask_px + bid_px) / 2.0) as mid_price_mean,
AVG((ask_px - bid_px) / ((ask_px + bid_px) / 2.0)) as rel_spread_mean,
MIN((ask_px - bid_px) / ((ask_px + bid_px) / 2.0)) as rel_spread_min,
MAX((ask_px - bid_px) / ((ask_px + bid_px) / 2.0)) as rel_spread_max,
FIRST_VALUE((ask_px - bid_px) / ((ask_px + bid_px) / 2.0)) OVER w as rel_spread_first,
LAST_VALUE((ask_px - bid_px) / ((ask_px + bid_px) / 2.0)) OVER w as rel_spread_last,
AVG((bid_qty - ask_qty) / (bid_qty + ask_qty)) as l1_imbalance_mean,
MIN((bid_qty - ask_qty) / (bid_qty + ask_qty)) as l1_imbalance_min,
MAX((bid_qty - ask_qty) / (bid_qty + ask_qty)) as l1_imbalance_max,
FIRST_VALUE((bid_qty - ask_qty) / (bid_qty + ask_qty)) OVER w as l1_imbalance_first,
LAST_VALUE((bid_qty - ask_qty) / (bid_qty + ask_qty)) OVER w as l1_imbalance_last,
AVG((ask_px * bid_qty + bid_px * ask_qty) / (bid_qty + ask_qty)) as micro_price_mean,
MIN((ask_px * bid_qty + bid_px * ask_qty) / (bid_qty + ask_qty)) as micro_price_min,
MAX((ask_px * bid_qty + bid_px * ask_qty) / (bid_qty + ask_qty)) as micro_price_max,
FIRST_VALUE((ask_px * bid_qty + bid_px * ask_qty) / (bid_qty + ask_qty)) OVER w as micro_price_first,
LAST_VALUE((ask_px * bid_qty + bid_px * ask_qty) / (bid_qty + ask_qty)) OVER w as micro_price_last,
AVG((ask_px * ask_qty + bid_px * bid_qty) / (bid_qty + ask_qty)) as weighted_mid_mean,
MIN((ask_px * ask_qty + bid_px * bid_qty) / (bid_qty + ask_qty)) as weighted_mid_min,
MAX((ask_px * ask_qty + bid_px * bid_qty) / (bid_qty + ask_qty)) as weighted_mid_max,
FIRST_VALUE((ask_px * ask_qty + bid_px * bid_qty) / (bid_qty + ask_qty)) OVER w as weighted_mid_first,
LAST_VALUE((ask_px * ask_qty + bid_px * bid_qty) / (bid_qty + ask_qty)) OVER w as weighted_mid_last
FROM md_quotes
GROUP BY strftime('%Y-%m-%d %H:%M:00', tstamp), exchange_id, instrument_id
WINDOW w AS (
PARTITION BY strftime('%Y-%m-%d %H:%M:00', tstamp), exchange_id, instrument_id
ORDER BY tstamp
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
);"
# Copy the md_1min_bars table to destination database
echo "Copying md_1min_bars to destination..."
sqlite3 "$DEST_DB" "
DROP TABLE IF EXISTS md_1min_bars;
CREATE TABLE IF NOT EXISTS md_1min_bars (
bin_tstamp TEXT,
tstamp_ns INTEGER,
exchange_id TEXT,
instrument_id TEXT,
open REAL,
high REAL,
low REAL,
close REAL,
volume REAL,
vwap REAL,
num_trades INTEGER,
PRIMARY KEY (bin_tstamp, exchange_id, instrument_id)
);"
echo "Creating index for md_1min_bars..."
sqlite3 "$DEST_DB" "
CREATE UNIQUE INDEX IF NOT EXISTS md_1min_bars_uidx
ON md_1min_bars(bin_tstamp, exchange_id, instrument_id);"
echo "Populating md_1min_bars..."
sqlite3 "$SRC_DB" "ATTACH DATABASE '$DEST_DB' AS dest;
INSERT INTO dest.md_1min_bars
SELECT * FROM md_1min_bars;"
# Create the combined features view in destination database
echo "Creating combined features view..."
sqlite3 "$DEST_DB" "
DROP VIEW IF EXISTS md_1min_features_view;
CREATE VIEW IF NOT EXISTS md_1min_features_view AS
SELECT
b.bin_tstamp,
b.tstamp_ns,
b.exchange_id,
b.instrument_id,
-- OHLCV data from md_1min_bars
b.open,
b.high,
b.low,
b.close,
b.volume,
b.vwap,
b.num_trades,
-- Quote features
q.mid_price_open,
q.mid_price_high,
q.mid_price_low,
q.mid_price_close,
q.mid_price_mean,
q.rel_spread_mean,
q.rel_spread_min,
q.rel_spread_max,
q.rel_spread_first,
q.rel_spread_last,
q.l1_imbalance_mean,
q.l1_imbalance_min,
q.l1_imbalance_max,
q.l1_imbalance_first,
q.l1_imbalance_last,
q.micro_price_mean,
q.micro_price_min,
q.micro_price_max,
q.micro_price_first,
q.micro_price_last,
q.weighted_mid_mean,
q.weighted_mid_min,
q.weighted_mid_max,
q.weighted_mid_first,
q.weighted_mid_last,
-- Trade features
t.price_mean as trade_price_mean,
t.price_median as trade_price_median,
t.signed_volume,
t.order_flow_imbalance,
t.avg_trade_size
FROM md_1min_bars b
LEFT JOIN md_1min_quote_features q
ON b.bin_tstamp = q.bin_tstamp
AND b.exchange_id = q.exchange_id
AND b.instrument_id = q.instrument_id
LEFT JOIN md_1min_trade_features t
ON b.bin_tstamp = t.bin_tstamp
AND b.exchange_id = t.exchange_id
AND b.instrument_id = t.instrument_id;"
echo "Feature tables created and populated successfully in $DEST_DB!"

View File

@ -1,44 +0,0 @@
#!/bin/bash
Python=/home/cvtt/.pyenv/python3.12-venv/bin/python3.12
RootDir=/home/cvtt/prod
export PYTHONPATH=${RootDir}
host=${1}
if [ "${host}" == "cvttdata" ]
then
ArchiveRootDir=/home/cvtt/prod/archive/md_archive/crypto/cvttdata
CredKey=TSDB_MD_CVTTDATA_RO
elif [ "${host}" == "cloud21" ]
then
ArchiveRootDir=/home/cvtt/prod/archive/md_archive/crypto/cloud21
CredKey=TSDB_MD_CLD21_RO
else
echo "Unknown host ${host}. ${0} Aborted."
exit 1
fi
mkdir -p ${ArchiveRootDir}
yesterday=$(date -d "yesterday" +%Y%m%d)
Schemas=${2}
if [ "${Schemas}" == "" ]
then
Schemas="coinbase,bnbspot,bnbfut"
fi
echo "Schemas=${Schemas}"
Cmd=
Cmd="${Python}"
Cmd="${Cmd} ${RootDir}/cvttpy/research/utils/archive_ts_md.py"
Cmd="${Cmd} --config=http://cloud16.cvtt.vpn:6789/apps/md_recorder"
Cmd="${Cmd} --db_credentials_key=${CredKey}"
Cmd="${Cmd} --date=${yesterday}"
Cmd="${Cmd} --schemas=${Schemas}"
Cmd="${Cmd} --root_dir=${ArchiveRootDir}"
Cmd="${Cmd} --format=SQLite"
Cmd="${Cmd} --compress"
echo ${Cmd}
eval ${Cmd}
echo "${0} ${*} Done."