#!/usr/bin/env python3 """Archive structured historical HK IPO data from ipohk.com.cn.""" from __future__ import annotations import argparse import csv import hashlib import json import sqlite3 from datetime import datetime, timezone from pathlib import Path from urllib.request import Request, urlopen DEFAULT_DB_PATH = Path("data/hk_ipo.sqlite") DEFAULT_SCHEMA_PATH = Path("schema/hk_ipo.schema.sql") SNAPSHOT_DIR = Path("data/snapshots") IPOHK_URL = "http://www.ipohk.com.cn/ipo/data.php?action=listed&year=&search=" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Repo-relative SQLite database path.") parser.add_argument("--schema", default=str(DEFAULT_SCHEMA_PATH), help="Repo-relative schema path.") parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.") parser.add_argument("--url", default=IPOHK_URL, help="ipohk listed-data endpoint.") return parser.parse_args() def parse_as_of(value: str | None) -> str: if value: return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z") return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def compact_timestamp(value: str) -> str: return value.replace("-", "").replace(":", "").replace("+00:00", "Z") def fetch_bytes(url: str) -> bytes: request = Request(url, headers={"User-Agent": "Mozilla/5.0", "Referer": "http://www.ipohk.com.cn/ipo/"}) with urlopen(request, timeout=60) as response: return response.read() def sha256_bytes(payload: bytes) -> str: return hashlib.sha256(payload).hexdigest() def save_raw(payload: bytes, as_of: str) -> tuple[str, str]: raw_dir = Path("data/raw/external_history") raw_dir.mkdir(parents=True, exist_ok=True) path = raw_dir / f"ipohk_listed_{compact_timestamp(as_of)}.json" if not path.exists() or path.read_bytes() != payload: path.write_bytes(payload) return path.as_posix(), sha256_bytes(payload) def as_float(value: object) -> float | None: if value is None: return None cleaned = str(value).strip().replace(",", "").replace("%", "") if cleaned in {"", "-"}: return None try: return float(cleaned) except ValueError: return None def as_ticker(value: object) -> str: return str(value or "").strip().zfill(5) def source_date(as_of: str) -> str: return datetime.fromisoformat(as_of.replace("Z", "+00:00")).date().isoformat() def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None: SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True) rows = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}").fetchall() columns = [description[0] for description in conn.execute(f"SELECT * FROM {table} LIMIT 0").description] with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle: writer = csv.writer(handle, lineterminator="\n") writer.writerow(columns) writer.writerows(rows) def write_sync_run(conn: sqlite3.Connection, as_of: str, row_count: int) -> None: sync_run_id = f"external_history_ipohk_{compact_timestamp(as_of)}" conn.execute( """ INSERT INTO sync_runs (sync_run_id, mode, as_of, started_at, finished_at, status, notes) VALUES (?, 'external_history_update', ?, ?, ?, 'complete', ?) ON CONFLICT(sync_run_id) DO UPDATE SET finished_at = excluded.finished_at, status = excluded.status, notes = excluded.notes """, (sync_run_id, as_of, as_of, as_of, f"Archived {row_count} ipohk listed-history rows."), ) def upsert_history( conn: sqlite3.Connection, rows: list[dict[str, object]], local_path: str, file_sha256: str, url: str, as_of: str, notes: str, ) -> int: written = 0 for row in rows: ticker = as_ticker(row.get("stock_code")) listing_date = str(row.get("listing_date") or "").strip() if not ticker.strip("0") or not listing_date: continue source_id = f"external_ipohk_listed_{compact_timestamp(as_of)}" history_id = f"ipohk_{ticker}_{listing_date}" conn.execute( """ INSERT INTO external_ipo_history ( history_id, ticker, provider, source_id, stock_name, listing_date, price_range_low_hkd, price_range_high_hkd, issue_price_hkd, one_lot_capital_hkd, one_hand_win_rate_pct, public_oversubscription_times, total_fundraise_hkd_b, market_cap_at_listing_hkd_b, grey_market_return_pct, first_day_return_pct, sponsor, source_date, archived_at, local_path, url, file_sha256, notes ) VALUES (?, ?, 'ipohk', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(ticker, provider, listing_date) DO UPDATE SET source_id = excluded.source_id, stock_name = excluded.stock_name, price_range_low_hkd = excluded.price_range_low_hkd, price_range_high_hkd = excluded.price_range_high_hkd, issue_price_hkd = excluded.issue_price_hkd, one_lot_capital_hkd = excluded.one_lot_capital_hkd, one_hand_win_rate_pct = excluded.one_hand_win_rate_pct, public_oversubscription_times = excluded.public_oversubscription_times, total_fundraise_hkd_b = excluded.total_fundraise_hkd_b, market_cap_at_listing_hkd_b = excluded.market_cap_at_listing_hkd_b, grey_market_return_pct = excluded.grey_market_return_pct, first_day_return_pct = excluded.first_day_return_pct, sponsor = excluded.sponsor, source_date = excluded.source_date, archived_at = excluded.archived_at, local_path = excluded.local_path, url = excluded.url, file_sha256 = excluded.file_sha256, notes = excluded.notes """, ( history_id, ticker, source_id, row.get("name"), listing_date, as_float(row.get("price_range_low")), as_float(row.get("price_range_high")), as_float(row.get("issue_price")), as_float(row.get("one_lot_capital")), as_float(row.get("one_hand_win_rate")), as_float(row.get("oversub_ratio")), as_float(row.get("total_fundraise")), as_float(row.get("market_cap_at_listing")), as_float(row.get("grey_market_chg")), as_float(row.get("first_day_chg")), row.get("sponsor"), source_date(as_of), as_of, local_path, url, file_sha256, notes, ), ) written += 1 return written def main() -> int: args = parse_args() as_of = parse_as_of(args.as_of) payload = fetch_bytes(args.url) local_path, file_sha256 = save_raw(payload, as_of) parsed = json.loads(payload.decode("utf-8")) rows = parsed.get("data") or [] notes = ( "External structured IPO history. Includes final oversubscription, one-lot win rate, " "grey-market return, and first-day return where available; not a T0.5 margin snapshot." ) with sqlite3.connect(args.db) as conn: conn.executescript(Path(args.schema).read_text(encoding="utf-8")) written = upsert_history(conn, rows, local_path, file_sha256, args.url, as_of, notes) write_sync_run(conn, as_of, written) export_snapshot(conn, "external_ipo_history", "listing_date DESC, ticker") export_snapshot(conn, "sync_runs", "sync_run_id") print("ipohk history archived") print(f"as_of: {as_of}") print(f"raw_snapshot: {local_path}") print(f"rows: {written}") return 0 if __name__ == "__main__": raise SystemExit(main())