diff --git a/.agents/skills/hk-ipo-analyst/SKILL.md b/.agents/skills/hk-ipo-analyst/SKILL.md index 7cf613b..0483cab 100644 --- a/.agents/skills/hk-ipo-analyst/SKILL.md +++ b/.agents/skills/hk-ipo-analyst/SKILL.md @@ -109,6 +109,12 @@ For all analyst-generated Markdown reports, prediction cards, review cards, and For scheduled runs, latest IPO list refreshes, and broad candidate reports, first use `hk-ipo-archivist` to refresh the latest internet-sourced IPO universe and archive updates before making analyst judgments. This includes the HKEX current new-listing page, newly available prospectuses, allotment-result announcements, listing-calendar changes, recent price-performance rows for review, and subscription-period market heat such as broker-aggregated margin subscription multiples. +Before rebuilding the analysis dataset or latest report, refresh A/H or other onshore share-class mappings with a three-layer check: + +1. Use the structured `listed_share_classes` table and `data/snapshots/listed_share_classes.csv` from prior archive runs. +2. Scan archived prospectus extracted text for issuer-context A-share evidence with `scripts/archive_a_share_mappings.py`; require same-issuer wording such as existing A Shares, SSE/SZSE, ChiNext, STAR Market, or an issuer stock code, and reject sponsor, portfolio-company, or unrelated shareholder contexts. +3. Use internet search and supported official exchange pages as a public cross-check for current or recently closed candidates. Archive reproducible official exchange evidence when supported; otherwise state that web cross-check evidence is a `data_gap` and rely on prospectus evidence as primary source. + Refresh the report content as a complete current snapshot, not as a partial patch. The dated report should be written to `reports/{date}_latest_ipo_candidates_analysis.md` and should update all relevant sections: actionable ranking, fundamentals, break-probability/risk-reward, capital efficiency, per-IPO notes, closed/waiting names, recent 30-day listed-IPO review with T2 grey-market context when available, data gaps, and sources. The break-probability, risk/reward, and capital-efficiency section must cover all current or recently closed IPOs that do not yet have confirmed D1 break/non-break information, including names whose subscription window has already closed and are waiting for T1/T2/D1. Do not drop a ticker from this section merely because the user can no longer subscribe; keep its probability/risk view visible until D1 outcome is archived or explicitly confirmed. Once D1 is confirmed, move it out of the probability table and into the recent listed-IPO review. @@ -130,6 +136,15 @@ When an issuer already has Mainland A Shares or another onshore listed share cla Detection cues include prospectus language such as existing `A Shares`, `Shenzhen Stock Exchange`, `Shanghai Stock Exchange`, `ChiNext`, `STAR Market`, an A-share stock code such as `300661.SZ` or `002600.SZ`, or HKEX waiver/pricing text that references the A-share closing price. +Do not rely on manual memory or a short hard-coded whitelist. Use archived structured mappings first, refresh them from prospectus text, and add internet/official-exchange search cross-checks for live candidates: + +```bash +.venv/bin/python scripts/archive_a_share_mappings.py --as-of YYYY-MM-DDTHH:MM:SSZ --web-cross-check --archive-quotes +.venv/bin/python scripts/build_analysis_dataset.py --as-of YYYY-MM-DDTHH:MM:SSZ +``` + +If the dataset has `a_share_ticker`, the report must include the A/H overlay. If the prospectus text suggests an A-share relation but the mapping is uncertain, include it as an explicit `data_gap` or low-confidence mapping rather than omitting it. + The report should include a compact A/H note or table covering: - A-share ticker, exchange, and whether it is the same issuer, parent, subsidiary, or only a comparable/affiliate. diff --git a/.agents/skills/hk-ipo-archivist/SKILL.md b/.agents/skills/hk-ipo-archivist/SKILL.md index ae8d979..106c8e8 100644 --- a/.agents/skills/hk-ipo-archivist/SKILL.md +++ b/.agents/skills/hk-ipo-archivist/SKILL.md @@ -157,6 +157,24 @@ The extractor is incremental: unchanged PDFs with matching manifest rows are ski Do not expect `data/extracted_text/` entries for Yahoo JSON market data or HKEX `.htm`/`.html` notices. Those are already text-like raw evidence files and are tracked under `data/raw/`. +## A/H Share-Class Mapping Archive + +When current or recently closed IPO candidates may already have Mainland A Shares or another onshore listed share class, refresh the structured mapping archive before analyst reports consume the dataset: + +```bash +.venv/bin/python scripts/archive_a_share_mappings.py --as-of YYYY-MM-DDTHH:MM:SSZ --web-cross-check --archive-quotes +``` + +The archive uses three evidence layers: + +1. Structured prior mappings in `listed_share_classes` and `data/snapshots/listed_share_classes.csv`. +2. Issuer-context prospectus text scans from `data/snapshots/extracted_text_manifest.csv`. +3. Internet search or supported official exchange pages as public cross-check evidence. + +Prospectus text remains the primary source for same-issuer identity. Public web evidence supports the mapping when it is reproducible and can be archived as a `source_refs` row. If a web cross-check is not supported for a market or exchange page, leave `web_source_id` blank and make the analyst report state the web-evidence gap rather than inventing a source. + +For detected same-issuer A-share mappings, archive recent A-share quote data and HKD/CNY FX evidence when `--archive-quotes` is used so the analyst can compute A/H discount or premium without relying on stale manual prices. Do not use sponsor, portfolio-company, shareholder, or comparable-company stock codes as same-issuer mappings. + ## T1 Demand Text Backfill When audit finds T1 rows where an allotment-results source is archived but `ipo_demand` is missing, use the text backfill script: diff --git a/schema/hk_ipo.schema.sql b/schema/hk_ipo.schema.sql index 9d57265..1c9c1cc 100644 --- a/schema/hk_ipo.schema.sql +++ b/schema/hk_ipo.schema.sql @@ -164,6 +164,29 @@ CREATE TABLE IF NOT EXISTS source_refs ( CHECK (local_path NOT LIKE '%\%') ); +CREATE TABLE IF NOT EXISTS listed_share_classes ( + share_class_id TEXT PRIMARY KEY, + ticker TEXT NOT NULL REFERENCES ipo_master(ticker), + share_class_type TEXT NOT NULL, + related_ticker TEXT NOT NULL, + exchange TEXT NOT NULL, + board TEXT, + relationship TEXT NOT NULL, + company_name TEXT, + listed_date TEXT, + detection_method TEXT NOT NULL, + confidence TEXT NOT NULL, + prospectus_source_id TEXT REFERENCES source_refs(source_id), + web_source_id TEXT REFERENCES source_refs(source_id), + evidence_text TEXT, + data_as_of TEXT NOT NULL, + notes TEXT, + UNIQUE (ticker, share_class_type, related_ticker), + CHECK (share_class_type IN ('A_share', 'other_onshore_share')), + CHECK (relationship IN ('same_issuer', 'parent', 'subsidiary', 'affiliate', 'comparable')), + CHECK (confidence IN ('high', 'medium', 'low')) +); + CREATE TABLE IF NOT EXISTS data_gaps ( gap_id TEXT PRIMARY KEY, ticker TEXT NOT NULL REFERENCES ipo_master(ticker), diff --git a/scripts/archive_a_share_mappings.py b/scripts/archive_a_share_mappings.py new file mode 100644 index 0000000..6a25eb5 --- /dev/null +++ b/scripts/archive_a_share_mappings.py @@ -0,0 +1,567 @@ +#!/usr/bin/env python3 +"""Detect and archive A/H or onshore share-class mappings from prospectus text.""" + +from __future__ import annotations + +import argparse +import csv +import hashlib +import html +import re +import sqlite3 +import sys +from dataclasses import dataclass +from datetime import date, datetime, timedelta, timezone +from pathlib import Path +from urllib.error import HTTPError, URLError +from urllib.parse import urlencode +from urllib.request import Request, urlopen + + +DEFAULT_DB_PATH = Path("data/hk_ipo.sqlite") +DEFAULT_SCHEMA_PATH = Path("schema/hk_ipo.schema.sql") +SNAPSHOT_DIR = Path("data/snapshots") +TEXT_MANIFEST = SNAPSHOT_DIR / "extracted_text_manifest.csv" +RAW_QUOTE_DIR = Path("data/raw/a_share_quotes") +RAW_WEB_DIR = Path("data/raw/a_share_mapping_web") +YAHOO_CHART_BASE = "https://query1.finance.yahoo.com/v8/finance/chart" + + +@dataclass(frozen=True) +class ProspectusText: + ticker: str + source_id: str + local_path: str + text_path: Path + text: str + + +@dataclass(frozen=True) +class ShareClassMapping: + ticker: str + related_ticker: str + exchange: str + board: str | None + company_name: str | None + listed_date: str | None + prospectus_source_id: str + evidence_text: str + confidence: str + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Repo-relative SQLite database path.") + parser.add_argument("--schema", default=str(DEFAULT_SCHEMA_PATH), help="Repo-relative schema path.") + parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.") + parser.add_argument("--tickers", help="Comma-separated HK tickers to scan. Defaults to current prospectus rows.") + parser.add_argument("--archive-quotes", action="store_true", help="Archive Yahoo A-share and HKD/CNY chart evidence.") + parser.add_argument("--web-cross-check", action="store_true", help="Archive supported public web cross-check pages.") + parser.add_argument("--dry-run", action="store_true", help="Print detected mappings without writing DB or files.") + return parser.parse_args() + + +def parse_as_of(value: str | None) -> str: + if value: + return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z") + return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def compact_timestamp(value: str) -> str: + return value.replace("-", "").replace(":", "").replace("+00:00", "Z") + + +def source_date(value: str) -> str: + return datetime.fromisoformat(value.replace("Z", "+00:00")).date().isoformat() + + +def selected_tickers(value: str | None) -> set[str] | None: + if not value: + return None + return {item.strip().zfill(5) for item in value.split(",") if item.strip()} + + +def load_manifest() -> dict[str, Path]: + if not TEXT_MANIFEST.exists(): + return {} + with TEXT_MANIFEST.open(newline="", encoding="utf-8") as handle: + return {row["source_id"]: Path(row["text_local_path"]) for row in csv.DictReader(handle)} + + +def load_prospectus_texts(conn: sqlite3.Connection, tickers: set[str] | None) -> list[ProspectusText]: + ticker_filter = "" + params: list[object] = [] + if tickers: + ticker_filter = f"AND s.ticker IN ({','.join('?' for _ in tickers)})" + params.extend(sorted(tickers)) + rows = conn.execute( + f""" + SELECT s.ticker, s.source_id, s.local_path + FROM source_refs s + WHERE s.source_type = 'prospectus' + {ticker_filter} + ORDER BY s.ticker, s.source_date DESC, s.source_id DESC + """, + params, + ).fetchall() + manifest = load_manifest() + texts: list[ProspectusText] = [] + seen: set[str] = set() + for row in rows: + ticker = row["ticker"] + if ticker in seen: + continue + text_path = manifest.get(row["source_id"]) + if text_path is None or not text_path.exists(): + continue + texts.append( + ProspectusText( + ticker=ticker, + source_id=row["source_id"], + local_path=row["local_path"], + text_path=text_path, + text=text_path.read_text(encoding="utf-8", errors="replace"), + ) + ) + seen.add(ticker) + return texts + + +CODE_RE = re.compile( + r"(?:stock\s+code\s*[::]?\s*)?\(?\b([036]\d{5})(?:\.(SH|SZ|SS))?\b\)?", + flags=re.I, +) + + +def clean_context(value: str) -> str: + return " ".join(html.unescape(value).split()) + + +def has_issuer_context(context: str) -> bool: + lowered = context.lower() + excluded_phrases = [ + "cornerstone investment", + "cornerstone investor", + "portfolio companies", + "portfolio company", + "will subscribe for and hold", + "wholly owned by", + ] + if any(phrase in lowered for phrase in excluded_phrases): + return False + if "sponsor" in lowered and "our company" not in lowered and "the company" not in lowered: + return False + if "a shares of which" in lowered and not re.search( + r"[\"“]\s*(?:company|our company|the company)\s*[\"”]", + context, + flags=re.I, + ): + return False + issuer_phrases = [ + "the a shares of which", + "a shares of which", + "our a shares", + "the company's a shares", + "the company’s a shares", + "our company has been listed", + "our company became listed", + "our company was listed", + "we completed the listing of our a shares", + "prior to the listing, our share capital comprises entirely a shares", + "a shares listed on", + ] + return any(phrase in lowered for phrase in issuer_phrases) + + +def exchange_from_context(code: str, suffix: str | None, context: str) -> tuple[str, str | None, str]: + lowered = context.lower() + suffix = (suffix or "").upper() + if suffix in {"SH", "SS"} or "shanghai stock exchange" in lowered or "上海证券交易所" in context: + exchange = "SSE" + ticker = f"{code}.SH" + elif suffix == "SZ" or "shenzhen stock exchange" in lowered or "深圳证券交易所" in context: + exchange = "SZSE" + ticker = f"{code}.SZ" + elif code.startswith("6"): + exchange = "SSE" + ticker = f"{code}.SH" + else: + exchange = "SZSE" + ticker = f"{code}.SZ" + + board = None + if "star market" in lowered or "science and technology innovation board" in lowered: + board = "STAR Market" + elif "chinext" in lowered: + board = "ChiNext" + return exchange, board, ticker + + +def company_name_from_context(context: str) -> str | None: + match = re.search(r"[\"“](?:the\s+Company|Company)[\"”]\s+([^,]+),", context, flags=re.I) + if match: + return clean_context(match.group(1)) + match = re.search(r"([A-Z][A-Za-z0-9&.,'() -]+(?:Co\.|Company|Corp|Inc\.)[^,]*)", context) + if match: + return clean_context(match.group(1)) + return None + + +def listed_date_from_context(context: str) -> str | None: + match = re.search( + r"(?:since|on)\s+([A-Z][a-z]+\s+\d{1,2},\s+\d{4})", + context, + ) + if not match: + return None + try: + return datetime.strptime(match.group(1), "%B %d, %Y").date().isoformat() + except ValueError: + return None + + +def detect_mappings(item: ProspectusText) -> list[ShareClassMapping]: + mappings: dict[str, ShareClassMapping] = {} + for match in CODE_RE.finditer(item.text): + code, suffix = match.group(1), match.group(2) + start = max(0, match.start() - 500) + end = min(len(item.text), match.end() + 500) + context = clean_context(item.text[start:end]) + if not has_issuer_context(context): + continue + exchange, board, related_ticker = exchange_from_context(code, suffix, context) + confidence = "high" if "a shares of which" in context.lower() or "our a shares" in context.lower() else "medium" + candidate = ShareClassMapping( + ticker=item.ticker, + related_ticker=related_ticker, + exchange=exchange, + board=board, + company_name=company_name_from_context(context), + listed_date=listed_date_from_context(context), + prospectus_source_id=item.source_id, + evidence_text=context[:700], + confidence=confidence, + ) + existing = mappings.get(related_ticker) + if existing: + stronger = existing.confidence != "high" and candidate.confidence == "high" + more_complete = ( + (not existing.board and candidate.board) + or (not existing.company_name and candidate.company_name) + or (not existing.listed_date and candidate.listed_date) + ) + if stronger or more_complete: + mappings[related_ticker] = ShareClassMapping( + ticker=existing.ticker, + related_ticker=existing.related_ticker, + exchange=candidate.exchange, + board=candidate.board or existing.board, + company_name=candidate.company_name or existing.company_name, + listed_date=candidate.listed_date or existing.listed_date, + prospectus_source_id=candidate.prospectus_source_id, + evidence_text=candidate.evidence_text, + confidence="high" if stronger or existing.confidence == "high" else candidate.confidence, + ) + continue + mappings[related_ticker] = candidate + return list(mappings.values()) + + +def fetch_bytes(url: str) -> bytes: + request = Request(url, headers={"User-Agent": "Mozilla/5.0"}) + with urlopen(request, timeout=60) as response: + return response.read() + + +def sha256_bytes(payload: bytes) -> str: + return hashlib.sha256(payload).hexdigest() + + +def epoch(day: date) -> int: + return int(datetime(day.year, day.month, day.day, tzinfo=timezone.utc).timestamp()) + + +def yahoo_symbol(related_ticker: str) -> str: + code, suffix = related_ticker.split(".", 1) + return f"{code}.SS" if suffix == "SH" else f"{code}.SZ" + + +def yahoo_chart_url(symbol: str, start: date, end: date) -> str: + params = urlencode( + { + "period1": epoch(start), + "period2": epoch(end + timedelta(days=1)), + "interval": "1d", + "events": "history", + "includeAdjustedClose": "true", + } + ) + return f"{YAHOO_CHART_BASE}/{symbol}?{params}" + + +def source_row( + source_id: str, + ticker: str, + source_type: str, + title: str, + local_path: str, + url: str, + payload: bytes, + as_of: str, + notes: str, +) -> dict[str, object]: + return { + "source_id": source_id, + "ticker": ticker, + "source_type": source_type, + "title": title, + "path_base": "repo_root", + "local_path": local_path, + "url": url, + "file_sha256": sha256_bytes(payload), + "source_date": source_date(as_of), + "archived_at": as_of, + "notes": notes, + } + + +def archive_quote_sources(mappings: list[ShareClassMapping], as_of: str) -> list[dict[str, object]]: + if not mappings: + return [] + RAW_QUOTE_DIR.mkdir(parents=True, exist_ok=True) + as_of_date = datetime.fromisoformat(as_of.replace("Z", "+00:00")).date() + start = as_of_date - timedelta(days=30) + compact = compact_timestamp(as_of) + rows: list[dict[str, object]] = [] + + for mapping in mappings: + symbol = yahoo_symbol(mapping.related_ticker) + slug = mapping.related_ticker.lower().replace(".", "_") + url = yahoo_chart_url(symbol, start, as_of_date) + try: + payload = fetch_bytes(url) + except (HTTPError, URLError, TimeoutError, OSError) as exc: + print(f"warning: quote archive failed for {mapping.related_ticker}: {exc}", file=sys.stderr) + continue + path = RAW_QUOTE_DIR / f"{slug}_yahoo_chart_{compact}.json" + if not path.exists() or path.read_bytes() != payload: + path.write_bytes(payload) + rows.append( + source_row( + f"{mapping.ticker}_a_share_yahoo_chart_{slug}_{compact}", + mapping.ticker, + "a_share_price_history", + f"Yahoo Finance daily chart for {mapping.related_ticker} A shares", + path.as_posix(), + url, + payload, + as_of, + "Raw Yahoo Finance chart response archived for A/H dual-listed valuation overlay.", + ) + ) + + fx_symbol = "HKDCNY=X" + fx_url = yahoo_chart_url(fx_symbol, start, as_of_date) + try: + fx_payload = fetch_bytes(fx_url) + except (HTTPError, URLError, TimeoutError, OSError) as exc: + print(f"warning: FX archive failed for {fx_symbol}: {exc}", file=sys.stderr) + return rows + fx_path = RAW_QUOTE_DIR / f"hkdcny_x_yahoo_chart_{compact}.json" + if not fx_path.exists() or fx_path.read_bytes() != fx_payload: + fx_path.write_bytes(fx_payload) + for mapping in mappings: + rows.append( + source_row( + f"{mapping.ticker}_fx_hkdcny_yahoo_chart_{compact}", + mapping.ticker, + "fx_price_history", + "Yahoo Finance daily chart for HKD/CNY exchange rate", + fx_path.as_posix(), + fx_url, + fx_payload, + as_of, + "Raw Yahoo Finance chart response archived to convert H-share offer prices into RMB for A/H discount checks.", + ) + ) + return rows + + +def official_web_url(mapping: ShareClassMapping) -> str | None: + code = mapping.related_ticker.split(".", 1)[0] + if mapping.exchange == "SSE" and mapping.board == "STAR Market": + return f"https://www.sse.com.cn/star/market/stocklist/info/company/index.shtml?COMPANY_CODE={code}" + if mapping.exchange == "SSE": + return f"https://www.sse.com.cn/assortment/stock/list/info/company/index.shtml?COMPANY_CODE={code}" + if mapping.exchange == "SZSE": + return f"http://www.szse.cn/English/siteMarketData/siteMarketDatas/lookup/index.html?code={code}" + return None + + +def archive_web_sources(mappings: list[ShareClassMapping], as_of: str) -> tuple[list[dict[str, object]], dict[str, str]]: + RAW_WEB_DIR.mkdir(parents=True, exist_ok=True) + compact = compact_timestamp(as_of) + rows: list[dict[str, object]] = [] + source_ids: dict[str, str] = {} + for mapping in mappings: + url = official_web_url(mapping) + if not url: + continue + try: + payload = fetch_bytes(url) + except (HTTPError, URLError, TimeoutError, OSError) as exc: + print(f"warning: web cross-check failed for {mapping.related_ticker}: {exc}", file=sys.stderr) + continue + slug = mapping.related_ticker.lower().replace(".", "_") + path = RAW_WEB_DIR / f"{mapping.ticker}_{slug}_official_{compact}.html" + if not path.exists() or path.read_bytes() != payload: + path.write_bytes(payload) + source_id = f"{mapping.ticker}_a_share_mapping_web_{slug}_{compact}" + rows.append( + source_row( + source_id, + mapping.ticker, + "a_share_mapping_web_evidence", + f"Official exchange company page for {mapping.related_ticker}", + path.as_posix(), + url, + payload, + as_of, + "Public internet cross-check for A/H share-class mapping. Prospectus remains the primary source.", + ) + ) + source_ids[mapping.ticker + "|" + mapping.related_ticker] = source_id + return rows, source_ids + + +def upsert_source_refs(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None: + if not rows: + return + conn.executemany( + """ + INSERT INTO source_refs ( + source_id, ticker, source_type, title, path_base, local_path, url, + file_sha256, source_date, archived_at, notes + ) + VALUES ( + :source_id, :ticker, :source_type, :title, :path_base, :local_path, :url, + :file_sha256, :source_date, :archived_at, :notes + ) + ON CONFLICT(source_id) DO UPDATE SET + source_type = excluded.source_type, + title = excluded.title, + path_base = excluded.path_base, + local_path = excluded.local_path, + url = excluded.url, + file_sha256 = excluded.file_sha256, + source_date = excluded.source_date, + archived_at = excluded.archived_at, + notes = excluded.notes + """, + rows, + ) + + +def upsert_mappings( + conn: sqlite3.Connection, + mappings: list[ShareClassMapping], + web_source_ids: dict[str, str], + as_of: str, +) -> None: + conn.executemany( + """ + INSERT INTO listed_share_classes ( + share_class_id, ticker, share_class_type, related_ticker, exchange, board, + relationship, company_name, listed_date, detection_method, confidence, + prospectus_source_id, web_source_id, evidence_text, data_as_of, notes + ) + VALUES (?, ?, 'A_share', ?, ?, ?, 'same_issuer', ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(ticker, share_class_type, related_ticker) DO UPDATE SET + exchange = excluded.exchange, + board = excluded.board, + relationship = excluded.relationship, + company_name = COALESCE(excluded.company_name, listed_share_classes.company_name), + listed_date = COALESCE(excluded.listed_date, listed_share_classes.listed_date), + detection_method = excluded.detection_method, + confidence = excluded.confidence, + prospectus_source_id = excluded.prospectus_source_id, + web_source_id = COALESCE(excluded.web_source_id, listed_share_classes.web_source_id), + evidence_text = excluded.evidence_text, + data_as_of = excluded.data_as_of, + notes = excluded.notes + """, + [ + ( + f"{mapping.ticker}_a_share_{mapping.related_ticker.lower().replace('.', '_')}", + mapping.ticker, + mapping.related_ticker, + mapping.exchange, + mapping.board, + mapping.company_name, + mapping.listed_date, + "prospectus_text_plus_web" if web_source_ids.get(mapping.ticker + "|" + mapping.related_ticker) else "prospectus_text", + mapping.confidence, + mapping.prospectus_source_id, + web_source_ids.get(mapping.ticker + "|" + mapping.related_ticker), + mapping.evidence_text, + as_of, + "Detected from issuer prospectus text. Internet cross-check is supporting evidence when web_source_id is present.", + ) + for mapping in mappings + ], + ) + + +def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None: + SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True) + cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}") + columns = [description[0] for description in cursor.description] + with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle: + writer = csv.writer(handle, lineterminator="\n") + writer.writerow(columns) + writer.writerows(cursor.fetchall()) + + +def main() -> int: + args = parse_args() + as_of = parse_as_of(args.as_of) + tickers = selected_tickers(args.tickers) + + with sqlite3.connect(args.db) as conn: + conn.row_factory = sqlite3.Row + if not args.dry_run: + conn.executescript(Path(args.schema).read_text(encoding="utf-8")) + texts = load_prospectus_texts(conn, tickers) + mappings = [mapping for item in texts for mapping in detect_mappings(item)] + + if args.dry_run: + for mapping in mappings: + print( + f"{mapping.ticker}: {mapping.related_ticker} {mapping.exchange} " + f"{mapping.board or ''} confidence={mapping.confidence} source={mapping.prospectus_source_id}" + ) + print(f"detected mappings: {len(mappings)}") + return 0 + + web_rows: list[dict[str, object]] = [] + web_source_ids: dict[str, str] = {} + if args.web_cross_check: + web_rows, web_source_ids = archive_web_sources(mappings, as_of) + quote_rows = archive_quote_sources(mappings, as_of) if args.archive_quotes else [] + + upsert_source_refs(conn, web_rows + quote_rows) + upsert_mappings(conn, mappings, web_source_ids, as_of) + export_snapshot(conn, "listed_share_classes", "ticker, related_ticker") + export_snapshot(conn, "source_refs", "source_id") + + print("A/H share-class mappings archived") + print(f"as_of: {as_of}") + print(f"prospectuses scanned: {len(texts)}") + print(f"mappings detected: {len(mappings)}") + print(f"web sources archived: {len(web_rows)}") + print(f"quote/fx sources archived: {len(quote_rows)}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/build_analysis_dataset.py b/scripts/build_analysis_dataset.py index 44b0e0e..3c53de1 100644 --- a/scripts/build_analysis_dataset.py +++ b/scripts/build_analysis_dataset.py @@ -16,6 +16,7 @@ from typing import Any MODEL_VERSION = "ipo_score_v0" RULE_PATH = Path("rules/ipo_score_v0.yaml") DEFAULT_DB_PATH = Path("data/hk_ipo.sqlite") +DEFAULT_SCHEMA_PATH = Path("schema/hk_ipo.schema.sql") DEFAULT_DATASET_PATH = Path("data/snapshots/analysis_model_v0_dataset.csv") DEFAULT_REPORT_PATH = Path("reports/2026-06-15_analysis_model_v0.md") @@ -33,6 +34,7 @@ class Metric: def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Repo-relative SQLite database path.") + parser.add_argument("--schema", default=str(DEFAULT_SCHEMA_PATH), help="Repo-relative schema path.") parser.add_argument("--dataset", default=str(DEFAULT_DATASET_PATH), help="Output CSV dataset path.") parser.add_argument("--report", default=str(DEFAULT_REPORT_PATH), help="Output Markdown report path.") parser.add_argument("--as-of", help="Analysis timestamp. Defaults to current UTC time.") @@ -435,6 +437,16 @@ def fetch_rows(conn: sqlite3.Connection) -> list[sqlite3.Row]: eh.grey_market_return_pct AS external_grey_market_return_pct, eh.first_day_return_pct AS external_first_day_return_pct, eh.local_path AS external_history_source_path, + ah.related_ticker AS a_share_ticker, + ah.exchange AS a_share_exchange, + ah.board AS a_share_board, + ah.relationship AS a_share_relationship, + ah.company_name AS a_share_company_name, + ah.listed_date AS a_share_listed_date, + ah.detection_method AS a_share_detection_method, + ah.confidence AS a_share_mapping_confidence, + ahp.local_path AS a_share_prospectus_source_path, + ahw.local_path AS a_share_web_source_path, ( SELECT local_path FROM source_refs s @@ -456,6 +468,19 @@ def fetch_rows(conn: sqlite3.Connection) -> list[sqlite3.Row]: LEFT JOIN performance p ON p.ticker = m.ticker LEFT JOIN latest_market_heat h ON h.ticker = m.ticker LEFT JOIN external_history eh ON eh.ticker = m.ticker + LEFT JOIN listed_share_classes ah + ON ah.share_class_id = ( + SELECT l.share_class_id + FROM listed_share_classes l + WHERE l.ticker = m.ticker AND l.share_class_type = 'A_share' + ORDER BY + l.data_as_of DESC, + CASE l.confidence WHEN 'high' THEN 0 WHEN 'medium' THEN 1 ELSE 2 END, + l.related_ticker + LIMIT 1 + ) + LEFT JOIN source_refs ahp ON ahp.source_id = ah.prospectus_source_id + LEFT JOIN source_refs ahw ON ahw.source_id = ah.web_source_id ORDER BY m.listing_date, m.ticker """ ).fetchall() @@ -483,6 +508,14 @@ def build_records(rows: list[sqlite3.Row], as_of: str) -> list[dict[str, Any]]: "company_name_en": row["company_name_en"], "company_name_zh": row["company_name_zh"], "stock_short_name": stock_short_name, + "a_share_ticker": row["a_share_ticker"], + "a_share_exchange": row["a_share_exchange"], + "a_share_board": row["a_share_board"], + "a_share_relationship": row["a_share_relationship"], + "a_share_company_name": row["a_share_company_name"], + "a_share_listed_date": row["a_share_listed_date"], + "a_share_detection_method": row["a_share_detection_method"], + "a_share_mapping_confidence": row["a_share_mapping_confidence"], "board": row["board"], "status": row["status"], "listing_date": row["listing_date"], @@ -539,6 +572,8 @@ def build_records(rows: list[sqlite3.Row], as_of: str) -> list[dict[str, Any]]: "external_grey_market_return_pct": row["external_grey_market_return_pct"], "external_first_day_return_pct": row["external_first_day_return_pct"], "external_history_source_path": row["external_history_source_path"], + "a_share_prospectus_source_path": row["a_share_prospectus_source_path"], + "a_share_web_source_path": row["a_share_web_source_path"], "prospectus_source_path": row["prospectus_source_path"], "allotment_source_path": row["allotment_source_path"], } @@ -608,6 +643,14 @@ def write_dataset(records: list[dict[str, Any]], output_path: Path) -> None: "company_name_en", "company_name_zh", "stock_short_name", + "a_share_ticker", + "a_share_exchange", + "a_share_board", + "a_share_relationship", + "a_share_company_name", + "a_share_listed_date", + "a_share_detection_method", + "a_share_mapping_confidence", "board", "status", "listing_date", @@ -662,6 +705,8 @@ def write_dataset(records: list[dict[str, Any]], output_path: Path) -> None: "external_grey_market_return_pct", "external_first_day_return_pct", "external_history_source_path", + "a_share_prospectus_source_path", + "a_share_web_source_path", "prospectus_source_path", "allotment_source_path", "t0_score_breakdown", @@ -846,10 +891,12 @@ def main() -> int: args = parse_args() as_of = parse_as_of(args.as_of) db_path = Path(args.db) + schema_path = Path(args.schema) dataset_path = Path(args.dataset) report_path = Path(args.report) with sqlite3.connect(db_path) as conn: + conn.executescript(schema_path.read_text(encoding="utf-8")) rows = fetch_rows(conn) records = build_records(rows, as_of) diff --git a/scripts/generate_ipo_report.py b/scripts/generate_ipo_report.py index c241d3e..b7cd744 100644 --- a/scripts/generate_ipo_report.py +++ b/scripts/generate_ipo_report.py @@ -320,6 +320,37 @@ def facts_table(record: dict[str, str], stage: str) -> str: return "\n".join(lines) +def ah_overlay(record: dict[str, str]) -> str: + if not record.get("a_share_ticker"): + return "- 未识别到同一发行人的 A 股或其他内地上市股本。" + + prospectus_path = record.get("a_share_prospectus_source_path") or "data_gap" + web_path = record.get("a_share_web_source_path") or "data_gap" + rows = [ + ("A 股代码", fmt_value(record.get("a_share_ticker"))), + ("交易所", fmt_value(record.get("a_share_exchange"))), + ("板块", fmt_value(record.get("a_share_board"))), + ("关系", fmt_value(record.get("a_share_relationship"))), + ("A 股公司名", fmt_value(record.get("a_share_company_name"))), + ("A 股上市日", fmt_value(record.get("a_share_listed_date"))), + ("识别方法", fmt_value(record.get("a_share_detection_method"))), + ("映射置信度", fmt_value(record.get("a_share_mapping_confidence"))), + ("招股书证据", f"`{prospectus_path}`" if prospectus_path != "data_gap" else "`data_gap`"), + ("互联网交叉验证", f"`{web_path}`" if web_path != "data_gap" else "`data_gap`"), + ] + lines = ["| 字段 | 数值 |", "| --- | --- |"] + for label, value in rows: + lines.append(f"| {label} | {value} |") + lines.extend( + [ + "", + "- 这是 A/H 或内地上市股本定价场景,不应按纯首次上市 IPO 处理。", + "- A 股价格可作为估值锚,但 A 股和 H 股通常不能互换或直接套利;短线收益仍取决于香港侧认购热度、流动性、供给和 T2/D1 出口。", + ] + ) + return "\n".join(lines) + + def stage_calendar_table(record: dict[str, str]) -> str: application_start = fmt_value(record["application_start_date"]) application_end = fmt_value(record["application_end_date"]) @@ -372,6 +403,10 @@ def source_paths(record: dict[str, str], stage: str) -> list[str]: paths.append(record["prospectus_source_path"]) if stage == T1_STAGE and record["allotment_source_path"]: paths.append(record["allotment_source_path"]) + if record.get("a_share_prospectus_source_path"): + paths.append(record["a_share_prospectus_source_path"]) + if record.get("a_share_web_source_path"): + paths.append(record["a_share_web_source_path"]) return paths @@ -455,6 +490,10 @@ def build_report(record: dict[str, str], rows: list[dict[str, str]], stage: str, "", facts_table(record, stage), "", + "## A/H 或内地上市股本检查", + "", + ah_overlay(record), + "", "## 短线退出模型推断", "", f"- D1 正收益概率:{fmt_pct_rate(metric.d1_positive_rate)}", diff --git a/scripts/run_hk_ipo_analyst_once.sh b/scripts/run_hk_ipo_analyst_once.sh index faf4204..b07a42e 100755 --- a/scripts/run_hk_ipo_analyst_once.sh +++ b/scripts/run_hk_ipo_analyst_once.sh @@ -24,6 +24,7 @@ Goals: - Inspect the current worktree and recent git history first. - Refresh the latest IPO candidate universe from online sources through `hk-ipo-archivist` before analysis. - Update all relevant fresh network facts for the latest candidate report, especially live subscription-period market heat / margin subscription multiples, official T1 allotment demand when published, prospectus documents, listing calendars, and recent D1 review data. +- Refresh A/H or other onshore share-class mappings before rebuilding the report: use the structured `listed_share_classes` archive, scan prospectus extracted text with `scripts/archive_a_share_mappings.py`, and add internet / official-exchange cross-check evidence when supported. - Keep unofficial subscription multiples in `ipo_market_heat` with their provider and `observed_at`; do not copy them into official T1 public oversubscription fields. - Rebuild the analysis dataset after any archive refresh. - Produce a complete latest broad IPO candidate report for actionable subscriptions, including ranking, fundamentals, break-risk/risk-reward, per-IPO notes, closed/waiting names, recent 30-day review, guardrails, and sources.