Files
hk-ipo/scripts/generate_ipo_report.py
T
geometrybase 7cbdd533b0 Add A/H share-class mapping workflow
Request:
- Add a repeatable mechanism so HK IPO reports detect issuers that already have Mainland A shares.
- Include a third internet/official-exchange cross-check layer beyond structured history and prospectus scans.

Changes:
- Add listed_share_classes schema support for same-issuer A-share mappings and evidence links.
- Add scripts/archive_a_share_mappings.py to scan prospectus extracted text, reject sponsor/portfolio/cornerstone false positives, archive optional official web evidence and A-share/FX quote evidence, and export snapshots on write.
- Surface a_share_* fields in the analysis dataset and single-ticker report output.
- Update hk-ipo analyst/archivist skill rules and scheduled refresh prompt to require the three-layer A/H mapping check.

Verification:
- python3 -m py_compile scripts/archive_a_share_mappings.py scripts/build_analysis_dataset.py scripts/generate_ipo_report.py
- .venv/bin/python scripts/archive_a_share_mappings.py --as-of 2026-06-24T00:00:00Z --tickers 00668,01688,03661,09630 --dry-run
- .venv/bin/python scripts/build_analysis_dataset.py --db /tmp/hk_ipo_ah_dataset_test.sqlite --dataset /tmp/hk_ipo_ah_dataset_test.csv --report /tmp/hk_ipo_ah_model_test.md --as-of 2026-06-24T00:00:00Z
- .venv/bin/python scripts/generate_ipo_report.py 09630 --dataset /tmp/hk_ipo_ah_dataset_test.csv --stdout --as-of 2026-06-24T00:00:00Z
- git diff --check

Next useful context:
- Dry-run detected 00668->300866.SZ, 01688->002600.SZ, 03661->300661.SZ, and 09630->688630.SH.
- A false positive 01688->300476.SZ from a cornerstone investor parent was rejected by the issuer-context filter.
2026-06-24 07:21:21 +00:00

