Backfill first HKEX IPO document batch

Request:
Start progressively filling detailed information for recent HK IPO targets.

Changes:
- Add scripts/archive_hkex_documents.py to map tickers to HKEXnews stock IDs, select official prospectus and allotment-results PDFs, archive them under data/raw/{ticker}, parse high-confidence T0/T1 facts, export snapshots, and refresh sync state.
- Document the small-batch HKEX document backfill workflow in README.md and the archivist skill.
- Archive prospectus and allotment-results PDFs for 00901, 01081, 01779, 02290, 02553, and 03388.
- Fill T0 details including application dates, expected allotment date, board lot, minimum subscription amount, and offer-share counts for the six tickers.
- Fill T1 allotment-demand details including valid/successful applications, public subscription level, international placees, international subscription level, and final offer-share allocations.
- Refresh source_refs, ipo_master, offering_terms, ipo_demand, ticker_sync_state, and sync_tasks snapshots.

Verification:
- Ran archive_hkex_documents.py in a first small batch and re-ran corrected tickers after parser hardening.
- Parsed project Python scripts with ast.parse.
- Checked SQLite integrity and DB-to-snapshot row counts.
- Verified source_refs paths are repo-relative, source files exist, and SHA-256 hashes match.
- Confirmed batch field completeness for the six processed tickers.
- Ran git diff --check and git diff --cached --check.
- Checked for Python cache and SQLite transient files.

