6d05056609
Request: - Use archivist to close the 137 T1 ipo_demand source-only gaps using extracted PDF text. Changes: - Add an incremental T1 demand text backfill script. - Parse existing allotment-result extracted text into ipo_demand. - Archive linked Summary PDFs from old HKEX HTML allotment-result pages. - Correct allotment-result selection to prefer primary result announcements over clarification or supplemental notices. - Add robust line-aware allotment parsing and document the workflow in archivist and README. - Record the backfill result in a report. Execution: - Selected 137 source-only T1 demand gaps. - Wrote 137 ipo_demand rows, increasing ipo_demand from 154 to 291 rows. - Archived 38 new HKEX allotment-result PDFs and extracted their text. - Confirmed an incremental rerun selects 0 gaps and writes 0 rows. Verification: - Ran git diff --cached --check. - Ran py_compile for archive_hkex_documents.py and backfill_t1_demand_from_text.py. - Checked SQLite integrity and foreign keys. - Confirmed DB row counts match CSV snapshots. - Verified no T1 complete row is missing ipo_demand. - Verified source_refs paths/files/hashes and PDF extracted-text manifest hashes. Next useful context: - T1 demand structure is complete for listed rows; 06106 and 06675 remain pending_not_due. - T2 grey-market and due price-performance gaps remain separate archivist priorities. - Analyst output should be regenerated before using the new T1 demand facts for scoring.
290 lines
10 KiB
Python
290 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""Backfill structured T1 allotment demand facts from archived text evidence."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import hashlib
|
|
import html
|
|
import re
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from urllib.parse import urljoin
|
|
|
|
from archive_hkex_documents import (
|
|
ArchivedSource,
|
|
AllotmentFacts,
|
|
choose_allotment,
|
|
download_document,
|
|
export_snapshot,
|
|
fetch_bytes,
|
|
first_pdf_text_with_lines,
|
|
load_stock_ids,
|
|
parse_allotment_facts_from_text,
|
|
ticker_dates,
|
|
title_search_rows,
|
|
upsert_demand,
|
|
upsert_source_refs,
|
|
)
|
|
|
|
|
|
DB_PATH = Path("data/hk_ipo.sqlite")
|
|
SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
|
|
SNAPSHOT_DIR = Path("data/snapshots")
|
|
EXTRACTED_TEXT_MANIFEST = SNAPSHOT_DIR / "extracted_text_manifest.csv"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DemandGapSource:
|
|
ticker: str
|
|
company_name_en: str
|
|
source_id: str
|
|
source_type: str
|
|
title: str
|
|
local_path: str
|
|
url: str
|
|
file_sha256: str
|
|
source_date: str
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.")
|
|
parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.")
|
|
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
|
|
parser.add_argument("--tickers", help="Comma-separated tickers to process instead of all source-only T1 gaps.")
|
|
parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh sync state after updating facts.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def parse_as_of(value: str | None) -> str:
|
|
if value:
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
|
|
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def sha256_bytes(data: bytes) -> str:
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
|
|
def load_manifest() -> dict[str, dict[str, str]]:
|
|
if not EXTRACTED_TEXT_MANIFEST.exists():
|
|
return {}
|
|
with EXTRACTED_TEXT_MANIFEST.open(newline="", encoding="utf-8") as handle:
|
|
return {row["source_id"]: row for row in csv.DictReader(handle)}
|
|
|
|
|
|
def select_source_only_gaps(conn: sqlite3.Connection, tickers: str | None) -> list[DemandGapSource]:
|
|
ticker_filter = ""
|
|
params: list[object] = []
|
|
if tickers:
|
|
selected = [ticker.strip().zfill(5) for ticker in tickers.split(",") if ticker.strip()]
|
|
placeholders = ", ".join("?" for _ in selected)
|
|
ticker_filter = f" AND s.ticker IN ({placeholders})"
|
|
params.extend(selected)
|
|
rows = conn.execute(
|
|
f"""
|
|
SELECT
|
|
s.ticker,
|
|
m.company_name_en,
|
|
sr.source_id,
|
|
sr.source_type,
|
|
sr.title,
|
|
sr.local_path,
|
|
sr.url,
|
|
sr.file_sha256,
|
|
sr.source_date
|
|
FROM ticker_sync_state s
|
|
JOIN ipo_master m ON m.ticker = s.ticker
|
|
JOIN source_refs sr ON sr.source_id = s.last_source_id
|
|
LEFT JOIN ipo_demand d ON d.ticker = s.ticker
|
|
WHERE s.stage = 'T1_allotment'
|
|
AND s.status = 'complete'
|
|
AND d.ticker IS NULL
|
|
AND sr.source_type = 'allotment_results'
|
|
{ticker_filter}
|
|
ORDER BY s.ticker
|
|
""",
|
|
params,
|
|
).fetchall()
|
|
return [DemandGapSource(**dict(row)) for row in rows]
|
|
|
|
|
|
def local_text_for_pdf(source: ArchivedSource | DemandGapSource, manifest: dict[str, dict[str, str]]) -> str:
|
|
manifest_row = manifest.get(source.source_id)
|
|
if manifest_row:
|
|
text_path = Path(manifest_row["text_local_path"])
|
|
if text_path.exists():
|
|
return text_path.read_text(encoding="utf-8")
|
|
return first_pdf_text_with_lines(source.local_path, 24)
|
|
|
|
|
|
def has_core_demand(facts: AllotmentFacts) -> bool:
|
|
return bool(
|
|
facts.valid_applications
|
|
and facts.public_oversubscription_times
|
|
and (facts.successful_applications or facts.international_placees or facts.international_oversubscription_times)
|
|
)
|
|
|
|
|
|
def is_secondary_allotment_notice(source: DemandGapSource) -> bool:
|
|
title = source.title.lower()
|
|
return "clarification" in title or "supplemental" in title
|
|
|
|
|
|
def primary_allotment_source(
|
|
conn: sqlite3.Connection,
|
|
stock_ids: dict[str, int],
|
|
source: DemandGapSource,
|
|
as_of: str,
|
|
) -> ArchivedSource | None:
|
|
stock_id = stock_ids.get(source.ticker)
|
|
if stock_id is None:
|
|
return None
|
|
listing_date, prospectus_date = ticker_dates(conn, source.ticker)
|
|
row = choose_allotment(title_search_rows(stock_id, listing_date, prospectus_date), listing_date)
|
|
if row is None:
|
|
return None
|
|
if row.url == source.url:
|
|
return None
|
|
archived = download_document(source.ticker, "allotment_results", row)
|
|
upsert_source_refs(conn, [archived], as_of)
|
|
return archived
|
|
|
|
|
|
def summary_pdf_source(source: DemandGapSource, as_of: str) -> ArchivedSource | None:
|
|
page = Path(source.local_path).read_text(encoding="utf-8", errors="replace")
|
|
links: list[tuple[str, str]] = []
|
|
for match in re.finditer(r'<a[^>]+href="([^"]+)"[^>]*>(.*?)</a>', page, flags=re.I | re.S):
|
|
label = " ".join(re.sub(r"<.*?>", " ", html.unescape(match.group(2))).split())
|
|
href = html.unescape(match.group(1))
|
|
if "summary" in label.lower():
|
|
links.append((label, href))
|
|
if not links:
|
|
return None
|
|
label, href = links[0]
|
|
url = urljoin(source.url, href)
|
|
data = fetch_bytes(url)
|
|
doc_id = Path(href).stem
|
|
local_path = Path("data/raw") / source.ticker / f"allotment_results_summary_{source.source_date}_{doc_id}.pdf"
|
|
local_path.parent.mkdir(parents=True, exist_ok=True)
|
|
if not local_path.exists() or local_path.read_bytes() != data:
|
|
local_path.write_bytes(data)
|
|
return ArchivedSource(
|
|
source_id=f"{source.ticker}_allotment_results_summary_{source.source_date.replace('-', '_')}_{doc_id}",
|
|
ticker=source.ticker,
|
|
source_type="allotment_results",
|
|
title=f"{source.title} - {label}",
|
|
local_path=local_path.as_posix(),
|
|
url=url,
|
|
file_sha256=sha256_bytes(data),
|
|
source_date=source.source_date,
|
|
notes=f"HKEXnews allotment-results summary PDF linked from {source.source_id}.",
|
|
)
|
|
|
|
|
|
def extract_text_for_sources(db_path: str, source_ids: list[str]) -> None:
|
|
if not source_ids:
|
|
return
|
|
command = [sys.executable, "scripts/extract_pdf_text.py", "--db", db_path]
|
|
for source_id in sorted(set(source_ids)):
|
|
command.extend(["--source-id", source_id])
|
|
subprocess.run(command, check=True)
|
|
|
|
|
|
def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None:
|
|
subprocess.run(
|
|
[
|
|
sys.executable,
|
|
"scripts/update_sync_state.py",
|
|
"--db",
|
|
db_path,
|
|
"--schema",
|
|
schema_path,
|
|
"--as-of",
|
|
as_of,
|
|
"--mode",
|
|
"t1_demand_text_backfill",
|
|
"--summary-limit",
|
|
"25",
|
|
],
|
|
check=True,
|
|
)
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
as_of = parse_as_of(args.as_of)
|
|
manifest = load_manifest()
|
|
stock_ids: dict[str, int] = {}
|
|
selected = 0
|
|
parsed = 0
|
|
downloaded_sources: list[ArchivedSource] = []
|
|
extracted_source_ids: list[str] = []
|
|
unparsed: list[tuple[str, str]] = []
|
|
|
|
with sqlite3.connect(args.db) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
conn.executescript(Path(args.schema).read_text(encoding="utf-8"))
|
|
gaps = select_source_only_gaps(conn, args.tickers)
|
|
selected = len(gaps)
|
|
for index, source in enumerate(gaps, start=1):
|
|
print(f"[{index}/{len(gaps)}] {source.ticker}", flush=True)
|
|
parse_source: ArchivedSource | DemandGapSource | None = source
|
|
if Path(source.local_path).suffix.lower() in {".htm", ".html"}:
|
|
parse_source = summary_pdf_source(source, as_of)
|
|
if parse_source is not None:
|
|
upsert_source_refs(conn, [parse_source], as_of)
|
|
downloaded_sources.append(parse_source)
|
|
extracted_source_ids.append(parse_source.source_id)
|
|
text = ""
|
|
if parse_source is not None and Path(parse_source.local_path).suffix.lower() == ".pdf":
|
|
text = local_text_for_pdf(parse_source, manifest)
|
|
facts = parse_allotment_facts_from_text(text) if text else AllotmentFacts()
|
|
|
|
if not has_core_demand(facts) and is_secondary_allotment_notice(source):
|
|
if not stock_ids:
|
|
stock_ids = load_stock_ids()
|
|
primary = primary_allotment_source(conn, stock_ids, source, as_of)
|
|
if primary is not None:
|
|
downloaded_sources.append(primary)
|
|
extracted_source_ids.append(primary.source_id)
|
|
parse_source = primary
|
|
facts = parse_allotment_facts_from_text(first_pdf_text_with_lines(primary.local_path, 24))
|
|
|
|
if parse_source is None or not has_core_demand(facts):
|
|
unparsed.append((source.ticker, source.source_id))
|
|
continue
|
|
|
|
upsert_demand(conn, source.ticker, parse_source.source_id, parse_source.source_date, facts, as_of)
|
|
parsed += 1
|
|
|
|
for table in ["ipo_demand", "source_refs"]:
|
|
export_snapshot(conn, table, "ticker" if table != "source_refs" else "source_id")
|
|
|
|
extract_text_for_sources(args.db, extracted_source_ids)
|
|
if not args.skip_sync_state and (parsed or downloaded_sources):
|
|
refresh_sync_state(args.db, args.schema, as_of)
|
|
elif not parsed and not downloaded_sources:
|
|
print("sync state unchanged: no source-only T1 demand gaps were updated")
|
|
|
|
print("t1 demand backfill complete")
|
|
print(f"source-only gaps selected: {selected}")
|
|
print(f"ipo_demand rows written: {parsed}")
|
|
print(f"new linked/primary PDFs archived: {len(downloaded_sources)}")
|
|
print(f"new PDF text extractions requested: {len(set(extracted_source_ids))}")
|
|
if unparsed:
|
|
print("unparsed source-only gaps:")
|
|
for ticker, source_id in unparsed:
|
|
print(f"- {ticker}: {source_id}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|