This commit is contained in:
Oleg Sheynin 2025-10-18 23:10:22 +00:00
parent 2fafbc0831
commit af89a6fc08
11 changed files with 12 additions and 284 deletions

1
.gitignore vendored
View File

@ -3,3 +3,4 @@ nohup.out
.DS_Store
__pycache__
*.html
.history

View File

@ -5,7 +5,7 @@ sys.path.append(f'{os.environ["HOME"]}/develop/cvtt2')
import torch
from cvttpy.tools.config import Config
from cvttpy_base.tools.config import Config
from research.ai.chronos.vwap_predictor import ChronosVwapPredictor, draw_result
import warnings

View File

@ -28,8 +28,8 @@ from research.ai.chronos.training.chronos_training_tools import (
)
from research.ai.chronos.training.chronos_dataset_loader import ModelDatasetLoader
from cvttpy.tools.logger import Log
from cvttpy.tools.timeutils import current_milliseconds
from cvttpy_base.tools.logger import Log
from cvttpy_base.tools.timeutils import current_milliseconds
from chronos import ChronosConfig

View File

@ -9,7 +9,7 @@ from research.tools.ai.settings import (
)
from research.tools.ai.t5 import extrapolate_future_timestamps
from cvttpy.tools.config import Config
from cvttpy_base.tools.config import Config
class ChronosVwapPredictor:

View File

@ -23,7 +23,7 @@
"\n",
"import torch\n",
"\n",
"from cvttpy.tools.config import Config\n",
"from cvttpy_base.tools.config import Config\n",
"from research.ai.chronos.vwap_predictor import ChronosVwapPredictor, draw_result\n",
"\n",
"CONFIG_FILE = f'file://{os.environ[\"HOME\"]}/develop/cvtt2/research/ai/chronos/chronos_h5.cfg'\n"

View File

@ -3,7 +3,7 @@
import datetime
import zoneinfo
from cvttpy.tools.timeutils import parse_to_nanos
from cvttpy_base.tools.timeutils import parse_to_nanos
ExchangeTsNsT = int

View File

@ -5,7 +5,7 @@ from zipfile import ZipFile
from typing import Generator
from cvttpy.research.eqt_ticks import EqtDataTick
from cvttpy.tools.logger import Log
from cvttpy_base.tools.logger import Log
def eqt_data_tick_gen(zpfile: str) -> Generator[EqtDataTick, str, None]:
with ZipFile(zpfile) as zp:

View File

