Private
Public Access
0
0
Files
hk-ipo/scripts/archive_hkex_documents.py
T
geometrybase e746cae035 Refresh HK IPO heat ranking
Request:
- Update the latest Hong Kong IPO candidate list and rescore it based on subscription multiples.

Changes:
- Archived the 2026-06-22 HKEX Main Board New Listing Information page, adding 02697, 03952, 06715, and 06915 to the current candidate set.
- Archived and extracted the four new prospectuses, refreshed current HKEX document facts, and rebuilt the v0 analysis dataset to 311 rows.
- Archived a 2026-06-22T08:55:00Z VBKR/Jieli market-heat snapshot and wrote only still-actionable T0.95 rows to avoid look-ahead leakage for already-closed IPOs.
- Improved prospectus date parsing for split weekday/month text, glued noon/commence phrases, and current new-listing expected listing-date updates.
- Added a Chinese 2026-06-22 latest IPO report ranking candidates after the subscription-multiple overlay.

Verification:
- Ran py_compile for archive_hkex_documents.py, archive_t0_5_market_heat.py, archive_hkex_current_new_listings.py, and build_analysis_dataset.py.
- Re-ran HKEX current-page seeding, document archiving, market-heat archiving, and analysis dataset build as of 2026-06-22T08:55:00Z.
- Ran git diff --check and git diff --cached --check.
- Ran SQLite integrity_check and foreign_key_check.
- Verified source_refs paths, file existence, and SHA-256 hashes.

Next useful context:
- 01956 is the only current candidate with both strong T0 structure and >100x actionable heat in this snapshot.
- Recheck 03952 and 06715 near the 2026-06-25 cutoff; their structure is strong but 2026-06-22 heat is below 10x.
- Official T1 allotment facts for 06067 and 06132 were still unavailable at this archive timestamp.
2026-06-22 09:03:50 +00:00

1223 lines
47 KiB
Python

