research/cvttpy_research/utils/archive_ts_md.py
2025-07-02 01:23:57 +00:00

274 lines
9.2 KiB
Python

from __future__ import annotations
import datetime
import gzip
import os
import sqlite3
from typing import Any, Dict, List
from cvttpy.tools.app import App
from cvttpy.tools.base import NamedObject
from cvttpy.tools.config import Config
from cvttpy.tools.secrets import Secrets, Secret
from cvttpy.tools.cvtt_tools import CvttAppConfig
from cvttpy.tools.db.db_client import DatabaseClient
from cvttpy.tools.hdf5 import save_df_to_hdf5
from cvttpy.tools.logger import Log
from cvttpy.tools.timeutils import NanoPerSec, format_nanos_utc, parse_to_nanos
TABLES = ["md_trades", "md_booktops", "md_booksnaps"]
VIEWS = ["ohlcv_1min"]
class DataSaver(NamedObject):
# tables_: List[str]
schemas_: List[str]
format_: str
file_name_: str
ISO_date_: str
def __init__(self):
App.instance().add_cmdline_arg(
"--root_dir",
type=str,
default=".",
required=False,
help="Output Root Directory",
)
App.instance().add_cmdline_arg(
"--db_credentials_key",
type=str,
default=None,
required=True,
help="Database Access Credentials Key",
)
App.instance().add_cmdline_arg(
"--date",
type=int,
default=None,
required=True,
help="Date: YYYYMMDD",
)
App.instance().add_cmdline_arg(
"--schemas",
type=str,
default="public",
required=False,
help="Database Schemas",
)
App.instance().add_cmdline_arg(
"--format",
type=str,
default="HDF5",
choices=["HDF5", "SQLite"],
required=False,
help="Data Format",
)
App.instance().add_cmdline_arg(
"--compress",
action="store_true",
default=False,
help="To compress result",
)
App.instance().add_call(App.Stage.Config, self._on_config())
App.instance().add_call(App.Stage.Run, self.run())
async def _on_config(self) -> None:
self.format_ = App.instance().get_argument("format")
self.schemas_ = App.instance().get_argument("schemas", "public").split(",")
date = App.instance().get_argument("date", None)
nanos = parse_to_nanos(tm=str(date), format="%Y%m%d")
self.ISO_date_ = format_nanos_utc(nanos=nanos, fmt="%Y-%m-%d")
file_date = format_nanos_utc(nanos=nanos, fmt="%Y%m%d")
root_dir = App.instance().get_argument("root_dir")
outdir = f"{root_dir}/" + format_nanos_utc(nanos=nanos, fmt="%Y/%m")
os.makedirs(outdir, exist_ok=True)
self.file_name_ = f"{outdir}/{file_date}.mktdata"
if self.format_ == "HDF5":
self.file_name_ += ".h5"
elif self.format_ == "SQLite":
self.file_name_ += ".db"
async def run(self) -> None:
cfg: Config = CvttAppConfig.instance().get_subconfig("md_recorder/db", Config())
secret:Secret = Secrets.instance().get(App.instance().args_.get("db_credentials_key", "db_credentials_key?"))
self.db_client_ = DatabaseClient.create(
class_name=cfg.get_value("class", "class?"),
secret=secret
)
if self.format_ == "HDF5":
await self._create_hdf5(dbclnt=self.db_client_)
elif self.format_ == "SQLite":
await self._create_sqlite(dbclnt=self.db_client_)
await self.db_client_.stop()
async def _create_sqlite(self, dbclnt: DatabaseClient) -> None:
conn = sqlite3.connect(self.file_name_)
sqlite_table: str = ""
insert_statements: List[str] = []
count: int = 0
def _create_table(
conn: sqlite3.Connection, tname: str, columns: List, prim_key: str = ""
):
col_sql_type = {
"timestamptz": "INTEGER",
"text": "TEXT",
"float8": "REAL",
"int8": "INTEGER",
}
col_defs: List[str] = []
for col in columns:
col_defs.append(f"{col.name} {col_sql_type.get(col.type_display)}")
qry = f"CREATE TABLE IF NOT EXISTS {tname} ({','.join(col_defs)}"
if prim_key:
qry += f", {prim_key})"
else:
qry += ")"
Log.info(qry)
conn.execute(qry)
def _val_to_str(val: Any) -> str:
if isinstance(val, str):
return f"'{val}'"
elif isinstance(val, datetime.datetime):
return str(
int(
val.replace(tzinfo=datetime.timezone.utc).timestamp()
* NanoPerSec
)
)
else:
return str(val)
def _flush() -> None:
nonlocal insert_statements, conn, count, sqlite_table
count += len(insert_statements)
Log.info(
f"Flushing next {len(insert_statements)} {sqlite_table} rows. Total={count} ..."
)
query = "BEGIN TRANSACTION;"
conn.execute(query)
for query in insert_statements:
conn.execute(query)
query = "COMMIT;"
conn.execute(query)
insert_statements = []
def _on_record(data: Dict, columns: List[str]) -> bool:
nonlocal sqlite_table
nonlocal insert_statements
colnames = columns
vals: List = [_val_to_str(data.get(col)) for col in colnames]
qry = f"INSERT INTO {sqlite_table} ({','.join(colnames)}) values ({','.join(vals)});"
insert_statements.append(qry)
if len(insert_statements) >= 100000:
_flush()
return True
def _compress() -> None:
Log.info(f"Compressing the file {self.file_name_}")
with open(self.file_name_, "rb") as fin:
with gzip.open(f"{self.file_name_}.gz", "wb") as fout:
fout.writelines(fin)
os.unlink(self.file_name_)
for schema in self.schemas_:
for view in VIEWS:
sqlite_table = f"{schema}_{view}"
qry_view = f"{schema}.{view}"
qry = f"select * from {qry_view} where cast(tstamp as DATE) = '{self.ISO_date_}'"
# fetch one record to get column attributes
res = await dbclnt.fetch_one_record(qry + " limit 1")
if not res:
Log.warning(f"{self.fname()} No results for query {qry=}")
continue
_, column_attributes = res
_create_table(
conn=conn,
tname=sqlite_table,
columns=column_attributes,
prim_key="PRIMARY KEY (tstamp, exchange_id, instrument_id)",
)
Log.info(f"{self.fname()} {qry=}")
await dbclnt.fetch(query=qry, on_record_cb=_on_record)
_flush()
count = 0
for table in TABLES:
sqlite_table = f"{schema}_{table}"
qry_table = f"{schema}.{table}"
qry = f"select * from {qry_table} where cast(time as DATE) = '{self.ISO_date_}'"
# fetch one record to get column attributes
res = await dbclnt.fetch_one_record(qry + " limit 1")
if not res:
Log.warning(f"{self.fname()} No results for query {qry=}")
continue
_, column_attributes = res
_create_table(
conn=conn,
tname=sqlite_table,
columns=column_attributes,
# prim_key="PRIMARY KEY (time, exchange_id, instrument_id)",
)
Log.info(f"{self.fname()} {qry=}")
await dbclnt.fetch(query=qry, on_record_cb=_on_record)
_flush()
count = 0
conn.close()
if App.instance().get_argument("compress", False):
_compress()
async def _create_hdf5(self, dbclnt: DatabaseClient) -> None:
## COULD BE REAL SLOW TO CREATE DataFrame
for schema in self.schemas_:
for table in TABLES:
qry_table = f"{schema}.{table}"
qry = f"select * from {qry_table} where cast(time as DATE) = '{self.ISO_date_}'"
Log.info(f"{self.fname()} {qry=}")
df = await dbclnt.fetch_as_dataframe(qry)
hdf5_key = f"{schema}_{table}"
save_df_to_hdf5(
df=df, key=hdf5_key, file_name=self.file_name_, complevel=7
)
for view in VIEWS:
qry_view = f"{schema}.{view}"
qry = f"select * from {qry_view} where cast(tstamp as DATE) = '{self.ISO_date_}'"
Log.info(f"{self.fname()} {qry=}")
df = await dbclnt.fetch_as_dataframe(qry)
hdf5_key = f"{schema}_{view}"
save_df_to_hdf5(
df=df, key=hdf5_key, file_name=self.file_name_, complevel=7
)
if __name__ == "__main__":
App()
CvttAppConfig()
Secrets()
DataSaver()
App.instance().run()