@ -1,273 +0,0 @@
from __future__ import annotations
import datetime
import gzip
import os
import sqlite3
from typing import Any, Dict, List
from cvttpy.tools.app import App
from cvttpy.tools.base import NamedObject
from cvttpy.tools.config import Config
from cvttpy.tools.secrets import Secrets, Secret
from cvttpy.tools.cvtt_tools import CvttAppConfig
from cvttpy.tools.db.db_client import DatabaseClient
from cvttpy.tools.hdf5 import save_df_to_hdf5
from cvttpy.tools.logger import Log
from cvttpy.tools.timeutils import NanoPerSec, format_nanos_utc, parse_to_nanos
TABLES = ["md_trades", "md_booktops", "md_booksnaps"]
VIEWS = ["ohlcv_1min"]
class DataSaver(NamedObject):
# tables_: List[str]
schemas_: List[str]
format_: str
file_name_: str
ISO_date_: str
def __init__(self):
App.instance().add_cmdline_arg(
"--root_dir",
type=str,
default=".",
required=False,
help="Output Root Directory",
)
App.instance().add_cmdline_arg(
"--db_credentials_key",
type=str,
default=None,
required=True,
help="Database Access Credentials Key",
)
App.instance().add_cmdline_arg(
"--date",
type=int,
default=None,
required=True,
help="Date: YYYYMMDD",
)
App.instance().add_cmdline_arg(
"--schemas",
type=str,
default="public",
required=False,
help="Database Schemas",
)
App.instance().add_cmdline_arg(
"--format",
type=str,
default="HDF5",
choices=["HDF5", "SQLite"],
required=False,
help="Data Format",
)
App.instance().add_cmdline_arg(
"--compress",
action="store_true",
default=False,
help="To compress result",
)
App.instance().add_call(App.Stage.Config, self._on_config())
App.instance().add_call(App.Stage.Run, self.run())
async def _on_config(self) -> None:
self.format_ = App.instance().get_argument("format")
self.schemas_ = App.instance().get_argument("schemas", "public").split(",")
date = App.instance().get_argument("date", None)
nanos = parse_to_nanos(tm=str(date), format="%Y%m%d")
self.ISO_date_ = format_nanos_utc(nanos=nanos, fmt="%Y-%m-%d")
file_date = format_nanos_utc(nanos=nanos, fmt="%Y%m%d")
root_dir = App.instance().get_argument("root_dir")
outdir = f"{root_dir}/" + format_nanos_utc(nanos=nanos, fmt="%Y/%m")
os.makedirs(outdir, exist_ok=True)
self.file_name_ = f"{outdir}/{file_date}.mktdata"
if self.format_ == "HDF5":
self.file_name_ += ".h5"
elif self.format_ == "SQLite":
self.file_name_ += ".db"
async def run(self) -> None:
cfg: Config = CvttAppConfig.instance().get_subconfig("md_recorder/db", Config())
secret:Secret = Secrets.instance().get(App.instance().args_.get("db_credentials_key", "db_credentials_key?"))
self.db_client_ = DatabaseClient.create(
class_name=cfg.get_value("class", "class?"),
secret=secret
)
if self.format_ == "HDF5":
await self._create_hdf5(dbclnt=self.db_client_)
elif self.format_ == "SQLite":
await self._create_sqlite(dbclnt=self.db_client_)
await self.db_client_.stop()
async def _create_sqlite(self, dbclnt: DatabaseClient) -> None:
conn = sqlite3.connect(self.file_name_)
sqlite_table: str = ""
insert_statements: List[str] = []
count: int = 0
def _create_table(
conn: sqlite3.Connection, tname: str, columns: List, prim_key: str = ""
):
col_sql_type = {
"timestamptz": "INTEGER",
"text": "TEXT",
"float8": "REAL",
"int8": "INTEGER",
}
col_defs: List[str] = []
for col in columns:
col_defs.append(f"{col.name} {col_sql_type.get(col.type_display)}")
qry = f"CREATE TABLE IF NOT EXISTS {tname} ({','.join(col_defs)}"
if prim_key:
qry += f", {prim_key})"
else:
qry += ")"
Log.info(qry)
conn.execute(qry)
def _val_to_str(val: Any) -> str:
if isinstance(val, str):
return f"'{val}'"
elif isinstance(val, datetime.datetime):
return str(
int(
val.replace(tzinfo=datetime.timezone.utc).timestamp()
* NanoPerSec
)
)
else:
return str(val)
def _flush() -> None:
nonlocal insert_statements, conn, count, sqlite_table
count += len(insert_statements)
Log.info(
f"Flushing next {len(insert_statements)} {sqlite_table} rows. Total={count} ..."
)
query = "BEGIN TRANSACTION;"
conn.execute(query)
for query in insert_statements:
conn.execute(query)
query = "COMMIT;"
conn.execute(query)
insert_statements = []
def _on_record(data: Dict, columns: List[str]) -> bool:
nonlocal sqlite_table
nonlocal insert_statements
colnames = columns
vals: List = [_val_to_str(data.get(col)) for col in colnames]
qry = f"INSERT INTO {sqlite_table} ({','.join(colnames)}) values ({','.join(vals)});"
insert_statements.append(qry)
if len(insert_statements) >= 100000:
_flush()
return True
def _compress() -> None:
Log.info(f"Compressing the file {self.file_name_}")
with open(self.file_name_, "rb") as fin:
with gzip.open(f"{self.file_name_}.gz", "wb") as fout:
fout.writelines(fin)
os.unlink(self.file_name_)
for schema in self.schemas_:
for view in VIEWS:
sqlite_table = f"{schema}_{view}"
qry_view = f"{schema}.{view}"
qry = f"select * from {qry_view} where cast(tstamp as DATE) = '{self.ISO_date_}'"
# fetch one record to get column attributes
res = await dbclnt.fetch_one_record(qry + " limit 1")
if not res:
Log.warning(f"{self.fname()} No results for query {qry=}")
continue
_, column_attributes = res
_create_table(
conn=conn,
tname=sqlite_table,
columns=column_attributes,
prim_key="PRIMARY KEY (tstamp, exchange_id, instrument_id)",
)
Log.info(f"{self.fname()} {qry=}")
await dbclnt.fetch(query=qry, on_record_cb=_on_record)
_flush()
count = 0
for table in TABLES:
sqlite_table = f"{schema}_{table}"
qry_table = f"{schema}.{table}"
qry = f"select * from {qry_table} where cast(time as DATE) = '{self.ISO_date_}'"
# fetch one record to get column attributes
res = await dbclnt.fetch_one_record(qry + " limit 1")
if not res:
Log.warning(f"{self.fname()} No results for query {qry=}")
continue
_, column_attributes = res
_create_table(
conn=conn,
tname=sqlite_table,
columns=column_attributes,
# prim_key="PRIMARY KEY (time, exchange_id, instrument_id)",
)
Log.info(f"{self.fname()} {qry=}")
await dbclnt.fetch(query=qry, on_record_cb=_on_record)
_flush()
count = 0
conn.close()
if App.instance().get_argument("compress", False):
_compress()
async def _create_hdf5(self, dbclnt: DatabaseClient) -> None:
## COULD BE REAL SLOW TO CREATE DataFrame
for schema in self.schemas_:
for table in TABLES:
qry_table = f"{schema}.{table}"
qry = f"select * from {qry_table} where cast(time as DATE) = '{self.ISO_date_}'"
Log.info(f"{self.fname()} {qry=}")
df = await dbclnt.fetch_as_dataframe(qry)
hdf5_key = f"{schema}_{table}"
save_df_to_hdf5(
df=df, key=hdf5_key, file_name=self.file_name_, complevel=7
)
for view in VIEWS:
qry_view = f"{schema}.{view}"
qry = f"select * from {qry_view} where cast(tstamp as DATE) = '{self.ISO_date_}'"
Log.info(f"{self.fname()} {qry=}")
df = await dbclnt.fetch_as_dataframe(qry)
hdf5_key = f"{schema}_{view}"
save_df_to_hdf5(
df=df, key=hdf5_key, file_name=self.file_name_, complevel=7
)
if __name__ == "__main__":
App()
CvttAppConfig()
Secrets()
DataSaver()
App.instance().run()