#!/usr/bin/env python3
"""Archive HKEXnews prospectus and allotment-result documents for open sync tasks."""
from __future__ import annotations
import argparse
import csv
import hashlib
import html
import json
import logging
import re
import sqlite3
import subprocess
import sys
from dataclasses import dataclass
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from urllib.parse import urlencode, urljoin
from urllib.request import Request, urlopen
from pypdf import PdfReader
logging.getLogger("pypdf").setLevel(logging.ERROR)
BASE_URL = "https://www1.hkexnews.hk"
ACTIVE_STOCK_URL = f"{BASE_URL}/ncms/script/eds/activestock_sehk_e.json"
INACTIVE_STOCK_URL = f"{BASE_URL}/ncms/script/eds/inactivestock_sehk_e.json"
TITLE_SEARCH_URL = f"{BASE_URL}/search/titlesearch.xhtml"
TITLE_SEARCH_SERVLET_URL = f"{BASE_URL}/search/titleSearchServlet.do"
DB_PATH = Path("data/hk_ipo.sqlite")
SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
SNAPSHOT_DIR = Path("data/snapshots")
@dataclass(frozen=True)
class DocumentRow:
release_time: str
release_date: str
headline: str
title: str
href: str
url: str
@dataclass(frozen=True)
class ArchivedSource:
source_id: str
ticker: str
source_type: str
title: str
local_path: str
url: str
file_sha256: str
source_date: str
notes: str
@dataclass(frozen=True)
class ProspectusFacts:
application_start_date: str | None = None
application_end_date: str | None = None
allotment_results_expected_date: str | None = None
listing_date: str | None = None
offer_price_hkd: float | None = None
board_lot: int | None = None
min_subscription_amount_hkd: float | None = None
global_offer_shares: int | None = None
hk_offer_shares_initial: int | None = None
international_offer_shares_initial: int | None = None
public_offer_pct_initial: float | None = None
over_allotment_offer_shares: int | None = None
gross_proceeds_hkd_m: float | None = None
@dataclass(frozen=True)
class AllotmentFacts:
final_offer_price_hkd: float | None = None
gross_proceeds_hkd_m: float | None = None
net_proceeds_hkd_m: float | None = None
issued_shares_upon_listing: int | None = None
valid_applications: int | None = None
successful_applications: int | None = None
public_oversubscription_times: float | None = None
international_placees: int | None = None
international_oversubscription_times: float | None = None
final_hk_offer_shares: int | None = None
final_international_offer_shares: int | None = None
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.")
parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.")
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
parser.add_argument("--limit", type=int, help="Optional maximum tickers to process. Omit to process all open T0/T1 tasks.")
parser.add_argument("--tickers", help="Comma-separated tickers to process instead of selecting from sync_tasks.")
parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh sync state after updating facts.")
parser.add_argument("--skip-text-extraction", action="store_true", help="Do not extract text for newly archived PDFs.")
return parser.parse_args()
def fetch_bytes(url: str) -> bytes:
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(request, timeout=60) as response:
return response.read()
def parse_as_of(value: str | None) -> str:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def load_stock_ids() -> dict[str, int]:
stock_ids: dict[str, int] = {}
for url in [ACTIVE_STOCK_URL, INACTIVE_STOCK_URL]:
payload = fetch_bytes(url).decode("utf-8-sig")
for item in json.loads(payload):
code = item.get("c")
stock_id = item.get("i")
if code and stock_id:
stock_ids.setdefault(code, int(stock_id))
return stock_ids
def clean_html(value: str) -> str:
text = re.sub(r"<.*?>", " ", value, flags=re.S)
return " ".join(html.unescape(text).split())
def parse_release_date(value: str) -> str:
return datetime.strptime(value.split()[0], "%d/%m/%Y").date().isoformat()
def parse_release_datetime(value: str) -> str:
return datetime.strptime(value, "%d/%m/%Y %H:%M").date().isoformat()
def latest_title_search_rows(stock_id: int) -> list[DocumentRow]:
url = f"{TITLE_SEARCH_URL}?category=0&market=SEHK&stockId={stock_id}"
page = fetch_bytes(url).decode("utf-8", "replace")
rows: list[DocumentRow] = []
for row in re.findall(r"<tr>(.*?)</tr>", page, flags=re.S):
release_match = re.search(r"Release Time: </span>(.*?)</td>", row, flags=re.S)
headline_match = re.search(r'<div class="headline">(.*?)</div>', row, flags=re.S)
link_match = re.search(r'<a href="([^"]+)"[^>]*>(.*?)</a>', row, flags=re.S)
if not release_match or not link_match:
continue
release_time = " ".join(release_match.group(1).split())
href = html.unescape(link_match.group(1))
rows.append(
DocumentRow(
release_time=release_time,
release_date=parse_release_date(release_time),
headline=clean_html(headline_match.group(1)) if headline_match else "",
title=clean_html(link_match.group(2)),
href=href,
url=urljoin(BASE_URL, href),
)
)
return rows
def window_title_search_rows(stock_id: int, from_date: date, to_date: date) -> list[DocumentRow]:
params = {
"sortDir": "0",
"sortByOptions": "DateTime",
"category": "0",
"market": "SEHK",
"stockId": str(stock_id),
"documentType": "-1",
"fromDate": from_date.strftime("%Y%m%d"),
"toDate": to_date.strftime("%Y%m%d"),
"title": "",
"searchType": "0",
"t1code": "-2",
"t2Gcode": "-2",
"t2code": "-2",
"rowRange": "500",
"lang": "en",
}
url = f"{TITLE_SEARCH_SERVLET_URL}?{urlencode(params)}"
request = Request(
url,
headers={
"User-Agent": "Mozilla/5.0",
"Referer": f"{TITLE_SEARCH_URL}?category=0&market=SEHK&stockId={stock_id}",
},
)
with urlopen(request, timeout=60) as response:
payload = response.read().decode("utf-8", "replace")
response_data = json.loads(payload)
result = json.loads(response_data.get("result") or "[]")
rows: list[DocumentRow] = []
for item in result:
href = html.unescape(item.get("FILE_LINK") or "")
release_time = " ".join((item.get("DATE_TIME") or "").split())
if not href or not release_time:
continue
rows.append(
DocumentRow(
release_time=release_time,
release_date=parse_release_datetime(release_time),
headline=clean_html(item.get("SHORT_TEXT") or ""),
title=clean_html(item.get("TITLE") or ""),
href=href,
url=urljoin(BASE_URL, href),
)
)
return rows
def title_search_rows(stock_id: int, listing_date: str | None, prospectus_date: str | None) -> list[DocumentRow]:
listed = parse_iso_date(listing_date)
prospectus = parse_iso_date(prospectus_date)
if listed:
return window_title_search_rows(stock_id, listed - timedelta(days=90), listed + timedelta(days=14))
if prospectus:
return window_title_search_rows(stock_id, prospectus - timedelta(days=14), prospectus + timedelta(days=60))
return latest_title_search_rows(stock_id)
def parse_iso_date(value: str | None) -> date | None:
if not value:
return None
return date.fromisoformat(value)
def date_distance(left: str, right: str) -> int:
return abs((date.fromisoformat(left) - date.fromisoformat(right)).days)
def archiveable_document(row: DocumentRow) -> bool:
return Path(row.href.lower()).suffix in {".pdf", ".htm", ".html"}
def choose_prospectus(rows: list[DocumentRow], prospectus_date: str | None, listing_date: str | None) -> DocumentRow | None:
candidates = []
for row in rows:
headline = row.headline.lower()
title = row.title.lower()
if not row.href.lower().endswith(".pdf"):
continue
if "listing documents" not in headline:
continue
if "global offering" in title or "prospectus" in title or title in {"share offer", "public offer"}:
candidates.append(row)
if not candidates:
return None
if prospectus_date:
return sorted(candidates, key=lambda row: (date_distance(row.release_date, prospectus_date), row.release_date))[0]
listed = parse_iso_date(listing_date)
if listed:
windowed = [
row
for row in candidates
if 0 <= (listed - date.fromisoformat(row.release_date)).days <= 60
]
if windowed:
candidates = windowed
return sorted(candidates, key=lambda row: row.release_date)[-1]
def choose_allotment(rows: list[DocumentRow], listing_date: str | None) -> DocumentRow | None:
candidates = [
row
for row in rows
if archiveable_document(row)
and ("allotment results" in row.headline.lower() or "allotment results" in row.title.lower())
]
if not candidates:
return None
listed = parse_iso_date(listing_date)
if listed:
windowed = [
row
for row in candidates
if -5 <= (listed - date.fromisoformat(row.release_date)).days <= 10
]
if windowed:
candidates = windowed
def allotment_rank(row: DocumentRow) -> tuple[int, int, str]:
title = row.title.lower()
if "clarification" in title or "supplemental" in title:
quality = 0
elif "allotment results" in title and ("final offer price" in title or title.startswith("announcement of allotment")):
quality = 2
else:
quality = 1
distance = date_distance(row.release_date, listing_date) if listing_date else 0
return quality, -distance, row.release_date
return sorted(candidates, key=allotment_rank)[-1]
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def download_document(ticker: str, source_type: str, row: DocumentRow) -> ArchivedSource:
data = fetch_bytes(row.url)
doc_id = Path(row.href).stem
suffix = Path(row.href).suffix.lower() or ".pdf"
local_path = Path("data/raw") / ticker / f"{source_type}_{row.release_date}_{doc_id}{suffix}"
local_path.parent.mkdir(parents=True, exist_ok=True)
if not local_path.exists() or local_path.read_bytes() != data:
local_path.write_bytes(data)
return ArchivedSource(
source_id=f"{ticker}_{source_type}_{row.release_date.replace('-', '_')}_{doc_id}",
ticker=ticker,
source_type=source_type,
title=row.title,
local_path=local_path.as_posix(),
url=row.url,
file_sha256=sha256_bytes(data),
source_date=row.release_date,
notes=f"HKEXnews {row.headline}.",
)
def first_pdf_text(local_path: str, max_pages: int) -> str:
reader = PdfReader(local_path)
chunks = []
for page in reader.pages[: min(max_pages, len(reader.pages))]:
chunks.append(page.extract_text() or "")
return " ".join(" ".join(chunks).split())
def first_pdf_text_with_lines(local_path: str, max_pages: int) -> str:
reader = PdfReader(local_path)
chunks = []
for page in reader.pages[: min(max_pages, len(reader.pages))]:
chunks.append(page.extract_text() or "")
return "\n".join(chunks)
def normalize_pdf_text(text: str) -> str:
replacements = {
"H o n g K o n g P u b l i c O f f e r i n g c o m m e n c e s": "Hong Kong Public Offering commences",
"H o n gK o n gP u b l i cO f f e r i n gc o m m e n c e s": "Hong Kong Public Offering commences",
"a t o r b e f o r e": "at or before",
"n o l a t e r": "no later",
"o n o r b e f o r e": "on or before",
"c o m m e n c e": "commence",
"e x p e c t e d t o": "expected to",
"e x p e c t e dt o": "expected to",
}
for source, target in replacements.items():
text = text.replace(source, target)
text = re.sub(r"\ba\s+t\b", "at", text)
text = re.sub(r"\bo\s+n\b", "on", text)
text = re.sub(r"\bf\s+r\s+o\s+m\b", "from", text)
text = re.sub(r"\bexpected\s*tocommenceo\s*n\b", "expected to commence on", text, flags=re.I)
text = re.sub(r"\bexpected\s+to\s+commenceo\s*n\b", "expected to commence on", text, flags=re.I)
text = re.sub(r"\bcommenceo\s*n\b", "commence on", text, flags=re.I)
for word in [
"Monday",
"Tuesday",
"Wednesday",
"Thursday",
"Friday",
"Saturday",
"Sunday",
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
]:
pattern = r"(?<![A-Za-z])" + r"\s*".join(word) + r"(?![A-Za-z])"
text = re.sub(pattern, word, text, flags=re.I)
text = re.sub(
r"\b(January|February|March|April|May|June|July|August|September|October|November|December)(\d)",
r"\1 \2",
text,
)
text = re.sub(r"\b(\d)\s+(\d)\s*,\s*(\d)\s+(\d)\s+(\d)\s+(\d)\b", r"\1\2, \3\4\5\6", text)
text = re.sub(r"\b(\d{1,2})\s*,\s*(\d)\s+(\d)\s+(\d)\s+(\d)\b", r"\1, \2\3\4\5", text)
text = re.sub(r"(?<![A-Za-z])n\s*o\s*o\s*n\s*o\s*n\s+([A-Z][a-z]+)", r"noon on \1", text, flags=re.I)
text = re.sub(r"\bno\s*o\s*n\s+([A-Z][a-z]+)", r"noon on \1", text, flags=re.I)
return text
def integer_after(pattern: str, text: str) -> int | None:
match = re.search(pattern, text, flags=re.I)
if not match:
return None
cleaned = match.group(1).replace(",", "").replace(" ", "")
if not cleaned:
return None
return int(cleaned)
def float_after(pattern: str, text: str) -> float | None:
match = re.search(pattern, text, flags=re.I)
if not match:
return None
return float(match.group(1).replace(",", ""))
def money_m_after(pattern: str, text: str) -> float | None:
match = re.search(pattern, text, flags=re.I)
if not match:
return None
amount = float(match.group(1).replace(",", ""))
unit = (match.group(2) or "").lower()
if unit.startswith("b"):
return amount * 1000
return amount
def strict_money_m_after(pattern: str, text: str) -> float | None:
match = re.search(pattern, text, flags=re.I)
if not match:
return None
amount = float(match.group(1).replace(",", ""))
unit = match.group(2).lower()
if unit.startswith("b"):
return amount * 1000
return amount
def date_after(label_pattern: str, text: str) -> str | None:
match = re.search(
label_pattern
+ r".{0,600}?\b(?:on(?: or about)?|from|at or before)\b\s*(?:[.\s]+)?(?:[A-Z][a-z]+\s*,\s*)?"
+ r"([A-Z][a-z]+ \d{1,2},\s*\d{4}|\d{1,2} [A-Z][a-z]+ \d{4})",
text,
flags=re.I,
)
if not match:
return None
value = match.group(1)
for date_format in ["%B %d, %Y", "%d %B %Y"]:
try:
return datetime.strptime(value, date_format).date().isoformat()
except ValueError:
pass
return None
def parse_offer_price_hkd(text: str) -> float | None:
share_unit = r"(?:H\s+)?(?:Share|Shares|Offer Share|Offer Shares|HDR|HDRs|Offer HDR|Offer HDRs)"
patterns = [
rf"(?:Maximum\s+)?Offer Price\s*:?\s*HK\$?\s*([\d,]+(?:\.\d+)?)\s+per\s+{share_unit}",
rf"Offer Price will (?:be|not be more than)\s+HK\$?\s*([\d,]+(?:\.\d+)?)\s+per\s+{share_unit}",
rf"maximum Offer Price of HK\$?\s*([\d,]+(?:\.\d+)?)\s+per\s+{share_unit}",
]
for pattern in patterns:
price = float_after(pattern, text)
if price is not None:
return price
return None
def parse_over_allotment_offer_shares(local_path: str, global_offer_shares: int | None) -> int | None:
text = normalize_pdf_text(first_pdf_text(local_path, 320))
if re.search(r"\bno\s+over-?allotment\s+option\b", text, flags=re.I):
return 0
explicit_shares = integer_after(
r"over-?allotment option.{0,500}?up to\s+([\d][\d,\s]*)\s+(?:additional\s+)?(?:H\s+)?(?:Shares|HDRs)",
text,
)
if explicit_shares is not None:
return explicit_shares
if global_offer_shares and re.search(r"over-?allotment option", text, flags=re.I):
if re.search(r"(?:15%|15\s+per\s+cent|fifteen\s+per\s+cent)", text, flags=re.I):
return round(global_offer_shares * 0.15)
return None
def parse_prospectus_facts(local_path: str) -> ProspectusFacts:
text = normalize_pdf_text(first_pdf_text(local_path, 8))
board_lot = integer_after(r"minimum\s*of\s*([\d][\d,\s]*)\s*Hong\s*Kong\s*Offer\s*(?:Shares|HDRs)", text)
min_amount = None
if board_lot:
pattern = rf"\b{board_lot:,}\b\s+([\d,]+\.\d{{2}})"
min_amount = float_after(pattern, text)
if min_amount is None:
pattern = rf"\b{board_lot}\b\s+([\d,]+\.\d{{2}})"
min_amount = float_after(pattern, text)
global_shares = integer_after(
r"Number of Offer (?:Shares|HDRs) (?:under|in) the Global Offering\s*:?\s+([\d][\d,\s]*)",
text,
)
if global_shares is None:
global_shares = integer_after(
r"Number of Offer (?:Shares|HDRs)\s*:?\s+([\d][\d,\s]*)\s+(?:H\s+)?(?:Shares|HDRs)",
text,
)
hk_shares = integer_after(r"Number of Hong Kong Offer (?:Shares|HDRs)\s*:?\s+([\d][\d,\s]*)", text)
intl_shares = integer_after(r"Number of International Offer (?:Shares|HDRs)\s*:?\s+([\d][\d,\s]*)", text)
offer_price = parse_offer_price_hkd(text)
over_allotment = parse_over_allotment_offer_shares(local_path, global_shares)
public_pct = round(hk_shares / global_shares, 4) if global_shares and hk_shares else None
gross_proceeds = round(global_shares * offer_price / 1_000_000, 6) if global_shares and offer_price else None
allotment_date = (
date_after(r"Announcement of the level of indications.*?basis of allocation", text)
or date_after(r"The results of allocations", text)
or date_after(r"allotment results announcement", text)
or date_after(r"Announcement of", text)
or date_after(r"Announcement of.*?Offer Price", text)
)
return ProspectusFacts(
application_start_date=(
date_after(r"Hong Kong Public Offering commences", text)
or date_after(r"Application lists.*?open", text)
),
application_end_date=date_after(r"Application lists.*?close", text),
allotment_results_expected_date=allotment_date,
listing_date=(
date_after(
r"Dealings in (?:our\s+|the\s+)?(?:H\s+)?(?:Shares|HDRs).*?(?:expected to commence|to commence)",
text,
)
),
offer_price_hkd=offer_price,
board_lot=board_lot,
min_subscription_amount_hkd=min_amount,
global_offer_shares=global_shares,
hk_offer_shares_initial=hk_shares,
international_offer_shares_initial=intl_shares,
public_offer_pct_initial=public_pct,
over_allotment_offer_shares=over_allotment,
gross_proceeds_hkd_m=gross_proceeds,
)
def section_between(text: str, start: str, end: str | None, use_last_start: bool = False) -> str:
start_matches = list(re.finditer(start, text, flags=re.I))
if not start_matches:
return ""
start_match = start_matches[-1] if use_last_start else start_matches[0]
section_start = start_match.end()
if not end:
return text[section_start:]
end_match = re.search(end, text[section_start:], flags=re.I)
section_end = section_start + end_match.start() if end_match else len(text)
return text[section_start:section_end]
def allotment_detail_sections(text: str) -> tuple[str, str]:
hk_match = re.search(
r"HONG KONG PUBLIC OFFERING\s+No\. of valid applications(.*?)INTERNATIONAL OFFERING\s+No\. of placees",
text,
flags=re.I,
)
intl_match = re.search(
r"INTERNATIONAL OFFERING\s+No\. of placees(.*?)(?:The Directors|LOCK-UP|Allottees with|$)",
text,
flags=re.I,
)
hk_section = "No. of valid applications" + hk_match.group(1) if hk_match else ""
intl_section = "No. of placees" + intl_match.group(1) if intl_match else ""
return hk_section, intl_section
def normalized_lines(text: str) -> list[str]:
return [" ".join(line.replace("\xa0", " ").split()) for line in text.splitlines() if line.strip()]
def number_from_text(value: str) -> int | None:
match = re.search(r"([\d][\d,]*)", value)
if not match:
return None
return int(match.group(1).replace(",", ""))
def float_from_text(value: str) -> float | None:
match = re.search(r"([\d][\d,]*(?:\.\d+)?)", value)
if not match:
return None
return float(match.group(1).replace(",", ""))
def value_after_line_label(
lines: list[str],
label_patterns: list[str],
*,
value_type: str = "int",
max_lines: int = 6,
) -> int | float | None:
for index in range(len(lines)):
window = " ".join(lines[index : index + max_lines])
for label_pattern in label_patterns:
match = re.search(label_pattern, window, flags=re.I)
if not match:
continue
tail = window[match.end() :]
if value_type == "times":
times_match = re.search(r"([\d][\d,]*(?:\.\d+)?)\s*times", tail, flags=re.I)
if times_match:
return float(times_match.group(1).replace(",", ""))
return float_from_text(tail)
return number_from_text(tail)
return None
def section_lines(
lines: list[str],
start_patterns: list[str],
end_patterns: list[str],
*,
start_index: int = 0,
) -> list[str]:
section_start = None
for index in range(start_index, len(lines)):
if any(re.search(pattern, lines[index], flags=re.I) for pattern in start_patterns):
section_start = index
break
if section_start is None:
return []
section_end = len(lines)
for index in range(section_start + 1, len(lines)):
if any(re.search(pattern, lines[index], flags=re.I) for pattern in end_patterns):
section_end = index
break
return lines[section_start:section_end]
def allotment_detail_line_sections(text: str) -> tuple[list[str], list[str], list[str]]:
lines = normalized_lines(text)
detail_start = 0
for index, line in enumerate(lines[:700]):
if re.search(r"ALLOTMENT RESULTS DETAILS|APPLICATIONS AND INDICATIONS", line, flags=re.I):
detail_start = index
break
detail_lines = lines[detail_start : detail_start + 320]
public_lines = section_lines(
detail_lines,
[r"^(HONG KONG )?PUBLIC OFFER", r"PUBLIC OFFER SHARES"],
[r"^INTERNATIONAL OFFER", r"^PLACING$", r"^EMPLOYEE PREFERENTIAL OFFERING"],
)
international_lines = section_lines(
detail_lines,
[r"^INTERNATIONAL OFFER", r"^PLACING$"],
[
r"^EMPLOYEE PREFERENTIAL OFFERING",
r"^THE DIRECTORS CONFIRM",
r"^THE PLACEES",
r"^LOCK-UP",
r"^BASIS OF ALLOCATION",
],
)
if not public_lines:
for index, line in enumerate(detail_lines):
if re.search(r"No\. of valid applications|Number of valid applications", line, flags=re.I):
public_lines = detail_lines[max(0, index - 10) : index + 70]
break
if not international_lines:
for index, line in enumerate(detail_lines):
if re.search(r"No\. of placees|Number of placees", line, flags=re.I):
international_lines = detail_lines[max(0, index - 10) : index + 90]
break
return detail_lines, public_lines, international_lines
def parse_allotment_facts_from_text(text: str) -> AllotmentFacts:
flat_text = " ".join(text.split())
detail_lines, public_lines, international_lines = allotment_detail_line_sections(text)
public_text = " ".join(public_lines)
international_text = " ".join(international_lines)
if not public_text:
public_text = flat_text
if not international_text:
international_text = flat_text
valid_applications = value_after_line_label(
public_lines,
[r"No\. of valid applications", r"Number of valid applications"],
)
if valid_applications is None:
valid_applications = integer_after(r"A total of\s+([\d,]+)\s+valid applications", flat_text)
successful_applications = value_after_line_label(
public_lines,
[r"No\. of successful applications", r"Number of successful applications"],
)
if successful_applications is None:
successful_applications = integer_after(r"allocated to\s+([\d,]+)\s+successful applicants", flat_text)
public_oversubscription_times = value_after_line_label(
public_lines,
[r"Subscription [Ll]evel"],
value_type="times",
)
if public_oversubscription_times is None:
public_oversubscription_times = float_after(
r"representing approximately\s+([\d,.]+)\s+times.*?(?:Public Offer|Hong Kong Public Offering)",
flat_text,
)
international_placees = value_after_line_label(
international_lines,
[r"No\. of placees", r"Number of placees"],
)
if international_placees is None:
international_placees = integer_after(
r"(?:There (?:are|is) a total of|total of)\s+([\d,]+)\s+placees",
international_text,
)
international_oversubscription_times = value_after_line_label(
international_lines,
[r"Subscription [Ll]evel"],
value_type="times",
)
if international_oversubscription_times is None:
international_oversubscription_times = float_after(
r"representing approximately\s+([\d,.]+)\s+times.*?(?:Placing|International Offer)",
international_text,
)
final_hk_offer_shares = value_after_line_label(
public_lines,
[r"Final no\. of Offer Shares under the (?:Hong Kong )?Public Offer(?:ing)?"],
)
if final_hk_offer_shares is None:
final_hk_offer_shares = integer_after(
r"final number of Offer Shares under the (?:Public Offer|Hong Kong Public Offering) is\s+([\d,]+)",
flat_text,
)
final_international_offer_shares = value_after_line_label(
international_lines,
[r"Final no\. of Offer Shares under the International Offer(?:ing)?", r"Final no\. of Offer Shares under the Placing"],
)
if final_international_offer_shares is None:
final_international_offer_shares = integer_after(
r"final number of Offer Shares under (?:the )?(?:Placing|International Offer(?:ing)?) is\s+([\d,]+)",
international_text,
)
return AllotmentFacts(
final_offer_price_hkd=float_after(r"Final Offer Price\s*:?\s*HK\$?([\d,.]+)", flat_text),
gross_proceeds_hkd_m=strict_money_m_after(r"Gross proceeds.{0,300}?HK\$([\d,.]+)\s*(million|billion)", flat_text),
net_proceeds_hkd_m=strict_money_m_after(r"Net proceeds.{0,500}?HK\$([\d,.]+)\s*(million|billion)", flat_text),
issued_shares_upon_listing=integer_after(r"Number of issued shares upon Listing.*?([\d,]+)", flat_text),
valid_applications=valid_applications if isinstance(valid_applications, int) else None,
successful_applications=successful_applications if isinstance(successful_applications, int) else None,
public_oversubscription_times=public_oversubscription_times if isinstance(public_oversubscription_times, float) else None,
international_placees=international_placees if isinstance(international_placees, int) else None,
international_oversubscription_times=(
international_oversubscription_times if isinstance(international_oversubscription_times, float) else None
),
final_hk_offer_shares=final_hk_offer_shares if isinstance(final_hk_offer_shares, int) else None,
final_international_offer_shares=(
final_international_offer_shares if isinstance(final_international_offer_shares, int) else None
),
)
def parse_allotment_facts(local_path: str) -> AllotmentFacts:
text_with_lines = first_pdf_text_with_lines(local_path, 12)
facts = parse_allotment_facts_from_text(text_with_lines)
if any(
[
facts.valid_applications,
facts.successful_applications,
facts.public_oversubscription_times,
facts.international_placees,
facts.international_oversubscription_times,
]
):
return facts
text = first_pdf_text(local_path, 8)
hk_section, intl_section = allotment_detail_sections(text)
return AllotmentFacts(
final_offer_price_hkd=float_after(r"Final Offer Price\s+HK\$([\d,.]+)", text),
gross_proceeds_hkd_m=money_m_after(r"Gross proceeds.*?HK\$([\d,.]+)\s*(million|billion)?", text),
net_proceeds_hkd_m=money_m_after(r"Net proceeds\s+HK\$([\d,.]+)\s*(million|billion)?", text),
issued_shares_upon_listing=integer_after(r"Number of issued shares upon Listing.*?([\d,]+)", text),
valid_applications=integer_after(r"No\. of valid applications\s+([\d,]+)", hk_section),
successful_applications=integer_after(r"No\. of successful applications\s+([\d,]+)", hk_section),
public_oversubscription_times=float_after(r"Subscription level\s+([\d,.]+)\s+times", hk_section),
international_placees=integer_after(r"No\. of placees\s+([\d,]+)", intl_section),
international_oversubscription_times=float_after(r"Subscription level.*?([\d,.]+)\s+times", intl_section),
final_hk_offer_shares=integer_after(
r"Final no\. of Offer Shares under the Hong Kong Public Offering.*?([\d][\d,\s]*)",
hk_section,
),
final_international_offer_shares=integer_after(
r"Final no\. of Offer Shares under the International Offering.*?([\d][\d,\s]*)",
intl_section,
),
)
def select_tickers(conn: sqlite3.Connection, limit: int | None, tickers: str | None) -> list[str]:
if tickers:
return [ticker.strip().zfill(5) for ticker in tickers.split(",") if ticker.strip()]
sql = """
SELECT DISTINCT m.ticker
FROM sync_tasks t
JOIN ipo_master m ON m.ticker = t.ticker
WHERE t.task_status = 'open'
AND t.stage IN ('T0_prospectus', 'T1_allotment')
ORDER BY m.listing_date DESC, m.ticker
"""
params: tuple[object, ...] = ()
if limit is not None:
sql += " LIMIT ?"
params = (limit,)
rows = conn.execute(sql, params).fetchall()
return [row[0] for row in rows]
def ticker_dates(conn: sqlite3.Connection, ticker: str) -> tuple[str | None, str | None]:
row = conn.execute(
"""
SELECT m.listing_date, r.prospectus_date
FROM ipo_master m
LEFT JOIN new_listing_report_entries r ON r.ticker = m.ticker
WHERE m.ticker = ?
ORDER BY r.report_year DESC
LIMIT 1
""",
(ticker,),
).fetchone()
if row is None:
return None, None
return row[0], row[1]
def upsert_source_refs(conn: sqlite3.Connection, sources: list[ArchivedSource], as_of: str) -> None:
conn.executemany(
"""
INSERT INTO source_refs (
source_id, ticker, source_type, title, path_base, local_path, url,
file_sha256, source_date, archived_at, notes
)
VALUES (?, ?, ?, ?, 'repo_root', ?, ?, ?, ?, ?, ?)
ON CONFLICT(source_id) DO UPDATE SET
title = excluded.title,
local_path = excluded.local_path,
url = excluded.url,
file_sha256 = excluded.file_sha256,
source_date = excluded.source_date,
archived_at = excluded.archived_at,
notes = excluded.notes
""",
[
(
source.source_id,
source.ticker,
source.source_type,
source.title,
source.local_path,
source.url,
source.file_sha256,
source.source_date,
as_of,
source.notes,
)
for source in sources
],
)
def update_master_from_prospectus(conn: sqlite3.Connection, ticker: str, facts: ProspectusFacts, as_of: str) -> None:
conn.execute(
"""
UPDATE ipo_master
SET application_start_date = COALESCE(?, application_start_date),
application_end_date = COALESCE(?, application_end_date),
allotment_results_expected_date = COALESCE(?, allotment_results_expected_date),
listing_date = CASE
WHEN status = 'listed' THEN COALESCE(listing_date, ?)
ELSE COALESCE(?, listing_date)
END,
data_as_of = ?
WHERE ticker = ?
""",
(
facts.application_start_date,
facts.application_end_date,
facts.allotment_results_expected_date,
facts.listing_date,
facts.listing_date,
as_of,
ticker,
),
)
def update_terms_from_prospectus(
conn: sqlite3.Connection,
ticker: str,
source_id: str,
source_date: str,
facts: ProspectusFacts,
as_of: str,
) -> None:
conn.execute(
"""
INSERT INTO offering_terms (
ticker, source_id, prospectus_date, offer_price_hkd, board_lot, min_subscription_amount_hkd,
global_offer_shares, hk_offer_shares_initial, international_offer_shares_initial,
public_offer_pct_initial, over_allotment_offer_shares, gross_proceeds_hkd_m, data_as_of
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker) DO UPDATE SET
source_id = CASE
WHEN offering_terms.source_id LIKE '%_new_listing_report_%'
OR offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.source_id
ELSE offering_terms.source_id
END,
prospectus_date = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.prospectus_date
ELSE COALESCE(offering_terms.prospectus_date, excluded.prospectus_date)
END,
offer_price_hkd = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.offer_price_hkd
ELSE COALESCE(offering_terms.offer_price_hkd, excluded.offer_price_hkd)
END,
board_lot = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.board_lot
ELSE COALESCE(offering_terms.board_lot, excluded.board_lot)
END,
min_subscription_amount_hkd = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.min_subscription_amount_hkd
ELSE COALESCE(offering_terms.min_subscription_amount_hkd, excluded.min_subscription_amount_hkd)
END,
global_offer_shares = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.global_offer_shares
ELSE COALESCE(offering_terms.global_offer_shares, excluded.global_offer_shares)
END,
hk_offer_shares_initial = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.hk_offer_shares_initial
ELSE COALESCE(offering_terms.hk_offer_shares_initial, excluded.hk_offer_shares_initial)
END,
international_offer_shares_initial = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.international_offer_shares_initial
ELSE COALESCE(
offering_terms.international_offer_shares_initial,
excluded.international_offer_shares_initial
)
END,
public_offer_pct_initial = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.public_offer_pct_initial
ELSE COALESCE(offering_terms.public_offer_pct_initial, excluded.public_offer_pct_initial)
END,
over_allotment_offer_shares = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.over_allotment_offer_shares
ELSE COALESCE(offering_terms.over_allotment_offer_shares, excluded.over_allotment_offer_shares)
END,
gross_proceeds_hkd_m = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.gross_proceeds_hkd_m
ELSE COALESCE(offering_terms.gross_proceeds_hkd_m, excluded.gross_proceeds_hkd_m)
END,
data_as_of = excluded.data_as_of
""",
(
ticker,
source_id,
source_date,
facts.offer_price_hkd,
facts.board_lot,
facts.min_subscription_amount_hkd,
facts.global_offer_shares,
facts.hk_offer_shares_initial,
facts.international_offer_shares_initial,
facts.public_offer_pct_initial,
facts.over_allotment_offer_shares,
facts.gross_proceeds_hkd_m,
as_of,
),
)
def update_terms_from_allotment(conn: sqlite3.Connection, ticker: str, facts: AllotmentFacts, as_of: str) -> None:
conn.execute(
"""
UPDATE offering_terms
SET offer_price_hkd = COALESCE(?, offer_price_hkd),
gross_proceeds_hkd_m = COALESCE(?, gross_proceeds_hkd_m),
net_proceeds_hkd_m = COALESCE(?, net_proceeds_hkd_m),
issued_shares_upon_listing = COALESCE(?, issued_shares_upon_listing),
data_as_of = ?
WHERE ticker = ?
""",
(
facts.final_offer_price_hkd,
facts.gross_proceeds_hkd_m,
facts.net_proceeds_hkd_m,
facts.issued_shares_upon_listing,
as_of,
ticker,
),
)
def upsert_demand(conn: sqlite3.Connection, ticker: str, source_id: str, source_date: str, facts: AllotmentFacts, as_of: str) -> None:
if not any(
[
facts.valid_applications,
facts.successful_applications,
facts.public_oversubscription_times,
facts.international_placees,
facts.international_oversubscription_times,
]
):
return
demand_id = source_id.replace("_allotment_results_", "_allotment_")
conn.execute(
"""
INSERT INTO ipo_demand (
demand_id, ticker, source_id, stage_date, valid_applications, successful_applications,
public_oversubscription_times, international_placees, international_oversubscription_times,
final_hk_offer_shares, final_international_offer_shares, data_as_of, notes
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(demand_id) DO UPDATE SET
source_id = excluded.source_id,
stage_date = excluded.stage_date,
valid_applications = excluded.valid_applications,
successful_applications = excluded.successful_applications,
public_oversubscription_times = excluded.public_oversubscription_times,
international_placees = excluded.international_placees,
international_oversubscription_times = excluded.international_oversubscription_times,
final_hk_offer_shares = excluded.final_hk_offer_shares,
final_international_offer_shares = excluded.final_international_offer_shares,
data_as_of = excluded.data_as_of,
notes = excluded.notes
""",
(
demand_id,
ticker,
source_id,
source_date,
facts.valid_applications,
facts.successful_applications,
facts.public_oversubscription_times,
facts.international_placees,
facts.international_oversubscription_times,
facts.final_hk_offer_shares,
facts.final_international_offer_shares,
as_of,
"Parsed from HKEXnews allotment results announcement.",
),
)
def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None:
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}")
columns = [description[0] for description in cursor.description]
with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle:
writer = csv.writer(handle, lineterminator="\n")
writer.writerow(columns)
writer.writerows(cursor.fetchall())
def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None:
subprocess.run(
[
sys.executable,
"scripts/update_sync_state.py",
"--db",
db_path,
"--schema",
schema_path,
"--as-of",
as_of,
"--mode",
"hkex_document_archive",
"--summary-limit",
"25",
],
check=True,
)
def refresh_extracted_text(db_path: str, sources: list[ArchivedSource]) -> None:
pdf_source_ids = [
source.source_id
for source in sources
if Path(source.local_path).suffix.lower() == ".pdf"
]
if not pdf_source_ids:
return
command = [
sys.executable,
"scripts/extract_pdf_text.py",
"--db",
db_path,
]
for source_id in sorted(set(pdf_source_ids)):
command.extend(["--source-id", source_id])
subprocess.run(command, check=True)
def main() -> int:
args = parse_args()
as_of = parse_as_of(args.as_of)
stock_ids = load_stock_ids()
archived_sources: list[ArchivedSource] = []
processed = 0
missing_stock_ids: list[str] = []
missing_docs: list[str] = []
failed_tickers: list[tuple[str, str]] = []
with sqlite3.connect(args.db) as conn:
conn.executescript(Path(args.schema).read_text(encoding="utf-8"))
tickers = select_tickers(conn, args.limit, args.tickers)
for index, ticker in enumerate(tickers, start=1):
print(f"[{index}/{len(tickers)}] {ticker}", flush=True)
try:
stock_id = stock_ids.get(ticker)
if stock_id is None:
missing_stock_ids.append(ticker)
continue
listing_date, prospectus_date = ticker_dates(conn, ticker)
rows = title_search_rows(stock_id, listing_date, prospectus_date)
prospectus_row = choose_prospectus(rows, prospectus_date, listing_date)
allotment_row = choose_allotment(rows, listing_date)
if not prospectus_row and not allotment_row:
missing_docs.append(ticker)
continue
sources_for_ticker: list[ArchivedSource] = []
if prospectus_row:
prospectus_source = download_document(ticker, "prospectus", prospectus_row)
sources_for_ticker.append(prospectus_source)
prospectus_facts = parse_prospectus_facts(prospectus_source.local_path)
update_master_from_prospectus(conn, ticker, prospectus_facts, as_of)
update_terms_from_prospectus(
conn,
ticker,
prospectus_source.source_id,
prospectus_source.source_date,
prospectus_facts,
as_of,
)
if allotment_row:
allotment_source = download_document(ticker, "allotment_results", allotment_row)
sources_for_ticker.append(allotment_source)
if Path(allotment_source.local_path).suffix.lower() == ".pdf":
allotment_facts = parse_allotment_facts(allotment_source.local_path)
update_terms_from_allotment(conn, ticker, allotment_facts, as_of)
upsert_demand(
conn,
ticker,
allotment_source.source_id,
allotment_source.source_date,
allotment_facts,
as_of,
)
upsert_source_refs(conn, sources_for_ticker, as_of)
archived_sources.extend(sources_for_ticker)
processed += 1
except Exception as exc: # Keep full refreshes moving; report failures at the end.
failed_tickers.append((ticker, str(exc)))
for table in [
"ipo_master",
"offering_terms",
"ipo_demand",
"source_refs",
"data_gaps",
]:
export_snapshot(conn, table)
if not args.skip_sync_state:
refresh_sync_state(args.db, args.schema, as_of)
if not args.skip_text_extraction:
refresh_extracted_text(args.db, archived_sources)
print("hkex documents archived")
print(f"tickers selected: {len(tickers)}")
print(f"tickers processed: {processed}")
print(f"sources archived: {len(archived_sources)}")
if missing_stock_ids:
print("missing stock ids: " + ", ".join(missing_stock_ids))
if missing_docs:
print("missing target docs: " + ", ".join(missing_docs))
if failed_tickers:
print("failed tickers:")
for ticker, error in failed_tickers:
print(f"- {ticker}: {error}")
return 0
if __name__ == "__main__":
raise SystemExit(main())