Files
hk-ipo/scripts/archive_hkex_current_new_listings.py
geometrybase e346690bb7 Archive current HKEX IPO candidates
Request:
- Use the analyst workflow to analyze the latest Hong Kong IPOs, connect their source data, and produce a current report.

Changes:
- Added a current HKEX New Listing Information page seeder that archives the official page, seeds visible tickers, and records source_refs.
- Archived current HKEX prospectus and allotment-result sources for the 16 visible Main Board candidates and extracted their text.
- Extended prospectus parsing for offer price, derived gross proceeds, HDR offerings, and listing-date text extracted with split characters.
- Rebuilt the analysis dataset and added a Chinese 2026-06-21 latest IPO report separating live T0 watchlist names from past-cutoff T1/D1 candidates.

Verification:
- Ran py_compile for update_recent_ipo_list.py, archive_hkex_current_new_listings.py, archive_hkex_documents.py, and build_analysis_dataset.py.
- Re-ran HKEX current page seeding, document archiving, and analysis dataset build as of 2026-06-21T08:44:59Z.
- Ran git diff --check and git diff --cached --check.
- Ran SQLite integrity_check and foreign_key_check.
- Verified source_refs paths, file existence, SHA-256 hashes, and report source paths.

Next useful context:
- Capture T0.95 market heat before the 2026-06-23 and 2026-06-24 order cutoffs before converting the new watchlist into execution calls.
- Treat 02667 as a stale/special HKEX page item until a fresh June timetable or official result appears.
2026-06-21 09:05:13 +00:00

346 lines
12 KiB
Python