566 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Generate a stage-safe Markdown analyst report for one Hong Kong IPO."""
from __future__ import annotations
import argparse
import csv
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from statistics import mean, median
from typing import Any
MODEL_RULE_PATH = Path("rules/ipo_score_v0.yaml")
DEFAULT_DATASET_PATH = Path("data/snapshots/analysis_model_v0_dataset.csv")
DEFAULT_OUTPUT_DIR = Path("reports")
T0_STAGE = "T0_prospectus"
T1_STAGE = "T1_allotment"
AUTO_STAGE = "auto"
@dataclass(frozen=True)
class BucketMetric:
sample_size: int
d1_positive_rate: float | None
d1_strong_rate: float | None
average_d1_return_pct: float | None
median_d1_return_pct: float | None
@dataclass(frozen=True)
class ScoreComponent:
name: str
points: int
reason: str
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("ticker", help="HK ticker, for example 06658 or 6658.")
parser.add_argument(
"--stage",
choices=[AUTO_STAGE, T0_STAGE, T1_STAGE],
default=AUTO_STAGE,
help="Prediction stage to generate. Auto uses T1 when structured T1 demand is available.",
)
parser.add_argument("--dataset", default=str(DEFAULT_DATASET_PATH), help="Repo-relative model dataset path.")
parser.add_argument("--output", help="Output Markdown path. Defaults to reports/{date}_{ticker}_{stage}_analysis.md.")
parser.add_argument("--as-of", help="Report timestamp. Defaults to current UTC time.")
parser.add_argument("--stdout", action="store_true", help="Print the report instead of writing a file.")
return parser.parse_args()
def parse_as_of(value: str | None) -> str:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def normalize_ticker(value: str) -> str:
ticker = value.strip().upper()
if ticker.endswith(".HK"):
ticker = ticker[:-3]
if ticker.isdigit():
return ticker.zfill(5)
return ticker
def load_dataset(path: Path) -> list[dict[str, str]]:
if not path.exists():
raise SystemExit(f"Dataset not found: {path.as_posix()}. Run scripts/build_analysis_dataset.py first.")
with path.open(newline="", encoding="utf-8") as handle:
return list(csv.DictReader(handle))
def find_record(rows: list[dict[str, str]], ticker: str, dataset_path: Path) -> dict[str, str]:
for row in rows:
if normalize_ticker(row["ticker"]) == ticker:
return row
raise SystemExit(
f"Ticker {ticker} is not in {dataset_path.as_posix()}. "
"Use archivist to update archived facts, then rerun scripts/build_analysis_dataset.py."
)
def as_float(value: Any) -> float | None:
if value in {None, ""}:
return None
return float(value)
def as_int(value: Any) -> int | None:
if value in {None, ""}:
return None
return int(float(value))
def as_bool(value: Any) -> bool:
return str(value).strip() in {"1", "true", "True"}
def fmt_value(value: Any) -> str:
if value in {None, ""}:
return "未记录"
return str(value)
def fmt_num(value: float | None, suffix: str = "", decimals: int = 1) -> str:
if value is None:
return "未记录"
return f"{value:,.{decimals}f}{suffix}"
def fmt_pct_rate(value: float | None) -> str:
if value is None:
return "未记录"
return f"{value * 100:.1f}%"
def fmt_pct_points(value: float | None) -> str:
if value is None:
return "未记录"
return f"{value:.1f}%"
def fmt_money_m(value: float | None) -> str:
if value is None:
return "未记录"
return f"HK${value:,.1f}m"
def fmt_hkd(value: float | None) -> str:
if value is None:
return "未记录"
return f"HK${value:,.2f}"
def fmt_times(value: float | None) -> str:
if value is None:
return "未记录"
return f"{value:,.2f}x"
def fmt_int(value: int | None) -> str:
if value is None:
return "未记录"
return f"{value:,}"
def stock_display_name(record: dict[str, str]) -> str:
return record.get("stock_short_name") or record.get("company_name_zh") or record.get("company_name_en") or ""
def company_display_name(record: dict[str, str]) -> str:
short_name = record.get("stock_short_name") or ""
chinese_name = record.get("company_name_zh") or ""
english_name = record.get("company_name_en") or ""
if short_name and chinese_name and short_name != chinese_name:
return f"{short_name}(法定中文名:{chinese_name}"
if short_name:
return short_name
if chinese_name and english_name:
return f"{chinese_name}{english_name}"
return chinese_name or english_name or "未记录"
def parse_date(value: str) -> datetime | None:
if not value:
return None
try:
return datetime.fromisoformat(value)
except ValueError:
return None
def previous_business_date(value: datetime) -> datetime:
day = value - timedelta(days=1)
while day.weekday() >= 5:
day -= timedelta(days=1)
return day
def determine_stage(record: dict[str, str], requested_stage: str) -> str:
if requested_stage == AUTO_STAGE:
return T1_STAGE if as_bool(record["has_structured_t1"]) else T0_STAGE
if requested_stage == T1_STAGE and not as_bool(record["has_structured_t1"]):
raise SystemExit(
f"{record['ticker']} has no structured T1 demand yet. "
f"Generate a {T0_STAGE} report or update the archive first."
)
return requested_stage
def output_path_for(ticker: str, stage: str, as_of: str) -> Path:
date_part = as_of[:10]
return DEFAULT_OUTPUT_DIR / f"{date_part}_{ticker}_{stage}_analysis.md"
def parse_components(text: str) -> list[ScoreComponent]:
components: list[ScoreComponent] = []
for item in text.split("|"):
if not item:
continue
name, points, reason = item.split(":", 2)
components.append(ScoreComponent(name=name, points=int(points), reason=reason))
return components
def bucket_metric(
rows: list[dict[str, str]],
bucket_key: str,
bucket_value: str,
require_t1: bool,
) -> BucketMetric:
returns: list[float] = []
for row in rows:
if require_t1 and not as_bool(row["has_structured_t1"]):
continue
if row.get(bucket_key) != bucket_value:
continue
d1_return = as_float(row.get("d1_return_pct"))
if d1_return is not None:
returns.append(d1_return)
if not returns:
return BucketMetric(0, None, None, None, None)
return BucketMetric(
sample_size=len(returns),
d1_positive_rate=sum(value > 0 for value in returns) / len(returns),
d1_strong_rate=sum(value >= 10 for value in returns) / len(returns),
average_d1_return_pct=mean(returns),
median_d1_return_pct=median(returns),
)
def t0_decision_band(score: int) -> str:
if score < 1:
return "weak_or_avoid"
if score <= 4:
return "neutral"
if score <= 7:
return "positive_watch"
return "strong_watch"
def action_for_decision(decision: str) -> str:
actions = {
"weak_or_avoid": "T0 阶段回避,除非后续 T1 认购热度明显改变格局。",
"neutral": "暂等 T1 分配结果,不在 T0 阶段主动下重注。",
"positive_watch": "正面观察,但需要等 T1 确认后再决定 T2/D1 退出仓位。",
"strong_watch": "T0 强关注,仍需等待 T1 认购热度确认后执行 T2/D1 退出纪律。",
"avoid": "回避申购。",
"avoid_or_wait": "回避或等待;没有更强催化前不放大仓位。",
"watch_or_small": "仅在执行条件支持 T2/D1 退出时小额参与。",
"selective_subscribe": "选择性申购,并严格按 T2/D1 卖出纪律控制仓位。",
"high_conviction_subscribe": "积极申购,但仍受分配、流动性和 T2/D1 卖出纪律约束。",
}
return actions[decision]
def component_label(name: str) -> str:
labels = {
"offer_size": "发行规模",
"public_pct": "初始公开发售比例",
"min_subscription": "最低认购金额",
"offer_price": "发行价",
"over_allotment": "超额配股权",
"public_os": "公开认购倍数",
"international_os": "国际配售认购倍数",
"valid_applications": "有效申请数",
"success_rate": "申请成功率",
"hk_reallocation": "香港公开发售回拨",
}
return labels.get(name, name.replace("_", " ").title())
def components_table(components: list[ScoreComponent]) -> str:
lines = ["| 评分项 | 分数 | 原因代码 |", "| --- | ---: | --- |"]
for component in components:
lines.append(f"| {component_label(component.name)} | {component.points} | `{component.reason}` |")
return "\n".join(lines)
def facts_table(record: dict[str, str], stage: str) -> str:
rows = [
("板块", fmt_value(record["board"])),
("状态", fmt_value(record["status"])),
("上市日期", fmt_value(record["listing_date"])),
("申购期", f"{fmt_value(record['application_start_date'])}{fmt_value(record['application_end_date'])}"),
("分配结果日期", fmt_value(record["allotment_results_expected_date"])),
("上市方式", fmt_value(record["listing_method"])),
("行业", fmt_value(record["industry_label"])),
("保荐人", fmt_value(record["sponsors"])),
("发行价", fmt_hkd(as_float(record["offer_price_hkd"]))),
("发行规模", fmt_money_m(as_float(record["offer_size_hkd_m"]))),
("市值", fmt_money_m(as_float(record["market_cap_hkd_m"]))),
("每手股数", fmt_int(as_int(record["board_lot"]))),
("最低认购金额", fmt_hkd(as_float(record["min_subscription_amount_hkd"]))),
("初始公开发售比例", fmt_pct_points(as_float(record["public_offer_pct_initial"]) * 100 if record["public_offer_pct_initial"] else None)),
("超额配股权股数", fmt_int(as_int(record["over_allotment_offer_shares"]))),
]
if stage == T1_STAGE:
rows.extend(
[
("公开认购倍数", fmt_times(as_float(record["public_oversubscription_times"]))),
("国际配售认购倍数", fmt_times(as_float(record["international_oversubscription_times"]))),
("有效申请数", fmt_int(as_int(record["valid_applications"]))),
("成功申请数", fmt_int(as_int(record["successful_applications"]))),
("申请成功率", fmt_pct_points(as_float(record["application_success_rate"]) * 100 if record["application_success_rate"] else None)),
("国际配售承配人数", fmt_int(as_int(record["international_placees"]))),
("香港公开发售回拨倍数", fmt_times(as_float(record["hk_offer_reallocation_multiple"]))),
]
)
lines = ["| 字段 | 数值 |", "| --- | --- |"]
for label, value in rows:
lines.append(f"| {label} | {value} |")
return "\n".join(lines)
def ah_overlay(record: dict[str, str]) -> str:
if not record.get("a_share_ticker"):
return "- 未识别到同一发行人的 A 股或其他内地上市股本。"
prospectus_path = record.get("a_share_prospectus_source_path") or "data_gap"
web_path = record.get("a_share_web_source_path") or "data_gap"
rows = [
("A 股代码", fmt_value(record.get("a_share_ticker"))),
("交易所", fmt_value(record.get("a_share_exchange"))),
("板块", fmt_value(record.get("a_share_board"))),
("关系", fmt_value(record.get("a_share_relationship"))),
("A 股公司名", fmt_value(record.get("a_share_company_name"))),
("A 股上市日", fmt_value(record.get("a_share_listed_date"))),
("识别方法", fmt_value(record.get("a_share_detection_method"))),
("映射置信度", fmt_value(record.get("a_share_mapping_confidence"))),
("招股书证据", f"`{prospectus_path}`" if prospectus_path != "data_gap" else "`data_gap`"),
("互联网交叉验证", f"`{web_path}`" if web_path != "data_gap" else "`data_gap`"),
]
lines = ["| 字段 | 数值 |", "| --- | --- |"]
for label, value in rows:
lines.append(f"| {label} | {value} |")
lines.extend(
[
"",
"- 这是 A/H 或内地上市股本定价场景,不应按纯首次上市 IPO 处理。",
"- A 股价格可作为估值锚,但 A 股和 H 股通常不能互换或直接套利;短线收益仍取决于香港侧认购热度、流动性、供给和 T2/D1 出口。",
]
)
return "\n".join(lines)
def stage_calendar_table(record: dict[str, str]) -> str:
application_start = fmt_value(record["application_start_date"])
application_end = fmt_value(record["application_end_date"])
allotment_date = fmt_value(record["allotment_results_expected_date"])
listing_date = fmt_value(record["listing_date"])
listed = parse_date(record["listing_date"])
allotment = parse_date(record["allotment_results_expected_date"])
if listed:
previous = previous_business_date(listed)
if allotment and previous.date() == allotment.date():
t2_date = f"{previous.date().isoformat()} 分配结果公布后 / D1 前一交易日"
else:
t2_date = f"{previous.date().isoformat()},预计 D1 前一交易日"
elif allotment:
t2_date = f"{allotment.date().isoformat()} 分配结果公布后"
else:
t2_date = "未记录"
rows = [
(
"T0_prospectus",
f"{application_start}{application_end}",
"申购前/申购中阶段;只使用招股书和发行条款。",
),
(
"T1_allotment",
allotment_date,
"分配结果日;使用公开认购热度、国际配售热度和分配事实。",
),
(
"T2_grey_market",
t2_date,
"上市前暗盘窗口;只有存在可靠且可执行的数据源时才作为卖出依据。",
),
(
"D1",
listing_date,
"正式上市首日;T2 数据不可用或不可靠时的默认卖出窗口。",
),
]
lines = ["| 阶段 | 本 IPO 对应日期 | 含义 |", "| --- | --- | --- |"]
for stage, date_text, meaning in rows:
lines.append(f"| `{stage}` | {date_text} | {meaning} |")
return "\n".join(lines)
def source_paths(record: dict[str, str], stage: str) -> list[str]:
paths = []
if record["prospectus_source_path"]:
paths.append(record["prospectus_source_path"])
if stage == T1_STAGE and record["allotment_source_path"]:
paths.append(record["allotment_source_path"])
if record.get("a_share_prospectus_source_path"):
paths.append(record["a_share_prospectus_source_path"])
if record.get("a_share_web_source_path"):
paths.append(record["a_share_web_source_path"])
return paths
def reason_lines(components: list[ScoreComponent], positive: bool) -> list[str]:
filtered = [component for component in components if (component.points > 0 if positive else component.points < 0)]
filtered.sort(key=lambda component: component.points, reverse=positive)
if not filtered:
return ["- 没有明显正向评分项。" if positive else "- 没有明显负向评分项。"]
return [f"- {component_label(component.name)}{component.points:+d} (`{component.reason}`)。" for component in filtered[:5]]
def missing_field_lines(record: dict[str, str], stage: str) -> list[str]:
required = [
("industry_label", "行业"),
("market_cap_hkd_m", "市值"),
("min_subscription_amount_hkd", "最低认购金额"),
]
if stage == T1_STAGE:
required.extend(
[
("public_oversubscription_times", "公开认购倍数"),
("international_oversubscription_times", "国际配售认购倍数"),
("valid_applications", "有效申请数"),
("successful_applications", "成功申请数"),
]
)
missing = [label for key, label in required if not record.get(key)]
if not missing:
return ["- 本阶段必需字段没有明显空缺。"]
return [f"- 缺失或空白字段:{', '.join(missing)}"]
def build_report(record: dict[str, str], rows: list[dict[str, str]], stage: str, as_of: str) -> str:
ticker = normalize_ticker(record["ticker"])
model_version = record["model_version"]
dataset_as_of = record["analysis_as_of"]
if stage == T0_STAGE:
score = as_int(record["t0_score"]) or 0
bucket = record["t0_score_bucket"]
decision = t0_decision_band(score)
components = parse_components(record["t0_score_breakdown"])
metric = bucket_metric(rows, "t0_score_bucket", bucket, require_t1=False)
else:
score = as_int(record["total_score"]) or 0
bucket = record["total_score_bucket"]
decision = record["decision_band"]
components = parse_components(record["t0_score_breakdown"]) + parse_components(record["t1_score_breakdown"])
metric = bucket_metric(rows, "total_score_bucket", bucket, require_t1=True)
paths = source_paths(record, stage)
source_lines = [f"- `{path}`" for path in paths] or ["- 本阶段没有记录来源路径。"]
company_name = company_display_name(record)
title_name = stock_display_name(record)
title_prefix = f"{ticker} {title_name}" if title_name else ticker
lines = [
f"# {title_prefix} IPO 分析报告",
"",
"## 摘要",
"",
f"- 股票代码:`{ticker}`",
f"- 公司:{company_name}",
f"- 分析阶段:`{stage}`",
f"- 报告生成时间:`{as_of}`",
f"- 模型数据时间:`{dataset_as_of}`",
f"- 规则版本:`{model_version}`",
f"- 规则路径:`{MODEL_RULE_PATH.as_posix()}`",
"- 策略周期:短线 IPO 申购交易;优先在可靠 T2 暗盘卖出,否则默认 D1 卖出。",
f"- 结论代码:`{decision}`",
f"- 执行动作:{action_for_decision(decision)}",
f"- {'T0 分数' if stage == T0_STAGE else '总分'}`{score}`",
f"- 分数分桶:`{bucket}`",
f"- 历史校准 D1 正收益概率:{fmt_pct_rate(metric.d1_positive_rate)},样本数 {metric.sample_size}",
"",
"## 阶段日期表",
"",
stage_calendar_table(record),
"",
"## 基础事实",
"",
facts_table(record, stage),
"",
"## A/H 或内地上市股本检查",
"",
ah_overlay(record),
"",
"## 短线退出模型推断",
"",
f"- D1 正收益概率:{fmt_pct_rate(metric.d1_positive_rate)}",
f"- D1 涨幅不低于 10% 概率:{fmt_pct_rate(metric.d1_strong_rate)}",
f"- 同分桶历史 D1 平均收益:{fmt_num(metric.average_d1_return_pct, '%')}",
f"- 同分桶历史 D1 中位收益:{fmt_num(metric.median_d1_return_pct, '%')}",
"- T2 暗盘卖出收益暂未建模,直到项目确认可靠暗盘数据源。",
"- D5/D20/D60 只作为复盘标签,不是持仓目标。",
"",
"## 评分拆解",
"",
components_table(components),
"",
"## 正面因素",
"",
*reason_lines(components, positive=True),
"",
"## 风险与缺口",
"",
*reason_lines(components, positive=False),
*missing_field_lines(record, stage),
"- T2 暗盘信号暂未使用,因为项目还没有批准可复现的数据源。",
"- 上市后的 D5/D20/D60 表现只用于后续复盘,不是本模型的持仓周期目标。",
"",
"## 触发条件",
"",
"- 上调:T1 认购热度显著更强、分配稀缺性更好,或出现有规则支持的新正面催化。",
"- 下调:公开或国际需求偏弱、供给过大、关键字段质量不足,或市场窗口明显转差。",
"",
"## 退出计划",
"",
"- 如果申购并获配,且 T2 暗盘数据可靠且可执行,优先按 T2 暗盘卖出计划处理。",
"- 如果 T2 不可用或不可靠,默认使用 D1 作为卖出窗口。",
"- 不把 D5/D20/D60 作为本模型的计划持仓周期。",
"- 后续记录 D1/D5/D20/D60 结果时,只作为复盘标签,不作为倒推预测输入。",
"",
"## 来源路径",
"",
*source_lines,
]
return "\n".join(lines)
def main() -> int:
args = parse_args()
ticker = normalize_ticker(args.ticker)
as_of = parse_as_of(args.as_of)
dataset_path = Path(args.dataset)
rows = load_dataset(dataset_path)
record = find_record(rows, ticker, dataset_path)
stage = determine_stage(record, args.stage)
report = build_report(record, rows, stage, as_of)
if args.stdout:
print(report)
return 0
output_path = Path(args.output) if args.output else output_path_for(ticker, stage, as_of)
if output_path.exists():
raise SystemExit(f"Refusing to overwrite existing report: {output_path.as_posix()}")
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report + "\n", encoding="utf-8")
print(f"report written: {output_path.as_posix()}")
return 0
if __name__ == "__main__":
sys.exit(main())