#!/usr/bin/env python3 """Generate a stage-safe Markdown analyst report for one Hong Kong IPO.""" from __future__ import annotations import argparse import csv import sys from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from statistics import mean, median from typing import Any MODEL_RULE_PATH = Path("rules/ipo_score_v0.yaml") DEFAULT_DATASET_PATH = Path("data/snapshots/analysis_model_v0_dataset.csv") DEFAULT_OUTPUT_DIR = Path("reports") T0_STAGE = "T0_prospectus" T1_STAGE = "T1_allotment" AUTO_STAGE = "auto" @dataclass(frozen=True) class BucketMetric: sample_size: int d1_positive_rate: float | None d1_strong_rate: float | None average_d1_return_pct: float | None median_d1_return_pct: float | None @dataclass(frozen=True) class ScoreComponent: name: str points: int reason: str def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("ticker", help="HK ticker, for example 06658 or 6658.") parser.add_argument( "--stage", choices=[AUTO_STAGE, T0_STAGE, T1_STAGE], default=AUTO_STAGE, help="Prediction stage to generate. Auto uses T1 when structured T1 demand is available.", ) parser.add_argument("--dataset", default=str(DEFAULT_DATASET_PATH), help="Repo-relative model dataset path.") parser.add_argument("--output", help="Output Markdown path. Defaults to reports/{date}_{ticker}_{stage}_analysis.md.") parser.add_argument("--as-of", help="Report timestamp. Defaults to current UTC time.") parser.add_argument("--stdout", action="store_true", help="Print the report instead of writing a file.") return parser.parse_args() def parse_as_of(value: str | None) -> str: if value: return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z") return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def normalize_ticker(value: str) -> str: ticker = value.strip().upper() if ticker.endswith(".HK"): ticker = ticker[:-3] if ticker.isdigit(): return ticker.zfill(5) return ticker def load_dataset(path: Path) -> list[dict[str, str]]: if not path.exists(): raise SystemExit(f"Dataset not found: {path.as_posix()}. Run scripts/build_analysis_dataset.py first.") with path.open(newline="", encoding="utf-8") as handle: return list(csv.DictReader(handle)) def find_record(rows: list[dict[str, str]], ticker: str, dataset_path: Path) -> dict[str, str]: for row in rows: if normalize_ticker(row["ticker"]) == ticker: return row raise SystemExit( f"Ticker {ticker} is not in {dataset_path.as_posix()}. " "Use archivist to update archived facts, then rerun scripts/build_analysis_dataset.py." ) def as_float(value: Any) -> float | None: if value in {None, ""}: return None return float(value) def as_int(value: Any) -> int | None: if value in {None, ""}: return None return int(float(value)) def as_bool(value: Any) -> bool: return str(value).strip() in {"1", "true", "True"} def fmt_value(value: Any) -> str: if value in {None, ""}: return "n/a" return str(value) def fmt_num(value: float | None, suffix: str = "", decimals: int = 1) -> str: if value is None: return "n/a" return f"{value:,.{decimals}f}{suffix}" def fmt_pct_rate(value: float | None) -> str: if value is None: return "n/a" return f"{value * 100:.1f}%" def fmt_pct_points(value: float | None) -> str: if value is None: return "n/a" return f"{value:.1f}%" def fmt_money_m(value: float | None) -> str: if value is None: return "n/a" return f"HK${value:,.1f}m" def fmt_hkd(value: float | None) -> str: if value is None: return "n/a" return f"HK${value:,.2f}" def fmt_times(value: float | None) -> str: if value is None: return "n/a" return f"{value:,.2f}x" def fmt_int(value: int | None) -> str: if value is None: return "n/a" return f"{value:,}" def determine_stage(record: dict[str, str], requested_stage: str) -> str: if requested_stage == AUTO_STAGE: return T1_STAGE if as_bool(record["has_structured_t1"]) else T0_STAGE if requested_stage == T1_STAGE and not as_bool(record["has_structured_t1"]): raise SystemExit( f"{record['ticker']} has no structured T1 demand yet. " f"Generate a {T0_STAGE} report or update the archive first." ) return requested_stage def output_path_for(ticker: str, stage: str, as_of: str) -> Path: date_part = as_of[:10] return DEFAULT_OUTPUT_DIR / f"{date_part}_{ticker}_{stage}_analysis.md" def parse_components(text: str) -> list[ScoreComponent]: components: list[ScoreComponent] = [] for item in text.split("|"): if not item: continue name, points, reason = item.split(":", 2) components.append(ScoreComponent(name=name, points=int(points), reason=reason)) return components def bucket_metric( rows: list[dict[str, str]], bucket_key: str, bucket_value: str, require_t1: bool, ) -> BucketMetric: returns: list[float] = [] for row in rows: if require_t1 and not as_bool(row["has_structured_t1"]): continue if row.get(bucket_key) != bucket_value: continue d1_return = as_float(row.get("d1_return_pct")) if d1_return is not None: returns.append(d1_return) if not returns: return BucketMetric(0, None, None, None, None) return BucketMetric( sample_size=len(returns), d1_positive_rate=sum(value > 0 for value in returns) / len(returns), d1_strong_rate=sum(value >= 10 for value in returns) / len(returns), average_d1_return_pct=mean(returns), median_d1_return_pct=median(returns), ) def t0_decision_band(score: int) -> str: if score < 1: return "weak_or_avoid" if score <= 4: return "neutral" if score <= 7: return "positive_watch" return "strong_watch" def action_for_decision(decision: str) -> str: actions = { "weak_or_avoid": "Avoid at T0 unless later T1 demand changes the setup.", "neutral": "Wait for T1 allotment demand before subscribing.", "positive_watch": "Watch positively, but wait for T1 confirmation before sizing for a T2/D1 exit.", "strong_watch": "Strong watch at T0, still pending T1 demand confirmation for a T2/D1 exit.", "avoid": "Avoid subscription.", "avoid_or_wait": "Avoid or wait; do not size without a stronger catalyst.", "watch_or_small": "Small subscription only if execution constraints support a T2/D1 exit.", "selective_subscribe": "Selective subscription with disciplined T2/D1 sell sizing.", "high_conviction_subscribe": "Subscribe, subject to allocation, liquidity, and T2/D1 sell discipline.", } return actions[decision] def component_label(name: str) -> str: labels = { "offer_size": "Offer size", "public_pct": "Initial public offer percentage", "min_subscription": "Minimum subscription", "offer_price": "Offer price", "over_allotment": "Over-allotment option", "public_os": "Public oversubscription", "international_os": "International oversubscription", "valid_applications": "Valid applications", "success_rate": "Application success rate", "hk_reallocation": "HK public offer reallocation", } return labels.get(name, name.replace("_", " ").title()) def components_table(components: list[ScoreComponent]) -> str: lines = ["| Component | Points | Reason |", "| --- | ---: | --- |"] for component in components: lines.append(f"| {component_label(component.name)} | {component.points} | `{component.reason}` |") return "\n".join(lines) def facts_table(record: dict[str, str], stage: str) -> str: rows = [ ("Board", fmt_value(record["board"])), ("Status", fmt_value(record["status"])), ("Listing date", fmt_value(record["listing_date"])), ("Application period", f"{fmt_value(record['application_start_date'])} to {fmt_value(record['application_end_date'])}"), ("Allotment result date", fmt_value(record["allotment_results_expected_date"])), ("Listing method", fmt_value(record["listing_method"])), ("Industry", fmt_value(record["industry_label"])), ("Sponsors", fmt_value(record["sponsors"])), ("Offer price", fmt_hkd(as_float(record["offer_price_hkd"]))), ("Offer size", fmt_money_m(as_float(record["offer_size_hkd_m"]))), ("Market cap", fmt_money_m(as_float(record["market_cap_hkd_m"]))), ("Board lot", fmt_int(as_int(record["board_lot"]))), ("Minimum subscription", fmt_hkd(as_float(record["min_subscription_amount_hkd"]))), ("Initial public offer percentage", fmt_pct_points(as_float(record["public_offer_pct_initial"]) * 100 if record["public_offer_pct_initial"] else None)), ("Over-allotment shares", fmt_int(as_int(record["over_allotment_offer_shares"]))), ] if stage == T1_STAGE: rows.extend( [ ("Public oversubscription", fmt_times(as_float(record["public_oversubscription_times"]))), ("International oversubscription", fmt_times(as_float(record["international_oversubscription_times"]))), ("Valid applications", fmt_int(as_int(record["valid_applications"]))), ("Successful applications", fmt_int(as_int(record["successful_applications"]))), ("Application success rate", fmt_pct_points(as_float(record["application_success_rate"]) * 100 if record["application_success_rate"] else None)), ("International placees", fmt_int(as_int(record["international_placees"]))), ("HK offer reallocation multiple", fmt_times(as_float(record["hk_offer_reallocation_multiple"]))), ] ) lines = ["| Field | Value |", "| --- | --- |"] for label, value in rows: lines.append(f"| {label} | {value} |") return "\n".join(lines) def source_paths(record: dict[str, str], stage: str) -> list[str]: paths = [] if record["prospectus_source_path"]: paths.append(record["prospectus_source_path"]) if stage == T1_STAGE and record["allotment_source_path"]: paths.append(record["allotment_source_path"]) return paths def reason_lines(components: list[ScoreComponent], positive: bool) -> list[str]: filtered = [component for component in components if (component.points > 0 if positive else component.points < 0)] filtered.sort(key=lambda component: component.points, reverse=positive) if not filtered: return ["- No material positive scoring component." if positive else "- No material negative scoring component."] return [f"- {component_label(component.name)}: {component.points:+d} (`{component.reason}`)." for component in filtered[:5]] def missing_field_lines(record: dict[str, str], stage: str) -> list[str]: required = [ ("industry_label", "industry label"), ("market_cap_hkd_m", "market cap"), ("min_subscription_amount_hkd", "minimum subscription"), ] if stage == T1_STAGE: required.extend( [ ("public_oversubscription_times", "public oversubscription"), ("international_oversubscription_times", "international oversubscription"), ("valid_applications", "valid applications"), ("successful_applications", "successful applications"), ] ) missing = [label for key, label in required if not record.get(key)] if not missing: return ["- No required report field is blank for this stage."] return [f"- Missing or blank: {', '.join(missing)}."] def build_report(record: dict[str, str], rows: list[dict[str, str]], stage: str, as_of: str) -> str: ticker = normalize_ticker(record["ticker"]) model_version = record["model_version"] dataset_as_of = record["analysis_as_of"] if stage == T0_STAGE: score = as_int(record["t0_score"]) or 0 bucket = record["t0_score_bucket"] decision = t0_decision_band(score) components = parse_components(record["t0_score_breakdown"]) metric = bucket_metric(rows, "t0_score_bucket", bucket, require_t1=False) score_label = "T0 score" else: score = as_int(record["total_score"]) or 0 bucket = record["total_score_bucket"] decision = record["decision_band"] components = parse_components(record["t0_score_breakdown"]) + parse_components(record["t1_score_breakdown"]) metric = bucket_metric(rows, "total_score_bucket", bucket, require_t1=True) score_label = "Total score" paths = source_paths(record, stage) source_lines = [f"- `{path}`" for path in paths] or ["- No source path recorded for this stage."] lines = [ f"# {ticker} IPO Analyst Report", "", "## Summary", "", f"- Ticker: `{ticker}`", f"- Company: {fmt_value(record['company_name_en'])}", f"- Stage: `{stage}`", f"- Report as of: `{as_of}`", f"- Model dataset as of: `{dataset_as_of}`", f"- Rule version: `{model_version}`", f"- Rule path: `{MODEL_RULE_PATH.as_posix()}`", "- Strategy horizon: short IPO subscription trade; intended exit is T2 grey market if reliable, otherwise D1.", f"- Decision: `{decision}`", f"- PM action: {action_for_decision(decision)}", f"- {score_label}: `{score}`", f"- Score bucket: `{bucket}`", f"- Calibrated D1 positive probability: {fmt_pct_rate(metric.d1_positive_rate)} from {metric.sample_size} historical D1 labels", "", "## Facts", "", facts_table(record, stage), "", "## Short-Exit Model Inference", "", f"- D1 positive probability: {fmt_pct_rate(metric.d1_positive_rate)}", f"- D1 >= 10% probability: {fmt_pct_rate(metric.d1_strong_rate)}", f"- Historical average D1 return for bucket: {fmt_num(metric.average_d1_return_pct, '%')}", f"- Historical median D1 return for bucket: {fmt_num(metric.median_d1_return_pct, '%')}", "- T2 sell return is not modeled until an approved grey-market data source exists.", "- D5/D20/D60 outcomes are review labels only, not holding targets.", "", "## Score Breakdown", "", components_table(components), "", "## Bull Points", "", *reason_lines(components, positive=True), "", "## Risks And Gaps", "", *reason_lines(components, positive=False), *missing_field_lines(record, stage), "- T2 grey-market signal is not used yet because the project has no approved reproducible source.", "- Post-listing D5/D20/D60 outcomes are labels for later review only and are not holding-period targets.", "", "## Triggers", "", "- Upgrade: stronger verified T1 demand, better allocation scarcity, or a new rule-backed positive catalyst.", "- Downgrade: weak public or international demand, oversized supply, low-quality missing fields, or adverse market window.", "", "## Exit Plan", "", "- If subscribed and allocated, plan to sell in T2 grey market when reliable executable data is available.", "- If T2 is unavailable or unreliable, use D1 as the default exit window.", "- Do not treat D5/D20/D60 as planned holding periods for this model.", "- Record D1/D5/D20/D60 outcomes later as review labels, not as retroactive prediction inputs.", "", "## Source Paths", "", *source_lines, "", ] return "\n".join(lines) def main() -> int: args = parse_args() ticker = normalize_ticker(args.ticker) as_of = parse_as_of(args.as_of) dataset_path = Path(args.dataset) rows = load_dataset(dataset_path) record = find_record(rows, ticker, dataset_path) stage = determine_stage(record, args.stage) report = build_report(record, rows, stage, as_of) if args.stdout: print(report) return 0 output_path = Path(args.output) if args.output else output_path_for(ticker, stage, as_of) if output_path.exists(): raise SystemExit(f"Refusing to overwrite existing report: {output_path.as_posix()}") output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(report + "\n", encoding="utf-8") print(f"report written: {output_path.as_posix()}") return 0 if __name__ == "__main__": sys.exit(main())