Files
hk-ipo/scripts/archive_t0_5_market_heat.py
T
geometrybase 222f55c140 Add T0.5 market heat IPO analysis
Request:
- Test whether subscription-period T0.5 market heat data can be captured and incorporated into IPO analysis.

Changes:
- Add an ipo_market_heat table for non-official market-heat snapshots.
- Add a VBKR/Jieli archive script for expected margin subscription multiples.
- Archive the 2026-06-15T18:40:00Z heat snapshot for 01392, 02335, 06067, 06106, and 06132.
- Add an experimental T0.5 overlay rule file and a Chinese cross-IPO trial report.
- Update archivist and analyst skills so T0.5 remains separate from official T1 allotment demand.

Verification:
- .venv/bin/python -m py_compile scripts/archive_t0_5_market_heat.py scripts/build_analysis_dataset.py scripts/update_sync_state.py
- Python sqlite3 PRAGMA integrity_check returned ok and foreign_key_check returned zero rows.
- Confirmed 5 ipo_market_heat rows and 5 t0_5_market_heat source_refs for the frozen snapshot.
- git diff --cached --check

Next useful context:
- T0.5 data is non-official and should be resampled during the subscription window, then compared against T1 official allotment results.
2026-06-15 15:44:32 +00:00

315 lines
11 KiB
Python

