222f55c140
Request: - Test whether subscription-period T0.5 market heat data can be captured and incorporated into IPO analysis. Changes: - Add an ipo_market_heat table for non-official market-heat snapshots. - Add a VBKR/Jieli archive script for expected margin subscription multiples. - Archive the 2026-06-15T18:40:00Z heat snapshot for 01392, 02335, 06067, 06106, and 06132. - Add an experimental T0.5 overlay rule file and a Chinese cross-IPO trial report. - Update archivist and analyst skills so T0.5 remains separate from official T1 allotment demand. Verification: - .venv/bin/python -m py_compile scripts/archive_t0_5_market_heat.py scripts/build_analysis_dataset.py scripts/update_sync_state.py - Python sqlite3 PRAGMA integrity_check returned ok and foreign_key_check returned zero rows. - Confirmed 5 ipo_market_heat rows and 5 t0_5_market_heat source_refs for the frozen snapshot. - git diff --cached --check Next useful context: - T0.5 data is non-official and should be resampled during the subscription window, then compared against T1 official allotment results.
315 lines
11 KiB
Python
315 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""Archive T0.5 HK IPO market-heat snapshots from public web pages."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import hashlib
|
|
import html
|
|
import re
|
|
import sqlite3
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from urllib.request import Request, urlopen
|
|
|
|
|
|
DEFAULT_DB_PATH = Path("data/hk_ipo.sqlite")
|
|
DEFAULT_SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
|
|
SNAPSHOT_DIR = Path("data/snapshots")
|
|
VBKR_URL = "https://www.vbkr.com/ipo/hk/v2/ipo-hk-index"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MarketHeatRow:
|
|
ticker: str
|
|
stock_name: str
|
|
offer_price_low_hkd: float | None
|
|
offer_price_high_hkd: float | None
|
|
board_lot: int | None
|
|
min_subscription_amount_hkd: float | None
|
|
margin_subscription_multiple: float | None
|
|
margin_multiple_label: str
|
|
subscription_deadline: str
|
|
result_announcement_date: str
|
|
listing_date: str
|
|
prospectus_url: str
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Repo-relative SQLite database path.")
|
|
parser.add_argument("--schema", default=str(DEFAULT_SCHEMA_PATH), help="Repo-relative schema path.")
|
|
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
|
|
parser.add_argument("--url", default=VBKR_URL, help="Market-heat page URL.")
|
|
parser.add_argument("--tickers", help="Comma-separated tickers to archive. Defaults to all parsed tickers in ipo_master.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def parse_as_of(value: str | None) -> str:
|
|
if value:
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
|
|
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def compact_timestamp(value: str) -> str:
|
|
return value.replace("-", "").replace(":", "").replace("+00:00", "Z")
|
|
|
|
|
|
def source_date(value: str) -> str:
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00")).date().isoformat()
|
|
|
|
|
|
def fetch_bytes(url: str) -> bytes:
|
|
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
|
|
with urlopen(request, timeout=60) as response:
|
|
return response.read()
|
|
|
|
|
|
def sha256_bytes(payload: bytes) -> str:
|
|
return hashlib.sha256(payload).hexdigest()
|
|
|
|
|
|
def clean_cell(value: str) -> str:
|
|
value = re.sub(r"<br[^>]*>", "\n", value, flags=re.I)
|
|
value = re.sub(r"<.*?>", " ", value, flags=re.S)
|
|
value = html.unescape(value)
|
|
return "\n".join(" ".join(line.split()) for line in value.splitlines() if line.strip())
|
|
|
|
|
|
def as_float(value: str) -> float | None:
|
|
cleaned = value.replace(",", "").replace("HKD", "").replace("$", "").strip()
|
|
if not cleaned or cleaned == "-":
|
|
return None
|
|
try:
|
|
return float(cleaned)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def as_int(value: str) -> int | None:
|
|
number = as_float(value)
|
|
if number is None:
|
|
return None
|
|
return int(number)
|
|
|
|
|
|
def parse_offer_price(value: str) -> tuple[float | None, float | None]:
|
|
numbers = [as_float(part) for part in value.splitlines()]
|
|
numbers = [number for number in numbers if number is not None]
|
|
if not numbers:
|
|
return None, None
|
|
if len(numbers) == 1:
|
|
return numbers[0], numbers[0]
|
|
return numbers[0], numbers[-1]
|
|
|
|
|
|
def parse_margin_multiple(value: str) -> float | None:
|
|
match = re.search(r"([0-9][0-9,]*(?:\.[0-9]+)?)\s*倍", value)
|
|
if not match:
|
|
return None
|
|
return float(match.group(1).replace(",", ""))
|
|
|
|
|
|
def parse_deadline(value: str) -> str:
|
|
lines = [line.strip() for line in value.splitlines() if line.strip()]
|
|
if len(lines) >= 2 and re.match(r"\d{4}-\d{2}-\d{2}$", lines[0]) and re.match(r"\d{1,2}:\d{2}$", lines[1]):
|
|
return f"{lines[0]} {lines[1]}"
|
|
return " ".join(lines)
|
|
|
|
|
|
def parse_rows(page: str) -> list[MarketHeatRow]:
|
|
rows: list[MarketHeatRow] = []
|
|
for tr in re.findall(r"<tr[^>]*>(.*?)</tr>", page, flags=re.S):
|
|
if not re.search(r"\d{5}\.HK", tr):
|
|
continue
|
|
cells = re.findall(r"<td[^>]*>(.*?)</td>", tr, flags=re.S)
|
|
if len(cells) < 11:
|
|
continue
|
|
text_cells = [clean_cell(cell) for cell in cells]
|
|
code_match = re.search(r"(\d{5})\.HK", text_cells[0])
|
|
if not code_match:
|
|
continue
|
|
prospectus_match = re.search(r'href="([^"]+)"', cells[10])
|
|
offer_low, offer_high = parse_offer_price(text_cells[3])
|
|
rows.append(
|
|
MarketHeatRow(
|
|
ticker=code_match.group(1),
|
|
stock_name=text_cells[1].splitlines()[0],
|
|
offer_price_low_hkd=offer_low,
|
|
offer_price_high_hkd=offer_high,
|
|
board_lot=as_int(text_cells[4]),
|
|
min_subscription_amount_hkd=as_float(text_cells[5]),
|
|
margin_subscription_multiple=parse_margin_multiple(text_cells[6]),
|
|
margin_multiple_label=text_cells[6],
|
|
subscription_deadline=parse_deadline(text_cells[7]),
|
|
result_announcement_date=text_cells[8],
|
|
listing_date=text_cells[9],
|
|
prospectus_url=html.unescape(prospectus_match.group(1)) if prospectus_match else "",
|
|
)
|
|
)
|
|
return rows
|
|
|
|
|
|
def save_raw_snapshot(payload: bytes, as_of: str) -> tuple[str, str]:
|
|
raw_dir = Path("data/raw/market_heat")
|
|
raw_dir.mkdir(parents=True, exist_ok=True)
|
|
path = raw_dir / f"vbkr_ipo_hk_index_{compact_timestamp(as_of)}.html"
|
|
if not path.exists() or path.read_bytes() != payload:
|
|
path.write_bytes(payload)
|
|
return path.as_posix(), sha256_bytes(payload)
|
|
|
|
|
|
def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None:
|
|
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
|
|
rows = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}").fetchall()
|
|
columns = [description[0] for description in conn.execute(f"SELECT * FROM {table} LIMIT 0").description]
|
|
with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle:
|
|
writer = csv.writer(handle, lineterminator="\n")
|
|
writer.writerow(columns)
|
|
writer.writerows(rows)
|
|
|
|
|
|
def known_tickers(conn: sqlite3.Connection) -> set[str]:
|
|
return {row[0] for row in conn.execute("SELECT ticker FROM ipo_master")}
|
|
|
|
|
|
def selected_tickers(value: str | None, parsed_rows: list[MarketHeatRow], conn: sqlite3.Connection) -> set[str]:
|
|
if value:
|
|
return {ticker.strip().zfill(5) for ticker in value.split(",") if ticker.strip()}
|
|
known = known_tickers(conn)
|
|
return {row.ticker for row in parsed_rows if row.ticker in known}
|
|
|
|
|
|
def upsert_rows(
|
|
conn: sqlite3.Connection,
|
|
rows: list[MarketHeatRow],
|
|
selected: set[str],
|
|
local_path: str,
|
|
file_sha256: str,
|
|
url: str,
|
|
as_of: str,
|
|
) -> int:
|
|
written = 0
|
|
for row in rows:
|
|
if row.ticker not in selected:
|
|
continue
|
|
source_id = f"{row.ticker}_t0_5_market_heat_vbkr_{compact_timestamp(as_of)}"
|
|
heat_id = source_id
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO source_refs (
|
|
source_id, ticker, source_type, title, path_base, local_path, url,
|
|
file_sha256, source_date, archived_at, notes
|
|
)
|
|
VALUES (?, ?, ?, ?, 'repo_root', ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(source_id) DO UPDATE SET
|
|
local_path = excluded.local_path,
|
|
url = excluded.url,
|
|
file_sha256 = excluded.file_sha256,
|
|
source_date = excluded.source_date,
|
|
archived_at = excluded.archived_at,
|
|
notes = excluded.notes
|
|
""",
|
|
(
|
|
source_id,
|
|
row.ticker,
|
|
"t0_5_market_heat",
|
|
"VBKR IPO expected margin multiple snapshot",
|
|
local_path,
|
|
url,
|
|
file_sha256,
|
|
source_date(as_of),
|
|
as_of,
|
|
"Non-official subscription-period market heat snapshot. Final subscription data must come from HKEX allotment results.",
|
|
),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO ipo_market_heat (
|
|
heat_id, ticker, source_id, stage, provider, observed_at,
|
|
margin_subscription_multiple, margin_multiple_label,
|
|
offer_price_low_hkd, offer_price_high_hkd, board_lot,
|
|
min_subscription_amount_hkd, subscription_deadline,
|
|
result_announcement_date, listing_date, data_as_of, notes
|
|
)
|
|
VALUES (?, ?, ?, 'T0_5_market_heat', 'VBKR/Jieli', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(ticker, provider, observed_at) DO UPDATE SET
|
|
source_id = excluded.source_id,
|
|
margin_subscription_multiple = excluded.margin_subscription_multiple,
|
|
margin_multiple_label = excluded.margin_multiple_label,
|
|
offer_price_low_hkd = excluded.offer_price_low_hkd,
|
|
offer_price_high_hkd = excluded.offer_price_high_hkd,
|
|
board_lot = excluded.board_lot,
|
|
min_subscription_amount_hkd = excluded.min_subscription_amount_hkd,
|
|
subscription_deadline = excluded.subscription_deadline,
|
|
result_announcement_date = excluded.result_announcement_date,
|
|
listing_date = excluded.listing_date,
|
|
data_as_of = excluded.data_as_of,
|
|
notes = excluded.notes
|
|
""",
|
|
(
|
|
heat_id,
|
|
row.ticker,
|
|
source_id,
|
|
as_of,
|
|
row.margin_subscription_multiple,
|
|
row.margin_multiple_label,
|
|
row.offer_price_low_hkd,
|
|
row.offer_price_high_hkd,
|
|
row.board_lot,
|
|
row.min_subscription_amount_hkd,
|
|
row.subscription_deadline,
|
|
row.result_announcement_date,
|
|
row.listing_date,
|
|
as_of,
|
|
f"Stock name on source page: {row.stock_name}. Prospectus URL shown by source: {row.prospectus_url}",
|
|
),
|
|
)
|
|
written += 1
|
|
return written
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
as_of = parse_as_of(args.as_of)
|
|
payload = fetch_bytes(args.url)
|
|
page = payload.decode("utf-8", "replace")
|
|
parsed_rows = parse_rows(page)
|
|
local_path, file_sha256 = save_raw_snapshot(payload, as_of)
|
|
|
|
db_path = Path(args.db)
|
|
schema_path = Path(args.schema)
|
|
with sqlite3.connect(db_path) as conn:
|
|
conn.executescript(schema_path.read_text(encoding="utf-8"))
|
|
selected = selected_tickers(args.tickers, parsed_rows, conn)
|
|
written = upsert_rows(conn, parsed_rows, selected, local_path, file_sha256, args.url, as_of)
|
|
export_snapshot(conn, "ipo_market_heat", "ticker, observed_at")
|
|
export_snapshot(conn, "source_refs", "source_id")
|
|
|
|
subprocess.run(
|
|
[
|
|
".venv/bin/python",
|
|
"scripts/update_sync_state.py",
|
|
"--as-of",
|
|
as_of,
|
|
"--mode",
|
|
"market_heat_source_update",
|
|
"--summary-limit",
|
|
"5",
|
|
],
|
|
check=True,
|
|
)
|
|
print("market heat archived")
|
|
print(f"as_of: {as_of}")
|
|
print(f"raw_snapshot: {local_path}")
|
|
print(f"parsed_rows: {len(parsed_rows)}")
|
|
print(f"written_rows: {written}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|