Files
hk-ipo/scripts/backfill_t1_demand_from_text.py
geometrybase 6d05056609 Backfill structured T1 demand from archived text
Request:
- Use archivist to close the 137 T1 ipo_demand source-only gaps using extracted PDF text.

Changes:
- Add an incremental T1 demand text backfill script.
- Parse existing allotment-result extracted text into ipo_demand.
- Archive linked Summary PDFs from old HKEX HTML allotment-result pages.
- Correct allotment-result selection to prefer primary result announcements over clarification or supplemental notices.
- Add robust line-aware allotment parsing and document the workflow in archivist and README.
- Record the backfill result in a report.

Execution:
- Selected 137 source-only T1 demand gaps.
- Wrote 137 ipo_demand rows, increasing ipo_demand from 154 to 291 rows.
- Archived 38 new HKEX allotment-result PDFs and extracted their text.
- Confirmed an incremental rerun selects 0 gaps and writes 0 rows.

Verification:
- Ran git diff --cached --check.
- Ran py_compile for archive_hkex_documents.py and backfill_t1_demand_from_text.py.
- Checked SQLite integrity and foreign keys.
- Confirmed DB row counts match CSV snapshots.
- Verified no T1 complete row is missing ipo_demand.
- Verified source_refs paths/files/hashes and PDF extracted-text manifest hashes.

Next useful context:
- T1 demand structure is complete for listed rows; 06106 and 06675 remain pending_not_due.
- T2 grey-market and due price-performance gaps remain separate archivist priorities.
- Analyst output should be regenerated before using the new T1 demand facts for scoring.
2026-06-15 13:59:06 +00:00

290 lines
10 KiB
Python