#!/usr/bin/env python3
"""Seed current HKEX New Listing Information page entries into the archive."""
from __future__ import annotations
import argparse
import csv
import hashlib
import html
import re
import sqlite3
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from html.parser import HTMLParser
from pathlib import Path
from urllib.request import Request, urlopen
ARCHIVE_PAGE_URL = "https://www2.hkexnews.hk/New-Listings/New-Listing-Information/Main-Board?sc_lang=en"
DB_PATH = Path("data/hk_ipo.sqlite")
SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
SNAPSHOT_DIR = Path("data/snapshots")
RAW_DIR = Path("data/raw/hkex_new_listing_information")
@dataclass(frozen=True)
class CurrentListingEntry:
ticker: str
company_name_en: str
announcement_url: str | None
prospectus_url: str | None
allotment_results_url: str | None
class TableParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.in_target_table = False
self.in_body = False
self.in_row = False
self.in_cell = False
self.current_cell = -1
self.current_row: list[dict[str, object]] = []
self.rows: list[list[dict[str, object]]] = []
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
attrs_dict = dict(attrs)
if tag == "table" and "rte-table-mobile-list" in (attrs_dict.get("class") or ""):
self.in_target_table = True
elif self.in_target_table and tag == "tbody":
self.in_body = True
elif self.in_body and tag == "tr":
self.in_row = True
self.current_row = []
elif self.in_row and tag == "td":
self.in_cell = True
self.current_cell += 1
self.current_row.append({"text": [], "links": []})
elif self.in_cell and tag == "a":
href = attrs_dict.get("href")
if href:
self.current_row[self.current_cell]["links"].append(href)
def handle_endtag(self, tag: str) -> None:
if tag == "td" and self.in_cell:
self.in_cell = False
elif tag == "tr" and self.in_row:
if self.current_row:
self.rows.append(self.current_row)
self.in_row = False
self.current_cell = -1
elif tag == "tbody" and self.in_body:
self.in_body = False
elif tag == "table" and self.in_target_table:
self.in_target_table = False
def handle_data(self, data: str) -> None:
if self.in_cell:
self.current_row[self.current_cell]["text"].append(data)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.")
parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.")
parser.add_argument("--archive-page", default=ARCHIVE_PAGE_URL, help="HKEXnews New Listing Information page.")
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh ticker sync state after updating facts.")
return parser.parse_args()
def parse_as_of(value: str | None) -> str:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def fetch_bytes(url: str) -> bytes:
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(request, timeout=60) as response:
return response.read()
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def clean_text(parts: list[str]) -> str:
return " ".join(html.unescape(" ".join(parts)).split())
def normalize_ticker(value: str) -> str | None:
digits = re.sub(r"\D", "", value)
if not digits:
return None
return digits.zfill(5)
def source_date_from_page(page: str) -> str | None:
match = re.search(r"Updated:\s*(\d{1,2}\s+[A-Za-z]+\s+\d{4})", page)
if not match:
return None
return datetime.strptime(match.group(1), "%d %b %Y").date().isoformat()
def parse_entries(page: str) -> list[CurrentListingEntry]:
parser = TableParser()
parser.feed(page)
entries: list[CurrentListingEntry] = []
for row in parser.rows:
if len(row) < 5:
continue
ticker = normalize_ticker(clean_text(row[0]["text"]))
company_name = clean_text(row[1]["text"])
if not ticker or not company_name:
continue
entries.append(
CurrentListingEntry(
ticker=ticker,
company_name_en=company_name,
announcement_url=first_link(row[2]),
prospectus_url=first_link(row[3]),
allotment_results_url=first_link(row[4]),
)
)
return entries
def first_link(cell: dict[str, object]) -> str | None:
links = cell["links"]
if isinstance(links, list) and links:
return str(links[0])
return None
def archive_page(url: str, source_date: str | None, data: bytes) -> tuple[str, str]:
suffix = source_date.replace("-", "") if source_date else datetime.now(timezone.utc).strftime("%Y%m%d")
local_path = RAW_DIR / f"main_board_{suffix}.html"
local_path.parent.mkdir(parents=True, exist_ok=True)
local_path.write_bytes(data)
return local_path.as_posix(), sha256_bytes(data)
def source_rows(
entries: list[CurrentListingEntry],
page_url: str,
local_path: str,
file_hash: str,
source_date: str | None,
as_of: str,
) -> list[dict[str, object]]:
rows = []
date_key = (source_date or as_of.split("T", 1)[0]).replace("-", "_")
for entry in entries:
links = []
if entry.announcement_url:
links.append("announcement")
if entry.prospectus_url:
links.append("prospectus")
if entry.allotment_results_url:
links.append("allotment_results")
rows.append(
{
"source_id": f"{entry.ticker}_new_listing_information_main_{date_key}",
"ticker": entry.ticker,
"source_type": "new_listing_information",
"title": "HKEXnews Main Board New Listing Information",
"path_base": "repo_root",
"local_path": local_path,
"url": page_url,
"file_sha256": file_hash,
"source_date": source_date,
"archived_at": as_of,
"notes": "Current HKEX New Listing Information page. Direct links present: "
+ (", ".join(links) if links else "none"),
}
)
return rows
def master_rows(entries: list[CurrentListingEntry], as_of: str) -> list[dict[str, object]]:
return [
{
"ticker": entry.ticker,
"company_name_en": entry.company_name_en,
"company_name_zh": None,
"stock_short_name": None,
"exchange": "HKEX",
"board": "Main Board",
"status": "new_listing_information",
"listing_date": None,
"application_start_date": None,
"application_end_date": None,
"allotment_results_expected_date": None,
"industry_label": None,
"data_as_of": as_of,
"notes": "Seeded from HKEXnews Main Board New Listing Information page; detailed terms require prospectus archive.",
}
for entry in entries
]
def upsert_master(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None:
conn.executemany(
"""
INSERT INTO ipo_master (
ticker, company_name_en, company_name_zh, stock_short_name, exchange, board,
status, listing_date, application_start_date, application_end_date,
allotment_results_expected_date, industry_label, data_as_of, notes
)
VALUES (
:ticker, :company_name_en, :company_name_zh, :stock_short_name, :exchange, :board,
:status, :listing_date, :application_start_date, :application_end_date,
:allotment_results_expected_date, :industry_label, :data_as_of, :notes
)
ON CONFLICT(ticker) DO UPDATE SET
company_name_en = CASE
WHEN ipo_master.company_name_en = '' THEN excluded.company_name_en
ELSE ipo_master.company_name_en
END,
exchange = excluded.exchange,
board = excluded.board,
status = CASE
WHEN ipo_master.status = 'listed' THEN ipo_master.status
ELSE excluded.status
END,
data_as_of = excluded.data_as_of,
notes = CASE
WHEN ipo_master.notes IS NULL THEN excluded.notes
ELSE ipo_master.notes
END
""",
rows,
)
def upsert_source_refs(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None:
conn.executemany(
"""
INSERT INTO source_refs (
source_id, ticker, source_type, title, path_base, local_path, url,
file_sha256, source_date, archived_at, notes
)
VALUES (
:source_id, :ticker, :source_type, :title, :path_base, :local_path, :url,
:file_sha256, :source_date, :archived_at, :notes
)
ON CONFLICT(source_id) DO UPDATE SET
source_type = excluded.source_type,
title = excluded.title,
path_base = excluded.path_base,
local_path = excluded.local_path,
url = excluded.url,
file_sha256 = excluded.file_sha256,
source_date = excluded.source_date,
archived_at = excluded.archived_at,
notes = excluded.notes
""",
rows,
)
def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None:
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}")
columns = [description[0] for description in cursor.description]
with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle:
writer = csv.writer(handle, lineterminator="\n")
writer.writerow(columns)
writer.writerows(cursor.fetchall())
def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None:
subprocess.run(
[
sys.executable,
"scripts/update_sync_state.py",
"--db",
db_path,
"--schema",
schema_path,
"--as-of",
as_of,
"--mode",
"current_new_listing_information",
"--summary-limit",
"25",
],
check=True,
)
def main() -> int:
args = parse_args()
as_of = parse_as_of(args.as_of)
data = fetch_bytes(args.archive_page)
page = data.decode("utf-8", "replace")
source_date = source_date_from_page(page)
entries = parse_entries(page)
if not entries:
raise SystemExit("No current HKEX new listing entries were parsed.")
local_path, file_hash = archive_page(args.archive_page, source_date, data)
with sqlite3.connect(args.db) as conn:
conn.executescript(Path(args.schema).read_text(encoding="utf-8"))
upsert_master(conn, master_rows(entries, as_of))
upsert_source_refs(conn, source_rows(entries, args.archive_page, local_path, file_hash, source_date, as_of))
for table in ["ipo_master", "source_refs", "data_gaps"]:
export_snapshot(conn, table)
if not args.skip_sync_state:
refresh_sync_state(args.db, args.schema, as_of)
print("current HKEX new listing information archived")
print(f"source_date: {source_date or 'unknown'}")
print(f"entries parsed: {len(entries)}")
print("tickers: " + ",".join(entry.ticker for entry in entries))
print(f"page: {local_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())