Files
hk-ipo/scripts/generate_ipo_report.py
T
geometrybase 77b405e4f3 Add T0 analyst reports for active IPOs
Request:
- Analyze HK IPO ticker 01392 with the analyst skill.
- Preserve the in-flight 06132 archive/report work already created for the prior request.

Changes:
- Archived official HKEX prospectus PDFs and extracted text for 01392 and 06132.
- Seeded structured T0 facts into the SQLite archive and refreshed CSV snapshots and sync state.
- Rebuilt the v0 analysis dataset and model calibration report.
- Generated Simplified Chinese T0 prospectus-stage analyst reports for 01392 and 06132.
- Adjusted report stage calendars so T2 uses the previous business day before D1 when listing is separated from allocation by a weekend.

Verification:
- Compiled modified Python scripts with in-memory syntax checks.
- Ran SQLite quick_check and foreign_key_check.
- Confirmed DB row counts match CSV snapshots for key tables.
- Verified 01392/06132 source paths are repo-relative, raw files exist, hashes match, and PDF text manifest rows are ok.
- Ran git diff --cached --check.

Next useful context:
- 01392 T1 is due on 2026-06-18; rerun analyst after allotment results are archived.
- 06132 T1 is due on 2026-06-22; rerun analyst after allotment results are archived.
2026-06-15 14:51:44 +00:00

