Backfill structured T1 demand from archived text

Request:
- Use archivist to close the 137 T1 ipo_demand source-only gaps using extracted PDF text.

Changes:
- Add an incremental T1 demand text backfill script.
- Parse existing allotment-result extracted text into ipo_demand.
- Archive linked Summary PDFs from old HKEX HTML allotment-result pages.
- Correct allotment-result selection to prefer primary result announcements over clarification or supplemental notices.
- Add robust line-aware allotment parsing and document the workflow in archivist and README.
- Record the backfill result in a report.

Execution:
- Selected 137 source-only T1 demand gaps.
- Wrote 137 ipo_demand rows, increasing ipo_demand from 154 to 291 rows.
- Archived 38 new HKEX allotment-result PDFs and extracted their text.
- Confirmed an incremental rerun selects 0 gaps and writes 0 rows.

Verification:
- Ran git diff --cached --check.
- Ran py_compile for archive_hkex_documents.py and backfill_t1_demand_from_text.py.
- Checked SQLite integrity and foreign keys.
- Confirmed DB row counts match CSV snapshots.
- Verified no T1 complete row is missing ipo_demand.
- Verified source_refs paths/files/hashes and PDF extracted-text manifest hashes.

Next useful context:
- T1 demand structure is complete for listed rows; 06106 and 06675 remain pending_not_due.
- T2 grey-market and due price-performance gaps remain separate archivist priorities.
- Analyst output should be regenerated before using the new T1 demand facts for scoring.
This commit is contained in:
2026-06-15 13:59:06 +00:00
parent 33d0bc056e
commit 6d05056609
88 changed files with 55785 additions and 2456 deletions
+241 -1
View File
@@ -279,7 +279,19 @@ def choose_allotment(rows: list[DocumentRow], listing_date: str | None) -> Docum
]
if windowed:
candidates = windowed
return sorted(candidates, key=lambda row: row.release_date)[-1]
def allotment_rank(row: DocumentRow) -> tuple[int, int, str]:
title = row.title.lower()
if "clarification" in title or "supplemental" in title:
quality = 0
elif "allotment results" in title and ("final offer price" in title or title.startswith("announcement of allotment")):
quality = 2
else:
quality = 1
distance = date_distance(row.release_date, listing_date) if listing_date else 0
return quality, -distance, row.release_date
return sorted(candidates, key=allotment_rank)[-1]
def sha256_bytes(data: bytes) -> str:
@@ -315,6 +327,14 @@ def first_pdf_text(local_path: str, max_pages: int) -> str:
return " ".join(" ".join(chunks).split())
def first_pdf_text_with_lines(local_path: str, max_pages: int) -> str:
reader = PdfReader(local_path)
chunks = []
for page in reader.pages[: min(max_pages, len(reader.pages))]:
chunks.append(page.extract_text() or "")
return "\n".join(chunks)
def normalize_pdf_text(text: str) -> str:
replacements = {
"H o n g K o n g P u b l i c O f f e r i n g c o m m e n c e s": "Hong Kong Public Offering commences",
@@ -357,6 +377,17 @@ def money_m_after(pattern: str, text: str) -> float | None:
return amount
def strict_money_m_after(pattern: str, text: str) -> float | None:
match = re.search(pattern, text, flags=re.I)
if not match:
return None
amount = float(match.group(1).replace(",", ""))
unit = match.group(2).lower()
if unit.startswith("b"):
return amount * 1000
return amount
def date_after(label_pattern: str, text: str) -> str | None:
match = re.search(
label_pattern
@@ -444,7 +475,216 @@ def allotment_detail_sections(text: str) -> tuple[str, str]:
return hk_section, intl_section
def normalized_lines(text: str) -> list[str]:
return [" ".join(line.replace("\xa0", " ").split()) for line in text.splitlines() if line.strip()]
def number_from_text(value: str) -> int | None:
match = re.search(r"([\d][\d,]*)", value)
if not match:
return None
return int(match.group(1).replace(",", ""))
def float_from_text(value: str) -> float | None:
match = re.search(r"([\d][\d,]*(?:\.\d+)?)", value)
if not match:
return None
return float(match.group(1).replace(",", ""))
def value_after_line_label(
lines: list[str],
label_patterns: list[str],
*,
value_type: str = "int",
max_lines: int = 6,
) -> int | float | None:
for index in range(len(lines)):
window = " ".join(lines[index : index + max_lines])
for label_pattern in label_patterns:
match = re.search(label_pattern, window, flags=re.I)
if not match:
continue
tail = window[match.end() :]
if value_type == "times":
times_match = re.search(r"([\d][\d,]*(?:\.\d+)?)\s*times", tail, flags=re.I)
if times_match:
return float(times_match.group(1).replace(",", ""))
return float_from_text(tail)
return number_from_text(tail)
return None
def section_lines(
lines: list[str],
start_patterns: list[str],
end_patterns: list[str],
*,
start_index: int = 0,
) -> list[str]:
section_start = None
for index in range(start_index, len(lines)):
if any(re.search(pattern, lines[index], flags=re.I) for pattern in start_patterns):
section_start = index
break
if section_start is None:
return []
section_end = len(lines)
for index in range(section_start + 1, len(lines)):
if any(re.search(pattern, lines[index], flags=re.I) for pattern in end_patterns):
section_end = index
break
return lines[section_start:section_end]
def allotment_detail_line_sections(text: str) -> tuple[list[str], list[str], list[str]]:
lines = normalized_lines(text)
detail_start = 0
for index, line in enumerate(lines[:700]):
if re.search(r"ALLOTMENT RESULTS DETAILS|APPLICATIONS AND INDICATIONS", line, flags=re.I):
detail_start = index
break
detail_lines = lines[detail_start : detail_start + 320]
public_lines = section_lines(
detail_lines,
[r"^(HONG KONG )?PUBLIC OFFER", r"PUBLIC OFFER SHARES"],
[r"^INTERNATIONAL OFFER", r"^PLACING$", r"^EMPLOYEE PREFERENTIAL OFFERING"],
)
international_lines = section_lines(
detail_lines,
[r"^INTERNATIONAL OFFER", r"^PLACING$"],
[
r"^EMPLOYEE PREFERENTIAL OFFERING",
r"^THE DIRECTORS CONFIRM",
r"^THE PLACEES",
r"^LOCK-UP",
r"^BASIS OF ALLOCATION",
],
)
if not public_lines:
for index, line in enumerate(detail_lines):
if re.search(r"No\. of valid applications|Number of valid applications", line, flags=re.I):
public_lines = detail_lines[max(0, index - 10) : index + 70]
break
if not international_lines:
for index, line in enumerate(detail_lines):
if re.search(r"No\. of placees|Number of placees", line, flags=re.I):
international_lines = detail_lines[max(0, index - 10) : index + 90]
break
return detail_lines, public_lines, international_lines
def parse_allotment_facts_from_text(text: str) -> AllotmentFacts:
flat_text = " ".join(text.split())
detail_lines, public_lines, international_lines = allotment_detail_line_sections(text)
public_text = " ".join(public_lines)
international_text = " ".join(international_lines)
if not public_text:
public_text = flat_text
if not international_text:
international_text = flat_text
valid_applications = value_after_line_label(
public_lines,
[r"No\. of valid applications", r"Number of valid applications"],
)
if valid_applications is None:
valid_applications = integer_after(r"A total of\s+([\d,]+)\s+valid applications", flat_text)
successful_applications = value_after_line_label(
public_lines,
[r"No\. of successful applications", r"Number of successful applications"],
)
if successful_applications is None:
successful_applications = integer_after(r"allocated to\s+([\d,]+)\s+successful applicants", flat_text)
public_oversubscription_times = value_after_line_label(
public_lines,
[r"Subscription [Ll]evel"],
value_type="times",
)
if public_oversubscription_times is None:
public_oversubscription_times = float_after(
r"representing approximately\s+([\d,.]+)\s+times.*?(?:Public Offer|Hong Kong Public Offering)",
flat_text,
)
international_placees = value_after_line_label(
international_lines,
[r"No\. of placees", r"Number of placees"],
)
if international_placees is None:
international_placees = integer_after(
r"(?:There (?:are|is) a total of|total of)\s+([\d,]+)\s+placees",
international_text,
)
international_oversubscription_times = value_after_line_label(
international_lines,
[r"Subscription [Ll]evel"],
value_type="times",
)
if international_oversubscription_times is None:
international_oversubscription_times = float_after(
r"representing approximately\s+([\d,.]+)\s+times.*?(?:Placing|International Offer)",
international_text,
)
final_hk_offer_shares = value_after_line_label(
public_lines,
[r"Final no\. of Offer Shares under the (?:Hong Kong )?Public Offer(?:ing)?"],
)
if final_hk_offer_shares is None:
final_hk_offer_shares = integer_after(
r"final number of Offer Shares under the (?:Public Offer|Hong Kong Public Offering) is\s+([\d,]+)",
flat_text,
)
final_international_offer_shares = value_after_line_label(
international_lines,
[r"Final no\. of Offer Shares under the International Offer(?:ing)?", r"Final no\. of Offer Shares under the Placing"],
)
if final_international_offer_shares is None:
final_international_offer_shares = integer_after(
r"final number of Offer Shares under (?:the )?(?:Placing|International Offer(?:ing)?) is\s+([\d,]+)",
international_text,
)
return AllotmentFacts(
final_offer_price_hkd=float_after(r"Final Offer Price\s*:?\s*HK\$?([\d,.]+)", flat_text),
gross_proceeds_hkd_m=strict_money_m_after(r"Gross proceeds.{0,300}?HK\$([\d,.]+)\s*(million|billion)", flat_text),
net_proceeds_hkd_m=strict_money_m_after(r"Net proceeds.{0,500}?HK\$([\d,.]+)\s*(million|billion)", flat_text),
issued_shares_upon_listing=integer_after(r"Number of issued shares upon Listing.*?([\d,]+)", flat_text),
valid_applications=valid_applications if isinstance(valid_applications, int) else None,
successful_applications=successful_applications if isinstance(successful_applications, int) else None,
public_oversubscription_times=public_oversubscription_times if isinstance(public_oversubscription_times, float) else None,
international_placees=international_placees if isinstance(international_placees, int) else None,
international_oversubscription_times=(
international_oversubscription_times if isinstance(international_oversubscription_times, float) else None
),
final_hk_offer_shares=final_hk_offer_shares if isinstance(final_hk_offer_shares, int) else None,
final_international_offer_shares=(
final_international_offer_shares if isinstance(final_international_offer_shares, int) else None
),
)
def parse_allotment_facts(local_path: str) -> AllotmentFacts:
text_with_lines = first_pdf_text_with_lines(local_path, 12)
facts = parse_allotment_facts_from_text(text_with_lines)
if any(
[
facts.valid_applications,
facts.successful_applications,
facts.public_oversubscription_times,
facts.international_placees,
facts.international_oversubscription_times,
]
):
return facts
text = first_pdf_text(local_path, 8)
hk_section, intl_section = allotment_detail_sections(text)
return AllotmentFacts(
+289
View File
@@ -0,0 +1,289 @@
#!/usr/bin/env python3
"""Backfill structured T1 allotment demand facts from archived text evidence."""
from __future__ import annotations
import argparse
import csv
import hashlib
import html
import re
import sqlite3
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urljoin
from archive_hkex_documents import (
ArchivedSource,
AllotmentFacts,
choose_allotment,
download_document,
export_snapshot,
fetch_bytes,
first_pdf_text_with_lines,
load_stock_ids,
parse_allotment_facts_from_text,
ticker_dates,
title_search_rows,
upsert_demand,
upsert_source_refs,
)
DB_PATH = Path("data/hk_ipo.sqlite")
SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
SNAPSHOT_DIR = Path("data/snapshots")
EXTRACTED_TEXT_MANIFEST = SNAPSHOT_DIR / "extracted_text_manifest.csv"
@dataclass(frozen=True)
class DemandGapSource:
ticker: str
company_name_en: str
source_id: str
source_type: str
title: str
local_path: str
url: str
file_sha256: str
source_date: str
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.")
parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.")
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
parser.add_argument("--tickers", help="Comma-separated tickers to process instead of all source-only T1 gaps.")
parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh sync state after updating facts.")
return parser.parse_args()
def parse_as_of(value: str | None) -> str:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def load_manifest() -> dict[str, dict[str, str]]:
if not EXTRACTED_TEXT_MANIFEST.exists():
return {}
with EXTRACTED_TEXT_MANIFEST.open(newline="", encoding="utf-8") as handle:
return {row["source_id"]: row for row in csv.DictReader(handle)}
def select_source_only_gaps(conn: sqlite3.Connection, tickers: str | None) -> list[DemandGapSource]:
ticker_filter = ""
params: list[object] = []
if tickers:
selected = [ticker.strip().zfill(5) for ticker in tickers.split(",") if ticker.strip()]
placeholders = ", ".join("?" for _ in selected)
ticker_filter = f" AND s.ticker IN ({placeholders})"
params.extend(selected)
rows = conn.execute(
f"""
SELECT
s.ticker,
m.company_name_en,
sr.source_id,
sr.source_type,
sr.title,
sr.local_path,
sr.url,
sr.file_sha256,
sr.source_date
FROM ticker_sync_state s
JOIN ipo_master m ON m.ticker = s.ticker
JOIN source_refs sr ON sr.source_id = s.last_source_id
LEFT JOIN ipo_demand d ON d.ticker = s.ticker
WHERE s.stage = 'T1_allotment'
AND s.status = 'complete'
AND d.ticker IS NULL
AND sr.source_type = 'allotment_results'
{ticker_filter}
ORDER BY s.ticker
""",
params,
).fetchall()
return [DemandGapSource(**dict(row)) for row in rows]
def local_text_for_pdf(source: ArchivedSource | DemandGapSource, manifest: dict[str, dict[str, str]]) -> str:
manifest_row = manifest.get(source.source_id)
if manifest_row:
text_path = Path(manifest_row["text_local_path"])
if text_path.exists():
return text_path.read_text(encoding="utf-8")
return first_pdf_text_with_lines(source.local_path, 24)
def has_core_demand(facts: AllotmentFacts) -> bool:
return bool(
facts.valid_applications
and facts.public_oversubscription_times
and (facts.successful_applications or facts.international_placees or facts.international_oversubscription_times)
)
def is_secondary_allotment_notice(source: DemandGapSource) -> bool:
title = source.title.lower()
return "clarification" in title or "supplemental" in title
def primary_allotment_source(
conn: sqlite3.Connection,
stock_ids: dict[str, int],
source: DemandGapSource,
as_of: str,
) -> ArchivedSource | None:
stock_id = stock_ids.get(source.ticker)
if stock_id is None:
return None
listing_date, prospectus_date = ticker_dates(conn, source.ticker)
row = choose_allotment(title_search_rows(stock_id, listing_date, prospectus_date), listing_date)
if row is None:
return None
if row.url == source.url:
return None
archived = download_document(source.ticker, "allotment_results", row)
upsert_source_refs(conn, [archived], as_of)
return archived
def summary_pdf_source(source: DemandGapSource, as_of: str) -> ArchivedSource | None:
page = Path(source.local_path).read_text(encoding="utf-8", errors="replace")
links: list[tuple[str, str]] = []
for match in re.finditer(r'<a[^>]+href="([^"]+)"[^>]*>(.*?)</a>', page, flags=re.I | re.S):
label = " ".join(re.sub(r"<.*?>", " ", html.unescape(match.group(2))).split())
href = html.unescape(match.group(1))
if "summary" in label.lower():
links.append((label, href))
if not links:
return None
label, href = links[0]
url = urljoin(source.url, href)
data = fetch_bytes(url)
doc_id = Path(href).stem
local_path = Path("data/raw") / source.ticker / f"allotment_results_summary_{source.source_date}_{doc_id}.pdf"
local_path.parent.mkdir(parents=True, exist_ok=True)
if not local_path.exists() or local_path.read_bytes() != data:
local_path.write_bytes(data)
return ArchivedSource(
source_id=f"{source.ticker}_allotment_results_summary_{source.source_date.replace('-', '_')}_{doc_id}",
ticker=source.ticker,
source_type="allotment_results",
title=f"{source.title} - {label}",
local_path=local_path.as_posix(),
url=url,
file_sha256=sha256_bytes(data),
source_date=source.source_date,
notes=f"HKEXnews allotment-results summary PDF linked from {source.source_id}.",
)
def extract_text_for_sources(db_path: str, source_ids: list[str]) -> None:
if not source_ids:
return
command = [sys.executable, "scripts/extract_pdf_text.py", "--db", db_path]
for source_id in sorted(set(source_ids)):
command.extend(["--source-id", source_id])
subprocess.run(command, check=True)
def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None:
subprocess.run(
[
sys.executable,
"scripts/update_sync_state.py",
"--db",
db_path,
"--schema",
schema_path,
"--as-of",
as_of,
"--mode",
"t1_demand_text_backfill",
"--summary-limit",
"25",
],
check=True,
)
def main() -> int:
args = parse_args()
as_of = parse_as_of(args.as_of)
manifest = load_manifest()
stock_ids: dict[str, int] = {}
selected = 0
parsed = 0
downloaded_sources: list[ArchivedSource] = []
extracted_source_ids: list[str] = []
unparsed: list[tuple[str, str]] = []
with sqlite3.connect(args.db) as conn:
conn.row_factory = sqlite3.Row
conn.executescript(Path(args.schema).read_text(encoding="utf-8"))
gaps = select_source_only_gaps(conn, args.tickers)
selected = len(gaps)
for index, source in enumerate(gaps, start=1):
print(f"[{index}/{len(gaps)}] {source.ticker}", flush=True)
parse_source: ArchivedSource | DemandGapSource | None = source
if Path(source.local_path).suffix.lower() in {".htm", ".html"}:
parse_source = summary_pdf_source(source, as_of)
if parse_source is not None:
upsert_source_refs(conn, [parse_source], as_of)
downloaded_sources.append(parse_source)
extracted_source_ids.append(parse_source.source_id)
text = ""
if parse_source is not None and Path(parse_source.local_path).suffix.lower() == ".pdf":
text = local_text_for_pdf(parse_source, manifest)
facts = parse_allotment_facts_from_text(text) if text else AllotmentFacts()
if not has_core_demand(facts) and is_secondary_allotment_notice(source):
if not stock_ids:
stock_ids = load_stock_ids()
primary = primary_allotment_source(conn, stock_ids, source, as_of)
if primary is not None:
downloaded_sources.append(primary)
extracted_source_ids.append(primary.source_id)
parse_source = primary
facts = parse_allotment_facts_from_text(first_pdf_text_with_lines(primary.local_path, 24))
if parse_source is None or not has_core_demand(facts):
unparsed.append((source.ticker, source.source_id))
continue
upsert_demand(conn, source.ticker, parse_source.source_id, parse_source.source_date, facts, as_of)
parsed += 1
for table in ["ipo_demand", "source_refs"]:
export_snapshot(conn, table, "ticker" if table != "source_refs" else "source_id")
extract_text_for_sources(args.db, extracted_source_ids)
if not args.skip_sync_state and (parsed or downloaded_sources):
refresh_sync_state(args.db, args.schema, as_of)
elif not parsed and not downloaded_sources:
print("sync state unchanged: no source-only T1 demand gaps were updated")
print("t1 demand backfill complete")
print(f"source-only gaps selected: {selected}")
print(f"ipo_demand rows written: {parsed}")
print(f"new linked/primary PDFs archived: {len(downloaded_sources)}")
print(f"new PDF text extractions requested: {len(set(extracted_source_ids))}")
if unparsed:
print("unparsed source-only gaps:")
for ticker, source_id in unparsed:
print(f"- {ticker}: {source_id}")
return 0
if __name__ == "__main__":
raise SystemExit(main())