#!/usr/bin/env python3 """Backfill structured T1 allotment demand facts from archived text evidence.""" from __future__ import annotations import argparse import csv import hashlib import html import re import sqlite3 import subprocess import sys from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from urllib.parse import urljoin from archive_hkex_documents import ( ArchivedSource, AllotmentFacts, choose_allotment, download_document, export_snapshot, fetch_bytes, first_pdf_text_with_lines, load_stock_ids, parse_allotment_facts_from_text, ticker_dates, title_search_rows, upsert_demand, upsert_source_refs, ) DB_PATH = Path("data/hk_ipo.sqlite") SCHEMA_PATH = Path("schema/hk_ipo.schema.sql") SNAPSHOT_DIR = Path("data/snapshots") EXTRACTED_TEXT_MANIFEST = SNAPSHOT_DIR / "extracted_text_manifest.csv" @dataclass(frozen=True) class DemandGapSource: ticker: str company_name_en: str source_id: str source_type: str title: str local_path: str url: str file_sha256: str source_date: str def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.") parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.") parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.") parser.add_argument("--tickers", help="Comma-separated tickers to process instead of all source-only T1 gaps.") parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh sync state after updating facts.") return parser.parse_args() def parse_as_of(value: str | None) -> str: if value: return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z") return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def sha256_bytes(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def load_manifest() -> dict[str, dict[str, str]]: if not EXTRACTED_TEXT_MANIFEST.exists(): return {} with EXTRACTED_TEXT_MANIFEST.open(newline="", encoding="utf-8") as handle: return {row["source_id"]: row for row in csv.DictReader(handle)} def select_source_only_gaps(conn: sqlite3.Connection, tickers: str | None) -> list[DemandGapSource]: ticker_filter = "" params: list[object] = [] if tickers: selected = [ticker.strip().zfill(5) for ticker in tickers.split(",") if ticker.strip()] placeholders = ", ".join("?" for _ in selected) ticker_filter = f" AND s.ticker IN ({placeholders})" params.extend(selected) rows = conn.execute( f""" SELECT s.ticker, m.company_name_en, sr.source_id, sr.source_type, sr.title, sr.local_path, sr.url, sr.file_sha256, sr.source_date FROM ticker_sync_state s JOIN ipo_master m ON m.ticker = s.ticker JOIN source_refs sr ON sr.source_id = s.last_source_id LEFT JOIN ipo_demand d ON d.ticker = s.ticker WHERE s.stage = 'T1_allotment' AND s.status = 'complete' AND d.ticker IS NULL AND sr.source_type = 'allotment_results' {ticker_filter} ORDER BY s.ticker """, params, ).fetchall() return [DemandGapSource(**dict(row)) for row in rows] def local_text_for_pdf(source: ArchivedSource | DemandGapSource, manifest: dict[str, dict[str, str]]) -> str: manifest_row = manifest.get(source.source_id) if manifest_row: text_path = Path(manifest_row["text_local_path"]) if text_path.exists(): return text_path.read_text(encoding="utf-8") return first_pdf_text_with_lines(source.local_path, 24) def has_core_demand(facts: AllotmentFacts) -> bool: return bool( facts.valid_applications and facts.public_oversubscription_times and (facts.successful_applications or facts.international_placees or facts.international_oversubscription_times) ) def is_secondary_allotment_notice(source: DemandGapSource) -> bool: title = source.title.lower() return "clarification" in title or "supplemental" in title def primary_allotment_source( conn: sqlite3.Connection, stock_ids: dict[str, int], source: DemandGapSource, as_of: str, ) -> ArchivedSource | None: stock_id = stock_ids.get(source.ticker) if stock_id is None: return None listing_date, prospectus_date = ticker_dates(conn, source.ticker) row = choose_allotment(title_search_rows(stock_id, listing_date, prospectus_date), listing_date) if row is None: return None if row.url == source.url: return None archived = download_document(source.ticker, "allotment_results", row) upsert_source_refs(conn, [archived], as_of) return archived def summary_pdf_source(source: DemandGapSource, as_of: str) -> ArchivedSource | None: page = Path(source.local_path).read_text(encoding="utf-8", errors="replace") links: list[tuple[str, str]] = [] for match in re.finditer(r']+href="([^"]+)"[^>]*>(.*?)', page, flags=re.I | re.S): label = " ".join(re.sub(r"<.*?>", " ", html.unescape(match.group(2))).split()) href = html.unescape(match.group(1)) if "summary" in label.lower(): links.append((label, href)) if not links: return None label, href = links[0] url = urljoin(source.url, href) data = fetch_bytes(url) doc_id = Path(href).stem local_path = Path("data/raw") / source.ticker / f"allotment_results_summary_{source.source_date}_{doc_id}.pdf" local_path.parent.mkdir(parents=True, exist_ok=True) if not local_path.exists() or local_path.read_bytes() != data: local_path.write_bytes(data) return ArchivedSource( source_id=f"{source.ticker}_allotment_results_summary_{source.source_date.replace('-', '_')}_{doc_id}", ticker=source.ticker, source_type="allotment_results", title=f"{source.title} - {label}", local_path=local_path.as_posix(), url=url, file_sha256=sha256_bytes(data), source_date=source.source_date, notes=f"HKEXnews allotment-results summary PDF linked from {source.source_id}.", ) def extract_text_for_sources(db_path: str, source_ids: list[str]) -> None: if not source_ids: return command = [sys.executable, "scripts/extract_pdf_text.py", "--db", db_path] for source_id in sorted(set(source_ids)): command.extend(["--source-id", source_id]) subprocess.run(command, check=True) def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None: subprocess.run( [ sys.executable, "scripts/update_sync_state.py", "--db", db_path, "--schema", schema_path, "--as-of", as_of, "--mode", "t1_demand_text_backfill", "--summary-limit", "25", ], check=True, ) def main() -> int: args = parse_args() as_of = parse_as_of(args.as_of) manifest = load_manifest() stock_ids: dict[str, int] = {} selected = 0 parsed = 0 downloaded_sources: list[ArchivedSource] = [] extracted_source_ids: list[str] = [] unparsed: list[tuple[str, str]] = [] with sqlite3.connect(args.db) as conn: conn.row_factory = sqlite3.Row conn.executescript(Path(args.schema).read_text(encoding="utf-8")) gaps = select_source_only_gaps(conn, args.tickers) selected = len(gaps) for index, source in enumerate(gaps, start=1): print(f"[{index}/{len(gaps)}] {source.ticker}", flush=True) parse_source: ArchivedSource | DemandGapSource | None = source if Path(source.local_path).suffix.lower() in {".htm", ".html"}: parse_source = summary_pdf_source(source, as_of) if parse_source is not None: upsert_source_refs(conn, [parse_source], as_of) downloaded_sources.append(parse_source) extracted_source_ids.append(parse_source.source_id) text = "" if parse_source is not None and Path(parse_source.local_path).suffix.lower() == ".pdf": text = local_text_for_pdf(parse_source, manifest) facts = parse_allotment_facts_from_text(text) if text else AllotmentFacts() if not has_core_demand(facts) and is_secondary_allotment_notice(source): if not stock_ids: stock_ids = load_stock_ids() primary = primary_allotment_source(conn, stock_ids, source, as_of) if primary is not None: downloaded_sources.append(primary) extracted_source_ids.append(primary.source_id) parse_source = primary facts = parse_allotment_facts_from_text(first_pdf_text_with_lines(primary.local_path, 24)) if parse_source is None or not has_core_demand(facts): unparsed.append((source.ticker, source.source_id)) continue upsert_demand(conn, source.ticker, parse_source.source_id, parse_source.source_date, facts, as_of) parsed += 1 for table in ["ipo_demand", "source_refs"]: export_snapshot(conn, table, "ticker" if table != "source_refs" else "source_id") extract_text_for_sources(args.db, extracted_source_ids) if not args.skip_sync_state and (parsed or downloaded_sources): refresh_sync_state(args.db, args.schema, as_of) elif not parsed and not downloaded_sources: print("sync state unchanged: no source-only T1 demand gaps were updated") print("t1 demand backfill complete") print(f"source-only gaps selected: {selected}") print(f"ipo_demand rows written: {parsed}") print(f"new linked/primary PDFs archived: {len(downloaded_sources)}") print(f"new PDF text extractions requested: {len(set(extracted_source_ids))}") if unparsed: print("unparsed source-only gaps:") for ticker, source_id in unparsed: print(f"- {ticker}: {source_id}") return 0 if __name__ == "__main__": raise SystemExit(main())