507 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Generate a stage-safe Markdown analyst report for one Hong Kong IPO."""
from __future__ import annotations
import argparse
import csv
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from statistics import mean, median
from typing import Any
MODEL_RULE_PATH = Path("rules/ipo_score_v0.yaml")
DEFAULT_DATASET_PATH = Path("data/snapshots/analysis_model_v0_dataset.csv")
DEFAULT_OUTPUT_DIR = Path("reports")
T0_STAGE = "T0_prospectus"
T1_STAGE = "T1_allotment"
AUTO_STAGE = "auto"
@dataclass(frozen=True)
class BucketMetric:
sample_size: int
d1_positive_rate: float | None
d1_strong_rate: float | None
average_d1_return_pct: float | None
median_d1_return_pct: float | None
@dataclass(frozen=True)
class ScoreComponent:
name: str
points: int
reason: str
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("ticker", help="HK ticker, for example 06658 or 6658.")
parser.add_argument(
"--stage",
choices=[AUTO_STAGE, T0_STAGE, T1_STAGE],
default=AUTO_STAGE,
help="Prediction stage to generate. Auto uses T1 when structured T1 demand is available.",
)
parser.add_argument("--dataset", default=str(DEFAULT_DATASET_PATH), help="Repo-relative model dataset path.")
parser.add_argument("--output", help="Output Markdown path. Defaults to reports/{date}_{ticker}_{stage}_analysis.md.")
parser.add_argument("--as-of", help="Report timestamp. Defaults to current UTC time.")
parser.add_argument("--stdout", action="store_true", help="Print the report instead of writing a file.")
return parser.parse_args()
def parse_as_of(value: str | None) -> str:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def normalize_ticker(value: str) -> str:
ticker = value.strip().upper()
if ticker.endswith(".HK"):
ticker = ticker[:-3]
if ticker.isdigit():
return ticker.zfill(5)
return ticker
def load_dataset(path: Path) -> list[dict[str, str]]:
if not path.exists():
raise SystemExit(f"Dataset not found: {path.as_posix()}. Run scripts/build_analysis_dataset.py first.")
with path.open(newline="", encoding="utf-8") as handle:
return list(csv.DictReader(handle))
def find_record(rows: list[dict[str, str]], ticker: str, dataset_path: Path) -> dict[str, str]:
for row in rows:
if normalize_ticker(row["ticker"]) == ticker:
return row
raise SystemExit(
f"Ticker {ticker} is not in {dataset_path.as_posix()}. "
"Use archivist to update archived facts, then rerun scripts/build_analysis_dataset.py."
)
def as_float(value: Any) -> float | None:
if value in {None, ""}:
return None
return float(value)
def as_int(value: Any) -> int | None:
if value in {None, ""}:
return None
return int(float(value))
def as_bool(value: Any) -> bool:
return str(value).strip() in {"1", "true", "True"}
def fmt_value(value: Any) -> str:
if value in {None, ""}:
return "未记录"
return str(value)
def fmt_num(value: float | None, suffix: str = "", decimals: int = 1) -> str:
if value is None:
return "未记录"
return f"{value:,.{decimals}f}{suffix}"
def fmt_pct_rate(value: float | None) -> str:
if value is None:
return "未记录"
return f"{value * 100:.1f}%"
def fmt_pct_points(value: float | None) -> str:
if value is None:
return "未记录"
return f"{value:.1f}%"
def fmt_money_m(value: float | None) -> str:
if value is None:
return "未记录"
return f"HK${value:,.1f}m"
def fmt_hkd(value: float | None) -> str:
if value is None:
return "未记录"
return f"HK${value:,.2f}"
def fmt_times(value: float | None) -> str:
if value is None:
return "未记录"
return f"{value:,.2f}x"
def fmt_int(value: int | None) -> str:
if value is None:
return "未记录"
return f"{value:,}"
def parse_date(value: str) -> datetime | None:
if not value:
return None
try:
return datetime.fromisoformat(value)
except ValueError:
return None
def previous_business_date(value: datetime) -> datetime:
day = value - timedelta(days=1)
while day.weekday() >= 5:
day -= timedelta(days=1)
return day
def determine_stage(record: dict[str, str], requested_stage: str) -> str:
if requested_stage == AUTO_STAGE:
return T1_STAGE if as_bool(record["has_structured_t1"]) else T0_STAGE
if requested_stage == T1_STAGE and not as_bool(record["has_structured_t1"]):
raise SystemExit(
f"{record['ticker']} has no structured T1 demand yet. "
f"Generate a {T0_STAGE} report or update the archive first."
)
return requested_stage
def output_path_for(ticker: str, stage: str, as_of: str) -> Path:
date_part = as_of[:10]
return DEFAULT_OUTPUT_DIR / f"{date_part}_{ticker}_{stage}_analysis.md"
def parse_components(text: str) -> list[ScoreComponent]:
components: list[ScoreComponent] = []
for item in text.split("|"):
if not item:
continue
name, points, reason = item.split(":", 2)
components.append(ScoreComponent(name=name, points=int(points), reason=reason))
return components
def bucket_metric(
rows: list[dict[str, str]],
bucket_key: str,
bucket_value: str,
require_t1: bool,
) -> BucketMetric:
returns: list[float] = []
for row in rows:
if require_t1 and not as_bool(row["has_structured_t1"]):
continue
if row.get(bucket_key) != bucket_value:
continue
d1_return = as_float(row.get("d1_return_pct"))
if d1_return is not None:
returns.append(d1_return)
if not returns:
return BucketMetric(0, None, None, None, None)
return BucketMetric(
sample_size=len(returns),
d1_positive_rate=sum(value > 0 for value in returns) / len(returns),
d1_strong_rate=sum(value >= 10 for value in returns) / len(returns),
average_d1_return_pct=mean(returns),
median_d1_return_pct=median(returns),
)
def t0_decision_band(score: int) -> str:
if score < 1:
return "weak_or_avoid"
if score <= 4:
return "neutral"
if score <= 7:
return "positive_watch"
return "strong_watch"
def action_for_decision(decision: str) -> str:
actions = {
"weak_or_avoid": "T0 阶段回避,除非后续 T1 认购热度明显改变格局。",
"neutral": "暂等 T1 分配结果,不在 T0 阶段主动下重注。",
"positive_watch": "正面观察,但需要等 T1 确认后再决定 T2/D1 退出仓位。",
"strong_watch": "T0 强关注,仍需等待 T1 认购热度确认后执行 T2/D1 退出纪律。",
"avoid": "回避申购。",
"avoid_or_wait": "回避或等待;没有更强催化前不放大仓位。",
"watch_or_small": "仅在执行条件支持 T2/D1 退出时小额参与。",
"selective_subscribe": "选择性申购,并严格按 T2/D1 卖出纪律控制仓位。",
"high_conviction_subscribe": "积极申购,但仍受分配、流动性和 T2/D1 卖出纪律约束。",
}
return actions[decision]
def component_label(name: str) -> str:
labels = {
"offer_size": "发行规模",
"public_pct": "初始公开发售比例",
"min_subscription": "最低认购金额",
"offer_price": "发行价",
"over_allotment": "超额配股权",
"public_os": "公开认购倍数",
"international_os": "国际配售认购倍数",
"valid_applications": "有效申请数",
"success_rate": "申请成功率",
"hk_reallocation": "香港公开发售回拨",
}
return labels.get(name, name.replace("_", " ").title())
def components_table(components: list[ScoreComponent]) -> str:
lines = ["| 评分项 | 分数 | 原因代码 |", "| --- | ---: | --- |"]
for component in components:
lines.append(f"| {component_label(component.name)} | {component.points} | `{component.reason}` |")
return "\n".join(lines)
def facts_table(record: dict[str, str], stage: str) -> str:
rows = [
("板块", fmt_value(record["board"])),
("状态", fmt_value(record["status"])),
("上市日期", fmt_value(record["listing_date"])),
("申购期", f"{fmt_value(record['application_start_date'])}{fmt_value(record['application_end_date'])}"),
("分配结果日期", fmt_value(record["allotment_results_expected_date"])),
("上市方式", fmt_value(record["listing_method"])),
("行业", fmt_value(record["industry_label"])),
("保荐人", fmt_value(record["sponsors"])),
("发行价", fmt_hkd(as_float(record["offer_price_hkd"]))),
("发行规模", fmt_money_m(as_float(record["offer_size_hkd_m"]))),
("市值", fmt_money_m(as_float(record["market_cap_hkd_m"]))),
("每手股数", fmt_int(as_int(record["board_lot"]))),
("最低认购金额", fmt_hkd(as_float(record["min_subscription_amount_hkd"]))),
("初始公开发售比例", fmt_pct_points(as_float(record["public_offer_pct_initial"]) * 100 if record["public_offer_pct_initial"] else None)),
("超额配股权股数", fmt_int(as_int(record["over_allotment_offer_shares"]))),
]
if stage == T1_STAGE:
rows.extend(
[
("公开认购倍数", fmt_times(as_float(record["public_oversubscription_times"]))),
("国际配售认购倍数", fmt_times(as_float(record["international_oversubscription_times"]))),
("有效申请数", fmt_int(as_int(record["valid_applications"]))),
("成功申请数", fmt_int(as_int(record["successful_applications"]))),
("申请成功率", fmt_pct_points(as_float(record["application_success_rate"]) * 100 if record["application_success_rate"] else None)),
("国际配售承配人数", fmt_int(as_int(record["international_placees"]))),
("香港公开发售回拨倍数", fmt_times(as_float(record["hk_offer_reallocation_multiple"]))),
]
)
lines = ["| 字段 | 数值 |", "| --- | --- |"]
for label, value in rows:
lines.append(f"| {label} | {value} |")
return "\n".join(lines)
def stage_calendar_table(record: dict[str, str]) -> str:
application_start = fmt_value(record["application_start_date"])
application_end = fmt_value(record["application_end_date"])
allotment_date = fmt_value(record["allotment_results_expected_date"])
listing_date = fmt_value(record["listing_date"])
listed = parse_date(record["listing_date"])
allotment = parse_date(record["allotment_results_expected_date"])
if listed:
previous = previous_business_date(listed)
if allotment and previous.date() == allotment.date():
t2_date = f"{previous.date().isoformat()} 分配结果公布后 / D1 前一交易日"
else:
t2_date = f"{previous.date().isoformat()},预计 D1 前一交易日"
elif allotment:
t2_date = f"{allotment.date().isoformat()} 分配结果公布后"
else:
t2_date = "未记录"
rows = [
(
"T0_prospectus",
f"{application_start}{application_end}",
"申购前/申购中阶段;只使用招股书和发行条款。",
),
(
"T1_allotment",
allotment_date,
"分配结果日;使用公开认购热度、国际配售热度和分配事实。",
),
(
"T2_grey_market",
t2_date,
"上市前暗盘窗口;只有存在可靠且可执行的数据源时才作为卖出依据。",
),
(
"D1",
listing_date,
"正式上市首日;T2 数据不可用或不可靠时的默认卖出窗口。",
),
]
lines = ["| 阶段 | 本 IPO 对应日期 | 含义 |", "| --- | --- | --- |"]
for stage, date_text, meaning in rows:
lines.append(f"| `{stage}` | {date_text} | {meaning} |")
return "\n".join(lines)
def source_paths(record: dict[str, str], stage: str) -> list[str]:
paths = []
if record["prospectus_source_path"]:
paths.append(record["prospectus_source_path"])
if stage == T1_STAGE and record["allotment_source_path"]:
paths.append(record["allotment_source_path"])
return paths
def reason_lines(components: list[ScoreComponent], positive: bool) -> list[str]:
filtered = [component for component in components if (component.points > 0 if positive else component.points < 0)]
filtered.sort(key=lambda component: component.points, reverse=positive)
if not filtered:
return ["- 没有明显正向评分项。" if positive else "- 没有明显负向评分项。"]
return [f"- {component_label(component.name)}{component.points:+d} (`{component.reason}`)。" for component in filtered[:5]]
def missing_field_lines(record: dict[str, str], stage: str) -> list[str]:
required = [
("industry_label", "行业"),
("market_cap_hkd_m", "市值"),
("min_subscription_amount_hkd", "最低认购金额"),
]
if stage == T1_STAGE:
required.extend(
[
("public_oversubscription_times", "公开认购倍数"),
("international_oversubscription_times", "国际配售认购倍数"),
("valid_applications", "有效申请数"),
("successful_applications", "成功申请数"),
]
)
missing = [label for key, label in required if not record.get(key)]
if not missing:
return ["- 本阶段必需字段没有明显空缺。"]
return [f"- 缺失或空白字段:{', '.join(missing)}。"]
def build_report(record: dict[str, str], rows: list[dict[str, str]], stage: str, as_of: str) -> str:
ticker = normalize_ticker(record["ticker"])
model_version = record["model_version"]
dataset_as_of = record["analysis_as_of"]
if stage == T0_STAGE:
score = as_int(record["t0_score"]) or 0
bucket = record["t0_score_bucket"]
decision = t0_decision_band(score)
components = parse_components(record["t0_score_breakdown"])
metric = bucket_metric(rows, "t0_score_bucket", bucket, require_t1=False)
else:
score = as_int(record["total_score"]) or 0
bucket = record["total_score_bucket"]
decision = record["decision_band"]
components = parse_components(record["t0_score_breakdown"]) + parse_components(record["t1_score_breakdown"])
metric = bucket_metric(rows, "total_score_bucket", bucket, require_t1=True)
paths = source_paths(record, stage)
source_lines = [f"- `{path}`" for path in paths] or ["- 本阶段没有记录来源路径。"]
lines = [
f"# {ticker} IPO 分析报告",
"",
"## 摘要",
"",
f"- 股票代码:`{ticker}`",
f"- 公司:{fmt_value(record['company_name_en'])}",
f"- 分析阶段:`{stage}`",
f"- 报告生成时间:`{as_of}`",
f"- 模型数据时间:`{dataset_as_of}`",
f"- 规则版本:`{model_version}`",
f"- 规则路径:`{MODEL_RULE_PATH.as_posix()}`",
"- 策略周期:短线 IPO 申购交易;优先在可靠 T2 暗盘卖出,否则默认 D1 卖出。",
f"- 结论代码:`{decision}`",
f"- 执行动作:{action_for_decision(decision)}",
f"- {'T0 分数' if stage == T0_STAGE else '总分'}`{score}`",
f"- 分数分桶:`{bucket}`",
f"- 历史校准 D1 正收益概率:{fmt_pct_rate(metric.d1_positive_rate)},样本数 {metric.sample_size}",
"",
"## 阶段日期表",
"",
stage_calendar_table(record),
"",
"## 基础事实",
"",
facts_table(record, stage),
"",
"## 短线退出模型推断",
"",
f"- D1 正收益概率:{fmt_pct_rate(metric.d1_positive_rate)}",
f"- D1 涨幅不低于 10% 概率:{fmt_pct_rate(metric.d1_strong_rate)}",
f"- 同分桶历史 D1 平均收益:{fmt_num(metric.average_d1_return_pct, '%')}",
f"- 同分桶历史 D1 中位收益:{fmt_num(metric.median_d1_return_pct, '%')}",
"- T2 暗盘卖出收益暂未建模,直到项目确认可靠暗盘数据源。",
"- D5/D20/D60 只作为复盘标签,不是持仓目标。",
"",
"## 评分拆解",
"",
components_table(components),
"",
"## 正面因素",
"",
*reason_lines(components, positive=True),
"",
"## 风险与缺口",
"",
*reason_lines(components, positive=False),
*missing_field_lines(record, stage),
"- T2 暗盘信号暂未使用,因为项目还没有批准可复现的数据源。",
"- 上市后的 D5/D20/D60 表现只用于后续复盘,不是本模型的持仓周期目标。",
"",
"## 触发条件",
"",
"- 上调:T1 认购热度显著更强、分配稀缺性更好,或出现有规则支持的新正面催化。",
"- 下调:公开或国际需求偏弱、供给过大、关键字段质量不足,或市场窗口明显转差。",
"",
"## 退出计划",
"",
"- 如果申购并获配,且 T2 暗盘数据可靠且可执行,优先按 T2 暗盘卖出计划处理。",
"- 如果 T2 不可用或不可靠,默认使用 D1 作为卖出窗口。",
"- 不把 D5/D20/D60 作为本模型的计划持仓周期。",
"- 后续记录 D1/D5/D20/D60 结果时,只作为复盘标签,不作为倒推预测输入。",
"",
"## 来源路径",
"",
*source_lines,
]
return "\n".join(lines)
def main() -> int:
args = parse_args()
ticker = normalize_ticker(args.ticker)
as_of = parse_as_of(args.as_of)
dataset_path = Path(args.dataset)
rows = load_dataset(dataset_path)
record = find_record(rows, ticker, dataset_path)
stage = determine_stage(record, args.stage)
report = build_report(record, rows, stage, as_of)
if args.stdout:
print(report)
return 0
output_path = Path(args.output) if args.output else output_path_for(ticker, stage, as_of)
if output_path.exists():
raise SystemExit(f"Refusing to overwrite existing report: {output_path.as_posix()}")
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report + "\n", encoding="utf-8")
print(f"report written: {output_path.as_posix()}")
return 0
if __name__ == "__main__":
sys.exit(main())