Private
Public Access
0
0
Files
hk-ipo/scripts/archive_hkex_documents.py
T
geometrybase 8a0dfd88f0 Make PDF text extraction a standard archive step
Request:
- Add extracted PDF text generation to the archivist workflow as a standard step.

Changes:
- Run PDF text extraction automatically for newly archived HKEX PDF sources.
- Make the PDF text extractor incremental and manifest-preserving.
- Document extracted-text handling in the archivist skill and README.
- Mark generated extracted text as no-diff data evidence.
- Backfill extracted text for all archived PDF source references.

Verification:
- Ran git diff --cached --check.
- Ran .venv/bin/python -m py_compile scripts/extract_pdf_text.py scripts/archive_hkex_documents.py.
- Ran full PDF extraction, then confirmed an incremental rerun skips unchanged files.
- Verified 557 PDF source_refs, 557 manifest rows, all status ok, and zero missing text/hash/path issues.

Next useful context:
- HKEX HTML notices and Yahoo JSON market data remain under data/raw and are not expected in data/extracted_text.
2026-06-15 13:27:41 +00:00

873 lines
33 KiB
Python

#!/usr/bin/env python3
"""Archive HKEXnews prospectus and allotment-result documents for open sync tasks."""
from __future__ import annotations
import argparse
import csv
import hashlib
import html
import json
import logging
import re
import sqlite3
import subprocess
import sys
from dataclasses import dataclass
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from urllib.parse import urlencode, urljoin
from urllib.request import Request, urlopen
from pypdf import PdfReader
logging.getLogger("pypdf").setLevel(logging.ERROR)
BASE_URL = "https://www1.hkexnews.hk"
ACTIVE_STOCK_URL = f"{BASE_URL}/ncms/script/eds/activestock_sehk_e.json"
INACTIVE_STOCK_URL = f"{BASE_URL}/ncms/script/eds/inactivestock_sehk_e.json"
TITLE_SEARCH_URL = f"{BASE_URL}/search/titlesearch.xhtml"
TITLE_SEARCH_SERVLET_URL = f"{BASE_URL}/search/titleSearchServlet.do"
DB_PATH = Path("data/hk_ipo.sqlite")
SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
SNAPSHOT_DIR = Path("data/snapshots")
@dataclass(frozen=True)
class DocumentRow:
release_time: str
release_date: str
headline: str
title: str
href: str
url: str
@dataclass(frozen=True)
class ArchivedSource:
source_id: str
ticker: str
source_type: str
title: str
local_path: str
url: str
file_sha256: str
source_date: str
notes: str
@dataclass(frozen=True)
class ProspectusFacts:
application_start_date: str | None = None
application_end_date: str | None = None
allotment_results_expected_date: str | None = None
listing_date: str | None = None
board_lot: int | None = None
min_subscription_amount_hkd: float | None = None
global_offer_shares: int | None = None
hk_offer_shares_initial: int | None = None
international_offer_shares_initial: int | None = None
public_offer_pct_initial: float | None = None
over_allotment_offer_shares: int | None = None
@dataclass(frozen=True)
class AllotmentFacts:
final_offer_price_hkd: float | None = None
gross_proceeds_hkd_m: float | None = None
net_proceeds_hkd_m: float | None = None
issued_shares_upon_listing: int | None = None
valid_applications: int | None = None
successful_applications: int | None = None
public_oversubscription_times: float | None = None
international_placees: int | None = None
international_oversubscription_times: float | None = None
final_hk_offer_shares: int | None = None
final_international_offer_shares: int | None = None
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.")
parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.")
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
parser.add_argument("--limit", type=int, help="Optional maximum tickers to process. Omit to process all open T0/T1 tasks.")
parser.add_argument("--tickers", help="Comma-separated tickers to process instead of selecting from sync_tasks.")
parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh sync state after updating facts.")
parser.add_argument("--skip-text-extraction", action="store_true", help="Do not extract text for newly archived PDFs.")
return parser.parse_args()
def fetch_bytes(url: str) -> bytes:
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(request, timeout=60) as response:
return response.read()
def parse_as_of(value: str | None) -> str:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def load_stock_ids() -> dict[str, int]:
stock_ids: dict[str, int] = {}
for url in [ACTIVE_STOCK_URL, INACTIVE_STOCK_URL]:
payload = fetch_bytes(url).decode("utf-8-sig")
for item in json.loads(payload):
code = item.get("c")
stock_id = item.get("i")
if code and stock_id:
stock_ids.setdefault(code, int(stock_id))
return stock_ids
def clean_html(value: str) -> str:
text = re.sub(r"<.*?>", " ", value, flags=re.S)
return " ".join(html.unescape(text).split())
def parse_release_date(value: str) -> str:
return datetime.strptime(value.split()[0], "%d/%m/%Y").date().isoformat()
def parse_release_datetime(value: str) -> str:
return datetime.strptime(value, "%d/%m/%Y %H:%M").date().isoformat()
def latest_title_search_rows(stock_id: int) -> list[DocumentRow]:
url = f"{TITLE_SEARCH_URL}?category=0&market=SEHK&stockId={stock_id}"
page = fetch_bytes(url).decode("utf-8", "replace")
rows: list[DocumentRow] = []
for row in re.findall(r"<tr>(.*?)</tr>", page, flags=re.S):
release_match = re.search(r"Release Time: </span>(.*?)</td>", row, flags=re.S)
headline_match = re.search(r'<div class="headline">(.*?)</div>', row, flags=re.S)
link_match = re.search(r'<a href="([^"]+)"[^>]*>(.*?)</a>', row, flags=re.S)
if not release_match or not link_match:
continue
release_time = " ".join(release_match.group(1).split())
href = html.unescape(link_match.group(1))
rows.append(
DocumentRow(
release_time=release_time,
release_date=parse_release_date(release_time),
headline=clean_html(headline_match.group(1)) if headline_match else "",
title=clean_html(link_match.group(2)),
href=href,
url=urljoin(BASE_URL, href),
)
)
return rows
def window_title_search_rows(stock_id: int, from_date: date, to_date: date) -> list[DocumentRow]:
params = {
"sortDir": "0",
"sortByOptions": "DateTime",
"category": "0",
"market": "SEHK",
"stockId": str(stock_id),
"documentType": "-1",
"fromDate": from_date.strftime("%Y%m%d"),
"toDate": to_date.strftime("%Y%m%d"),
"title": "",
"searchType": "0",
"t1code": "-2",
"t2Gcode": "-2",
"t2code": "-2",
"rowRange": "500",
"lang": "en",
}
url = f"{TITLE_SEARCH_SERVLET_URL}?{urlencode(params)}"
request = Request(
url,
headers={
"User-Agent": "Mozilla/5.0",
"Referer": f"{TITLE_SEARCH_URL}?category=0&market=SEHK&stockId={stock_id}",
},
)
with urlopen(request, timeout=60) as response:
payload = response.read().decode("utf-8", "replace")
response_data = json.loads(payload)
result = json.loads(response_data.get("result") or "[]")
rows: list[DocumentRow] = []
for item in result:
href = html.unescape(item.get("FILE_LINK") or "")
release_time = " ".join((item.get("DATE_TIME") or "").split())
if not href or not release_time:
continue
rows.append(
DocumentRow(
release_time=release_time,
release_date=parse_release_datetime(release_time),
headline=clean_html(item.get("SHORT_TEXT") or ""),
title=clean_html(item.get("TITLE") or ""),
href=href,
url=urljoin(BASE_URL, href),
)
)
return rows
def title_search_rows(stock_id: int, listing_date: str | None, prospectus_date: str | None) -> list[DocumentRow]:
listed = parse_iso_date(listing_date)
prospectus = parse_iso_date(prospectus_date)
if listed:
return window_title_search_rows(stock_id, listed - timedelta(days=90), listed + timedelta(days=14))
if prospectus:
return window_title_search_rows(stock_id, prospectus - timedelta(days=14), prospectus + timedelta(days=60))
return latest_title_search_rows(stock_id)
def parse_iso_date(value: str | None) -> date | None:
if not value:
return None
return date.fromisoformat(value)
def date_distance(left: str, right: str) -> int:
return abs((date.fromisoformat(left) - date.fromisoformat(right)).days)
def archiveable_document(row: DocumentRow) -> bool:
return Path(row.href.lower()).suffix in {".pdf", ".htm", ".html"}
def choose_prospectus(rows: list[DocumentRow], prospectus_date: str | None, listing_date: str | None) -> DocumentRow | None:
candidates = []
for row in rows:
headline = row.headline.lower()
title = row.title.lower()
if not row.href.lower().endswith(".pdf"):
continue
if "listing documents" not in headline:
continue
if "global offering" in title or "prospectus" in title or title in {"share offer", "public offer"}:
candidates.append(row)
if not candidates:
return None
if prospectus_date:
return sorted(candidates, key=lambda row: (date_distance(row.release_date, prospectus_date), row.release_date))[0]
listed = parse_iso_date(listing_date)
if listed:
windowed = [
row
for row in candidates
if 0 <= (listed - date.fromisoformat(row.release_date)).days <= 60
]
if windowed:
candidates = windowed
return sorted(candidates, key=lambda row: row.release_date)[-1]
def choose_allotment(rows: list[DocumentRow], listing_date: str | None) -> DocumentRow | None:
candidates = [
row
for row in rows
if archiveable_document(row)
and ("allotment results" in row.headline.lower() or "allotment results" in row.title.lower())
]
if not candidates:
return None
listed = parse_iso_date(listing_date)
if listed:
windowed = [
row
for row in candidates
if -5 <= (listed - date.fromisoformat(row.release_date)).days <= 10
]
if windowed:
candidates = windowed
return sorted(candidates, key=lambda row: row.release_date)[-1]
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def download_document(ticker: str, source_type: str, row: DocumentRow) -> ArchivedSource:
data = fetch_bytes(row.url)
doc_id = Path(row.href).stem
suffix = Path(row.href).suffix.lower() or ".pdf"
local_path = Path("data/raw") / ticker / f"{source_type}_{row.release_date}_{doc_id}{suffix}"
local_path.parent.mkdir(parents=True, exist_ok=True)
if not local_path.exists() or local_path.read_bytes() != data:
local_path.write_bytes(data)
return ArchivedSource(
source_id=f"{ticker}_{source_type}_{row.release_date.replace('-', '_')}_{doc_id}",
ticker=ticker,
source_type=source_type,
title=row.title,
local_path=local_path.as_posix(),
url=row.url,
file_sha256=sha256_bytes(data),
source_date=row.release_date,
notes=f"HKEXnews {row.headline}.",
)
def first_pdf_text(local_path: str, max_pages: int) -> str:
reader = PdfReader(local_path)
chunks = []
for page in reader.pages[: min(max_pages, len(reader.pages))]:
chunks.append(page.extract_text() or "")
return " ".join(" ".join(chunks).split())
def normalize_pdf_text(text: str) -> str:
replacements = {
"H o n g K o n g P u b l i c O f f e r i n g c o m m e n c e s": "Hong Kong Public Offering commences",
"a t o r b e f o r e": "at or before",
"n o l a t e r": "no later",
"o n o r b e f o r e": "on or before",
}
for source, target in replacements.items():
text = text.replace(source, target)
text = re.sub(r"\bo\s+n\b", "on", text)
text = re.sub(r"\bf\s+r\s+o\s+m\b", "from", text)
return text
def integer_after(pattern: str, text: str) -> int | None:
match = re.search(pattern, text, flags=re.I)
if not match:
return None
cleaned = match.group(1).replace(",", "").replace(" ", "")
if not cleaned:
return None
return int(cleaned)
def float_after(pattern: str, text: str) -> float | None:
match = re.search(pattern, text, flags=re.I)
if not match:
return None
return float(match.group(1).replace(",", ""))
def money_m_after(pattern: str, text: str) -> float | None:
match = re.search(pattern, text, flags=re.I)
if not match:
return None
amount = float(match.group(1).replace(",", ""))
unit = (match.group(2) or "").lower()
if unit.startswith("b"):
return amount * 1000
return amount
def date_after(label_pattern: str, text: str) -> str | None:
match = re.search(
label_pattern
+ r".{0,600}?(?:on|from|at or before)\s+(?:[A-Z][a-z]+,\s+)?"
+ r"([A-Z][a-z]+ \d{1,2}, \d{4}|\d{1,2} [A-Z][a-z]+ \d{4})",
text,
flags=re.I,
)
if not match:
return None
value = match.group(1)
for date_format in ["%B %d, %Y", "%d %B %Y"]:
try:
return datetime.strptime(value, date_format).date().isoformat()
except ValueError:
pass
return None
def parse_prospectus_facts(local_path: str) -> ProspectusFacts:
text = normalize_pdf_text(first_pdf_text(local_path, 8))
board_lot = integer_after(r"minimum\s*of\s*([\d][\d,\s]*)\s*Hong\s*Kong\s*Offer\s*Shares", text)
min_amount = None
if board_lot:
pattern = rf"\b{board_lot:,}\b\s+([\d,]+\.\d{{2}})"
min_amount = float_after(pattern, text)
if min_amount is None:
pattern = rf"\b{board_lot}\b\s+([\d,]+\.\d{{2}})"
min_amount = float_after(pattern, text)
global_shares = integer_after(r"Number of Offer Shares (?:under|in) the Global Offering\s*:?\s+([\d][\d,\s]*)", text)
if global_shares is None:
global_shares = integer_after(r"Number of Offer Shares\s*:?\s+([\d][\d,\s]*)\s+(?:H\s+)?Shares", text)
hk_shares = integer_after(r"Number of Hong Kong Offer Shares\s*:?\s+([\d][\d,\s]*)", text)
intl_shares = integer_after(r"Number of International Offer Shares\s*:?\s+([\d][\d,\s]*)", text)
over_allotment = None
if global_shares:
over_allotment = round(global_shares * 0.15)
public_pct = round(hk_shares / global_shares, 4) if global_shares and hk_shares else None
allotment_date = (
date_after(r"Announcement of.*?Offer Price", text)
or date_after(r"Announcement of", text)
or date_after(r"The results of allocations", text)
)
return ProspectusFacts(
application_start_date=date_after(r"Hong Kong Public Offering commences", text),
application_end_date=date_after(r"Application lists.*?close", text),
allotment_results_expected_date=allotment_date,
listing_date=date_after(r"Dealings in the (?:H\s+)?Shares.*?expected to commence", text),
board_lot=board_lot,
min_subscription_amount_hkd=min_amount,
global_offer_shares=global_shares,
hk_offer_shares_initial=hk_shares,
international_offer_shares_initial=intl_shares,
public_offer_pct_initial=public_pct,
over_allotment_offer_shares=over_allotment,
)
def section_between(text: str, start: str, end: str | None, use_last_start: bool = False) -> str:
start_matches = list(re.finditer(start, text, flags=re.I))
if not start_matches:
return ""
start_match = start_matches[-1] if use_last_start else start_matches[0]
section_start = start_match.end()
if not end:
return text[section_start:]
end_match = re.search(end, text[section_start:], flags=re.I)
section_end = section_start + end_match.start() if end_match else len(text)
return text[section_start:section_end]
def allotment_detail_sections(text: str) -> tuple[str, str]:
hk_match = re.search(
r"HONG KONG PUBLIC OFFERING\s+No\. of valid applications(.*?)INTERNATIONAL OFFERING\s+No\. of placees",
text,
flags=re.I,
)
intl_match = re.search(
r"INTERNATIONAL OFFERING\s+No\. of placees(.*?)(?:The Directors|LOCK-UP|Allottees with|$)",
text,
flags=re.I,
)
hk_section = "No. of valid applications" + hk_match.group(1) if hk_match else ""
intl_section = "No. of placees" + intl_match.group(1) if intl_match else ""
return hk_section, intl_section
def parse_allotment_facts(local_path: str) -> AllotmentFacts:
text = first_pdf_text(local_path, 8)
hk_section, intl_section = allotment_detail_sections(text)
return AllotmentFacts(
final_offer_price_hkd=float_after(r"Final Offer Price\s+HK\$([\d,.]+)", text),
gross_proceeds_hkd_m=money_m_after(r"Gross proceeds.*?HK\$([\d,.]+)\s*(million|billion)?", text),
net_proceeds_hkd_m=money_m_after(r"Net proceeds\s+HK\$([\d,.]+)\s*(million|billion)?", text),
issued_shares_upon_listing=integer_after(r"Number of issued shares upon Listing.*?([\d,]+)", text),
valid_applications=integer_after(r"No\. of valid applications\s+([\d,]+)", hk_section),
successful_applications=integer_after(r"No\. of successful applications\s+([\d,]+)", hk_section),
public_oversubscription_times=float_after(r"Subscription level\s+([\d,.]+)\s+times", hk_section),
international_placees=integer_after(r"No\. of placees\s+([\d,]+)", intl_section),
international_oversubscription_times=float_after(r"Subscription level.*?([\d,.]+)\s+times", intl_section),
final_hk_offer_shares=integer_after(
r"Final no\. of Offer Shares under the Hong Kong Public Offering.*?([\d][\d,\s]*)",
hk_section,
),
final_international_offer_shares=integer_after(
r"Final no\. of Offer Shares under the International Offering.*?([\d][\d,\s]*)",
intl_section,
),
)
def select_tickers(conn: sqlite3.Connection, limit: int | None, tickers: str | None) -> list[str]:
if tickers:
return [ticker.strip().zfill(5) for ticker in tickers.split(",") if ticker.strip()]
sql = """
SELECT DISTINCT m.ticker
FROM sync_tasks t
JOIN ipo_master m ON m.ticker = t.ticker
WHERE t.task_status = 'open'
AND t.stage IN ('T0_prospectus', 'T1_allotment')
ORDER BY m.listing_date DESC, m.ticker
"""
params: tuple[object, ...] = ()
if limit is not None:
sql += " LIMIT ?"
params = (limit,)
rows = conn.execute(sql, params).fetchall()
return [row[0] for row in rows]
def ticker_dates(conn: sqlite3.Connection, ticker: str) -> tuple[str | None, str | None]:
row = conn.execute(
"""
SELECT m.listing_date, r.prospectus_date
FROM ipo_master m
LEFT JOIN new_listing_report_entries r ON r.ticker = m.ticker
WHERE m.ticker = ?
ORDER BY r.report_year DESC
LIMIT 1
""",
(ticker,),
).fetchone()
if row is None:
return None, None
return row[0], row[1]
def upsert_source_refs(conn: sqlite3.Connection, sources: list[ArchivedSource], as_of: str) -> None:
conn.executemany(
"""
INSERT INTO source_refs (
source_id, ticker, source_type, title, path_base, local_path, url,
file_sha256, source_date, archived_at, notes
)
VALUES (?, ?, ?, ?, 'repo_root', ?, ?, ?, ?, ?, ?)
ON CONFLICT(source_id) DO UPDATE SET
title = excluded.title,
local_path = excluded.local_path,
url = excluded.url,
file_sha256 = excluded.file_sha256,
source_date = excluded.source_date,
archived_at = excluded.archived_at,
notes = excluded.notes
""",
[
(
source.source_id,
source.ticker,
source.source_type,
source.title,
source.local_path,
source.url,
source.file_sha256,
source.source_date,
as_of,
source.notes,
)
for source in sources
],
)
def update_master_from_prospectus(conn: sqlite3.Connection, ticker: str, facts: ProspectusFacts, as_of: str) -> None:
conn.execute(
"""
UPDATE ipo_master
SET application_start_date = COALESCE(?, application_start_date),
application_end_date = COALESCE(?, application_end_date),
allotment_results_expected_date = COALESCE(?, allotment_results_expected_date),
listing_date = COALESCE(listing_date, ?),
data_as_of = ?
WHERE ticker = ?
""",
(
facts.application_start_date,
facts.application_end_date,
facts.allotment_results_expected_date,
facts.listing_date,
as_of,
ticker,
),
)
def update_terms_from_prospectus(
conn: sqlite3.Connection,
ticker: str,
source_id: str,
source_date: str,
facts: ProspectusFacts,
as_of: str,
) -> None:
conn.execute(
"""
INSERT INTO offering_terms (
ticker, source_id, prospectus_date, board_lot, min_subscription_amount_hkd,
global_offer_shares, hk_offer_shares_initial, international_offer_shares_initial,
public_offer_pct_initial, over_allotment_offer_shares, data_as_of
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker) DO UPDATE SET
source_id = CASE
WHEN offering_terms.source_id LIKE '%_new_listing_report_%'
OR offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.source_id
ELSE offering_terms.source_id
END,
prospectus_date = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.prospectus_date
ELSE COALESCE(offering_terms.prospectus_date, excluded.prospectus_date)
END,
board_lot = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.board_lot
ELSE COALESCE(offering_terms.board_lot, excluded.board_lot)
END,
min_subscription_amount_hkd = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.min_subscription_amount_hkd
ELSE COALESCE(offering_terms.min_subscription_amount_hkd, excluded.min_subscription_amount_hkd)
END,
global_offer_shares = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.global_offer_shares
ELSE COALESCE(offering_terms.global_offer_shares, excluded.global_offer_shares)
END,
hk_offer_shares_initial = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.hk_offer_shares_initial
ELSE COALESCE(offering_terms.hk_offer_shares_initial, excluded.hk_offer_shares_initial)
END,
international_offer_shares_initial = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.international_offer_shares_initial
ELSE COALESCE(
offering_terms.international_offer_shares_initial,
excluded.international_offer_shares_initial
)
END,
public_offer_pct_initial = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.public_offer_pct_initial
ELSE COALESCE(offering_terms.public_offer_pct_initial, excluded.public_offer_pct_initial)
END,
over_allotment_offer_shares = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.over_allotment_offer_shares
ELSE COALESCE(offering_terms.over_allotment_offer_shares, excluded.over_allotment_offer_shares)
END,
data_as_of = excluded.data_as_of
""",
(
ticker,
source_id,
source_date,
facts.board_lot,
facts.min_subscription_amount_hkd,
facts.global_offer_shares,
facts.hk_offer_shares_initial,
facts.international_offer_shares_initial,
facts.public_offer_pct_initial,
facts.over_allotment_offer_shares,
as_of,
),
)
def update_terms_from_allotment(conn: sqlite3.Connection, ticker: str, facts: AllotmentFacts, as_of: str) -> None:
conn.execute(
"""
UPDATE offering_terms
SET offer_price_hkd = COALESCE(offer_price_hkd, ?),
gross_proceeds_hkd_m = COALESCE(gross_proceeds_hkd_m, ?),
net_proceeds_hkd_m = COALESCE(net_proceeds_hkd_m, ?),
issued_shares_upon_listing = COALESCE(issued_shares_upon_listing, ?),
data_as_of = ?
WHERE ticker = ?
""",
(
facts.final_offer_price_hkd,
facts.gross_proceeds_hkd_m,
facts.net_proceeds_hkd_m,
facts.issued_shares_upon_listing,
as_of,
ticker,
),
)
def upsert_demand(conn: sqlite3.Connection, ticker: str, source_id: str, source_date: str, facts: AllotmentFacts, as_of: str) -> None:
if not any(
[
facts.valid_applications,
facts.successful_applications,
facts.public_oversubscription_times,
facts.international_placees,
facts.international_oversubscription_times,
]
):
return
demand_id = source_id.replace("_allotment_results_", "_allotment_")
conn.execute(
"""
INSERT INTO ipo_demand (
demand_id, ticker, source_id, stage_date, valid_applications, successful_applications,
public_oversubscription_times, international_placees, international_oversubscription_times,
final_hk_offer_shares, final_international_offer_shares, data_as_of, notes
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(demand_id) DO UPDATE SET
source_id = excluded.source_id,
stage_date = excluded.stage_date,
valid_applications = excluded.valid_applications,
successful_applications = excluded.successful_applications,
public_oversubscription_times = excluded.public_oversubscription_times,
international_placees = excluded.international_placees,
international_oversubscription_times = excluded.international_oversubscription_times,
final_hk_offer_shares = excluded.final_hk_offer_shares,
final_international_offer_shares = excluded.final_international_offer_shares,
data_as_of = excluded.data_as_of,
notes = excluded.notes
""",
(
demand_id,
ticker,
source_id,
source_date,
facts.valid_applications,
facts.successful_applications,
facts.public_oversubscription_times,
facts.international_placees,
facts.international_oversubscription_times,
facts.final_hk_offer_shares,
facts.final_international_offer_shares,
as_of,
"Parsed from HKEXnews allotment results announcement.",
),
)
def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None:
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}")
columns = [description[0] for description in cursor.description]
with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle:
writer = csv.writer(handle, lineterminator="\n")
writer.writerow(columns)
writer.writerows(cursor.fetchall())
def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None:
subprocess.run(
[
sys.executable,
"scripts/update_sync_state.py",
"--db",
db_path,
"--schema",
schema_path,
"--as-of",
as_of,
"--mode",
"hkex_document_archive",
"--summary-limit",
"25",
],
check=True,
)
def refresh_extracted_text(db_path: str, sources: list[ArchivedSource]) -> None:
pdf_source_ids = [
source.source_id
for source in sources
if Path(source.local_path).suffix.lower() == ".pdf"
]
if not pdf_source_ids:
return
command = [
sys.executable,
"scripts/extract_pdf_text.py",
"--db",
db_path,
]
for source_id in sorted(set(pdf_source_ids)):
command.extend(["--source-id", source_id])
subprocess.run(command, check=True)
def main() -> int:
args = parse_args()
as_of = parse_as_of(args.as_of)
stock_ids = load_stock_ids()
archived_sources: list[ArchivedSource] = []
processed = 0
missing_stock_ids: list[str] = []
missing_docs: list[str] = []
failed_tickers: list[tuple[str, str]] = []
with sqlite3.connect(args.db) as conn:
conn.executescript(Path(args.schema).read_text(encoding="utf-8"))
tickers = select_tickers(conn, args.limit, args.tickers)
for index, ticker in enumerate(tickers, start=1):
print(f"[{index}/{len(tickers)}] {ticker}", flush=True)
try:
stock_id = stock_ids.get(ticker)
if stock_id is None:
missing_stock_ids.append(ticker)
continue
listing_date, prospectus_date = ticker_dates(conn, ticker)
rows = title_search_rows(stock_id, listing_date, prospectus_date)
prospectus_row = choose_prospectus(rows, prospectus_date, listing_date)
allotment_row = choose_allotment(rows, listing_date)
if not prospectus_row and not allotment_row:
missing_docs.append(ticker)
continue
sources_for_ticker: list[ArchivedSource] = []
if prospectus_row:
prospectus_source = download_document(ticker, "prospectus", prospectus_row)
sources_for_ticker.append(prospectus_source)
prospectus_facts = parse_prospectus_facts(prospectus_source.local_path)
update_master_from_prospectus(conn, ticker, prospectus_facts, as_of)
update_terms_from_prospectus(
conn,
ticker,
prospectus_source.source_id,
prospectus_source.source_date,
prospectus_facts,
as_of,
)
if allotment_row:
allotment_source = download_document(ticker, "allotment_results", allotment_row)
sources_for_ticker.append(allotment_source)
if Path(allotment_source.local_path).suffix.lower() == ".pdf":
allotment_facts = parse_allotment_facts(allotment_source.local_path)
update_terms_from_allotment(conn, ticker, allotment_facts, as_of)
upsert_demand(
conn,
ticker,
allotment_source.source_id,
allotment_source.source_date,
allotment_facts,
as_of,
)
upsert_source_refs(conn, sources_for_ticker, as_of)
archived_sources.extend(sources_for_ticker)
processed += 1
except Exception as exc: # Keep full refreshes moving; report failures at the end.
failed_tickers.append((ticker, str(exc)))
for table in [
"ipo_master",
"offering_terms",
"ipo_demand",
"source_refs",
"data_gaps",
]:
export_snapshot(conn, table)
if not args.skip_sync_state:
refresh_sync_state(args.db, args.schema, as_of)
if not args.skip_text_extraction:
refresh_extracted_text(args.db, archived_sources)
print("hkex documents archived")
print(f"tickers selected: {len(tickers)}")
print(f"tickers processed: {processed}")
print(f"sources archived: {len(archived_sources)}")
if missing_stock_ids:
print("missing stock ids: " + ", ".join(missing_stock_ids))
if missing_docs:
print("missing target docs: " + ", ".join(missing_docs))
if failed_tickers:
print("failed tickers:")
for ticker, error in failed_tickers:
print(f"- {ticker}: {error}")
return 0
if __name__ == "__main__":
raise SystemExit(main())