Improve IPO archive gap handling

Request:
- Rework archivist handling for stubborn T0/T1 HKEX document gaps and unresolved T2 grey-market gaps.

Changes:
- Query HKEXnews titleSearchServlet with IPO-date windows instead of only the latest title-search page.
- Recognize SHARE OFFER listing documents and archive official HTML allotment-result notices when no PDF is published.
- Mark source-only allotment completion clearly when structured demand parsing is not yet covered.
- Add a reusable grey-market gap marker and archivist source policy for T2 data.
- Archive newly discovered HKEX raw sources, update SQLite, and refresh CSV snapshots.
- Treat raw evidence files as binary in Git attributes.

Verification:
- Ran py_compile for archive_hkex_documents.py, update_sync_state.py, and mark_grey_market_gaps.py.
- Ran HKEX document archive backfill and grey-market gap marker.
- Checked SQLite integrity, foreign keys, source paths, source hashes, and DB-vs-snapshot row counts.
- Ran git diff --cached --check after marking raw archives binary.

Next useful context:
- T0 is now complete for 293 tickers.
- T1 has 291 complete and 2 pending_not_due tickers.
- T2 has 291 blocked gaps pending an approved grey-market source strategy.
This commit is contained in:
2026-06-15 09:47:36 +00:00
parent 078f56998b
commit 5f9546b16c
184 changed files with 1204626 additions and 2857 deletions
+85 -9
View File
@@ -14,9 +14,9 @@ import sqlite3
import subprocess
import sys
from dataclasses import dataclass
from datetime import date, datetime, timezone
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from urllib.parse import urljoin
from urllib.parse import urlencode, urljoin
from urllib.request import Request, urlopen
from pypdf import PdfReader
@@ -28,6 +28,7 @@ BASE_URL = "https://www1.hkexnews.hk"
ACTIVE_STOCK_URL = f"{BASE_URL}/ncms/script/eds/activestock_sehk_e.json"
INACTIVE_STOCK_URL = f"{BASE_URL}/ncms/script/eds/inactivestock_sehk_e.json"
TITLE_SEARCH_URL = f"{BASE_URL}/search/titlesearch.xhtml"
TITLE_SEARCH_SERVLET_URL = f"{BASE_URL}/search/titleSearchServlet.do"
DB_PATH = Path("data/hk_ipo.sqlite")
SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
SNAPSHOT_DIR = Path("data/snapshots")
@@ -130,7 +131,11 @@ def parse_release_date(value: str) -> str:
return datetime.strptime(value.split()[0], "%d/%m/%Y").date().isoformat()
def title_search_rows(stock_id: int) -> list[DocumentRow]:
def parse_release_datetime(value: str) -> str:
return datetime.strptime(value, "%d/%m/%Y %H:%M").date().isoformat()
def latest_title_search_rows(stock_id: int) -> list[DocumentRow]:
url = f"{TITLE_SEARCH_URL}?category=0&market=SEHK&stockId={stock_id}"
page = fetch_bytes(url).decode("utf-8", "replace")
rows: list[DocumentRow] = []
@@ -155,6 +160,65 @@ def title_search_rows(stock_id: int) -> list[DocumentRow]:
return rows
def window_title_search_rows(stock_id: int, from_date: date, to_date: date) -> list[DocumentRow]:
params = {
"sortDir": "0",
"sortByOptions": "DateTime",
"category": "0",
"market": "SEHK",
"stockId": str(stock_id),
"documentType": "-1",
"fromDate": from_date.strftime("%Y%m%d"),
"toDate": to_date.strftime("%Y%m%d"),
"title": "",
"searchType": "0",
"t1code": "-2",
"t2Gcode": "-2",
"t2code": "-2",
"rowRange": "500",
"lang": "en",
}
url = f"{TITLE_SEARCH_SERVLET_URL}?{urlencode(params)}"
request = Request(
url,
headers={
"User-Agent": "Mozilla/5.0",
"Referer": f"{TITLE_SEARCH_URL}?category=0&market=SEHK&stockId={stock_id}",
},
)
with urlopen(request, timeout=60) as response:
payload = response.read().decode("utf-8", "replace")
response_data = json.loads(payload)
result = json.loads(response_data.get("result") or "[]")
rows: list[DocumentRow] = []
for item in result:
href = html.unescape(item.get("FILE_LINK") or "")
release_time = " ".join((item.get("DATE_TIME") or "").split())
if not href or not release_time:
continue
rows.append(
DocumentRow(
release_time=release_time,
release_date=parse_release_datetime(release_time),
headline=clean_html(item.get("SHORT_TEXT") or ""),
title=clean_html(item.get("TITLE") or ""),
href=href,
url=urljoin(BASE_URL, href),
)
)
return rows
def title_search_rows(stock_id: int, listing_date: str | None, prospectus_date: str | None) -> list[DocumentRow]:
listed = parse_iso_date(listing_date)
prospectus = parse_iso_date(prospectus_date)
if listed:
return window_title_search_rows(stock_id, listed - timedelta(days=90), listed + timedelta(days=14))
if prospectus:
return window_title_search_rows(stock_id, prospectus - timedelta(days=14), prospectus + timedelta(days=60))
return latest_title_search_rows(stock_id)
def parse_iso_date(value: str | None) -> date | None:
if not value:
return None
@@ -165,6 +229,10 @@ def date_distance(left: str, right: str) -> int:
return abs((date.fromisoformat(left) - date.fromisoformat(right)).days)
def archiveable_document(row: DocumentRow) -> bool:
return Path(row.href.lower()).suffix in {".pdf", ".htm", ".html"}
def choose_prospectus(rows: list[DocumentRow], prospectus_date: str | None, listing_date: str | None) -> DocumentRow | None:
candidates = []
for row in rows:
@@ -174,7 +242,7 @@ def choose_prospectus(rows: list[DocumentRow], prospectus_date: str | None, list
continue
if "listing documents" not in headline:
continue
if "global offering" in title or "prospectus" in title:
if "global offering" in title or "prospectus" in title or title in {"share offer", "public offer"}:
candidates.append(row)
if not candidates:
return None
@@ -196,7 +264,7 @@ def choose_allotment(rows: list[DocumentRow], listing_date: str | None) -> Docum
candidates = [
row
for row in rows
if row.href.lower().endswith(".pdf")
if archiveable_document(row)
and ("allotment results" in row.headline.lower() or "allotment results" in row.title.lower())
]
if not candidates:
@@ -708,8 +776,8 @@ def main() -> int:
if stock_id is None:
missing_stock_ids.append(ticker)
continue
rows = title_search_rows(stock_id)
listing_date, prospectus_date = ticker_dates(conn, ticker)
rows = title_search_rows(stock_id, listing_date, prospectus_date)
prospectus_row = choose_prospectus(rows, prospectus_date, listing_date)
allotment_row = choose_allotment(rows, listing_date)
if not prospectus_row and not allotment_row:
@@ -733,9 +801,17 @@ def main() -> int:
if allotment_row:
allotment_source = download_document(ticker, "allotment_results", allotment_row)
sources_for_ticker.append(allotment_source)
allotment_facts = parse_allotment_facts(allotment_source.local_path)
update_terms_from_allotment(conn, ticker, allotment_facts, as_of)
upsert_demand(conn, ticker, allotment_source.source_id, allotment_source.source_date, allotment_facts, as_of)
if Path(allotment_source.local_path).suffix.lower() == ".pdf":
allotment_facts = parse_allotment_facts(allotment_source.local_path)
update_terms_from_allotment(conn, ticker, allotment_facts, as_of)
upsert_demand(
conn,
ticker,
allotment_source.source_id,
allotment_source.source_date,
allotment_facts,
as_of,
)
upsert_source_refs(conn, sources_for_ticker, as_of)
archived_sources.extend(sources_for_ticker)
+129
View File
@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""Mark due T2 grey-market tasks as blocked until a reliable source is approved."""
from __future__ import annotations
import argparse
import csv
import sqlite3
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
DEFAULT_DB_PATH = Path("data/hk_ipo.sqlite")
DEFAULT_SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
SNAPSHOT_DIR = Path("data/snapshots")
def parse_as_of(value: str | None) -> str:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None:
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}")
columns = [description[0] for description in cursor.description]
with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle:
writer = csv.writer(handle, lineterminator="\n")
writer.writerow(columns)
writer.writerows(cursor.fetchall())
def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None:
subprocess.run(
[
sys.executable,
"scripts/update_sync_state.py",
"--db",
db_path,
"--schema",
schema_path,
"--as-of",
as_of,
"--mode",
"grey_market_gap_review",
"--summary-limit",
"25",
],
check=True,
)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Repo-relative SQLite database path.")
parser.add_argument("--schema", default=str(DEFAULT_SCHEMA_PATH), help="Repo-relative schema path.")
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh sync state after marking gaps.")
args = parser.parse_args()
as_of = parse_as_of(args.as_of)
reason = (
"No reproducible and redistribution-safe grey-market data source has been approved. "
"HKEX does not publish an official grey-market feed; broker and third-party grey-market feeds "
"are platform-specific or proprietary."
)
notes = (
"Keep T2 blocked until the project has a licensed export, user-provided evidence file, "
"or public historical source with clear reuse terms. Do not scrape proprietary grey-market "
"feeds into the repo."
)
with sqlite3.connect(args.db) as conn:
conn.executescript(Path(args.schema).read_text(encoding="utf-8"))
tickers = [
row[0]
for row in conn.execute(
"""
SELECT ticker
FROM ticker_sync_state
WHERE stage = 'T2_grey_market'
AND status = 'pending_due'
ORDER BY ticker
"""
)
]
rows = [
(
f"{ticker}_T2_grey_market_source_strategy_required",
ticker,
"T2_grey_market",
"grey_market_price_performance",
reason,
None,
as_of,
notes,
)
for ticker in tickers
]
conn.executemany(
"""
INSERT INTO data_gaps (
gap_id, ticker, stage, field_name, reason, expected_resolution_date, created_at, notes
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(gap_id) DO UPDATE SET
field_name = excluded.field_name,
reason = excluded.reason,
expected_resolution_date = excluded.expected_resolution_date,
created_at = excluded.created_at,
notes = excluded.notes
""",
rows,
)
export_snapshot(conn, "data_gaps")
if not args.skip_sync_state:
refresh_sync_state(args.db, args.schema, as_of)
print("grey-market gaps marked")
print(f"tickers marked: {len(rows)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+15 -3
View File
@@ -172,18 +172,30 @@ def allotment_state(conn: sqlite3.Connection, ticker: Ticker, as_of_date: date)
""",
(ticker.ticker,),
)
if demand or source:
if demand:
return StageState(
ticker.ticker,
"T1_allotment",
"complete",
1,
ticker.allotment_results_expected_date,
(demand or source)["stage_date" if demand else "source_date"],
(demand or source)["source_id"],
demand["stage_date"],
demand["source_id"],
None,
"Allotment result facts are archived.",
)
if source:
return StageState(
ticker.ticker,
"T1_allotment",
"complete",
1,
ticker.allotment_results_expected_date,
source["source_date"],
source["source_id"],
None,
"Allotment result source is archived; structured demand facts need parser coverage.",
)
gap = data_gap_for(conn, ticker.ticker, "T1_allotment")
status = "blocked" if gap and gap["expected_resolution_date"] is None else due_status(ticker.allotment_results_expected_date, as_of_date)
return StageState(