#!/usr/bin/env python3 """Generate a stage-safe Markdown analyst report for one Hong Kong IPO.""" from __future__ import annotations import argparse import csv import sys from dataclasses import dataclass from datetime import datetime, timedelta, timezone from pathlib import Path from statistics import mean, median from typing import Any MODEL_RULE_PATH = Path("rules/ipo_score_v0.yaml") DEFAULT_DATASET_PATH = Path("data/snapshots/analysis_model_v0_dataset.csv") DEFAULT_OUTPUT_DIR = Path("reports") T0_STAGE = "T0_prospectus" T1_STAGE = "T1_allotment" AUTO_STAGE = "auto" @dataclass(frozen=True) class BucketMetric: sample_size: int d1_positive_rate: float | None d1_strong_rate: float | None average_d1_return_pct: float | None median_d1_return_pct: float | None @dataclass(frozen=True) class ScoreComponent: name: str points: int reason: str def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("ticker", help="HK ticker, for example 06658 or 6658.") parser.add_argument( "--stage", choices=[AUTO_STAGE, T0_STAGE, T1_STAGE], default=AUTO_STAGE, help="Prediction stage to generate. Auto uses T1 when structured T1 demand is available.", ) parser.add_argument("--dataset", default=str(DEFAULT_DATASET_PATH), help="Repo-relative model dataset path.") parser.add_argument("--output", help="Output Markdown path. Defaults to reports/{date}_{ticker}_{stage}_analysis.md.") parser.add_argument("--as-of", help="Report timestamp. Defaults to current UTC time.") parser.add_argument("--stdout", action="store_true", help="Print the report instead of writing a file.") return parser.parse_args() def parse_as_of(value: str | None) -> str: if value: return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z") return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def normalize_ticker(value: str) -> str: ticker = value.strip().upper() if ticker.endswith(".HK"): ticker = ticker[:-3] if ticker.isdigit(): return ticker.zfill(5) return ticker def load_dataset(path: Path) -> list[dict[str, str]]: if not path.exists(): raise SystemExit(f"Dataset not found: {path.as_posix()}. Run scripts/build_analysis_dataset.py first.") with path.open(newline="", encoding="utf-8") as handle: return list(csv.DictReader(handle)) def find_record(rows: list[dict[str, str]], ticker: str, dataset_path: Path) -> dict[str, str]: for row in rows: if normalize_ticker(row["ticker"]) == ticker: return row raise SystemExit( f"Ticker {ticker} is not in {dataset_path.as_posix()}. " "Use archivist to update archived facts, then rerun scripts/build_analysis_dataset.py." ) def as_float(value: Any) -> float | None: if value in {None, ""}: return None return float(value) def as_int(value: Any) -> int | None: if value in {None, ""}: return None return int(float(value)) def as_bool(value: Any) -> bool: return str(value).strip() in {"1", "true", "True"} def fmt_value(value: Any) -> str: if value in {None, ""}: return "未记录" return str(value) def fmt_num(value: float | None, suffix: str = "", decimals: int = 1) -> str: if value is None: return "未记录" return f"{value:,.{decimals}f}{suffix}" def fmt_pct_rate(value: float | None) -> str: if value is None: return "未记录" return f"{value * 100:.1f}%" def fmt_pct_points(value: float | None) -> str: if value is None: return "未记录" return f"{value:.1f}%" def fmt_money_m(value: float | None) -> str: if value is None: return "未记录" return f"HK${value:,.1f}m" def fmt_hkd(value: float | None) -> str: if value is None: return "未记录" return f"HK${value:,.2f}" def fmt_times(value: float | None) -> str: if value is None: return "未记录" return f"{value:,.2f}x" def fmt_int(value: int | None) -> str: if value is None: return "未记录" return f"{value:,}" def chinese_display_name(record: dict[str, str]) -> str: return record.get("stock_short_name") or record.get("company_name_zh") or "" def company_display_name(record: dict[str, str]) -> str: short_name = record.get("stock_short_name") or "" chinese_name = record.get("company_name_zh") or "" english_name = record.get("company_name_en") or "" if short_name and chinese_name and short_name != chinese_name: return f"{short_name}(法定中文名:{chinese_name})" if short_name: return short_name if chinese_name and english_name: return f"{chinese_name}({english_name})" return chinese_name or english_name or "未记录" def parse_date(value: str) -> datetime | None: if not value: return None try: return datetime.fromisoformat(value) except ValueError: return None def previous_business_date(value: datetime) -> datetime: day = value - timedelta(days=1) while day.weekday() >= 5: day -= timedelta(days=1) return day def determine_stage(record: dict[str, str], requested_stage: str) -> str: if requested_stage == AUTO_STAGE: return T1_STAGE if as_bool(record["has_structured_t1"]) else T0_STAGE if requested_stage == T1_STAGE and not as_bool(record["has_structured_t1"]): raise SystemExit( f"{record['ticker']} has no structured T1 demand yet. " f"Generate a {T0_STAGE} report or update the archive first." ) return requested_stage def output_path_for(ticker: str, stage: str, as_of: str) -> Path: date_part = as_of[:10] return DEFAULT_OUTPUT_DIR / f"{date_part}_{ticker}_{stage}_analysis.md" def parse_components(text: str) -> list[ScoreComponent]: components: list[ScoreComponent] = [] for item in text.split("|"): if not item: continue name, points, reason = item.split(":", 2) components.append(ScoreComponent(name=name, points=int(points), reason=reason)) return components def bucket_metric( rows: list[dict[str, str]], bucket_key: str, bucket_value: str, require_t1: bool, ) -> BucketMetric: returns: list[float] = [] for row in rows: if require_t1 and not as_bool(row["has_structured_t1"]): continue if row.get(bucket_key) != bucket_value: continue d1_return = as_float(row.get("d1_return_pct")) if d1_return is not None: returns.append(d1_return) if not returns: return BucketMetric(0, None, None, None, None) return BucketMetric( sample_size=len(returns), d1_positive_rate=sum(value > 0 for value in returns) / len(returns), d1_strong_rate=sum(value >= 10 for value in returns) / len(returns), average_d1_return_pct=mean(returns), median_d1_return_pct=median(returns), ) def t0_decision_band(score: int) -> str: if score < 1: return "weak_or_avoid" if score <= 4: return "neutral" if score <= 7: return "positive_watch" return "strong_watch" def action_for_decision(decision: str) -> str: actions = { "weak_or_avoid": "T0 阶段回避,除非后续 T1 认购热度明显改变格局。", "neutral": "暂等 T1 分配结果,不在 T0 阶段主动下重注。", "positive_watch": "正面观察,但需要等 T1 确认后再决定 T2/D1 退出仓位。", "strong_watch": "T0 强关注,仍需等待 T1 认购热度确认后执行 T2/D1 退出纪律。", "avoid": "回避申购。", "avoid_or_wait": "回避或等待;没有更强催化前不放大仓位。", "watch_or_small": "仅在执行条件支持 T2/D1 退出时小额参与。", "selective_subscribe": "选择性申购,并严格按 T2/D1 卖出纪律控制仓位。", "high_conviction_subscribe": "积极申购,但仍受分配、流动性和 T2/D1 卖出纪律约束。", } return actions[decision] def component_label(name: str) -> str: labels = { "offer_size": "发行规模", "public_pct": "初始公开发售比例", "min_subscription": "最低认购金额", "offer_price": "发行价", "over_allotment": "超额配股权", "public_os": "公开认购倍数", "international_os": "国际配售认购倍数", "valid_applications": "有效申请数", "success_rate": "申请成功率", "hk_reallocation": "香港公开发售回拨", } return labels.get(name, name.replace("_", " ").title()) def components_table(components: list[ScoreComponent]) -> str: lines = ["| 评分项 | 分数 | 原因代码 |", "| --- | ---: | --- |"] for component in components: lines.append(f"| {component_label(component.name)} | {component.points} | `{component.reason}` |") return "\n".join(lines) def facts_table(record: dict[str, str], stage: str) -> str: rows = [ ("板块", fmt_value(record["board"])), ("状态", fmt_value(record["status"])), ("上市日期", fmt_value(record["listing_date"])), ("申购期", f"{fmt_value(record['application_start_date'])} 至 {fmt_value(record['application_end_date'])}"), ("分配结果日期", fmt_value(record["allotment_results_expected_date"])), ("上市方式", fmt_value(record["listing_method"])), ("行业", fmt_value(record["industry_label"])), ("保荐人", fmt_value(record["sponsors"])), ("发行价", fmt_hkd(as_float(record["offer_price_hkd"]))), ("发行规模", fmt_money_m(as_float(record["offer_size_hkd_m"]))), ("市值", fmt_money_m(as_float(record["market_cap_hkd_m"]))), ("每手股数", fmt_int(as_int(record["board_lot"]))), ("最低认购金额", fmt_hkd(as_float(record["min_subscription_amount_hkd"]))), ("初始公开发售比例", fmt_pct_points(as_float(record["public_offer_pct_initial"]) * 100 if record["public_offer_pct_initial"] else None)), ("超额配股权股数", fmt_int(as_int(record["over_allotment_offer_shares"]))), ] if stage == T1_STAGE: rows.extend( [ ("公开认购倍数", fmt_times(as_float(record["public_oversubscription_times"]))), ("国际配售认购倍数", fmt_times(as_float(record["international_oversubscription_times"]))), ("有效申请数", fmt_int(as_int(record["valid_applications"]))), ("成功申请数", fmt_int(as_int(record["successful_applications"]))), ("申请成功率", fmt_pct_points(as_float(record["application_success_rate"]) * 100 if record["application_success_rate"] else None)), ("国际配售承配人数", fmt_int(as_int(record["international_placees"]))), ("香港公开发售回拨倍数", fmt_times(as_float(record["hk_offer_reallocation_multiple"]))), ] ) lines = ["| 字段 | 数值 |", "| --- | --- |"] for label, value in rows: lines.append(f"| {label} | {value} |") return "\n".join(lines) def stage_calendar_table(record: dict[str, str]) -> str: application_start = fmt_value(record["application_start_date"]) application_end = fmt_value(record["application_end_date"]) allotment_date = fmt_value(record["allotment_results_expected_date"]) listing_date = fmt_value(record["listing_date"]) listed = parse_date(record["listing_date"]) allotment = parse_date(record["allotment_results_expected_date"]) if listed: previous = previous_business_date(listed) if allotment and previous.date() == allotment.date(): t2_date = f"{previous.date().isoformat()} 分配结果公布后 / D1 前一交易日" else: t2_date = f"{previous.date().isoformat()},预计 D1 前一交易日" elif allotment: t2_date = f"{allotment.date().isoformat()} 分配结果公布后" else: t2_date = "未记录" rows = [ ( "T0_prospectus", f"{application_start} 至 {application_end}", "申购前/申购中阶段;只使用招股书和发行条款。", ), ( "T1_allotment", allotment_date, "分配结果日;使用公开认购热度、国际配售热度和分配事实。", ), ( "T2_grey_market", t2_date, "上市前暗盘窗口;只有存在可靠且可执行的数据源时才作为卖出依据。", ), ( "D1", listing_date, "正式上市首日;T2 数据不可用或不可靠时的默认卖出窗口。", ), ] lines = ["| 阶段 | 本 IPO 对应日期 | 含义 |", "| --- | --- | --- |"] for stage, date_text, meaning in rows: lines.append(f"| `{stage}` | {date_text} | {meaning} |") return "\n".join(lines) def source_paths(record: dict[str, str], stage: str) -> list[str]: paths = [] if record["prospectus_source_path"]: paths.append(record["prospectus_source_path"]) if stage == T1_STAGE and record["allotment_source_path"]: paths.append(record["allotment_source_path"]) return paths def reason_lines(components: list[ScoreComponent], positive: bool) -> list[str]: filtered = [component for component in components if (component.points > 0 if positive else component.points < 0)] filtered.sort(key=lambda component: component.points, reverse=positive) if not filtered: return ["- 没有明显正向评分项。" if positive else "- 没有明显负向评分项。"] return [f"- {component_label(component.name)}:{component.points:+d} (`{component.reason}`)。" for component in filtered[:5]] def missing_field_lines(record: dict[str, str], stage: str) -> list[str]: required = [ ("industry_label", "行业"), ("market_cap_hkd_m", "市值"), ("min_subscription_amount_hkd", "最低认购金额"), ] if stage == T1_STAGE: required.extend( [ ("public_oversubscription_times", "公开认购倍数"), ("international_oversubscription_times", "国际配售认购倍数"), ("valid_applications", "有效申请数"), ("successful_applications", "成功申请数"), ] ) missing = [label for key, label in required if not record.get(key)] if not missing: return ["- 本阶段必需字段没有明显空缺。"] return [f"- 缺失或空白字段:{', '.join(missing)}。"] def build_report(record: dict[str, str], rows: list[dict[str, str]], stage: str, as_of: str) -> str: ticker = normalize_ticker(record["ticker"]) model_version = record["model_version"] dataset_as_of = record["analysis_as_of"] if stage == T0_STAGE: score = as_int(record["t0_score"]) or 0 bucket = record["t0_score_bucket"] decision = t0_decision_band(score) components = parse_components(record["t0_score_breakdown"]) metric = bucket_metric(rows, "t0_score_bucket", bucket, require_t1=False) else: score = as_int(record["total_score"]) or 0 bucket = record["total_score_bucket"] decision = record["decision_band"] components = parse_components(record["t0_score_breakdown"]) + parse_components(record["t1_score_breakdown"]) metric = bucket_metric(rows, "total_score_bucket", bucket, require_t1=True) paths = source_paths(record, stage) source_lines = [f"- `{path}`" for path in paths] or ["- 本阶段没有记录来源路径。"] company_name = company_display_name(record) title_name = chinese_display_name(record) or record.get("company_name_en") or "" title_prefix = f"{ticker} {title_name}" if title_name else ticker lines = [ f"# {title_prefix} IPO 分析报告", "", "## 摘要", "", f"- 股票代码:`{ticker}`", f"- 公司:{company_name}", f"- 分析阶段:`{stage}`", f"- 报告生成时间:`{as_of}`", f"- 模型数据时间:`{dataset_as_of}`", f"- 规则版本:`{model_version}`", f"- 规则路径:`{MODEL_RULE_PATH.as_posix()}`", "- 策略周期:短线 IPO 申购交易;优先在可靠 T2 暗盘卖出,否则默认 D1 卖出。", f"- 结论代码:`{decision}`", f"- 执行动作:{action_for_decision(decision)}", f"- {'T0 分数' if stage == T0_STAGE else '总分'}:`{score}`", f"- 分数分桶:`{bucket}`", f"- 历史校准 D1 正收益概率:{fmt_pct_rate(metric.d1_positive_rate)},样本数 {metric.sample_size}", "", "## 阶段日期表", "", stage_calendar_table(record), "", "## 基础事实", "", facts_table(record, stage), "", "## 短线退出模型推断", "", f"- D1 正收益概率:{fmt_pct_rate(metric.d1_positive_rate)}", f"- D1 涨幅不低于 10% 概率:{fmt_pct_rate(metric.d1_strong_rate)}", f"- 同分桶历史 D1 平均收益:{fmt_num(metric.average_d1_return_pct, '%')}", f"- 同分桶历史 D1 中位收益:{fmt_num(metric.median_d1_return_pct, '%')}", "- T2 暗盘卖出收益暂未建模,直到项目确认可靠暗盘数据源。", "- D5/D20/D60 只作为复盘标签,不是持仓目标。", "", "## 评分拆解", "", components_table(components), "", "## 正面因素", "", *reason_lines(components, positive=True), "", "## 风险与缺口", "", *reason_lines(components, positive=False), *missing_field_lines(record, stage), "- T2 暗盘信号暂未使用,因为项目还没有批准可复现的数据源。", "- 上市后的 D5/D20/D60 表现只用于后续复盘,不是本模型的持仓周期目标。", "", "## 触发条件", "", "- 上调:T1 认购热度显著更强、分配稀缺性更好,或出现有规则支持的新正面催化。", "- 下调:公开或国际需求偏弱、供给过大、关键字段质量不足,或市场窗口明显转差。", "", "## 退出计划", "", "- 如果申购并获配,且 T2 暗盘数据可靠且可执行,优先按 T2 暗盘卖出计划处理。", "- 如果 T2 不可用或不可靠,默认使用 D1 作为卖出窗口。", "- 不把 D5/D20/D60 作为本模型的计划持仓周期。", "- 后续记录 D1/D5/D20/D60 结果时,只作为复盘标签,不作为倒推预测输入。", "", "## 来源路径", "", *source_lines, ] return "\n".join(lines) def main() -> int: args = parse_args() ticker = normalize_ticker(args.ticker) as_of = parse_as_of(args.as_of) dataset_path = Path(args.dataset) rows = load_dataset(dataset_path) record = find_record(rows, ticker, dataset_path) stage = determine_stage(record, args.stage) report = build_report(record, rows, stage, as_of) if args.stdout: print(report) return 0 output_path = Path(args.output) if args.output else output_path_for(ticker, stage, as_of) if output_path.exists(): raise SystemExit(f"Refusing to overwrite existing report: {output_path.as_posix()}") output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(report + "\n", encoding="utf-8") print(f"report written: {output_path.as_posix()}") return 0 if __name__ == "__main__": sys.exit(main())