#!/usr/bin/env python3
"""Backfill structured T1 allotment demand facts from archived text evidence."""
from __future__ import annotations
import argparse
import csv
import hashlib
import html
import re
import sqlite3
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urljoin
from archive_hkex_documents import (
ArchivedSource,
AllotmentFacts,
choose_allotment,
download_document,
export_snapshot,
fetch_bytes,
first_pdf_text_with_lines,
load_stock_ids,
parse_allotment_facts_from_text,
ticker_dates,
title_search_rows,
upsert_demand,
upsert_source_refs,
)
DB_PATH = Path("data/hk_ipo.sqlite")
SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
SNAPSHOT_DIR = Path("data/snapshots")
EXTRACTED_TEXT_MANIFEST = SNAPSHOT_DIR / "extracted_text_manifest.csv"
@dataclass(frozen=True)
class DemandGapSource:
ticker: str
company_name_en: str
source_id: str
source_type: str
title: str
local_path: str
url: str
file_sha256: str
source_date: str
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.")
parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.")
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
parser.add_argument("--tickers", help="Comma-separated tickers to process instead of all source-only T1 gaps.")
parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh sync state after updating facts.")
return parser.parse_args()
def parse_as_of(value: str | None) -> str:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def load_manifest() -> dict[str, dict[str, str]]:
if not EXTRACTED_TEXT_MANIFEST.exists():
return {}
with EXTRACTED_TEXT_MANIFEST.open(newline="", encoding="utf-8") as handle:
return {row["source_id"]: row for row in csv.DictReader(handle)}
def select_source_only_gaps(conn: sqlite3.Connection, tickers: str | None) -> list[DemandGapSource]:
ticker_filter = ""
params: list[object] = []
if tickers:
selected = [ticker.strip().zfill(5) for ticker in tickers.split(",") if ticker.strip()]
placeholders = ", ".join("?" for _ in selected)
ticker_filter = f" AND s.ticker IN ({placeholders})"
params.extend(selected)
rows = conn.execute(
f"""
SELECT
s.ticker,
m.company_name_en,
sr.source_id,
sr.source_type,
sr.title,
sr.local_path,
sr.url,
sr.file_sha256,
sr.source_date
FROM ticker_sync_state s
JOIN ipo_master m ON m.ticker = s.ticker
JOIN source_refs sr ON sr.source_id = s.last_source_id
LEFT JOIN ipo_demand d ON d.ticker = s.ticker
WHERE s.stage = 'T1_allotment'
AND s.status = 'complete'
AND d.ticker IS NULL
AND sr.source_type = 'allotment_results'
{ticker_filter}
ORDER BY s.ticker
""",
params,
).fetchall()
return [DemandGapSource(**dict(row)) for row in rows]
def local_text_for_pdf(source: ArchivedSource | DemandGapSource, manifest: dict[str, dict[str, str]]) -> str:
manifest_row = manifest.get(source.source_id)
if manifest_row:
text_path = Path(manifest_row["text_local_path"])
if text_path.exists():
return text_path.read_text(encoding="utf-8")
return first_pdf_text_with_lines(source.local_path, 24)
def has_core_demand(facts: AllotmentFacts) -> bool:
return bool(
facts.valid_applications
and facts.public_oversubscription_times
and (facts.successful_applications or facts.international_placees or facts.international_oversubscription_times)
)
def is_secondary_allotment_notice(source: DemandGapSource) -> bool:
title = source.title.lower()
return "clarification" in title or "supplemental" in title
def primary_allotment_source(
conn: sqlite3.Connection,
stock_ids: dict[str, int],
source: DemandGapSource,
as_of: str,
) -> ArchivedSource | None:
stock_id = stock_ids.get(source.ticker)
if stock_id is None:
return None
listing_date, prospectus_date = ticker_dates(conn, source.ticker)
row = choose_allotment(title_search_rows(stock_id, listing_date, prospectus_date), listing_date)
if row is None:
return None
if row.url == source.url:
return None
archived = download_document(source.ticker, "allotment_results", row)
upsert_source_refs(conn, [archived], as_of)
return archived
def summary_pdf_source(source: DemandGapSource, as_of: str) -> ArchivedSource | None:
page = Path(source.local_path).read_text(encoding="utf-8", errors="replace")
links: list[tuple[str, str]] = []
for match in re.finditer(r'<a[^>]+href="([^"]+)"[^>]*>(.*?)</a>', page, flags=re.I | re.S):
label = " ".join(re.sub(r"<.*?>", " ", html.unescape(match.group(2))).split())
href = html.unescape(match.group(1))
if "summary" in label.lower():
links.append((label, href))
if not links:
return None
label, href = links[0]
url = urljoin(source.url, href)
data = fetch_bytes(url)
doc_id = Path(href).stem
local_path = Path("data/raw") / source.ticker / f"allotment_results_summary_{source.source_date}_{doc_id}.pdf"
local_path.parent.mkdir(parents=True, exist_ok=True)
if not local_path.exists() or local_path.read_bytes() != data:
local_path.write_bytes(data)
return ArchivedSource(
source_id=f"{source.ticker}_allotment_results_summary_{source.source_date.replace('-', '_')}_{doc_id}",
ticker=source.ticker,
source_type="allotment_results",
title=f"{source.title} - {label}",
local_path=local_path.as_posix(),
url=url,
file_sha256=sha256_bytes(data),
source_date=source.source_date,
notes=f"HKEXnews allotment-results summary PDF linked from {source.source_id}.",
)
def extract_text_for_sources(db_path: str, source_ids: list[str]) -> None:
if not source_ids:
return
command = [sys.executable, "scripts/extract_pdf_text.py", "--db", db_path]
for source_id in sorted(set(source_ids)):
command.extend(["--source-id", source_id])
subprocess.run(command, check=True)
def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None:
subprocess.run(
[
sys.executable,
"scripts/update_sync_state.py",
"--db",
db_path,
"--schema",
schema_path,
"--as-of",
as_of,
"--mode",
"t1_demand_text_backfill",
"--summary-limit",
"25",
],
check=True,
)
def main() -> int:
args = parse_args()
as_of = parse_as_of(args.as_of)
manifest = load_manifest()
stock_ids: dict[str, int] = {}
selected = 0
parsed = 0
downloaded_sources: list[ArchivedSource] = []
extracted_source_ids: list[str] = []
unparsed: list[tuple[str, str]] = []
with sqlite3.connect(args.db) as conn:
conn.row_factory = sqlite3.Row
conn.executescript(Path(args.schema).read_text(encoding="utf-8"))
gaps = select_source_only_gaps(conn, args.tickers)
selected = len(gaps)
for index, source in enumerate(gaps, start=1):
print(f"[{index}/{len(gaps)}] {source.ticker}", flush=True)
parse_source: ArchivedSource | DemandGapSource | None = source
if Path(source.local_path).suffix.lower() in {".htm", ".html"}:
parse_source = summary_pdf_source(source, as_of)
if parse_source is not None:
upsert_source_refs(conn, [parse_source], as_of)
downloaded_sources.append(parse_source)
extracted_source_ids.append(parse_source.source_id)
text = ""
if parse_source is not None and Path(parse_source.local_path).suffix.lower() == ".pdf":
text = local_text_for_pdf(parse_source, manifest)
facts = parse_allotment_facts_from_text(text) if text else AllotmentFacts()
if not has_core_demand(facts) and is_secondary_allotment_notice(source):
if not stock_ids:
stock_ids = load_stock_ids()
primary = primary_allotment_source(conn, stock_ids, source, as_of)
if primary is not None:
downloaded_sources.append(primary)
extracted_source_ids.append(primary.source_id)
parse_source = primary
facts = parse_allotment_facts_from_text(first_pdf_text_with_lines(primary.local_path, 24))
if parse_source is None or not has_core_demand(facts):
unparsed.append((source.ticker, source.source_id))
continue
upsert_demand(conn, source.ticker, parse_source.source_id, parse_source.source_date, facts, as_of)
parsed += 1
for table in ["ipo_demand", "source_refs"]:
export_snapshot(conn, table, "ticker" if table != "source_refs" else "source_id")
extract_text_for_sources(args.db, extracted_source_ids)
if not args.skip_sync_state and (parsed or downloaded_sources):
refresh_sync_state(args.db, args.schema, as_of)
elif not parsed and not downloaded_sources:
print("sync state unchanged: no source-only T1 demand gaps were updated")
print("t1 demand backfill complete")
print(f"source-only gaps selected: {selected}")
print(f"ipo_demand rows written: {parsed}")
print(f"new linked/primary PDFs archived: {len(downloaded_sources)}")
print(f"new PDF text extractions requested: {len(set(extracted_source_ids))}")
if unparsed:
print("unparsed source-only gaps:")
for ticker, source_id in unparsed:
print(f"- {ticker}: {source_id}")
return 0
if __name__ == "__main__":
raise SystemExit(main())