Files
hk-ipo/scripts/archive_a_share_mappings.py
geometrybase 7cbdd533b0 Add A/H share-class mapping workflow
Request:
- Add a repeatable mechanism so HK IPO reports detect issuers that already have Mainland A shares.
- Include a third internet/official-exchange cross-check layer beyond structured history and prospectus scans.

Changes:
- Add listed_share_classes schema support for same-issuer A-share mappings and evidence links.
- Add scripts/archive_a_share_mappings.py to scan prospectus extracted text, reject sponsor/portfolio/cornerstone false positives, archive optional official web evidence and A-share/FX quote evidence, and export snapshots on write.
- Surface a_share_* fields in the analysis dataset and single-ticker report output.
- Update hk-ipo analyst/archivist skill rules and scheduled refresh prompt to require the three-layer A/H mapping check.

Verification:
- python3 -m py_compile scripts/archive_a_share_mappings.py scripts/build_analysis_dataset.py scripts/generate_ipo_report.py
- .venv/bin/python scripts/archive_a_share_mappings.py --as-of 2026-06-24T00:00:00Z --tickers 00668,01688,03661,09630 --dry-run
- .venv/bin/python scripts/build_analysis_dataset.py --db /tmp/hk_ipo_ah_dataset_test.sqlite --dataset /tmp/hk_ipo_ah_dataset_test.csv --report /tmp/hk_ipo_ah_model_test.md --as-of 2026-06-24T00:00:00Z
- .venv/bin/python scripts/generate_ipo_report.py 09630 --dataset /tmp/hk_ipo_ah_dataset_test.csv --stdout --as-of 2026-06-24T00:00:00Z
- git diff --check

Next useful context:
- Dry-run detected 00668->300866.SZ, 01688->002600.SZ, 03661->300661.SZ, and 09630->688630.SH.
- A false positive 01688->300476.SZ from a cornerstone investor parent was rejected by the issuer-context filter.
2026-06-24 07:21:21 +00:00

