#!/usr/bin/env python3
"""Archive T0.5/T0.95 HK IPO market-heat snapshots from public web pages."""
from __future__ import annotations
import argparse
import csv
import hashlib
import html
import re
import sqlite3
import subprocess
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from urllib.request import Request, urlopen
DEFAULT_DB_PATH = Path("data/hk_ipo.sqlite")
DEFAULT_SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
SNAPSHOT_DIR = Path("data/snapshots")
VBKR_URL = "https://www.vbkr.com/ipo/hk/v2/ipo-hk-index"
T0_5_STAGE = "T0_5_market_heat"
T0_95_STAGE = "T0_95_final_heat"
@dataclass(frozen=True)
class MarketHeatRow:
ticker: str
stock_name: str
offer_price_low_hkd: float | None
offer_price_high_hkd: float | None
board_lot: int | None
min_subscription_amount_hkd: float | None
margin_subscription_multiple: float | None
margin_multiple_label: str
subscription_deadline: str
result_announcement_date: str
listing_date: str
prospectus_url: str
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Repo-relative SQLite database path.")
parser.add_argument("--schema", default=str(DEFAULT_SCHEMA_PATH), help="Repo-relative schema path.")
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
parser.add_argument("--url", default=VBKR_URL, help="Market-heat page URL.")
parser.add_argument("--tickers", help="Comma-separated tickers to archive. Defaults to all parsed tickers in ipo_master.")
parser.add_argument(
"--stage",
choices=[T0_5_STAGE, T0_95_STAGE],
default=T0_5_STAGE,
help="Decision stage represented by this snapshot. Use T0_95_final_heat only when the snapshot is still actionable before the user's order cutoff.",
)
return parser.parse_args()
def parse_as_of(value: str | None) -> str:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def compact_timestamp(value: str) -> str:
return value.replace("-", "").replace(":", "").replace("+00:00", "Z")
def source_date(value: str) -> str:
return datetime.fromisoformat(value.replace("Z", "+00:00")).date().isoformat()
def stage_slug(stage: str) -> str:
return stage.lower()
def fetch_bytes(url: str) -> bytes:
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(request, timeout=60) as response:
return response.read()
def sha256_bytes(payload: bytes) -> str:
return hashlib.sha256(payload).hexdigest()
def clean_cell(value: str) -> str:
value = re.sub(r"
]*>", "\n", value, flags=re.I)
value = re.sub(r"<.*?>", " ", value, flags=re.S)
value = html.unescape(value)
return "\n".join(" ".join(line.split()) for line in value.splitlines() if line.strip())
def as_float(value: str) -> float | None:
cleaned = value.replace(",", "").replace("HKD", "").replace("$", "").strip()
if not cleaned or cleaned == "-":
return None
try:
return float(cleaned)
except ValueError:
return None
def as_int(value: str) -> int | None:
number = as_float(value)
if number is None:
return None
return int(number)
def parse_offer_price(value: str) -> tuple[float | None, float | None]:
numbers = [as_float(part) for part in value.splitlines()]
numbers = [number for number in numbers if number is not None]
if not numbers:
return None, None
if len(numbers) == 1:
return numbers[0], numbers[0]
return numbers[0], numbers[-1]
def parse_margin_multiple(value: str) -> float | None:
match = re.search(r"([0-9][0-9,]*(?:\.[0-9]+)?)\s*倍", value)
if not match:
return None
return float(match.group(1).replace(",", ""))
def parse_deadline(value: str) -> str:
lines = [line.strip() for line in value.splitlines() if line.strip()]
if len(lines) >= 2 and re.match(r"\d{4}-\d{2}-\d{2}$", lines[0]) and re.match(r"\d{1,2}:\d{2}$", lines[1]):
return f"{lines[0]} {lines[1]}"
return " ".join(lines)
def is_still_actionable(row: MarketHeatRow, as_of: str) -> bool:
try:
deadline_hkt = datetime.strptime(row.subscription_deadline, "%Y-%m-%d %H:%M")
except ValueError:
return False
observed_at = datetime.fromisoformat(as_of.replace("Z", "+00:00"))
observed_hkt = observed_at.astimezone(timezone(timedelta(hours=8))).replace(tzinfo=None)
return observed_hkt < deadline_hkt
def parse_rows(page: str) -> list[MarketHeatRow]:
rows: list[MarketHeatRow] = []
for tr in re.findall(r"
]*>(.*?)
", page, flags=re.S):
if not re.search(r"\d{5}\.HK", tr):
continue
cells = re.findall(r"]*>(.*?) | ", tr, flags=re.S)
if len(cells) < 11:
continue
text_cells = [clean_cell(cell) for cell in cells]
code_match = re.search(r"(\d{5})\.HK", text_cells[0])
if not code_match:
continue
prospectus_match = re.search(r'href="([^"]+)"', cells[10])
offer_low, offer_high = parse_offer_price(text_cells[3])
rows.append(
MarketHeatRow(
ticker=code_match.group(1),
stock_name=text_cells[1].splitlines()[0],
offer_price_low_hkd=offer_low,
offer_price_high_hkd=offer_high,
board_lot=as_int(text_cells[4]),
min_subscription_amount_hkd=as_float(text_cells[5]),
margin_subscription_multiple=parse_margin_multiple(text_cells[6]),
margin_multiple_label=text_cells[6],
subscription_deadline=parse_deadline(text_cells[7]),
result_announcement_date=text_cells[8],
listing_date=text_cells[9],
prospectus_url=html.unescape(prospectus_match.group(1)) if prospectus_match else "",
)
)
return rows
def save_raw_snapshot(payload: bytes, as_of: str) -> tuple[str, str]:
raw_dir = Path("data/raw/market_heat")
raw_dir.mkdir(parents=True, exist_ok=True)
path = raw_dir / f"vbkr_ipo_hk_index_{compact_timestamp(as_of)}.html"
if not path.exists() or path.read_bytes() != payload:
path.write_bytes(payload)
return path.as_posix(), sha256_bytes(payload)
def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None:
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
rows = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}").fetchall()
columns = [description[0] for description in conn.execute(f"SELECT * FROM {table} LIMIT 0").description]
with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle:
writer = csv.writer(handle, lineterminator="\n")
writer.writerow(columns)
writer.writerows(rows)
def known_tickers(conn: sqlite3.Connection) -> set[str]:
return {row[0] for row in conn.execute("SELECT ticker FROM ipo_master")}
def selected_tickers(value: str | None, parsed_rows: list[MarketHeatRow], conn: sqlite3.Connection) -> set[str]:
if value:
return {ticker.strip().zfill(5) for ticker in value.split(",") if ticker.strip()}
known = known_tickers(conn)
return {row.ticker for row in parsed_rows if row.ticker in known}
def upsert_rows(
conn: sqlite3.Connection,
rows: list[MarketHeatRow],
selected: set[str],
local_path: str,
file_sha256: str,
url: str,
as_of: str,
stage: str,
) -> int:
written = 0
slug = stage_slug(stage)
title = (
"VBKR IPO near-deadline final heat snapshot"
if stage == T0_95_STAGE
else "VBKR IPO expected margin multiple snapshot"
)
source_notes = (
"Non-official near-deadline market heat snapshot archived before the user's order cutoff. Final HKEX subscription data must still come from allotment results."
if stage == T0_95_STAGE
else "Non-official subscription-period market heat snapshot. Final subscription data must come from HKEX allotment results."
)
for row in rows:
if row.ticker not in selected:
continue
if stage == T0_95_STAGE and not is_still_actionable(row, as_of):
continue
source_id = f"{row.ticker}_{slug}_vbkr_{compact_timestamp(as_of)}"
heat_id = source_id
conn.execute(
"""
INSERT INTO source_refs (
source_id, ticker, source_type, title, path_base, local_path, url,
file_sha256, source_date, archived_at, notes
)
VALUES (?, ?, ?, ?, 'repo_root', ?, ?, ?, ?, ?, ?)
ON CONFLICT(source_id) DO UPDATE SET
local_path = excluded.local_path,
url = excluded.url,
file_sha256 = excluded.file_sha256,
source_date = excluded.source_date,
archived_at = excluded.archived_at,
notes = excluded.notes
""",
(
source_id,
row.ticker,
slug,
title,
local_path,
url,
file_sha256,
source_date(as_of),
as_of,
source_notes,
),
)
conn.execute(
"""
INSERT INTO ipo_market_heat (
heat_id, ticker, source_id, stage, provider, observed_at,
margin_subscription_multiple, margin_multiple_label,
offer_price_low_hkd, offer_price_high_hkd, board_lot,
min_subscription_amount_hkd, subscription_deadline,
result_announcement_date, listing_date, data_as_of, notes
)
VALUES (?, ?, ?, ?, 'VBKR/Jieli', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker, provider, observed_at) DO UPDATE SET
source_id = excluded.source_id,
stage = excluded.stage,
margin_subscription_multiple = excluded.margin_subscription_multiple,
margin_multiple_label = excluded.margin_multiple_label,
offer_price_low_hkd = excluded.offer_price_low_hkd,
offer_price_high_hkd = excluded.offer_price_high_hkd,
board_lot = excluded.board_lot,
min_subscription_amount_hkd = excluded.min_subscription_amount_hkd,
subscription_deadline = excluded.subscription_deadline,
result_announcement_date = excluded.result_announcement_date,
listing_date = excluded.listing_date,
data_as_of = excluded.data_as_of,
notes = excluded.notes
""",
(
heat_id,
row.ticker,
source_id,
stage,
as_of,
row.margin_subscription_multiple,
row.margin_multiple_label,
row.offer_price_low_hkd,
row.offer_price_high_hkd,
row.board_lot,
row.min_subscription_amount_hkd,
row.subscription_deadline,
row.result_announcement_date,
row.listing_date,
as_of,
f"Stock name on source page: {row.stock_name}. Prospectus URL shown by source: {row.prospectus_url}",
),
)
written += 1
return written
def main() -> int:
args = parse_args()
as_of = parse_as_of(args.as_of)
payload = fetch_bytes(args.url)
page = payload.decode("utf-8", "replace")
parsed_rows = parse_rows(page)
local_path, file_sha256 = save_raw_snapshot(payload, as_of)
db_path = Path(args.db)
schema_path = Path(args.schema)
with sqlite3.connect(db_path) as conn:
conn.executescript(schema_path.read_text(encoding="utf-8"))
selected = selected_tickers(args.tickers, parsed_rows, conn)
written = upsert_rows(conn, parsed_rows, selected, local_path, file_sha256, args.url, as_of, args.stage)
export_snapshot(conn, "ipo_market_heat", "ticker, observed_at")
export_snapshot(conn, "source_refs", "source_id")
subprocess.run(
[
".venv/bin/python",
"scripts/update_sync_state.py",
"--as-of",
as_of,
"--mode",
"market_heat_source_update",
"--summary-limit",
"5",
],
check=True,
)
print("market heat archived")
print(f"as_of: {as_of}")
print(f"stage: {args.stage}")
print(f"raw_snapshot: {local_path}")
print(f"parsed_rows: {len(parsed_rows)}")
print(f"written_rows: {written}")
return 0
if __name__ == "__main__":
raise SystemExit(main())