#!/usr/bin/env python3
"""Archive T0.5 HK IPO market-heat snapshots from public web pages."""
from __future__ import annotations
import argparse
import csv
import hashlib
import html
import re
import sqlite3
import subprocess
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from urllib.request import Request, urlopen
DEFAULT_DB_PATH = Path("data/hk_ipo.sqlite")
DEFAULT_SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
SNAPSHOT_DIR = Path("data/snapshots")
VBKR_URL = "https://www.vbkr.com/ipo/hk/v2/ipo-hk-index"
@dataclass(frozen=True)
class MarketHeatRow:
ticker: str
stock_name: str
offer_price_low_hkd: float | None
offer_price_high_hkd: float | None
board_lot: int | None
min_subscription_amount_hkd: float | None
margin_subscription_multiple: float | None
margin_multiple_label: str
subscription_deadline: str
result_announcement_date: str
listing_date: str
prospectus_url: str
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Repo-relative SQLite database path.")
parser.add_argument("--schema", default=str(DEFAULT_SCHEMA_PATH), help="Repo-relative schema path.")
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
parser.add_argument("--url", default=VBKR_URL, help="Market-heat page URL.")
parser.add_argument("--tickers", help="Comma-separated tickers to archive. Defaults to all parsed tickers in ipo_master.")
return parser.parse_args()
def parse_as_of(value: str | None) -> str:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def compact_timestamp(value: str) -> str:
return value.replace("-", "").replace(":", "").replace("+00:00", "Z")
def source_date(value: str) -> str:
return datetime.fromisoformat(value.replace("Z", "+00:00")).date().isoformat()
def fetch_bytes(url: str) -> bytes:
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(request, timeout=60) as response:
return response.read()
def sha256_bytes(payload: bytes) -> str:
return hashlib.sha256(payload).hexdigest()
def clean_cell(value: str) -> str:
value = re.sub(r"<br[^>]*>", "\n", value, flags=re.I)
value = re.sub(r"<.*?>", " ", value, flags=re.S)
value = html.unescape(value)
return "\n".join(" ".join(line.split()) for line in value.splitlines() if line.strip())
def as_float(value: str) -> float | None:
cleaned = value.replace(",", "").replace("HKD", "").replace("$", "").strip()
if not cleaned or cleaned == "-":
return None
try:
return float(cleaned)
except ValueError:
return None
def as_int(value: str) -> int | None:
number = as_float(value)
if number is None:
return None
return int(number)
def parse_offer_price(value: str) -> tuple[float | None, float | None]:
numbers = [as_float(part) for part in value.splitlines()]
numbers = [number for number in numbers if number is not None]
if not numbers:
return None, None
if len(numbers) == 1:
return numbers[0], numbers[0]
return numbers[0], numbers[-1]
def parse_margin_multiple(value: str) -> float | None:
match = re.search(r"([0-9][0-9,]*(?:\.[0-9]+)?)\s*倍", value)
if not match:
return None
return float(match.group(1).replace(",", ""))
def parse_deadline(value: str) -> str:
lines = [line.strip() for line in value.splitlines() if line.strip()]
if len(lines) >= 2 and re.match(r"\d{4}-\d{2}-\d{2}$", lines[0]) and re.match(r"\d{1,2}:\d{2}$", lines[1]):
return f"{lines[0]} {lines[1]}"
return " ".join(lines)
def parse_rows(page: str) -> list[MarketHeatRow]:
rows: list[MarketHeatRow] = []
for tr in re.findall(r"<tr[^>]*>(.*?)</tr>", page, flags=re.S):
if not re.search(r"\d{5}\.HK", tr):
continue
cells = re.findall(r"<td[^>]*>(.*?)</td>", tr, flags=re.S)
if len(cells) < 11:
continue
text_cells = [clean_cell(cell) for cell in cells]
code_match = re.search(r"(\d{5})\.HK", text_cells[0])
if not code_match:
continue
prospectus_match = re.search(r'href="([^"]+)"', cells[10])
offer_low, offer_high = parse_offer_price(text_cells[3])
rows.append(
MarketHeatRow(
ticker=code_match.group(1),
stock_name=text_cells[1].splitlines()[0],
offer_price_low_hkd=offer_low,
offer_price_high_hkd=offer_high,
board_lot=as_int(text_cells[4]),
min_subscription_amount_hkd=as_float(text_cells[5]),
margin_subscription_multiple=parse_margin_multiple(text_cells[6]),
margin_multiple_label=text_cells[6],
subscription_deadline=parse_deadline(text_cells[7]),
result_announcement_date=text_cells[8],
listing_date=text_cells[9],
prospectus_url=html.unescape(prospectus_match.group(1)) if prospectus_match else "",
)
)
return rows
def save_raw_snapshot(payload: bytes, as_of: str) -> tuple[str, str]:
raw_dir = Path("data/raw/market_heat")
raw_dir.mkdir(parents=True, exist_ok=True)
path = raw_dir / f"vbkr_ipo_hk_index_{compact_timestamp(as_of)}.html"
if not path.exists() or path.read_bytes() != payload:
path.write_bytes(payload)
return path.as_posix(), sha256_bytes(payload)
def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None:
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
rows = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}").fetchall()
columns = [description[0] for description in conn.execute(f"SELECT * FROM {table} LIMIT 0").description]
with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle:
writer = csv.writer(handle, lineterminator="\n")
writer.writerow(columns)
writer.writerows(rows)
def known_tickers(conn: sqlite3.Connection) -> set[str]:
return {row[0] for row in conn.execute("SELECT ticker FROM ipo_master")}
def selected_tickers(value: str | None, parsed_rows: list[MarketHeatRow], conn: sqlite3.Connection) -> set[str]:
if value:
return {ticker.strip().zfill(5) for ticker in value.split(",") if ticker.strip()}
known = known_tickers(conn)
return {row.ticker for row in parsed_rows if row.ticker in known}
def upsert_rows(
conn: sqlite3.Connection,
rows: list[MarketHeatRow],
selected: set[str],
local_path: str,
file_sha256: str,
url: str,
as_of: str,
) -> int:
written = 0
for row in rows:
if row.ticker not in selected:
continue
source_id = f"{row.ticker}_t0_5_market_heat_vbkr_{compact_timestamp(as_of)}"
heat_id = source_id
conn.execute(
"""
INSERT INTO source_refs (
source_id, ticker, source_type, title, path_base, local_path, url,
file_sha256, source_date, archived_at, notes
)
VALUES (?, ?, ?, ?, 'repo_root', ?, ?, ?, ?, ?, ?)
ON CONFLICT(source_id) DO UPDATE SET
local_path = excluded.local_path,
url = excluded.url,
file_sha256 = excluded.file_sha256,
source_date = excluded.source_date,
archived_at = excluded.archived_at,
notes = excluded.notes
""",
(
source_id,
row.ticker,
"t0_5_market_heat",
"VBKR IPO expected margin multiple snapshot",
local_path,
url,
file_sha256,
source_date(as_of),
as_of,
"Non-official subscription-period market heat snapshot. Final subscription data must come from HKEX allotment results.",
),
)
conn.execute(
"""
INSERT INTO ipo_market_heat (
heat_id, ticker, source_id, stage, provider, observed_at,
margin_subscription_multiple, margin_multiple_label,
offer_price_low_hkd, offer_price_high_hkd, board_lot,
min_subscription_amount_hkd, subscription_deadline,
result_announcement_date, listing_date, data_as_of, notes
)
VALUES (?, ?, ?, 'T0_5_market_heat', 'VBKR/Jieli', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker, provider, observed_at) DO UPDATE SET
source_id = excluded.source_id,
margin_subscription_multiple = excluded.margin_subscription_multiple,
margin_multiple_label = excluded.margin_multiple_label,
offer_price_low_hkd = excluded.offer_price_low_hkd,
offer_price_high_hkd = excluded.offer_price_high_hkd,
board_lot = excluded.board_lot,
min_subscription_amount_hkd = excluded.min_subscription_amount_hkd,
subscription_deadline = excluded.subscription_deadline,
result_announcement_date = excluded.result_announcement_date,
listing_date = excluded.listing_date,
data_as_of = excluded.data_as_of,
notes = excluded.notes
""",
(
heat_id,
row.ticker,
source_id,
as_of,
row.margin_subscription_multiple,
row.margin_multiple_label,
row.offer_price_low_hkd,
row.offer_price_high_hkd,
row.board_lot,
row.min_subscription_amount_hkd,
row.subscription_deadline,
row.result_announcement_date,
row.listing_date,
as_of,
f"Stock name on source page: {row.stock_name}. Prospectus URL shown by source: {row.prospectus_url}",
),
)
written += 1
return written
def main() -> int:
args = parse_args()
as_of = parse_as_of(args.as_of)
payload = fetch_bytes(args.url)
page = payload.decode("utf-8", "replace")
parsed_rows = parse_rows(page)
local_path, file_sha256 = save_raw_snapshot(payload, as_of)
db_path = Path(args.db)
schema_path = Path(args.schema)
with sqlite3.connect(db_path) as conn:
conn.executescript(schema_path.read_text(encoding="utf-8"))
selected = selected_tickers(args.tickers, parsed_rows, conn)
written = upsert_rows(conn, parsed_rows, selected, local_path, file_sha256, args.url, as_of)
export_snapshot(conn, "ipo_market_heat", "ticker, observed_at")
export_snapshot(conn, "source_refs", "source_id")
subprocess.run(
[
".venv/bin/python",
"scripts/update_sync_state.py",
"--as-of",
as_of,
"--mode",
"market_heat_source_update",
"--summary-limit",
"5",
],
check=True,
)
print("market heat archived")
print(f"as_of: {as_of}")
print(f"raw_snapshot: {local_path}")
print(f"parsed_rows: {len(parsed_rows)}")
print(f"written_rows: {written}")
return 0
if __name__ == "__main__":
raise SystemExit(main())