568 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Detect and archive A/H or onshore share-class mappings from prospectus text."""
from __future__ import annotations
import argparse
import csv
import hashlib
import html
import re
import sqlite3
import sys
from dataclasses import dataclass
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
DEFAULT_DB_PATH = Path("data/hk_ipo.sqlite")
DEFAULT_SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
SNAPSHOT_DIR = Path("data/snapshots")
TEXT_MANIFEST = SNAPSHOT_DIR / "extracted_text_manifest.csv"
RAW_QUOTE_DIR = Path("data/raw/a_share_quotes")
RAW_WEB_DIR = Path("data/raw/a_share_mapping_web")
YAHOO_CHART_BASE = "https://query1.finance.yahoo.com/v8/finance/chart"
@dataclass(frozen=True)
class ProspectusText:
ticker: str
source_id: str
local_path: str
text_path: Path
text: str
@dataclass(frozen=True)
class ShareClassMapping:
ticker: str
related_ticker: str
exchange: str
board: str | None
company_name: str | None
listed_date: str | None
prospectus_source_id: str
evidence_text: str
confidence: str
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Repo-relative SQLite database path.")
parser.add_argument("--schema", default=str(DEFAULT_SCHEMA_PATH), help="Repo-relative schema path.")
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
parser.add_argument("--tickers", help="Comma-separated HK tickers to scan. Defaults to current prospectus rows.")
parser.add_argument("--archive-quotes", action="store_true", help="Archive Yahoo A-share and HKD/CNY chart evidence.")
parser.add_argument("--web-cross-check", action="store_true", help="Archive supported public web cross-check pages.")
parser.add_argument("--dry-run", action="store_true", help="Print detected mappings without writing DB or files.")
return parser.parse_args()
def parse_as_of(value: str | None) -> str:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def compact_timestamp(value: str) -> str:
return value.replace("-", "").replace(":", "").replace("+00:00", "Z")
def source_date(value: str) -> str:
return datetime.fromisoformat(value.replace("Z", "+00:00")).date().isoformat()
def selected_tickers(value: str | None) -> set[str] | None:
if not value:
return None
return {item.strip().zfill(5) for item in value.split(",") if item.strip()}
def load_manifest() -> dict[str, Path]:
if not TEXT_MANIFEST.exists():
return {}
with TEXT_MANIFEST.open(newline="", encoding="utf-8") as handle:
return {row["source_id"]: Path(row["text_local_path"]) for row in csv.DictReader(handle)}
def load_prospectus_texts(conn: sqlite3.Connection, tickers: set[str] | None) -> list[ProspectusText]:
ticker_filter = ""
params: list[object] = []
if tickers:
ticker_filter = f"AND s.ticker IN ({','.join('?' for _ in tickers)})"
params.extend(sorted(tickers))
rows = conn.execute(
f"""
SELECT s.ticker, s.source_id, s.local_path
FROM source_refs s
WHERE s.source_type = 'prospectus'
{ticker_filter}
ORDER BY s.ticker, s.source_date DESC, s.source_id DESC
""",
params,
).fetchall()
manifest = load_manifest()
texts: list[ProspectusText] = []
seen: set[str] = set()
for row in rows:
ticker = row["ticker"]
if ticker in seen:
continue
text_path = manifest.get(row["source_id"])
if text_path is None or not text_path.exists():
continue
texts.append(
ProspectusText(
ticker=ticker,
source_id=row["source_id"],
local_path=row["local_path"],
text_path=text_path,
text=text_path.read_text(encoding="utf-8", errors="replace"),
)
)
seen.add(ticker)
return texts
CODE_RE = re.compile(
r"(?:stock\s+code\s*[:]?\s*)?\(?\b([036]\d{5})(?:\.(SH|SZ|SS))?\b\)?",
flags=re.I,
)
def clean_context(value: str) -> str:
return " ".join(html.unescape(value).split())
def has_issuer_context(context: str) -> bool:
lowered = context.lower()
excluded_phrases = [
"cornerstone investment",
"cornerstone investor",
"portfolio companies",
"portfolio company",
"will subscribe for and hold",
"wholly owned by",
]
if any(phrase in lowered for phrase in excluded_phrases):
return False
if "sponsor" in lowered and "our company" not in lowered and "the company" not in lowered:
return False
if "a shares of which" in lowered and not re.search(
r"[\"“]\s*(?:company|our company|the company)\s*[\"”]",
context,
flags=re.I,
):
return False
issuer_phrases = [
"the a shares of which",
"a shares of which",
"our a shares",
"the company's a shares",
"the companys a shares",
"our company has been listed",
"our company became listed",
"our company was listed",
"we completed the listing of our a shares",
"prior to the listing, our share capital comprises entirely a shares",
"a shares listed on",
]
return any(phrase in lowered for phrase in issuer_phrases)
def exchange_from_context(code: str, suffix: str | None, context: str) -> tuple[str, str | None, str]:
lowered = context.lower()
suffix = (suffix or "").upper()
if suffix in {"SH", "SS"} or "shanghai stock exchange" in lowered or "上海证券交易所" in context:
exchange = "SSE"
ticker = f"{code}.SH"
elif suffix == "SZ" or "shenzhen stock exchange" in lowered or "深圳证券交易所" in context:
exchange = "SZSE"
ticker = f"{code}.SZ"
elif code.startswith("6"):
exchange = "SSE"
ticker = f"{code}.SH"
else:
exchange = "SZSE"
ticker = f"{code}.SZ"
board = None
if "star market" in lowered or "science and technology innovation board" in lowered:
board = "STAR Market"
elif "chinext" in lowered:
board = "ChiNext"
return exchange, board, ticker
def company_name_from_context(context: str) -> str | None:
match = re.search(r"[\"“](?:the\s+Company|Company)[\"”]\s+([^,]+),", context, flags=re.I)
if match:
return clean_context(match.group(1))
match = re.search(r"([A-Z][A-Za-z0-9&.,'() -]+(?:Co\.|Company|Corp|Inc\.)[^,]*)", context)
if match:
return clean_context(match.group(1))
return None
def listed_date_from_context(context: str) -> str | None:
match = re.search(
r"(?:since|on)\s+([A-Z][a-z]+\s+\d{1,2},\s+\d{4})",
context,
)
if not match:
return None
try:
return datetime.strptime(match.group(1), "%B %d, %Y").date().isoformat()
except ValueError:
return None
def detect_mappings(item: ProspectusText) -> list[ShareClassMapping]:
mappings: dict[str, ShareClassMapping] = {}
for match in CODE_RE.finditer(item.text):
code, suffix = match.group(1), match.group(2)
start = max(0, match.start() - 500)
end = min(len(item.text), match.end() + 500)
context = clean_context(item.text[start:end])
if not has_issuer_context(context):
continue
exchange, board, related_ticker = exchange_from_context(code, suffix, context)
confidence = "high" if "a shares of which" in context.lower() or "our a shares" in context.lower() else "medium"
candidate = ShareClassMapping(
ticker=item.ticker,
related_ticker=related_ticker,
exchange=exchange,
board=board,
company_name=company_name_from_context(context),
listed_date=listed_date_from_context(context),
prospectus_source_id=item.source_id,
evidence_text=context[:700],
confidence=confidence,
)
existing = mappings.get(related_ticker)
if existing:
stronger = existing.confidence != "high" and candidate.confidence == "high"
more_complete = (
(not existing.board and candidate.board)
or (not existing.company_name and candidate.company_name)
or (not existing.listed_date and candidate.listed_date)
)
if stronger or more_complete:
mappings[related_ticker] = ShareClassMapping(
ticker=existing.ticker,
related_ticker=existing.related_ticker,
exchange=candidate.exchange,
board=candidate.board or existing.board,
company_name=candidate.company_name or existing.company_name,
listed_date=candidate.listed_date or existing.listed_date,
prospectus_source_id=candidate.prospectus_source_id,
evidence_text=candidate.evidence_text,
confidence="high" if stronger or existing.confidence == "high" else candidate.confidence,
)
continue
mappings[related_ticker] = candidate
return list(mappings.values())
def fetch_bytes(url: str) -> bytes:
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(request, timeout=60) as response:
return response.read()
def sha256_bytes(payload: bytes) -> str:
return hashlib.sha256(payload).hexdigest()
def epoch(day: date) -> int:
return int(datetime(day.year, day.month, day.day, tzinfo=timezone.utc).timestamp())
def yahoo_symbol(related_ticker: str) -> str:
code, suffix = related_ticker.split(".", 1)
return f"{code}.SS" if suffix == "SH" else f"{code}.SZ"
def yahoo_chart_url(symbol: str, start: date, end: date) -> str:
params = urlencode(
{
"period1": epoch(start),
"period2": epoch(end + timedelta(days=1)),
"interval": "1d",
"events": "history",
"includeAdjustedClose": "true",
}
)
return f"{YAHOO_CHART_BASE}/{symbol}?{params}"
def source_row(
source_id: str,
ticker: str,
source_type: str,
title: str,
local_path: str,
url: str,
payload: bytes,
as_of: str,
notes: str,
) -> dict[str, object]:
return {
"source_id": source_id,
"ticker": ticker,
"source_type": source_type,
"title": title,
"path_base": "repo_root",
"local_path": local_path,
"url": url,
"file_sha256": sha256_bytes(payload),
"source_date": source_date(as_of),
"archived_at": as_of,
"notes": notes,
}
def archive_quote_sources(mappings: list[ShareClassMapping], as_of: str) -> list[dict[str, object]]:
if not mappings:
return []
RAW_QUOTE_DIR.mkdir(parents=True, exist_ok=True)
as_of_date = datetime.fromisoformat(as_of.replace("Z", "+00:00")).date()
start = as_of_date - timedelta(days=30)
compact = compact_timestamp(as_of)
rows: list[dict[str, object]] = []
for mapping in mappings:
symbol = yahoo_symbol(mapping.related_ticker)
slug = mapping.related_ticker.lower().replace(".", "_")
url = yahoo_chart_url(symbol, start, as_of_date)
try:
payload = fetch_bytes(url)
except (HTTPError, URLError, TimeoutError, OSError) as exc:
print(f"warning: quote archive failed for {mapping.related_ticker}: {exc}", file=sys.stderr)
continue
path = RAW_QUOTE_DIR / f"{slug}_yahoo_chart_{compact}.json"
if not path.exists() or path.read_bytes() != payload:
path.write_bytes(payload)
rows.append(
source_row(
f"{mapping.ticker}_a_share_yahoo_chart_{slug}_{compact}",
mapping.ticker,
"a_share_price_history",
f"Yahoo Finance daily chart for {mapping.related_ticker} A shares",
path.as_posix(),
url,
payload,
as_of,
"Raw Yahoo Finance chart response archived for A/H dual-listed valuation overlay.",
)
)
fx_symbol = "HKDCNY=X"
fx_url = yahoo_chart_url(fx_symbol, start, as_of_date)
try:
fx_payload = fetch_bytes(fx_url)
except (HTTPError, URLError, TimeoutError, OSError) as exc:
print(f"warning: FX archive failed for {fx_symbol}: {exc}", file=sys.stderr)
return rows
fx_path = RAW_QUOTE_DIR / f"hkdcny_x_yahoo_chart_{compact}.json"
if not fx_path.exists() or fx_path.read_bytes() != fx_payload:
fx_path.write_bytes(fx_payload)
for mapping in mappings:
rows.append(
source_row(
f"{mapping.ticker}_fx_hkdcny_yahoo_chart_{compact}",
mapping.ticker,
"fx_price_history",
"Yahoo Finance daily chart for HKD/CNY exchange rate",
fx_path.as_posix(),
fx_url,
fx_payload,
as_of,
"Raw Yahoo Finance chart response archived to convert H-share offer prices into RMB for A/H discount checks.",
)
)
return rows
def official_web_url(mapping: ShareClassMapping) -> str | None:
code = mapping.related_ticker.split(".", 1)[0]
if mapping.exchange == "SSE" and mapping.board == "STAR Market":
return f"https://www.sse.com.cn/star/market/stocklist/info/company/index.shtml?COMPANY_CODE={code}"
if mapping.exchange == "SSE":
return f"https://www.sse.com.cn/assortment/stock/list/info/company/index.shtml?COMPANY_CODE={code}"
if mapping.exchange == "SZSE":
return f"http://www.szse.cn/English/siteMarketData/siteMarketDatas/lookup/index.html?code={code}"
return None
def archive_web_sources(mappings: list[ShareClassMapping], as_of: str) -> tuple[list[dict[str, object]], dict[str, str]]:
RAW_WEB_DIR.mkdir(parents=True, exist_ok=True)
compact = compact_timestamp(as_of)
rows: list[dict[str, object]] = []
source_ids: dict[str, str] = {}
for mapping in mappings:
url = official_web_url(mapping)
if not url:
continue
try:
payload = fetch_bytes(url)
except (HTTPError, URLError, TimeoutError, OSError) as exc:
print(f"warning: web cross-check failed for {mapping.related_ticker}: {exc}", file=sys.stderr)
continue
slug = mapping.related_ticker.lower().replace(".", "_")
path = RAW_WEB_DIR / f"{mapping.ticker}_{slug}_official_{compact}.html"
if not path.exists() or path.read_bytes() != payload:
path.write_bytes(payload)
source_id = f"{mapping.ticker}_a_share_mapping_web_{slug}_{compact}"
rows.append(
source_row(
source_id,
mapping.ticker,
"a_share_mapping_web_evidence",
f"Official exchange company page for {mapping.related_ticker}",
path.as_posix(),
url,
payload,
as_of,
"Public internet cross-check for A/H share-class mapping. Prospectus remains the primary source.",
)
)
source_ids[mapping.ticker + "|" + mapping.related_ticker] = source_id
return rows, source_ids
def upsert_source_refs(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None:
if not rows:
return
conn.executemany(
"""
INSERT INTO source_refs (
source_id, ticker, source_type, title, path_base, local_path, url,
file_sha256, source_date, archived_at, notes
)
VALUES (
:source_id, :ticker, :source_type, :title, :path_base, :local_path, :url,
:file_sha256, :source_date, :archived_at, :notes
)
ON CONFLICT(source_id) DO UPDATE SET
source_type = excluded.source_type,
title = excluded.title,
path_base = excluded.path_base,
local_path = excluded.local_path,
url = excluded.url,
file_sha256 = excluded.file_sha256,
source_date = excluded.source_date,
archived_at = excluded.archived_at,
notes = excluded.notes
""",
rows,
)
def upsert_mappings(
conn: sqlite3.Connection,
mappings: list[ShareClassMapping],
web_source_ids: dict[str, str],
as_of: str,
) -> None:
conn.executemany(
"""
INSERT INTO listed_share_classes (
share_class_id, ticker, share_class_type, related_ticker, exchange, board,
relationship, company_name, listed_date, detection_method, confidence,
prospectus_source_id, web_source_id, evidence_text, data_as_of, notes
)
VALUES (?, ?, 'A_share', ?, ?, ?, 'same_issuer', ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker, share_class_type, related_ticker) DO UPDATE SET
exchange = excluded.exchange,
board = excluded.board,
relationship = excluded.relationship,
company_name = COALESCE(excluded.company_name, listed_share_classes.company_name),
listed_date = COALESCE(excluded.listed_date, listed_share_classes.listed_date),
detection_method = excluded.detection_method,
confidence = excluded.confidence,
prospectus_source_id = excluded.prospectus_source_id,
web_source_id = COALESCE(excluded.web_source_id, listed_share_classes.web_source_id),
evidence_text = excluded.evidence_text,
data_as_of = excluded.data_as_of,
notes = excluded.notes
""",
[
(
f"{mapping.ticker}_a_share_{mapping.related_ticker.lower().replace('.', '_')}",
mapping.ticker,
mapping.related_ticker,
mapping.exchange,
mapping.board,
mapping.company_name,
mapping.listed_date,
"prospectus_text_plus_web" if web_source_ids.get(mapping.ticker + "|" + mapping.related_ticker) else "prospectus_text",
mapping.confidence,
mapping.prospectus_source_id,
web_source_ids.get(mapping.ticker + "|" + mapping.related_ticker),
mapping.evidence_text,
as_of,
"Detected from issuer prospectus text. Internet cross-check is supporting evidence when web_source_id is present.",
)
for mapping in mappings
],
)
def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None:
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}")
columns = [description[0] for description in cursor.description]
with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle:
writer = csv.writer(handle, lineterminator="\n")
writer.writerow(columns)
writer.writerows(cursor.fetchall())
def main() -> int:
args = parse_args()
as_of = parse_as_of(args.as_of)
tickers = selected_tickers(args.tickers)
with sqlite3.connect(args.db) as conn:
conn.row_factory = sqlite3.Row
if not args.dry_run:
conn.executescript(Path(args.schema).read_text(encoding="utf-8"))
texts = load_prospectus_texts(conn, tickers)
mappings = [mapping for item in texts for mapping in detect_mappings(item)]
if args.dry_run:
for mapping in mappings:
print(
f"{mapping.ticker}: {mapping.related_ticker} {mapping.exchange} "
f"{mapping.board or ''} confidence={mapping.confidence} source={mapping.prospectus_source_id}"
)
print(f"detected mappings: {len(mappings)}")
return 0
web_rows: list[dict[str, object]] = []
web_source_ids: dict[str, str] = {}
if args.web_cross_check:
web_rows, web_source_ids = archive_web_sources(mappings, as_of)
quote_rows = archive_quote_sources(mappings, as_of) if args.archive_quotes else []
upsert_source_refs(conn, web_rows + quote_rows)
upsert_mappings(conn, mappings, web_source_ids, as_of)
export_snapshot(conn, "listed_share_classes", "ticker, related_ticker")
export_snapshot(conn, "source_refs", "source_id")
print("A/H share-class mappings archived")
print(f"as_of: {as_of}")
print(f"prospectuses scanned: {len(texts)}")
print(f"mappings detected: {len(mappings)}")
print(f"web sources archived: {len(web_rows)}")
print(f"quote/fx sources archived: {len(quote_rows)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())