View File

@ -26,7 +26,7 @@
"import sys\n",
"sys.path.append(\"/home/oleg/develop/cvtt2\")\n",
"\n",
"from cvttpy.tools.config import Config\n",
"from cvttpy_base.tools.config import Config\n",
"from cvttpy.research.charts.scatter_chart import (\n",
" PlotSettings,\n",
" ScatterChartSettings,\n",

View File

@ -30,7 +30,7 @@
"import sys\n",
"sys.path.append(\"/home/oleg/develop/cvtt2\")\n",
"\n",
"from cvttpy.tools.config import Config\n",
"from cvttpy_base.tools.config import Config\n",
"\n",
"from cvttpy.research.vwap import VwapCalculator\n",
"from cvttpy.tools.credentials import Credentials\n",

View File

@ -17,7 +17,7 @@
"\n",
"from cvttpy.research.calculators.ma import ta_sma\n",
"from cvttpy.research.calculators.vwap import ta_vwap\n",
"from cvttpy.tools.config import Config\n",
"from cvttpy_base.tools.config import Config\n",
"from cvttpy.tools.credentials import Credentials\n",
"\n",
"from cvttpy.research.charts.scatter_chart import (\n",
@ -3238,7 +3238,7 @@
}
],
"source": [
"from cvttpy.tools.logger import Log\n",
"from cvttpy_base.tools.logger import Log\n",
"\n",
"\n",
"Log.set_level(\"WARNING\")\n",