#!/usr/bin/env python3 """Archive recent HKEX IPO targets from HKEXnews new listing reports.""" from __future__ import annotations import argparse import csv import hashlib import re import sqlite3 import subprocess import sys from dataclasses import dataclass from datetime import date, datetime, timezone from pathlib import Path from urllib.parse import urljoin from urllib.request import Request, urlopen from openpyxl import load_workbook ARCHIVE_PAGE_URL = "https://www2.hkexnews.hk/New-Listings/New-Listing-Information/Main-Board?sc_lang=en" DB_PATH = Path("data/hk_ipo.sqlite") SCHEMA_PATH = Path("schema/hk_ipo.schema.sql") SNAPSHOT_DIR = Path("data/snapshots") RAW_REPORT_DIR = Path("data/raw/hkex_new_listing_reports") @dataclass(frozen=True) class ReportLink: year: int board_key: str board_name: str url: str local_path: str @dataclass(frozen=True) class ReportEntry: ticker: str report_year: int board: str source_id: str company_name_en: str prospectus_date: str | None listing_date: str offer_price_hkd: float | None funds_raised_hkd: float | None subscription_ratio_times: float | None market_cap_hkd: float | None outstanding_shares_at_listing: int | None listing_method: str | None industry_label: str | None place_of_incorporation: str | None sponsors: str | None reporting_accountants: str | None valuers: str | None notes: str | None def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.") parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.") parser.add_argument("--archive-page", default=ARCHIVE_PAGE_URL, help="HKEXnews new listing information page.") parser.add_argument("--end-date", default=date.today().isoformat(), help="Inclusive end date for recent IPO listings.") parser.add_argument("--years", type=int, default=3, help="Lookback years ending at --end-date.") parser.add_argument("--start-date", help="Inclusive start date. Defaults to --end-date minus --years.") parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.") parser.add_argument("--include-non-ipo", action="store_true", help="Include report rows without an IPO offer price.") parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh ticker sync state after updating facts.") return parser.parse_args() def utc_now() -> datetime: return datetime.now(timezone.utc).replace(microsecond=0) def parse_as_of(value: str | None) -> str: if value: return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z") return utc_now().isoformat().replace("+00:00", "Z") def years_before(day: date, years: int) -> date: try: return day.replace(year=day.year - years) except ValueError: return day.replace(year=day.year - years, day=28) def fetch_bytes(url: str) -> bytes: request = Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urlopen(request, timeout=60) as response: return response.read() def clean_text(value: object) -> str | None: if value is None: return None if isinstance(value, str): cleaned = " ".join(value.replace("\r", "\n").replace("\n", " ").split()) return cleaned or None return str(value) def numeric(value: object) -> float | None: if isinstance(value, bool) or value is None: return None if isinstance(value, (int, float)): return float(value) if isinstance(value, str): cleaned = value.replace(",", "").strip() if not cleaned or cleaned.upper().startswith("N/A"): return None try: return float(cleaned) except ValueError: return None return None def whole_number(value: object) -> int | None: number = numeric(value) if number is None: return None return int(round(number)) def excel_date(value: object) -> str | None: if isinstance(value, datetime): return value.date().isoformat() if isinstance(value, date): return value.isoformat() return None def normalize_ticker(value: object) -> str | None: if value is None: return None if isinstance(value, (int, float)) and not isinstance(value, bool): return f"{int(value):05d}" text = str(value).strip() if not text: return None digits = re.sub(r"\D", "", text) if not digits: return None return digits.zfill(5) def sha256_bytes(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def sha256_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def discover_report_links(archive_page_url: str, start_date: date, end_date: date) -> list[ReportLink]: html = fetch_bytes(archive_page_url).decode("utf-8", "replace") hrefs = sorted(set(re.findall(r'href="([^"]*New-Listing-Report/(?:Main|GEM)/[^"]+?\.xlsx)"', html))) needed_years = set(range(start_date.year, end_date.year + 1)) links: list[ReportLink] = [] for href in hrefs: year_match = re.search(r"(20\d{2})", href) if not year_match: continue year = int(year_match.group(1)) if year not in needed_years: continue board_key = "gem" if "/GEM/" in href else "main" board_name = "GEM" if board_key == "gem" else "Main Board" filename = href.rsplit("/", 1)[-1] local_path = RAW_REPORT_DIR / board_key / filename links.append( ReportLink( year=year, board_key=board_key, board_name=board_name, url=urljoin(archive_page_url, href), local_path=local_path.as_posix(), ) ) return sorted(links, key=lambda item: (item.year, item.board_key, item.url)) def archive_report_file(link: ReportLink) -> str: data = fetch_bytes(link.url) local_path = Path(link.local_path) local_path.parent.mkdir(parents=True, exist_ok=True) if local_path.exists() and local_path.read_bytes() == data: return sha256_file(local_path) local_path.write_bytes(data) return sha256_bytes(data) def report_source_date(path: Path, board_key: str, report_year: int) -> str: workbook = load_workbook(path, data_only=True, read_only=True) worksheet = workbook[workbook.sheetnames[0]] if board_key == "gem": title = clean_text(worksheet["A5"].value) or "" match = re.search(r"up to (\d{4}/\d{2}/\d{2})", title) if match: return datetime.strptime(match.group(1), "%Y/%m/%d").date().isoformat() else: title = clean_text(worksheet["A1"].value) or "" match = re.search(r"up to (\d{1,2} [A-Za-z]+ \d{4})", title) if match: return datetime.strptime(match.group(1), "%d %B %Y").date().isoformat() return date(report_year, 12, 31).isoformat() def combined_listing_method(primary: object, following_row: tuple[object, ...] | None) -> str | None: methods = [clean_text(primary)] if following_row is not None and len(following_row) > 17: methods.append(clean_text(following_row[17])) unique_methods = [] for method in methods: if method and method not in unique_methods: unique_methods.append(method) return "; ".join(unique_methods) or None def is_main_ditto_row(row: tuple[object, ...]) -> bool: return len(row) > 1 and row[1] == '"' def parse_main_report(link: ReportLink, start_date: date, end_date: date, include_non_ipo: bool) -> tuple[list[ReportEntry], int]: workbook = load_workbook(link.local_path, data_only=True, read_only=True) worksheet = workbook[workbook.sheetnames[0]] rows = list(worksheet.iter_rows(min_row=3, values_only=True)) entries: list[ReportEntry] = [] skipped = 0 for index, row in enumerate(rows): if not isinstance(row[0], int): continue ticker = normalize_ticker(row[1]) listing_date = excel_date(row[4]) if ticker is None or listing_date is None: continue listing_day = date.fromisoformat(listing_date) if listing_day < start_date or listing_day > end_date: continue offer_price = numeric(row[9] if len(row) > 9 else None) if offer_price is None and not include_non_ipo: skipped += 1 continue following_row = rows[index + 1] if index + 1 < len(rows) and is_main_ditto_row(rows[index + 1]) else None funds_raised = numeric(row[8] if len(row) > 8 else None) if following_row is not None and len(following_row) > 8: following_funds = numeric(following_row[8]) if following_funds is not None: funds_raised = (funds_raised or 0) + following_funds source_id = f"{ticker}_new_listing_report_main_{link.year}" entries.append( ReportEntry( ticker=ticker, report_year=link.year, board=link.board_name, source_id=source_id, company_name_en=clean_text(row[2]) or "", prospectus_date=excel_date(row[3]), listing_date=listing_date, offer_price_hkd=offer_price, funds_raised_hkd=funds_raised, subscription_ratio_times=None, market_cap_hkd=None, outstanding_shares_at_listing=None, listing_method="IPO" if offer_price is not None else clean_text(row[9] if len(row) > 9 else None), industry_label=None, place_of_incorporation=None, sponsors=clean_text(row[5] if len(row) > 5 else None), reporting_accountants=clean_text(row[6] if len(row) > 6 else None), valuers=clean_text(row[7] if len(row) > 7 else None), notes="Funds raised sums annual report rows (a) and (b) when both are present.", ) ) return entries, skipped def parse_gem_report(link: ReportLink, start_date: date, end_date: date, include_non_ipo: bool) -> tuple[list[ReportEntry], int]: workbook = load_workbook(link.local_path, data_only=True, read_only=True) worksheet = workbook[workbook.sheetnames[0]] rows = list(worksheet.iter_rows(min_row=12, values_only=True)) entries: list[ReportEntry] = [] skipped = 0 for index, row in enumerate(rows): listing_date = excel_date(row[0]) if listing_date is None: continue ticker = normalize_ticker(row[1]) if ticker is None: continue listing_day = date.fromisoformat(listing_date) if listing_day < start_date or listing_day > end_date: continue offer_price = numeric(row[5] if len(row) > 5 else None) if offer_price is None and not include_non_ipo: skipped += 1 continue following_row = rows[index + 1] if index + 1 < len(rows) else None source_id = f"{ticker}_new_listing_report_gem_{link.year}" entries.append( ReportEntry( ticker=ticker, report_year=link.year, board=link.board_name, source_id=source_id, company_name_en=clean_text(row[2]) or "", prospectus_date=None, listing_date=listing_date, offer_price_hkd=offer_price, funds_raised_hkd=numeric(row[9] if len(row) > 9 else None), subscription_ratio_times=numeric(row[7] if len(row) > 7 else None), market_cap_hkd=numeric(row[12] if len(row) > 12 else None), outstanding_shares_at_listing=whole_number(row[10] if len(row) > 10 else None), listing_method=combined_listing_method(row[17] if len(row) > 17 else None, following_row), industry_label=clean_text(row[14] if len(row) > 14 else None), place_of_incorporation=clean_text(row[16] if len(row) > 16 else None), sponsors=clean_text(row[19] if len(row) > 19 else None), reporting_accountants=clean_text(row[21] if len(row) > 21 else None), valuers=None, notes="GEM annual report does not provide prospectus date.", ) ) return entries, skipped def parse_report(link: ReportLink, start_date: date, end_date: date, include_non_ipo: bool) -> tuple[list[ReportEntry], int]: if link.board_key == "gem": return parse_gem_report(link, start_date, end_date, include_non_ipo) return parse_main_report(link, start_date, end_date, include_non_ipo) def source_ref_rows(links: list[ReportLink], entries_by_source: dict[str, list[ReportEntry]], as_of: str) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] report_meta = { (link.board_key, link.year): ( link, report_source_date(Path(link.local_path), link.board_key, link.year), sha256_file(Path(link.local_path)), ) for link in links } for source_id, entries in entries_by_source.items(): if not entries: continue sample = entries[0] board_key = "gem" if sample.board == "GEM" else "main" link, source_date, file_hash = report_meta[(board_key, sample.report_year)] for entry in entries: rows.append( { "source_id": entry.source_id, "ticker": entry.ticker, "source_type": "new_listing_report", "title": f"HKEXnews {entry.board} New Listing Report {entry.report_year}", "path_base": "repo_root", "local_path": link.local_path, "url": link.url, "file_sha256": file_hash, "source_date": source_date, "archived_at": as_of, "notes": "Annual HKEXnews new listing report used to seed recent IPO target coverage.", } ) return rows def master_rows(entries: list[ReportEntry], as_of: str) -> list[dict[str, object]]: rows = [] for entry in entries: rows.append( { "ticker": entry.ticker, "company_name_en": entry.company_name_en, "company_name_zh": None, "stock_short_name": None, "exchange": "HKEX", "board": entry.board, "status": "listed", "listing_date": entry.listing_date, "application_start_date": None, "application_end_date": None, "allotment_results_expected_date": None, "industry_label": entry.industry_label, "data_as_of": as_of, "notes": f"Seeded from HKEXnews {entry.board} New Listing Report {entry.report_year}.", } ) return rows def offering_rows(entries: list[ReportEntry], as_of: str) -> list[dict[str, object]]: rows = [] for entry in entries: rows.append( { "ticker": entry.ticker, "source_id": entry.source_id, "prospectus_date": entry.prospectus_date, "offer_price_hkd": entry.offer_price_hkd, "board_lot": None, "min_subscription_amount_hkd": None, "global_offer_shares": None, "hk_offer_shares_initial": None, "international_offer_shares_initial": None, "public_offer_pct_initial": None, "over_allotment_offer_shares": None, "offer_size_adjustment_offer_shares": None, "market_cap_hkd_m": entry.market_cap_hkd / 1_000_000 if entry.market_cap_hkd else None, "gross_proceeds_hkd_m": entry.funds_raised_hkd / 1_000_000 if entry.funds_raised_hkd else None, "net_proceeds_hkd_m": None, "issued_shares_upon_listing": entry.outstanding_shares_at_listing, "data_as_of": as_of, } ) return rows def report_entry_rows(entries: list[ReportEntry], as_of: str) -> list[dict[str, object]]: rows = [] for entry in entries: rows.append( { "report_entry_id": f"{entry.ticker}_{entry.board.lower().replace(' ', '_')}_{entry.report_year}", "ticker": entry.ticker, "report_year": entry.report_year, "board": entry.board, "source_id": entry.source_id, "company_name_en": entry.company_name_en, "prospectus_date": entry.prospectus_date, "listing_date": entry.listing_date, "offer_price_hkd": entry.offer_price_hkd, "funds_raised_hkd": entry.funds_raised_hkd, "subscription_ratio_times": entry.subscription_ratio_times, "market_cap_hkd": entry.market_cap_hkd, "outstanding_shares_at_listing": entry.outstanding_shares_at_listing, "listing_method": entry.listing_method, "industry_label": entry.industry_label, "place_of_incorporation": entry.place_of_incorporation, "sponsors": entry.sponsors, "reporting_accountants": entry.reporting_accountants, "valuers": entry.valuers, "data_as_of": as_of, "notes": entry.notes, } ) return rows def upsert_source_refs(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None: conn.executemany( """ INSERT INTO source_refs ( source_id, ticker, source_type, title, path_base, local_path, url, file_sha256, source_date, archived_at, notes ) VALUES ( :source_id, :ticker, :source_type, :title, :path_base, :local_path, :url, :file_sha256, :source_date, :archived_at, :notes ) ON CONFLICT(source_id) DO UPDATE SET source_type = excluded.source_type, title = excluded.title, path_base = excluded.path_base, local_path = excluded.local_path, url = excluded.url, file_sha256 = excluded.file_sha256, source_date = excluded.source_date, archived_at = excluded.archived_at, notes = excluded.notes """, rows, ) def upsert_master(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None: conn.executemany( """ INSERT INTO ipo_master ( ticker, company_name_en, company_name_zh, stock_short_name, exchange, board, status, listing_date, application_start_date, application_end_date, allotment_results_expected_date, industry_label, data_as_of, notes ) VALUES ( :ticker, :company_name_en, :company_name_zh, :stock_short_name, :exchange, :board, :status, :listing_date, :application_start_date, :application_end_date, :allotment_results_expected_date, :industry_label, :data_as_of, :notes ) ON CONFLICT(ticker) DO UPDATE SET company_name_en = CASE WHEN ipo_master.company_name_en = '' THEN excluded.company_name_en ELSE ipo_master.company_name_en END, exchange = excluded.exchange, board = excluded.board, status = CASE WHEN excluded.status = 'listed' THEN 'listed' ELSE ipo_master.status END, listing_date = COALESCE(ipo_master.listing_date, excluded.listing_date), industry_label = COALESCE(ipo_master.industry_label, excluded.industry_label), data_as_of = excluded.data_as_of, notes = COALESCE(ipo_master.notes, excluded.notes) """, rows, ) def upsert_offering_terms(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None: conn.executemany( """ INSERT INTO offering_terms ( ticker, source_id, prospectus_date, offer_price_hkd, board_lot, min_subscription_amount_hkd, global_offer_shares, hk_offer_shares_initial, international_offer_shares_initial, public_offer_pct_initial, over_allotment_offer_shares, offer_size_adjustment_offer_shares, market_cap_hkd_m, gross_proceeds_hkd_m, net_proceeds_hkd_m, issued_shares_upon_listing, data_as_of ) VALUES ( :ticker, :source_id, :prospectus_date, :offer_price_hkd, :board_lot, :min_subscription_amount_hkd, :global_offer_shares, :hk_offer_shares_initial, :international_offer_shares_initial, :public_offer_pct_initial, :over_allotment_offer_shares, :offer_size_adjustment_offer_shares, :market_cap_hkd_m, :gross_proceeds_hkd_m, :net_proceeds_hkd_m, :issued_shares_upon_listing, :data_as_of ) ON CONFLICT(ticker) DO UPDATE SET source_id = CASE WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.source_id ELSE offering_terms.source_id END, prospectus_date = CASE WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.prospectus_date ELSE COALESCE(offering_terms.prospectus_date, excluded.prospectus_date) END, offer_price_hkd = CASE WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.offer_price_hkd ELSE COALESCE(offering_terms.offer_price_hkd, excluded.offer_price_hkd) END, market_cap_hkd_m = CASE WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.market_cap_hkd_m ELSE COALESCE(offering_terms.market_cap_hkd_m, excluded.market_cap_hkd_m) END, gross_proceeds_hkd_m = CASE WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.gross_proceeds_hkd_m ELSE COALESCE(offering_terms.gross_proceeds_hkd_m, excluded.gross_proceeds_hkd_m) END, issued_shares_upon_listing = CASE WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.issued_shares_upon_listing ELSE COALESCE( offering_terms.issued_shares_upon_listing, excluded.issued_shares_upon_listing ) END, data_as_of = CASE WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.data_as_of ELSE offering_terms.data_as_of END """, rows, ) def upsert_report_entries(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None: conn.executemany( """ INSERT INTO new_listing_report_entries ( report_entry_id, ticker, report_year, board, source_id, company_name_en, prospectus_date, listing_date, offer_price_hkd, funds_raised_hkd, subscription_ratio_times, market_cap_hkd, outstanding_shares_at_listing, listing_method, industry_label, place_of_incorporation, sponsors, reporting_accountants, valuers, data_as_of, notes ) VALUES ( :report_entry_id, :ticker, :report_year, :board, :source_id, :company_name_en, :prospectus_date, :listing_date, :offer_price_hkd, :funds_raised_hkd, :subscription_ratio_times, :market_cap_hkd, :outstanding_shares_at_listing, :listing_method, :industry_label, :place_of_incorporation, :sponsors, :reporting_accountants, :valuers, :data_as_of, :notes ) ON CONFLICT(report_entry_id) DO UPDATE SET source_id = excluded.source_id, company_name_en = excluded.company_name_en, prospectus_date = excluded.prospectus_date, listing_date = excluded.listing_date, offer_price_hkd = excluded.offer_price_hkd, funds_raised_hkd = excluded.funds_raised_hkd, subscription_ratio_times = excluded.subscription_ratio_times, market_cap_hkd = excluded.market_cap_hkd, outstanding_shares_at_listing = excluded.outstanding_shares_at_listing, listing_method = excluded.listing_method, industry_label = excluded.industry_label, place_of_incorporation = excluded.place_of_incorporation, sponsors = excluded.sponsors, reporting_accountants = excluded.reporting_accountants, valuers = excluded.valuers, data_as_of = excluded.data_as_of, notes = excluded.notes """, rows, ) def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None: SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True) cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}") columns = [description[0] for description in cursor.description] with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle: writer = csv.writer(handle, lineterminator="\n") writer.writerow(columns) writer.writerows(cursor.fetchall()) def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None: subprocess.run( [ sys.executable, "scripts/update_sync_state.py", "--db", db_path, "--schema", schema_path, "--as-of", as_of, "--mode", "recent_ipo_list_refresh", "--summary-limit", "25", ], check=True, ) def main() -> int: args = parse_args() end_date = date.fromisoformat(args.end_date) start_date = date.fromisoformat(args.start_date) if args.start_date else years_before(end_date, args.years) as_of = parse_as_of(args.as_of) links = discover_report_links(args.archive_page, start_date, end_date) if not links: raise SystemExit("No HKEXnews new listing report links were found for the requested date range.") all_entries: list[ReportEntry] = [] skipped_rows = 0 for link in links: archive_report_file(link) entries, skipped = parse_report(link, start_date, end_date, args.include_non_ipo) all_entries.extend(entries) skipped_rows += skipped entries_by_source: dict[str, list[ReportEntry]] = {} for entry in all_entries: entries_by_source.setdefault(entry.source_id, []).append(entry) with sqlite3.connect(args.db) as conn: conn.executescript(Path(args.schema).read_text(encoding="utf-8")) upsert_master(conn, master_rows(all_entries, as_of)) upsert_source_refs(conn, source_ref_rows(links, entries_by_source, as_of)) upsert_report_entries(conn, report_entry_rows(all_entries, as_of)) upsert_offering_terms(conn, offering_rows(all_entries, as_of)) for table in [ "ipo_master", "offering_terms", "new_listing_report_entries", "source_refs", "data_gaps", ]: export_snapshot(conn, table) if not args.skip_sync_state: refresh_sync_state(args.db, args.schema, as_of) print("recent IPO list updated") print(f"date range: {start_date.isoformat()} to {end_date.isoformat()}") print(f"reports archived: {len(links)}") print(f"IPO targets parsed: {len(all_entries)}") if skipped_rows: print(f"non-IPO report rows skipped: {skipped_rows}") return 0 if __name__ == "__main__": raise SystemExit(main())