#!/usr/bin/env python3 """Seed current HKEX New Listing Information page entries into the archive.""" from __future__ import annotations import argparse import csv import hashlib import html import re import sqlite3 import subprocess import sys from dataclasses import dataclass from datetime import datetime, timezone from html.parser import HTMLParser from pathlib import Path from urllib.request import Request, urlopen ARCHIVE_PAGE_URL = "https://www2.hkexnews.hk/New-Listings/New-Listing-Information/Main-Board?sc_lang=en" DB_PATH = Path("data/hk_ipo.sqlite") SCHEMA_PATH = Path("schema/hk_ipo.schema.sql") SNAPSHOT_DIR = Path("data/snapshots") RAW_DIR = Path("data/raw/hkex_new_listing_information") @dataclass(frozen=True) class CurrentListingEntry: ticker: str company_name_en: str announcement_url: str | None prospectus_url: str | None allotment_results_url: str | None class TableParser(HTMLParser): def __init__(self) -> None: super().__init__() self.in_target_table = False self.in_body = False self.in_row = False self.in_cell = False self.current_cell = -1 self.current_row: list[dict[str, object]] = [] self.rows: list[list[dict[str, object]]] = [] def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: attrs_dict = dict(attrs) if tag == "table" and "rte-table-mobile-list" in (attrs_dict.get("class") or ""): self.in_target_table = True elif self.in_target_table and tag == "tbody": self.in_body = True elif self.in_body and tag == "tr": self.in_row = True self.current_row = [] elif self.in_row and tag == "td": self.in_cell = True self.current_cell += 1 self.current_row.append({"text": [], "links": []}) elif self.in_cell and tag == "a": href = attrs_dict.get("href") if href: self.current_row[self.current_cell]["links"].append(href) def handle_endtag(self, tag: str) -> None: if tag == "td" and self.in_cell: self.in_cell = False elif tag == "tr" and self.in_row: if self.current_row: self.rows.append(self.current_row) self.in_row = False self.current_cell = -1 elif tag == "tbody" and self.in_body: self.in_body = False elif tag == "table" and self.in_target_table: self.in_target_table = False def handle_data(self, data: str) -> None: if self.in_cell: self.current_row[self.current_cell]["text"].append(data) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.") parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.") parser.add_argument("--archive-page", default=ARCHIVE_PAGE_URL, help="HKEXnews New Listing Information page.") parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.") parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh ticker sync state after updating facts.") return parser.parse_args() def parse_as_of(value: str | None) -> str: if value: return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z") return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def fetch_bytes(url: str) -> bytes: request = Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urlopen(request, timeout=60) as response: return response.read() def sha256_bytes(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def clean_text(parts: list[str]) -> str: return " ".join(html.unescape(" ".join(parts)).split()) def normalize_ticker(value: str) -> str | None: digits = re.sub(r"\D", "", value) if not digits: return None return digits.zfill(5) def source_date_from_page(page: str) -> str | None: match = re.search(r"Updated:\s*(\d{1,2}\s+[A-Za-z]+\s+\d{4})", page) if not match: return None return datetime.strptime(match.group(1), "%d %b %Y").date().isoformat() def parse_entries(page: str) -> list[CurrentListingEntry]: parser = TableParser() parser.feed(page) entries: list[CurrentListingEntry] = [] for row in parser.rows: if len(row) < 5: continue ticker = normalize_ticker(clean_text(row[0]["text"])) company_name = clean_text(row[1]["text"]) if not ticker or not company_name: continue entries.append( CurrentListingEntry( ticker=ticker, company_name_en=company_name, announcement_url=first_link(row[2]), prospectus_url=first_link(row[3]), allotment_results_url=first_link(row[4]), ) ) return entries def first_link(cell: dict[str, object]) -> str | None: links = cell["links"] if isinstance(links, list) and links: return str(links[0]) return None def archive_page(url: str, source_date: str | None, data: bytes) -> tuple[str, str]: suffix = source_date.replace("-", "") if source_date else datetime.now(timezone.utc).strftime("%Y%m%d") local_path = RAW_DIR / f"main_board_{suffix}.html" local_path.parent.mkdir(parents=True, exist_ok=True) local_path.write_bytes(data) return local_path.as_posix(), sha256_bytes(data) def source_rows( entries: list[CurrentListingEntry], page_url: str, local_path: str, file_hash: str, source_date: str | None, as_of: str, ) -> list[dict[str, object]]: rows = [] date_key = (source_date or as_of.split("T", 1)[0]).replace("-", "_") for entry in entries: links = [] if entry.announcement_url: links.append("announcement") if entry.prospectus_url: links.append("prospectus") if entry.allotment_results_url: links.append("allotment_results") rows.append( { "source_id": f"{entry.ticker}_new_listing_information_main_{date_key}", "ticker": entry.ticker, "source_type": "new_listing_information", "title": "HKEXnews Main Board New Listing Information", "path_base": "repo_root", "local_path": local_path, "url": page_url, "file_sha256": file_hash, "source_date": source_date, "archived_at": as_of, "notes": "Current HKEX New Listing Information page. Direct links present: " + (", ".join(links) if links else "none"), } ) return rows def master_rows(entries: list[CurrentListingEntry], as_of: str) -> list[dict[str, object]]: return [ { "ticker": entry.ticker, "company_name_en": entry.company_name_en, "company_name_zh": None, "stock_short_name": None, "exchange": "HKEX", "board": "Main Board", "status": "new_listing_information", "listing_date": None, "application_start_date": None, "application_end_date": None, "allotment_results_expected_date": None, "industry_label": None, "data_as_of": as_of, "notes": "Seeded from HKEXnews Main Board New Listing Information page; detailed terms require prospectus archive.", } for entry in entries ] def upsert_master(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None: conn.executemany( """ INSERT INTO ipo_master ( ticker, company_name_en, company_name_zh, stock_short_name, exchange, board, status, listing_date, application_start_date, application_end_date, allotment_results_expected_date, industry_label, data_as_of, notes ) VALUES ( :ticker, :company_name_en, :company_name_zh, :stock_short_name, :exchange, :board, :status, :listing_date, :application_start_date, :application_end_date, :allotment_results_expected_date, :industry_label, :data_as_of, :notes ) ON CONFLICT(ticker) DO UPDATE SET company_name_en = CASE WHEN ipo_master.company_name_en = '' THEN excluded.company_name_en ELSE ipo_master.company_name_en END, exchange = excluded.exchange, board = excluded.board, status = CASE WHEN ipo_master.status = 'listed' THEN ipo_master.status ELSE excluded.status END, data_as_of = excluded.data_as_of, notes = CASE WHEN ipo_master.notes IS NULL THEN excluded.notes ELSE ipo_master.notes END """, rows, ) def upsert_source_refs(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None: conn.executemany( """ INSERT INTO source_refs ( source_id, ticker, source_type, title, path_base, local_path, url, file_sha256, source_date, archived_at, notes ) VALUES ( :source_id, :ticker, :source_type, :title, :path_base, :local_path, :url, :file_sha256, :source_date, :archived_at, :notes ) ON CONFLICT(source_id) DO UPDATE SET source_type = excluded.source_type, title = excluded.title, path_base = excluded.path_base, local_path = excluded.local_path, url = excluded.url, file_sha256 = excluded.file_sha256, source_date = excluded.source_date, archived_at = excluded.archived_at, notes = excluded.notes """, rows, ) def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None: SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True) cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}") columns = [description[0] for description in cursor.description] with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle: writer = csv.writer(handle, lineterminator="\n") writer.writerow(columns) writer.writerows(cursor.fetchall()) def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None: subprocess.run( [ sys.executable, "scripts/update_sync_state.py", "--db", db_path, "--schema", schema_path, "--as-of", as_of, "--mode", "current_new_listing_information", "--summary-limit", "25", ], check=True, ) def main() -> int: args = parse_args() as_of = parse_as_of(args.as_of) data = fetch_bytes(args.archive_page) page = data.decode("utf-8", "replace") source_date = source_date_from_page(page) entries = parse_entries(page) if not entries: raise SystemExit("No current HKEX new listing entries were parsed.") local_path, file_hash = archive_page(args.archive_page, source_date, data) with sqlite3.connect(args.db) as conn: conn.executescript(Path(args.schema).read_text(encoding="utf-8")) upsert_master(conn, master_rows(entries, as_of)) upsert_source_refs(conn, source_rows(entries, args.archive_page, local_path, file_hash, source_date, as_of)) for table in ["ipo_master", "source_refs", "data_gaps"]: export_snapshot(conn, table) if not args.skip_sync_state: refresh_sync_state(args.db, args.schema, as_of) print("current HKEX new listing information archived") print(f"source_date: {source_date or 'unknown'}") print(f"entries parsed: {len(entries)}") print("tickers: " + ",".join(entry.ticker for entry in entries)) print(f"page: {local_path}") return 0 if __name__ == "__main__": raise SystemExit(main())