Next useful context:
- This batch added about 55MB of official HKEXnews PDFs.
- Sync state now has 16 complete stages, 1993 pending_due stages, and 42 pending_not_due stages.
- Continue with small --limit batches because HKEXnews title search can include historical or postponed offering documents for the same stock code.
This commit is contained in:
2026-06-15 07:07:46 +00:00
parent c65b20a1c4
commit 993d7b26fa
23 changed files with 4908 additions and 4110 deletions
+761
View File
@@ -0,0 +1,761 @@
#!/usr/bin/env python3
"""Archive HKEXnews prospectus and allotment-result documents for open sync tasks."""
from __future__ import annotations
import argparse
import csv
import hashlib
import html
import json
import re
import sqlite3
import subprocess
import sys
from dataclasses import dataclass
from datetime import date, datetime, timezone
from pathlib import Path
from urllib.parse import urljoin
from urllib.request import Request, urlopen
from pypdf import PdfReader
BASE_URL = "https://www1.hkexnews.hk"
ACTIVE_STOCK_URL = f"{BASE_URL}/ncms/script/eds/activestock_sehk_e.json"
INACTIVE_STOCK_URL = f"{BASE_URL}/ncms/script/eds/inactivestock_sehk_e.json"
TITLE_SEARCH_URL = f"{BASE_URL}/search/titlesearch.xhtml"
DB_PATH = Path("data/hk_ipo.sqlite")
SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
SNAPSHOT_DIR = Path("data/snapshots")
@dataclass(frozen=True)
class DocumentRow:
release_time: str
release_date: str
headline: str
title: str
href: str
url: str
@dataclass(frozen=True)
class ArchivedSource:
source_id: str
ticker: str
source_type: str
title: str
local_path: str
url: str
file_sha256: str
source_date: str
notes: str
@dataclass(frozen=True)
class ProspectusFacts:
application_start_date: str | None = None
application_end_date: str | None = None
allotment_results_expected_date: str | None = None
listing_date: str | None = None
board_lot: int | None = None
min_subscription_amount_hkd: float | None = None
global_offer_shares: int | None = None
hk_offer_shares_initial: int | None = None
international_offer_shares_initial: int | None = None
public_offer_pct_initial: float | None = None
over_allotment_offer_shares: int | None = None
@dataclass(frozen=True)
class AllotmentFacts:
final_offer_price_hkd: float | None = None
gross_proceeds_hkd_m: float | None = None
net_proceeds_hkd_m: float | None = None
issued_shares_upon_listing: int | None = None
valid_applications: int | None = None
successful_applications: int | None = None
public_oversubscription_times: float | None = None
international_placees: int | None = None
international_oversubscription_times: float | None = None
final_hk_offer_shares: int | None = None
final_international_offer_shares: int | None = None
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.")
parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.")
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
parser.add_argument("--limit", type=int, default=5, help="Maximum tickers to process.")
parser.add_argument("--tickers", help="Comma-separated tickers to process instead of selecting from sync_tasks.")
parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh sync state after updating facts.")
return parser.parse_args()
def fetch_bytes(url: str) -> bytes:
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(request, timeout=60) as response:
return response.read()
def parse_as_of(value: str | None) -> str:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def load_stock_ids() -> dict[str, int]:
stock_ids: dict[str, int] = {}
for url in [ACTIVE_STOCK_URL, INACTIVE_STOCK_URL]:
payload = fetch_bytes(url).decode("utf-8-sig")
for item in json.loads(payload):
code = item.get("c")
stock_id = item.get("i")
if code and stock_id:
stock_ids.setdefault(code, int(stock_id))
return stock_ids
def clean_html(value: str) -> str:
text = re.sub(r"<.*?>", " ", value, flags=re.S)
return " ".join(html.unescape(text).split())
def parse_release_date(value: str) -> str:
return datetime.strptime(value.split()[0], "%d/%m/%Y").date().isoformat()
def title_search_rows(stock_id: int) -> list[DocumentRow]:
url = f"{TITLE_SEARCH_URL}?category=0&market=SEHK&stockId={stock_id}"
page = fetch_bytes(url).decode("utf-8", "replace")
rows: list[DocumentRow] = []
for row in re.findall(r"<tr>(.*?)</tr>", page, flags=re.S):
release_match = re.search(r"Release Time: </span>(.*?)</td>", row, flags=re.S)
headline_match = re.search(r'<div class="headline">(.*?)</div>', row, flags=re.S)
link_match = re.search(r'<a href="([^"]+)"[^>]*>(.*?)</a>', row, flags=re.S)
if not release_match or not link_match:
continue
release_time = " ".join(release_match.group(1).split())
href = html.unescape(link_match.group(1))
rows.append(
DocumentRow(
release_time=release_time,
release_date=parse_release_date(release_time),
headline=clean_html(headline_match.group(1)) if headline_match else "",
title=clean_html(link_match.group(2)),
href=href,
url=urljoin(BASE_URL, href),
)
)
return rows
def parse_iso_date(value: str | None) -> date | None:
if not value:
return None
return date.fromisoformat(value)
def date_distance(left: str, right: str) -> int:
return abs((date.fromisoformat(left) - date.fromisoformat(right)).days)
def choose_prospectus(rows: list[DocumentRow], prospectus_date: str | None, listing_date: str | None) -> DocumentRow | None:
candidates = []
for row in rows:
headline = row.headline.lower()
title = row.title.lower()
if not row.href.lower().endswith(".pdf"):
continue
if "listing documents" not in headline:
continue
if "global offering" in title or "prospectus" in title:
candidates.append(row)
if not candidates:
return None
if prospectus_date:
return sorted(candidates, key=lambda row: (date_distance(row.release_date, prospectus_date), row.release_date))[0]
listed = parse_iso_date(listing_date)
if listed:
windowed = [
row
for row in candidates
if 0 <= (listed - date.fromisoformat(row.release_date)).days <= 60
]
if windowed:
candidates = windowed
return sorted(candidates, key=lambda row: row.release_date)[-1]
def choose_allotment(rows: list[DocumentRow], listing_date: str | None) -> DocumentRow | None:
candidates = [
row
for row in rows
if row.href.lower().endswith(".pdf")
and ("allotment results" in row.headline.lower() or "allotment results" in row.title.lower())
]
if not candidates:
return None
listed = parse_iso_date(listing_date)
if listed:
windowed = [
row
for row in candidates
if -5 <= (listed - date.fromisoformat(row.release_date)).days <= 10
]
if windowed:
candidates = windowed
return sorted(candidates, key=lambda row: row.release_date)[-1]
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def download_document(ticker: str, source_type: str, row: DocumentRow) -> ArchivedSource:
data = fetch_bytes(row.url)
doc_id = Path(row.href).stem
suffix = Path(row.href).suffix.lower() or ".pdf"
local_path = Path("data/raw") / ticker / f"{source_type}_{row.release_date}_{doc_id}{suffix}"
local_path.parent.mkdir(parents=True, exist_ok=True)
if not local_path.exists() or local_path.read_bytes() != data:
local_path.write_bytes(data)
return ArchivedSource(
source_id=f"{ticker}_{source_type}_{row.release_date.replace('-', '_')}_{doc_id}",
ticker=ticker,
source_type=source_type,
title=row.title,
local_path=local_path.as_posix(),
url=row.url,
file_sha256=sha256_bytes(data),
source_date=row.release_date,
notes=f"HKEXnews {row.headline}.",
)
def first_pdf_text(local_path: str, max_pages: int) -> str:
reader = PdfReader(local_path)
chunks = []
for page in reader.pages[: min(max_pages, len(reader.pages))]:
chunks.append(page.extract_text() or "")
return " ".join(" ".join(chunks).split())
def normalize_pdf_text(text: str) -> str:
replacements = {
"H o n g K o n g P u b l i c O f f e r i n g c o m m e n c e s": "Hong Kong Public Offering commences",
"a t o r b e f o r e": "at or before",
"n o l a t e r": "no later",
"o n o r b e f o r e": "on or before",
}
for source, target in replacements.items():
text = text.replace(source, target)
text = re.sub(r"\bo\s+n\b", "on", text)
text = re.sub(r"\bf\s+r\s+o\s+m\b", "from", text)
return text
def integer_after(pattern: str, text: str) -> int | None:
match = re.search(pattern, text, flags=re.I)
if not match:
return None
cleaned = match.group(1).replace(",", "").replace(" ", "")
if not cleaned:
return None
return int(cleaned)
def float_after(pattern: str, text: str) -> float | None:
match = re.search(pattern, text, flags=re.I)
if not match:
return None
return float(match.group(1).replace(",", ""))
def money_m_after(pattern: str, text: str) -> float | None:
match = re.search(pattern, text, flags=re.I)
if not match:
return None
amount = float(match.group(1).replace(",", ""))
unit = (match.group(2) or "").lower()
if unit.startswith("b"):
return amount * 1000
return amount
def date_after(label_pattern: str, text: str) -> str | None:
match = re.search(
label_pattern
+ r".{0,600}?(?:on|from|at or before)\s+(?:[A-Z][a-z]+,\s+)?"
+ r"([A-Z][a-z]+ \d{1,2}, \d{4}|\d{1,2} [A-Z][a-z]+ \d{4})",
text,
flags=re.I,
)
if not match:
return None
value = match.group(1)
for date_format in ["%B %d, %Y", "%d %B %Y"]:
try:
return datetime.strptime(value, date_format).date().isoformat()
except ValueError:
pass
return None
def parse_prospectus_facts(local_path: str) -> ProspectusFacts:
text = normalize_pdf_text(first_pdf_text(local_path, 8))
board_lot = integer_after(r"minimum\s*of\s*([\d][\d,\s]*)\s*Hong\s*Kong\s*Offer\s*Shares", text)
min_amount = None
if board_lot:
pattern = rf"\b{board_lot:,}\b\s+([\d,]+\.\d{{2}})"
min_amount = float_after(pattern, text)
if min_amount is None:
pattern = rf"\b{board_lot}\b\s+([\d,]+\.\d{{2}})"
min_amount = float_after(pattern, text)
global_shares = integer_after(r"Number of Offer Shares (?:under|in) the Global Offering\s*:?\s+([\d][\d,\s]*)", text)
if global_shares is None:
global_shares = integer_after(r"Number of Offer Shares\s*:?\s+([\d][\d,\s]*)\s+(?:H\s+)?Shares", text)
hk_shares = integer_after(r"Number of Hong Kong Offer Shares\s*:?\s+([\d][\d,\s]*)", text)
intl_shares = integer_after(r"Number of International Offer Shares\s*:?\s+([\d][\d,\s]*)", text)
over_allotment = None
if global_shares:
over_allotment = round(global_shares * 0.15)
public_pct = round(hk_shares / global_shares, 4) if global_shares and hk_shares else None
allotment_date = (
date_after(r"Announcement of.*?Offer Price", text)
or date_after(r"Announcement of", text)
or date_after(r"The results of allocations", text)
)
return ProspectusFacts(
application_start_date=date_after(r"Hong Kong Public Offering commences", text),
application_end_date=date_after(r"Application lists.*?close", text),
allotment_results_expected_date=allotment_date,
listing_date=date_after(r"Dealings in the (?:H\s+)?Shares.*?expected to commence", text),
board_lot=board_lot,
min_subscription_amount_hkd=min_amount,
global_offer_shares=global_shares,
hk_offer_shares_initial=hk_shares,
international_offer_shares_initial=intl_shares,
public_offer_pct_initial=public_pct,
over_allotment_offer_shares=over_allotment,
)
def section_between(text: str, start: str, end: str | None, use_last_start: bool = False) -> str:
start_matches = list(re.finditer(start, text, flags=re.I))
if not start_matches:
return ""
start_match = start_matches[-1] if use_last_start else start_matches[0]
section_start = start_match.end()
if not end:
return text[section_start:]
end_match = re.search(end, text[section_start:], flags=re.I)
section_end = section_start + end_match.start() if end_match else len(text)
return text[section_start:section_end]
def allotment_detail_sections(text: str) -> tuple[str, str]:
hk_match = re.search(
r"HONG KONG PUBLIC OFFERING\s+No\. of valid applications(.*?)INTERNATIONAL OFFERING\s+No\. of placees",
text,
flags=re.I,
)
intl_match = re.search(
r"INTERNATIONAL OFFERING\s+No\. of placees(.*?)(?:The Directors|LOCK-UP|Allottees with|$)",
text,
flags=re.I,
)
hk_section = "No. of valid applications" + hk_match.group(1) if hk_match else ""
intl_section = "No. of placees" + intl_match.group(1) if intl_match else ""
return hk_section, intl_section
def parse_allotment_facts(local_path: str) -> AllotmentFacts:
text = first_pdf_text(local_path, 8)
hk_section, intl_section = allotment_detail_sections(text)
return AllotmentFacts(
final_offer_price_hkd=float_after(r"Final Offer Price\s+HK\$([\d,.]+)", text),
gross_proceeds_hkd_m=money_m_after(r"Gross proceeds.*?HK\$([\d,.]+)\s*(million|billion)?", text),
net_proceeds_hkd_m=money_m_after(r"Net proceeds\s+HK\$([\d,.]+)\s*(million|billion)?", text),
issued_shares_upon_listing=integer_after(r"Number of issued shares upon Listing.*?([\d,]+)", text),
valid_applications=integer_after(r"No\. of valid applications\s+([\d,]+)", hk_section),
successful_applications=integer_after(r"No\. of successful applications\s+([\d,]+)", hk_section),
public_oversubscription_times=float_after(r"Subscription level\s+([\d,.]+)\s+times", hk_section),
international_placees=integer_after(r"No\. of placees\s+([\d,]+)", intl_section),
international_oversubscription_times=float_after(r"Subscription level.*?([\d,.]+)\s+times", intl_section),
final_hk_offer_shares=integer_after(
r"Final no\. of Offer Shares under the Hong Kong Public Offering.*?([\d][\d,\s]*)",
hk_section,
),
final_international_offer_shares=integer_after(
r"Final no\. of Offer Shares under the International Offering.*?([\d][\d,\s]*)",
intl_section,
),
)
def select_tickers(conn: sqlite3.Connection, limit: int, tickers: str | None) -> list[str]:
if tickers:
return [ticker.strip().zfill(5) for ticker in tickers.split(",") if ticker.strip()]
rows = conn.execute(
"""
SELECT DISTINCT m.ticker
FROM sync_tasks t
JOIN ipo_master m ON m.ticker = t.ticker
WHERE t.task_status = 'open'
AND t.stage IN ('T0_prospectus', 'T1_allotment')
ORDER BY m.listing_date DESC, m.ticker
LIMIT ?
""",
(limit,),
).fetchall()
return [row[0] for row in rows]
def ticker_dates(conn: sqlite3.Connection, ticker: str) -> tuple[str | None, str | None]:
row = conn.execute(
"""
SELECT m.listing_date, r.prospectus_date
FROM ipo_master m
LEFT JOIN new_listing_report_entries r ON r.ticker = m.ticker
WHERE m.ticker = ?
ORDER BY r.report_year DESC
LIMIT 1
""",
(ticker,),
).fetchone()
if row is None:
return None, None
return row[0], row[1]
def upsert_source_refs(conn: sqlite3.Connection, sources: list[ArchivedSource], as_of: str) -> None:
conn.executemany(
"""
INSERT INTO source_refs (
source_id, ticker, source_type, title, path_base, local_path, url,
file_sha256, source_date, archived_at, notes
)
VALUES (?, ?, ?, ?, 'repo_root', ?, ?, ?, ?, ?, ?)
ON CONFLICT(source_id) DO UPDATE SET
title = excluded.title,
local_path = excluded.local_path,
url = excluded.url,
file_sha256 = excluded.file_sha256,
source_date = excluded.source_date,
archived_at = excluded.archived_at,
notes = excluded.notes
""",
[
(
source.source_id,
source.ticker,
source.source_type,
source.title,
source.local_path,
source.url,
source.file_sha256,
source.source_date,
as_of,
source.notes,
)
for source in sources
],
)
def update_master_from_prospectus(conn: sqlite3.Connection, ticker: str, facts: ProspectusFacts, as_of: str) -> None:
conn.execute(
"""
UPDATE ipo_master
SET application_start_date = COALESCE(?, application_start_date),
application_end_date = COALESCE(?, application_end_date),
allotment_results_expected_date = COALESCE(?, allotment_results_expected_date),
listing_date = COALESCE(listing_date, ?),
data_as_of = ?
WHERE ticker = ?
""",
(
facts.application_start_date,
facts.application_end_date,
facts.allotment_results_expected_date,
facts.listing_date,
as_of,
ticker,
),
)
def update_terms_from_prospectus(
conn: sqlite3.Connection,
ticker: str,
source_id: str,
source_date: str,
facts: ProspectusFacts,
as_of: str,
) -> None:
conn.execute(
"""
INSERT INTO offering_terms (
ticker, source_id, prospectus_date, board_lot, min_subscription_amount_hkd,
global_offer_shares, hk_offer_shares_initial, international_offer_shares_initial,
public_offer_pct_initial, over_allotment_offer_shares, data_as_of
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker) DO UPDATE SET
source_id = CASE
WHEN offering_terms.source_id LIKE '%_new_listing_report_%'
OR offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.source_id
ELSE offering_terms.source_id
END,
prospectus_date = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.prospectus_date
ELSE COALESCE(offering_terms.prospectus_date, excluded.prospectus_date)
END,
board_lot = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.board_lot
ELSE COALESCE(offering_terms.board_lot, excluded.board_lot)
END,
min_subscription_amount_hkd = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.min_subscription_amount_hkd
ELSE COALESCE(offering_terms.min_subscription_amount_hkd, excluded.min_subscription_amount_hkd)
END,
global_offer_shares = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.global_offer_shares
ELSE COALESCE(offering_terms.global_offer_shares, excluded.global_offer_shares)
END,
hk_offer_shares_initial = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.hk_offer_shares_initial
ELSE COALESCE(offering_terms.hk_offer_shares_initial, excluded.hk_offer_shares_initial)
END,
international_offer_shares_initial = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.international_offer_shares_initial
ELSE COALESCE(
offering_terms.international_offer_shares_initial,
excluded.international_offer_shares_initial
)
END,
public_offer_pct_initial = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.public_offer_pct_initial
ELSE COALESCE(offering_terms.public_offer_pct_initial, excluded.public_offer_pct_initial)
END,
over_allotment_offer_shares = CASE
WHEN offering_terms.source_id = excluded.source_id
OR offering_terms.source_id LIKE offering_terms.ticker || '_prospectus_%'
THEN excluded.over_allotment_offer_shares
ELSE COALESCE(offering_terms.over_allotment_offer_shares, excluded.over_allotment_offer_shares)
END,
data_as_of = excluded.data_as_of
""",
(
ticker,
source_id,
source_date,
facts.board_lot,
facts.min_subscription_amount_hkd,
facts.global_offer_shares,
facts.hk_offer_shares_initial,
facts.international_offer_shares_initial,
facts.public_offer_pct_initial,
facts.over_allotment_offer_shares,
as_of,
),
)
def update_terms_from_allotment(conn: sqlite3.Connection, ticker: str, facts: AllotmentFacts, as_of: str) -> None:
conn.execute(
"""
UPDATE offering_terms
SET offer_price_hkd = COALESCE(offer_price_hkd, ?),
gross_proceeds_hkd_m = COALESCE(gross_proceeds_hkd_m, ?),
net_proceeds_hkd_m = COALESCE(net_proceeds_hkd_m, ?),
issued_shares_upon_listing = COALESCE(issued_shares_upon_listing, ?),
data_as_of = ?
WHERE ticker = ?
""",
(
facts.final_offer_price_hkd,
facts.gross_proceeds_hkd_m,
facts.net_proceeds_hkd_m,
facts.issued_shares_upon_listing,
as_of,
ticker,
),
)
def upsert_demand(conn: sqlite3.Connection, ticker: str, source_id: str, source_date: str, facts: AllotmentFacts, as_of: str) -> None:
if not any(
[
facts.valid_applications,
facts.successful_applications,
facts.public_oversubscription_times,
facts.international_placees,
facts.international_oversubscription_times,
]
):
return
demand_id = source_id.replace("_allotment_results_", "_allotment_")
conn.execute(
"""
INSERT INTO ipo_demand (
demand_id, ticker, source_id, stage_date, valid_applications, successful_applications,
public_oversubscription_times, international_placees, international_oversubscription_times,
final_hk_offer_shares, final_international_offer_shares, data_as_of, notes
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(demand_id) DO UPDATE SET
source_id = excluded.source_id,
stage_date = excluded.stage_date,
valid_applications = excluded.valid_applications,
successful_applications = excluded.successful_applications,
public_oversubscription_times = excluded.public_oversubscription_times,
international_placees = excluded.international_placees,
international_oversubscription_times = excluded.international_oversubscription_times,
final_hk_offer_shares = excluded.final_hk_offer_shares,
final_international_offer_shares = excluded.final_international_offer_shares,
data_as_of = excluded.data_as_of,
notes = excluded.notes
""",
(
demand_id,
ticker,
source_id,
source_date,
facts.valid_applications,
facts.successful_applications,
facts.public_oversubscription_times,
facts.international_placees,
facts.international_oversubscription_times,
facts.final_hk_offer_shares,
facts.final_international_offer_shares,
as_of,
"Parsed from HKEXnews allotment results announcement.",
),
)
def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None:
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}")
columns = [description[0] for description in cursor.description]
with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle:
writer = csv.writer(handle, lineterminator="\n")
writer.writerow(columns)
writer.writerows(cursor.fetchall())
def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None:
subprocess.run(
[
sys.executable,
"scripts/update_sync_state.py",
"--db",
db_path,
"--schema",
schema_path,
"--as-of",
as_of,
"--mode",
"hkex_document_archive",
"--summary-limit",
"25",
],
check=True,
)
def main() -> int:
args = parse_args()
as_of = parse_as_of(args.as_of)
stock_ids = load_stock_ids()
archived_sources: list[ArchivedSource] = []
processed = 0
missing_stock_ids: list[str] = []
missing_docs: list[str] = []
with sqlite3.connect(args.db) as conn:
conn.executescript(Path(args.schema).read_text(encoding="utf-8"))
tickers = select_tickers(conn, args.limit, args.tickers)
for ticker in tickers:
stock_id = stock_ids.get(ticker)
if stock_id is None:
missing_stock_ids.append(ticker)
continue
rows = title_search_rows(stock_id)
listing_date, prospectus_date = ticker_dates(conn, ticker)
prospectus_row = choose_prospectus(rows, prospectus_date, listing_date)
allotment_row = choose_allotment(rows, listing_date)
if not prospectus_row and not allotment_row:
missing_docs.append(ticker)
continue
sources_for_ticker: list[ArchivedSource] = []
if prospectus_row:
prospectus_source = download_document(ticker, "prospectus", prospectus_row)
sources_for_ticker.append(prospectus_source)
prospectus_facts = parse_prospectus_facts(prospectus_source.local_path)
update_master_from_prospectus(conn, ticker, prospectus_facts, as_of)
update_terms_from_prospectus(
conn,
ticker,
prospectus_source.source_id,
prospectus_source.source_date,
prospectus_facts,
as_of,
)
if allotment_row:
allotment_source = download_document(ticker, "allotment_results", allotment_row)
sources_for_ticker.append(allotment_source)
allotment_facts = parse_allotment_facts(allotment_source.local_path)
update_terms_from_allotment(conn, ticker, allotment_facts, as_of)
upsert_demand(conn, ticker, allotment_source.source_id, allotment_source.source_date, allotment_facts, as_of)
upsert_source_refs(conn, sources_for_ticker, as_of)
archived_sources.extend(sources_for_ticker)
processed += 1
for table in [
"ipo_master",
"offering_terms",
"ipo_demand",
"source_refs",
"data_gaps",
]:
export_snapshot(conn, table)
if not args.skip_sync_state:
refresh_sync_state(args.db, args.schema, as_of)
print("hkex documents archived")
print(f"tickers selected: {len(tickers)}")
print(f"tickers processed: {processed}")
print(f"sources archived: {len(archived_sources)}")
if missing_stock_ids:
print("missing stock ids: " + ", ".join(missing_stock_ids))
if missing_docs:
print("missing target docs: " + ", ".join(missing_docs))
return 0
if __name__ == "__main__":
raise SystemExit(main())