#!/usr/bin/env python3 """Archive HKEXnews prospectus and allotment-result documents for open sync tasks.""" from __future__ import annotations import argparse import csv import hashlib import html import json import logging import re import sqlite3 import subprocess import sys from dataclasses import dataclass from datetime import date, datetime, timedelta, timezone from pathlib import Path from urllib.parse import urlencode, urljoin from urllib.request import Request, urlopen from pypdf import PdfReader logging.getLogger("pypdf").setLevel(logging.ERROR) BASE_URL = "https://www1.hkexnews.hk" ACTIVE_STOCK_URL = f"{BASE_URL}/ncms/script/eds/activestock_sehk_e.json" INACTIVE_STOCK_URL = f"{BASE_URL}/ncms/script/eds/inactivestock_sehk_e.json" TITLE_SEARCH_URL = f"{BASE_URL}/search/titlesearch.xhtml" TITLE_SEARCH_SERVLET_URL = f"{BASE_URL}/search/titleSearchServlet.do" DB_PATH = Path("data/hk_ipo.sqlite") SCHEMA_PATH = Path("schema/hk_ipo.schema.sql") SNAPSHOT_DIR = Path("data/snapshots") @dataclass(frozen=True) class DocumentRow: release_time: str release_date: str headline: str title: str href: str url: str @dataclass(frozen=True) class ArchivedSource: source_id: str ticker: str source_type: str title: str local_path: str url: str file_sha256: str source_date: str notes: str @dataclass(frozen=True) class ProspectusFacts: application_start_date: str | None = None application_end_date: str | None = None allotment_results_expected_date: str | None = None listing_date: str | None = None offer_price_hkd: float | None = None board_lot: int | None = None min_subscription_amount_hkd: float | None = None global_offer_shares: int | None = None hk_offer_shares_initial: int | None = None international_offer_shares_initial: int | None = None public_offer_pct_initial: float | None = None over_allotment_offer_shares: int | None = None gross_proceeds_hkd_m: float | None = None @dataclass(frozen=True) class AllotmentFacts: final_offer_price_hkd: float | None = None gross_proceeds_hkd_m: float | None = None net_proceeds_hkd_m: float | None = None issued_shares_upon_listing: int | None = None valid_applications: int | None = None successful_applications: int | None = None public_oversubscription_times: float | None = None international_placees: int | None = None international_oversubscription_times: float | None = None final_hk_offer_shares: int | None = None final_international_offer_shares: int | None = None def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.") parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.") parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.") parser.add_argument("--limit", type=int, help="Optional maximum tickers to process. Omit to process all open T0/T1 tasks.") parser.add_argument("--tickers", help="Comma-separated tickers to process instead of selecting from sync_tasks.") parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh sync state after updating facts.") parser.add_argument("--skip-text-extraction", action="store_true", help="Do not extract text for newly archived PDFs.") return parser.parse_args() def fetch_bytes(url: str) -> bytes: request = Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urlopen(request, timeout=60) as response: return response.read() def parse_as_of(value: str | None) -> str: if value: return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z") return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def load_stock_ids() -> dict[str, int]: stock_ids: dict[str, int] = {} for url in [ACTIVE_STOCK_URL, INACTIVE_STOCK_URL]: payload = fetch_bytes(url).decode("utf-8-sig") for item in json.loads(payload): code = item.get("c") stock_id = item.get("i") if code and stock_id: stock_ids.setdefault(code, int(stock_id)) return stock_ids def clean_html(value: str) -> str: text = re.sub(r"<.*?>", " ", value, flags=re.S) return " ".join(html.unescape(text).split()) def parse_release_date(value: str) -> str: return datetime.strptime(value.split()[0], "%d/%m/%Y").date().isoformat() def parse_release_datetime(value: str) -> str: return datetime.strptime(value, "%d/%m/%Y %H:%M").date().isoformat() def latest_title_search_rows(stock_id: int) -> list[DocumentRow]: url = f"{TITLE_SEARCH_URL}?category=0&market=SEHK&stockId={stock_id}" page = fetch_bytes(url).decode("utf-8", "replace") rows: list[DocumentRow] = [] for row in re.findall(r"(.*?)", page, flags=re.S): release_match = re.search(r"Release Time: (.*?)", row, flags=re.S) headline_match = re.search(r'
(.*?)
', row, flags=re.S) link_match = re.search(r']*>(.*?)', row, flags=re.S) if not release_match or not link_match: continue release_time = " ".join(release_match.group(1).split()) href = html.unescape(link_match.group(1)) rows.append( DocumentRow( release_time=release_time, release_date=parse_release_date(release_time), headline=clean_html(headline_match.group(1)) if headline_match else "", title=clean_html(link_match.group(2)), href=href, url=urljoin(BASE_URL, href), ) ) return rows def window_title_search_rows(stock_id: int, from_date: date, to_date: date) -> list[DocumentRow]: params = { "sortDir": "0", "sortByOptions": "DateTime", "category": "0", "market": "SEHK", "stockId": str(stock_id), "documentType": "-1", "fromDate": from_date.strftime("%Y%m%d"), "toDate": to_date.strftime("%Y%m%d"), "title": "", "searchType": "0", "t1code": "-2", "t2Gcode": "-2", "t2code": "-2", "rowRange": "500", "lang": "en", } url = f"{TITLE_SEARCH_SERVLET_URL}?{urlencode(params)}" request = Request( url, headers={ "User-Agent": "Mozilla/5.0", "Referer": f"{TITLE_SEARCH_URL}?category=0&market=SEHK&stockId={stock_id}", }, ) with urlopen(request, timeout=60) as response: payload = response.read().decode("utf-8", "replace") response_data = json.loads(payload) result = json.loads(response_data.get("result") or "[]") rows: list[DocumentRow] = [] for item in result: href = html.unescape(item.get("FILE_LINK") or "") release_time = " ".join((item.get("DATE_TIME") or "").split()) if not href or not release_time: continue rows.append( DocumentRow( release_time=release_time, release_date=parse_release_datetime(release_time), headline=clean_html(item.get("SHORT_TEXT") or ""), title=clean_html(item.get("TITLE") or ""), href=href, url=urljoin(BASE_URL, href), ) ) return rows def title_search_rows(stock_id: int, listing_date: str | None, prospectus_date: str | None) -> list[DocumentRow]: listed = parse_iso_date(listing_date) prospectus = parse_iso_date(prospectus_date) if listed: return window_title_search_rows(stock_id, listed - timedelta(days=90), listed + timedelta(days=14)) if prospectus: return window_title_search_rows(stock_id, prospectus - timedelta(days=14), prospectus + timedelta(days=60)) return latest_title_search_rows(stock_id) def parse_iso_date(value: str | None) -> date | None: if not value: return None return date.fromisoformat(value) def date_distance(left: str, right: str) -> int: return abs((date.fromisoformat(left) - date.fromisoformat(right)).days) def archiveable_document(row: DocumentRow) -> bool: return Path(row.href.lower()).suffix in {".pdf", ".htm", ".html"} def choose_prospectus(rows: list[DocumentRow], prospectus_date: str | None, listing_date: str | None) -> DocumentRow | None: candidates = [] for row in rows: headline = row.headline.lower() title = row.title.lower() if not row.href.lower().endswith(".pdf"): continue if "listing documents" not in headline: continue if "global offering" in title or "prospectus" in title or title in {"share offer", "public offer"}: candidates.append(row) if not candidates: return None if prospectus_date: return sorted(candidates, key=lambda row: (date_distance(row.release_date, prospectus_date), row.release_date))[0] listed = parse_iso_date(listing_date) if listed: windowed = [ row for row in candidates if 0 <= (listed - date.fromisoformat(row.release_date)).days <= 60 ] if windowed: candidates = windowed return sorted(candidates, key=lambda row: row.release_date)[-1] def choose_allotment(rows: list[DocumentRow], listing_date: str | None) -> DocumentRow | None: candidates = [ row for row in rows if archiveable_document(row) and ("allotment results" in row.headline.lower() or "allotment results" in row.title.lower()) ] if not candidates: return None listed = parse_iso_date(listing_date) if listed: windowed = [ row for row in candidates if -5 <= (listed - date.fromisoformat(row.release_date)).days <= 10 ] if windowed: candidates = windowed def allotment_rank(row: DocumentRow) -> tuple[int, int, str]: title = row.title.lower() if "clarification" in title or "supplemental" in title: quality = 0 elif "allotment results" in title and ("final offer price" in title or title.startswith("announcement of allotment")): quality = 2 else: quality = 1 distance = date_distance(row.release_date, listing_date) if listing_date else 0 return quality, -distance, row.release_date return sorted(candidates, key=allotment_rank)[-1] def sha256_bytes(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def download_document(ticker: str, source_type: str, row: DocumentRow) -> ArchivedSource: data = fetch_bytes(row.url) doc_id = Path(row.href).stem suffix = Path(row.href).suffix.lower() or ".pdf" local_path = Path("data/raw") / ticker / f"{source_type}_{row.release_date}_{doc_id}{suffix}" local_path.parent.mkdir(parents=True, exist_ok=True) if not local_path.exists() or local_path.read_bytes() != data: local_path.write_bytes(data) return ArchivedSource( source_id=f"{ticker}_{source_type}_{row.release_date.replace('-', '_')}_{doc_id}", ticker=ticker, source_type=source_type, title=row.title, local_path=local_path.as_posix(), url=row.url, file_sha256=sha256_bytes(data), source_date=row.release_date, notes=f"HKEXnews {row.headline}.", ) def first_pdf_text(local_path: str, max_pages: int) -> str: reader = PdfReader(local_path) chunks = [] for page in reader.pages[: min(max_pages, len(reader.pages))]: chunks.append(page.extract_text() or "") return " ".join(" ".join(chunks).split()) def first_pdf_text_with_lines(local_path: str, max_pages: int) -> str: reader = PdfReader(local_path) chunks = [] for page in reader.pages[: min(max_pages, len(reader.pages))]: chunks.append(page.extract_text() or "") return "\n".join(chunks) def normalize_pdf_text(text: str) -> str: replacements = { "H o n g K o n g P u b l i c O f f e r i n g c o m m e n c e s": "Hong Kong Public Offering commences", "a t o r b e f o r e": "at or before", "n o l a t e r": "no later", "o n o r b e f o r e": "on or before", "c o m m e n c e": "commence", } for source, target in replacements.items(): text = text.replace(source, target) text = re.sub(r"\ba\s+t\b", "at", text) text = re.sub(r"\bo\s+n\b", "on", text) text = re.sub(r"\bf\s+r\s+o\s+m\b", "from", text) return text def integer_after(pattern: str, text: str) -> int | None: match = re.search(pattern, text, flags=re.I) if not match: return None cleaned = match.group(1).replace(",", "").replace(" ", "") if not cleaned: return None return int(cleaned) def float_after(pattern: str, text: str) -> float | None: match = re.search(pattern, text, flags=re.I) if not match: return None return float(match.group(1).replace(",", "")) def money_m_after(pattern: str, text: str) -> float | None: match = re.search(pattern, text, flags=re.I) if not match: return None amount = float(match.group(1).replace(",", "")) unit = (match.group(2) or "").lower() if unit.startswith("b"): return amount * 1000 return amount def strict_money_m_after(pattern: str, text: str) -> float | None: match = re.search(pattern, text, flags=re.I) if not match: return None amount = float(match.group(1).replace(",", "")) unit = match.group(2).lower() if unit.startswith("b"): return amount * 1000 return amount def date_after(label_pattern: str, text: str) -> str | None: match = re.search( label_pattern + r".{0,600}?(?:on|from|at or before)\s+(?:[.\s]+)?(?:[A-Z][a-z]+,\s+)?" + r"([A-Z][a-z]+ \d{1,2}, \d{4}|\d{1,2} [A-Z][a-z]+ \d{4})", text, flags=re.I, ) if not match: return None value = match.group(1) for date_format in ["%B %d, %Y", "%d %B %Y"]: try: return datetime.strptime(value, date_format).date().isoformat() except ValueError: pass return None def parse_offer_price_hkd(text: str) -> float | None: share_unit = r"(?:H\s+)?(?:Share|Shares|Offer Share|Offer Shares|HDR|HDRs|Offer HDR|Offer HDRs)" patterns = [ rf"(?:Maximum\s+)?Offer Price\s*:?\s*HK\$?\s*([\d,]+(?:\.\d+)?)\s+per\s+{share_unit}", rf"Offer Price will (?:be|not be more than)\s+HK\$?\s*([\d,]+(?:\.\d+)?)\s+per\s+{share_unit}", rf"maximum Offer Price of HK\$?\s*([\d,]+(?:\.\d+)?)\s+per\s+{share_unit}", ] for pattern in patterns: price = float_after(pattern, text) if price is not None: return price return None def parse_over_allotment_offer_shares(local_path: str, global_offer_shares: int | None) -> int | None: text = normalize_pdf_text(first_pdf_text(local_path, 320)) if re.search(r"\bno\s+over-?allotment\s+option\b", text, flags=re.I): return 0 explicit_shares = integer_after( r"over-?allotment option.{0,500}?up to\s+([\d][\d,\s]*)\s+(?:additional\s+)?(?:H\s+)?(?:Shares|HDRs)", text, ) if explicit_shares is not None: return explicit_shares if global_offer_shares and re.search(r"over-?allotment option", text, flags=re.I): if re.search(r"(?:15%|15\s+per\s+cent|fifteen\s+per\s+cent)", text, flags=re.I): return round(global_offer_shares * 0.15) return None def parse_prospectus_facts(local_path: str) -> ProspectusFacts: text = normalize_pdf_text(first_pdf_text(local_path, 8)) board_lot = integer_after(r"minimum\s*of\s*([\d][\d,\s]*)\s*Hong\s*Kong\s*Offer\s*(?:Shares|HDRs)", text) min_amount = None if board_lot: pattern = rf"\b{board_lot:,}\b\s+([\d,]+\.\d{{2}})" min_amount = float_after(pattern, text) if min_amount is None: pattern = rf"\b{board_lot}\b\s+([\d,]+\.\d{{2}})" min_amount = float_after(pattern, text) global_shares = integer_after( r"Number of Offer (?:Shares|HDRs) (?:under|in) the Global Offering\s*:?\s+([\d][\d,\s]*)", text, ) if global_shares is None: global_shares = integer_after( r"Number of Offer (?:Shares|HDRs)\s*:?\s+([\d][\d,\s]*)\s+(?:H\s+)?(?:Shares|HDRs)", text, ) hk_shares = integer_after(r"Number of Hong Kong Offer (?:Shares|HDRs)\s*:?\s+([\d][\d,\s]*)", text) intl_shares = integer_after(r"Number of International Offer (?:Shares|HDRs)\s*:?\s+([\d][\d,\s]*)", text) offer_price = parse_offer_price_hkd(text) over_allotment = parse_over_allotment_offer_shares(local_path, global_shares) public_pct = round(hk_shares / global_shares, 4) if global_shares and hk_shares else None gross_proceeds = round(global_shares * offer_price / 1_000_000, 6) if global_shares and offer_price else None allotment_date = ( date_after(r"Announcement of the level of indications.*?basis of allocation", text) or date_after(r"The results of allocations", text) or date_after(r"Announcement of", text) or date_after(r"Announcement of.*?Offer Price", text) ) return ProspectusFacts( application_start_date=date_after(r"Hong Kong Public Offering commences", text), application_end_date=date_after(r"Application lists.*?close", text), allotment_results_expected_date=allotment_date, listing_date=date_after( r"Dealings in (?:our\s+|the\s+)?(?:H\s+)?(?:Shares|HDRs).*?(?:expected to commence|to commence)", text, ), offer_price_hkd=offer_price, board_lot=board_lot, min_subscription_amount_hkd=min_amount, global_offer_shares=global_shares, hk_offer_shares_initial=hk_shares, international_offer_shares_initial=intl_shares, public_offer_pct_initial=public_pct, over_allotment_offer_shares=over_allotment, gross_proceeds_hkd_m=gross_proceeds, ) def section_between(text: str, start: str, end: str | None, use_last_start: bool = False) -> str: start_matches = list(re.finditer(start, text, flags=re.I)) if not start_matches: return "" start_match = start_matches[-1] if use_last_start else start_matches[0] section_start = start_match.end() if not end: return text[section_start:] end_match = re.search(end, text[section_start:], flags=re.I) section_end = section_start + end_match.start() if end_match else len(text) return text[section_start:section_end] def allotment_detail_sections(text: str) -> tuple[str, str]: hk_match = re.search( r"HONG KONG PUBLIC OFFERING\s+No\. of valid applications(.*?)INTERNATIONAL OFFERING\s+No\. of placees", text, flags=re.I, ) intl_match = re.search( r"INTERNATIONAL OFFERING\s+No\. of placees(.*?)(?:The Directors|LOCK-UP|Allottees with|$)", text, flags=re.I, ) hk_section = "No. of valid applications" + hk_match.group(1) if hk_match else "" intl_section = "No. of placees" + intl_match.group(1) if intl_match else "" return hk_section, intl_section def normalized_lines(text: str) -> list[str]: return [" ".join(line.replace("\xa0", " ").split()) for line in text.splitlines() if line.strip()] def number_from_text(value: str) -> int | None: match = re.search(r"([\d][\d,]*)", value) if not match: return None return int(match.group(1).replace(",", "")) def float_from_text(value: str) -> float | None: match = re.search(r"([\d][\d,]*(?:\.\d+)?)", value) if not match: return None return float(match.group(1).replace(",", "")) def value_after_line_label( lines: list[str], label_patterns: list[str], *, value_type: str = "int", max_lines: int = 6, ) -> int | float | None: for index in range(len(lines)): window = " ".join(lines[index : index + max_lines]) for label_pattern in label_patterns: match = re.search(label_pattern, window, flags=re.I) if not match: continue tail = window[match.end() :] if value_type == "times": times_match = re.search(r"([\d][\d,]*(?:\.\d+)?)\s*times", tail, flags=re.I) if times_match: return float(times_match.group(1).replace(",", "")) return float_from_text(tail) return number_from_text(tail) return None def section_lines( lines: list[str], start_patterns: list[str], end_patterns: list[str], *, start_index: int = 0, ) -> list[str]: section_start = None for index in range(start_index, len(lines)): if any(re.search(pattern, lines[index], flags=re.I) for pattern in start_patterns): section_start = index break if section_start is None: return [] section_end = len(lines) for index in range(section_start + 1, len(lines)): if any(re.search(pattern, lines[index], flags=re.I) for pattern in end_patterns): section_end = index break return lines[section_start:section_end] def allotment_detail_line_sections(text: str) -> tuple[list[str], list[str], list[str]]: lines = normalized_lines(text) detail_start = 0 for index, line in enumerate(lines[:700]): if re.search(r"ALLOTMENT RESULTS DETAILS|APPLICATIONS AND INDICATIONS", line, flags=re.I): detail_start = index break detail_lines = lines[detail_start : detail_start + 320] public_lines = section_lines( detail_lines, [r"^(HONG KONG )?PUBLIC OFFER", r"PUBLIC OFFER SHARES"], [r"^INTERNATIONAL OFFER", r"^PLACING$", r"^EMPLOYEE PREFERENTIAL OFFERING"], ) international_lines = section_lines( detail_lines, [r"^INTERNATIONAL OFFER", r"^PLACING$"], [ r"^EMPLOYEE PREFERENTIAL OFFERING", r"^THE DIRECTORS CONFIRM", r"^THE PLACEES", r"^LOCK-UP", r"^BASIS OF ALLOCATION", ], ) if not public_lines: for index, line in enumerate(detail_lines): if re.search(r"No\. of valid applications|Number of valid applications", line, flags=re.I): public_lines = detail_lines[max(0, index - 10) : index + 70] break if not international_lines: for index, line in enumerate(detail_lines): if re.search(r"No\. of placees|Number of placees", line, flags=re.I): international_lines = detail_lines[max(0, index - 10) : index + 90] break return detail_lines, public_lines, international_lines def parse_allotment_facts_from_text(text: str) -> AllotmentFacts: flat_text = " ".join(text.split()) detail_lines, public_lines, international_lines = allotment_detail_line_sections(text) public_text = " ".join(public_lines) international_text = " ".join(international_lines) if not public_text: public_text = flat_text if not international_text: international_text = flat_text valid_applications = value_after_line_label( public_lines, [r"No\. of valid applications", r"Number of valid applications"], ) if valid_applications is None: valid_applications = integer_after(r"A total of\s+([\d,]+)\s+valid applications", flat_text) successful_applications = value_after_line_label( public_lines, [r"No\. of successful applications", r"Number of successful applications"], ) if successful_applications is None: successful_applications = integer_after(r"allocated to\s+([\d,]+)\s+successful applicants", flat_text) public_oversubscription_times = value_after_line_label( public_lines, [r"Subscription [Ll]evel"], value_type="times", ) if public_oversubscription_times is None: public_oversubscription_times = float_after( r"representing approximately\s+([\d,.]+)\s+times.*?(?:Public Offer|Hong Kong Public Offering)", flat_text, ) international_placees = value_after_line_label( international_lines, [r"No\. of placees", r"Number of placees"], ) if international_placees is None: international_placees = integer_after( r"(?:There (?:are|is) a total of|total of)\s+([\d,]+)\s+placees", international_text, ) international_oversubscription_times = value_after_line_label( international_lines, [r"Subscription [Ll]evel"], value_type="times", ) if international_oversubscription_times is None: international_oversubscription_times = float_after( r"representing approximately\s+([\d,.]+)\s+times.*?(?:Placing|International Offer)", international_text, ) final_hk_offer_shares = value_after_line_label( public_lines, [r"Final no\. of Offer Shares under the (?:Hong Kong )?Public Offer(?:ing)?"], ) if final_hk_offer_shares is None: final_hk_offer_shares = integer_after( r"final number of Offer Shares under the (?:Public Offer|Hong Kong Public Offering) is\s+([\d,]+)", flat_text, ) final_international_offer_shares = value_after_line_label( international_lines, [r"Final no\. of Offer Shares under the International Offer(?:ing)?", r"Final no\. of Offer Shares under the Placing"], ) if final_international_offer_shares is None: final_international_offer_shares = integer_after( r"final number of Offer Shares under (?:the )?(?:Placing|International Offer(?:ing)?) is\s+([\d,]+)", international_text, ) return AllotmentFacts( final_offer_price_hkd=float_after(r"Final Offer Price\s*:?\s*HK\$?([\d,.]+)", flat_text), gross_proceeds_hkd_m=strict_money_m_after(r"Gross proceeds.{0,300}?HK\$([\d,.]+)\s*(million|billion)", flat_text), net_proceeds_hkd_m=strict_money_m_after(r"Net proceeds.{0,500}?HK\$([\d,.]+)\s*(million|billion)", flat_text), issued_shares_upon_listing=integer_after(r"Number of issued shares upon Listing.*?([\d,]+)", flat_text), valid_applications=valid_applications if isinstance(valid_applications, int) else None, successful_applications=successful_applications if isinstance(successful_applications, int) else None, public_oversubscription_times=public_oversubscription_times if isinstance(public_oversubscription_times, float) else None, international_placees=international_placees if isinstance(international_placees, int) else None, international_oversubscription_times=( international_oversubscription_times if isinstance(international_oversubscription_times, float) else None ), final_hk_offer_shares=final_hk_offer_shares if isinstance(final_hk_offer_shares, int) else None, final_international_offer_shares=( final_international_offer_shares if isinstance(final_international_offer_shares, int) else None ), ) def parse_allotment_facts(local_path: str) -> AllotmentFacts: text_with_lines = first_pdf_text_with_lines(local_path, 12) facts = parse_allotment_facts_from_text(text_with_lines) if any( [ facts.valid_applications, facts.successful_applications, facts.public_oversubscription_times, facts.international_placees, facts.international_oversubscription_times, ] ): return facts text = first_pdf_text(local_path, 8) hk_section, intl_section = allotment_detail_sections(text) return AllotmentFacts( final_offer_price_hkd=float_after(r"Final Offer Price\s+HK\$([\d,.]+)", text), gross_proceeds_hkd_m=money_m_after(r"Gross proceeds.*?HK\$([\d,.]+)\s*(million|billion)?", text), net_proceeds_hkd_m=money_m_after(r"Net proceeds\s+HK\$([\d,.]+)\s*(million|billion)?", text), issued_shares_upon_listing=integer_after(r"Number of issued shares upon Listing.*?([\d,]+)", text), valid_applications=integer_after(r"No\. of valid applications\s+([\d,]+)", hk_section), successful_applications=integer_after(r"No\. of successful applications\s+([\d,]+)", hk_section), public_oversubscription_times=float_after(r"Subscription level\s+([\d,.]+)\s+times", hk_section), international_placees=integer_after(r"No\. of placees\s+([\d,]+)", intl_section), international_oversubscription_times=float_after(r"Subscription level.*?([\d,.]+)\s+times", intl_section), final_hk_offer_shares=integer_after( r"Final no\. of Offer Shares under the Hong Kong Public Offering.*?([\d][\d,\s]*)", hk_section, ), final_international_offer_shares=integer_after( r"Final no\. of Offer Shares under the International Offering.*?([\d][\d,\s]*)", intl_section, ), ) def select_tickers(conn: sqlite3.Connection, limit: int | None, tickers: str | None) -> list[str]: if tickers: return [ticker.strip().zfill(5) for ticker in tickers.split(",") if ticker.strip()] sql = """ SELECT DISTINCT m.ticker FROM sync_tasks t JOIN ipo_master m ON m.ticker = t.ticker WHERE t.task_status = 'open' AND t.stage IN ('T0_prospectus', 'T1_allotment') ORDER BY m.listing_date DESC, m.ticker """ params: tuple[object, ...] = () if limit is not None: sql += " LIMIT ?" params = (limit,) rows = conn.execute(sql, params).fetchall() return [row[0] for row in rows] def ticker_dates(conn: sqlite3.Connection, ticker: str) -> tuple[str | None, str | None]: row = conn.execute( """ SELECT m.listing_date, r.prospectus_date FROM ipo_master m LEFT JOIN new_listing_report_entries r ON r.ticker = m.ticker WHERE m.ticker = ? ORDER BY r.report_year DESC LIMIT 1 """, (ticker,), ).fetchone() if row is None: return None, None return row[0], row[1] def upsert_source_refs(conn: sqlite3.Connection, sources: list[ArchivedSource], as_of: str) -> None: conn.executemany( """ INSERT INTO source_refs ( source_id, ticker, source_type, title, path_base, local_path, url, file_sha256, source_date, archived_at, notes ) VALUES (?, ?, ?, ?, 'repo_root', ?, ?, ?, ?, ?, ?) ON CONFLICT(source_id) DO UPDATE SET title = excluded.title, local_path = excluded.local_path, url = excluded.url, file_sha256 = excluded.file_sha256, source_date = excluded.source_date, archived_at = excluded.archived_at, notes = excluded.notes """, [ ( source.source_id, source.ticker, source.source_type, source.title, source.local_path, source.url, source.file_sha256, source.source_date, as_of, source.notes, ) for source in sources ], ) def update_master_from_prospectus(conn: sqlite3.Connection, ticker: str, facts: ProspectusFacts, as_of: str) -> None: conn.execute( """ UPDATE ipo_master SET application_start_date = COALESCE(?, application_start_date), application_end_date = COALESCE(?, application_end_date), allotment_results_expected_date = COALESCE(?, allotment_results_expected_date), listing_date = COALESCE(listing_date, ?), data_as_of = ? WHERE ticker = ? """, ( facts.application_start_date, facts.application_end_date, facts.allotment_results_expected_date, facts.listing_date, as_of, ticker, ), ) def update_terms_from_prospectus( conn: sqlite3.Connection, ticker: str, source_id: str, source_date: str, facts: ProspectusFacts, as_of: str, ) -> None: conn.execute( """ INSERT INTO offering_terms ( ticker, source_id, prospectus_date, offer_price_hkd, board_lot, min_subscription_amount_hkd, global_offer_shares, hk_offer_shares_initial, international_offer_shares_initial, public_offer_pct_initial, over_allotment_offer_shares, gross_proceeds_hkd_m, data_as_of ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(ticker) DO UPDATE SET source_id = CASE WHEN offering_terms.source_id LIKE '%_new_listing_report_%' OR offering_terms.source_id = excluded.source_id OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%' THEN excluded.source_id ELSE offering_terms.source_id END, prospectus_date = CASE WHEN offering_terms.source_id = excluded.source_id OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%' THEN excluded.prospectus_date ELSE COALESCE(offering_terms.prospectus_date, excluded.prospectus_date) END, offer_price_hkd = CASE WHEN offering_terms.source_id = excluded.source_id OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%' THEN excluded.offer_price_hkd ELSE COALESCE(offering_terms.offer_price_hkd, excluded.offer_price_hkd) END, board_lot = CASE WHEN offering_terms.source_id = excluded.source_id OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%' THEN excluded.board_lot ELSE COALESCE(offering_terms.board_lot, excluded.board_lot) END, min_subscription_amount_hkd = CASE WHEN offering_terms.source_id = excluded.source_id OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%' THEN excluded.min_subscription_amount_hkd ELSE COALESCE(offering_terms.min_subscription_amount_hkd, excluded.min_subscription_amount_hkd) END, global_offer_shares = CASE WHEN offering_terms.source_id = excluded.source_id OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%' THEN excluded.global_offer_shares ELSE COALESCE(offering_terms.global_offer_shares, excluded.global_offer_shares) END, hk_offer_shares_initial = CASE WHEN offering_terms.source_id = excluded.source_id OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%' THEN excluded.hk_offer_shares_initial ELSE COALESCE(offering_terms.hk_offer_shares_initial, excluded.hk_offer_shares_initial) END, international_offer_shares_initial = CASE WHEN offering_terms.source_id = excluded.source_id OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%' THEN excluded.international_offer_shares_initial ELSE COALESCE( offering_terms.international_offer_shares_initial, excluded.international_offer_shares_initial ) END, public_offer_pct_initial = CASE WHEN offering_terms.source_id = excluded.source_id OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%' THEN excluded.public_offer_pct_initial ELSE COALESCE(offering_terms.public_offer_pct_initial, excluded.public_offer_pct_initial) END, over_allotment_offer_shares = CASE WHEN offering_terms.source_id = excluded.source_id OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%' THEN excluded.over_allotment_offer_shares ELSE COALESCE(offering_terms.over_allotment_offer_shares, excluded.over_allotment_offer_shares) END, gross_proceeds_hkd_m = CASE WHEN offering_terms.source_id = excluded.source_id OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%' THEN excluded.gross_proceeds_hkd_m ELSE COALESCE(offering_terms.gross_proceeds_hkd_m, excluded.gross_proceeds_hkd_m) END, data_as_of = excluded.data_as_of """, ( ticker, source_id, source_date, facts.offer_price_hkd, facts.board_lot, facts.min_subscription_amount_hkd, facts.global_offer_shares, facts.hk_offer_shares_initial, facts.international_offer_shares_initial, facts.public_offer_pct_initial, facts.over_allotment_offer_shares, facts.gross_proceeds_hkd_m, as_of, ), ) def update_terms_from_allotment(conn: sqlite3.Connection, ticker: str, facts: AllotmentFacts, as_of: str) -> None: conn.execute( """ UPDATE offering_terms SET offer_price_hkd = COALESCE(?, offer_price_hkd), gross_proceeds_hkd_m = COALESCE(?, gross_proceeds_hkd_m), net_proceeds_hkd_m = COALESCE(?, net_proceeds_hkd_m), issued_shares_upon_listing = COALESCE(?, issued_shares_upon_listing), data_as_of = ? WHERE ticker = ? """, ( facts.final_offer_price_hkd, facts.gross_proceeds_hkd_m, facts.net_proceeds_hkd_m, facts.issued_shares_upon_listing, as_of, ticker, ), ) def upsert_demand(conn: sqlite3.Connection, ticker: str, source_id: str, source_date: str, facts: AllotmentFacts, as_of: str) -> None: if not any( [ facts.valid_applications, facts.successful_applications, facts.public_oversubscription_times, facts.international_placees, facts.international_oversubscription_times, ] ): return demand_id = source_id.replace("_allotment_results_", "_allotment_") conn.execute( """ INSERT INTO ipo_demand ( demand_id, ticker, source_id, stage_date, valid_applications, successful_applications, public_oversubscription_times, international_placees, international_oversubscription_times, final_hk_offer_shares, final_international_offer_shares, data_as_of, notes ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(demand_id) DO UPDATE SET source_id = excluded.source_id, stage_date = excluded.stage_date, valid_applications = excluded.valid_applications, successful_applications = excluded.successful_applications, public_oversubscription_times = excluded.public_oversubscription_times, international_placees = excluded.international_placees, international_oversubscription_times = excluded.international_oversubscription_times, final_hk_offer_shares = excluded.final_hk_offer_shares, final_international_offer_shares = excluded.final_international_offer_shares, data_as_of = excluded.data_as_of, notes = excluded.notes """, ( demand_id, ticker, source_id, source_date, facts.valid_applications, facts.successful_applications, facts.public_oversubscription_times, facts.international_placees, facts.international_oversubscription_times, facts.final_hk_offer_shares, facts.final_international_offer_shares, as_of, "Parsed from HKEXnews allotment results announcement.", ), ) def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None: SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True) cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}") columns = [description[0] for description in cursor.description] with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle: writer = csv.writer(handle, lineterminator="\n") writer.writerow(columns) writer.writerows(cursor.fetchall()) def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None: subprocess.run( [ sys.executable, "scripts/update_sync_state.py", "--db", db_path, "--schema", schema_path, "--as-of", as_of, "--mode", "hkex_document_archive", "--summary-limit", "25", ], check=True, ) def refresh_extracted_text(db_path: str, sources: list[ArchivedSource]) -> None: pdf_source_ids = [ source.source_id for source in sources if Path(source.local_path).suffix.lower() == ".pdf" ] if not pdf_source_ids: return command = [ sys.executable, "scripts/extract_pdf_text.py", "--db", db_path, ] for source_id in sorted(set(pdf_source_ids)): command.extend(["--source-id", source_id]) subprocess.run(command, check=True) def main() -> int: args = parse_args() as_of = parse_as_of(args.as_of) stock_ids = load_stock_ids() archived_sources: list[ArchivedSource] = [] processed = 0 missing_stock_ids: list[str] = [] missing_docs: list[str] = [] failed_tickers: list[tuple[str, str]] = [] with sqlite3.connect(args.db) as conn: conn.executescript(Path(args.schema).read_text(encoding="utf-8")) tickers = select_tickers(conn, args.limit, args.tickers) for index, ticker in enumerate(tickers, start=1): print(f"[{index}/{len(tickers)}] {ticker}", flush=True) try: stock_id = stock_ids.get(ticker) if stock_id is None: missing_stock_ids.append(ticker) continue listing_date, prospectus_date = ticker_dates(conn, ticker) rows = title_search_rows(stock_id, listing_date, prospectus_date) prospectus_row = choose_prospectus(rows, prospectus_date, listing_date) allotment_row = choose_allotment(rows, listing_date) if not prospectus_row and not allotment_row: missing_docs.append(ticker) continue sources_for_ticker: list[ArchivedSource] = [] if prospectus_row: prospectus_source = download_document(ticker, "prospectus", prospectus_row) sources_for_ticker.append(prospectus_source) prospectus_facts = parse_prospectus_facts(prospectus_source.local_path) update_master_from_prospectus(conn, ticker, prospectus_facts, as_of) update_terms_from_prospectus( conn, ticker, prospectus_source.source_id, prospectus_source.source_date, prospectus_facts, as_of, ) if allotment_row: allotment_source = download_document(ticker, "allotment_results", allotment_row) sources_for_ticker.append(allotment_source) if Path(allotment_source.local_path).suffix.lower() == ".pdf": allotment_facts = parse_allotment_facts(allotment_source.local_path) update_terms_from_allotment(conn, ticker, allotment_facts, as_of) upsert_demand( conn, ticker, allotment_source.source_id, allotment_source.source_date, allotment_facts, as_of, ) upsert_source_refs(conn, sources_for_ticker, as_of) archived_sources.extend(sources_for_ticker) processed += 1 except Exception as exc: # Keep full refreshes moving; report failures at the end. failed_tickers.append((ticker, str(exc))) for table in [ "ipo_master", "offering_terms", "ipo_demand", "source_refs", "data_gaps", ]: export_snapshot(conn, table) if not args.skip_sync_state: refresh_sync_state(args.db, args.schema, as_of) if not args.skip_text_extraction: refresh_extracted_text(args.db, archived_sources) print("hkex documents archived") print(f"tickers selected: {len(tickers)}") print(f"tickers processed: {processed}") print(f"sources archived: {len(archived_sources)}") if missing_stock_ids: print("missing stock ids: " + ", ".join(missing_stock_ids)) if missing_docs: print("missing target docs: " + ", ".join(missing_docs)) if failed_tickers: print("failed tickers:") for ticker, error in failed_tickers: print(f"- {ticker}: {error}") return 0 if __name__ == "__main__": raise SystemExit(main())