From 1227f2c7c47950c36be216e96e3165211af622eb Mon Sep 17 00:00:00 2001 From: geometrybase Date: Mon, 15 Jun 2026 14:11:18 +0000 Subject: [PATCH] Add single IPO analyst report generator Request: - Let the analyst skill generate a Markdown report directly when a new IPO ticker is provided. Changes: - Add scripts/generate_ipo_report.py for stage-safe single-ticker reports from the v0 analysis dataset. - Auto-select T1 reports when structured allotment demand exists and otherwise use T0 prospectus-stage reporting. - Keep post-listing D1/D5/D20/D60 outcomes out of prediction reports while using historical buckets for calibration. - Document the workflow in the analyst skill and README. Verification: - Ran py_compile for scripts/generate_ipo_report.py. - Generated stdout dry-run reports for 06106 and 06658. - Wrote temporary Markdown reports under /tmp for output-path validation. - Ran git diff --check. Next useful context: - Before generating a report for a ticker absent from the analysis dataset, run archivist updates and rebuild scripts/build_analysis_dataset.py. --- .codex/skills/analyst/SKILL.md | 19 ++ README.md | 13 + scripts/generate_ipo_report.py | 439 +++++++++++++++++++++++++++++++++ 3 files changed, 471 insertions(+) create mode 100644 scripts/generate_ipo_report.py diff --git a/.codex/skills/analyst/SKILL.md b/.codex/skills/analyst/SKILL.md index adc0877..82e52cf 100644 --- a/.codex/skills/analyst/SKILL.md +++ b/.codex/skills/analyst/SKILL.md @@ -60,6 +60,25 @@ Do not overwrite prediction cards. If a view changes, write a new stage card or 8. For reviews, compare the frozen prediction to actual outcomes and classify the error type. 9. Commit only the related memo/report/rule changes after verification. +## Single-Ticker Markdown Report + +When the user gives a single IPO ticker and asks for an analyst report, use the report generator after archived facts and the analysis dataset are current: + +```bash +.venv/bin/python scripts/build_analysis_dataset.py --as-of YYYY-MM-DDTHH:MM:SSZ +.venv/bin/python scripts/generate_ipo_report.py 06658 --stage auto +``` + +The generator writes `reports/{date}_{ticker}_{stage}_analysis.md` by default. Use `--stdout` for a dry run, `--stage T0_prospectus` to force a prospectus-stage report, or `--stage T1_allotment` only when structured T1 demand exists. + +If the ticker is absent from `data/snapshots/analysis_model_v0_dataset.csv`, use `archivist` first to archive the IPO facts and rebuild the analysis dataset before generating the report. + +Generated prediction reports must remain stage-safe: + +- T0 reports use only prospectus-stage fields and T0 calibration. +- T1 reports may add allotment demand fields and T1 calibration. +- D1/D5/D20/D60 returns are never shown as prediction inputs; they are reserved for later review cards. + ## Output Standards Every prediction card should include: diff --git a/README.md b/README.md index ee6dbea..3f057e3 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,19 @@ The v0 model is documented in `rules/ipo_score_v0.yaml`. It writes `data/snapsho The model separates T0 prospectus inputs from T1 allotment inputs. D1/D5/D20/D60 returns are labels for calibration and review, not prediction inputs. +## Single IPO Markdown Report + +Use the analyst report generator after the archive and model dataset are current: + +```bash +.venv/bin/python scripts/build_analysis_dataset.py --as-of 2026-06-15T15:00:00Z +.venv/bin/python scripts/generate_ipo_report.py 06106 --stage auto +``` + +The generator writes `reports/{date}_{ticker}_{stage}_analysis.md` by default. It auto-selects `T1_allotment` when structured allotment-demand facts exist; otherwise it generates a `T0_prospectus` report. Use `--stdout` for a dry run or `--output` to choose a specific Markdown path. + +Prediction reports are stage-safe: T0 reports use only prospectus-stage facts and T0 calibration, while T1 reports add allotment demand and T1 calibration. Post-listing D1/D5/D20/D60 performance stays out of prediction reports and is reserved for review cards. + ## Incremental Archive Sync The archivist keeps a per-ticker sync ledger so repeated updates can focus on missing stages: diff --git a/scripts/generate_ipo_report.py b/scripts/generate_ipo_report.py new file mode 100644 index 0000000..85a65bf --- /dev/null +++ b/scripts/generate_ipo_report.py @@ -0,0 +1,439 @@ +#!/usr/bin/env python3 +"""Generate a stage-safe Markdown analyst report for one Hong Kong IPO.""" + +from __future__ import annotations + +import argparse +import csv +import sys +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from statistics import mean, median +from typing import Any + + +MODEL_RULE_PATH = Path("rules/ipo_score_v0.yaml") +DEFAULT_DATASET_PATH = Path("data/snapshots/analysis_model_v0_dataset.csv") +DEFAULT_OUTPUT_DIR = Path("reports") + +T0_STAGE = "T0_prospectus" +T1_STAGE = "T1_allotment" +AUTO_STAGE = "auto" + + +@dataclass(frozen=True) +class BucketMetric: + sample_size: int + d1_positive_rate: float | None + d1_strong_rate: float | None + average_d1_return_pct: float | None + median_d1_return_pct: float | None + + +@dataclass(frozen=True) +class ScoreComponent: + name: str + points: int + reason: str + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("ticker", help="HK ticker, for example 06658 or 6658.") + parser.add_argument( + "--stage", + choices=[AUTO_STAGE, T0_STAGE, T1_STAGE], + default=AUTO_STAGE, + help="Prediction stage to generate. Auto uses T1 when structured T1 demand is available.", + ) + parser.add_argument("--dataset", default=str(DEFAULT_DATASET_PATH), help="Repo-relative model dataset path.") + parser.add_argument("--output", help="Output Markdown path. Defaults to reports/{date}_{ticker}_{stage}_analysis.md.") + parser.add_argument("--as-of", help="Report timestamp. Defaults to current UTC time.") + parser.add_argument("--stdout", action="store_true", help="Print the report instead of writing a file.") + return parser.parse_args() + + +def parse_as_of(value: str | None) -> str: + if value: + return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z") + return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def normalize_ticker(value: str) -> str: + ticker = value.strip().upper() + if ticker.endswith(".HK"): + ticker = ticker[:-3] + if ticker.isdigit(): + return ticker.zfill(5) + return ticker + + +def load_dataset(path: Path) -> list[dict[str, str]]: + if not path.exists(): + raise SystemExit(f"Dataset not found: {path.as_posix()}. Run scripts/build_analysis_dataset.py first.") + with path.open(newline="", encoding="utf-8") as handle: + return list(csv.DictReader(handle)) + + +def find_record(rows: list[dict[str, str]], ticker: str, dataset_path: Path) -> dict[str, str]: + for row in rows: + if normalize_ticker(row["ticker"]) == ticker: + return row + raise SystemExit( + f"Ticker {ticker} is not in {dataset_path.as_posix()}. " + "Use archivist to update archived facts, then rerun scripts/build_analysis_dataset.py." + ) + + +def as_float(value: Any) -> float | None: + if value in {None, ""}: + return None + return float(value) + + +def as_int(value: Any) -> int | None: + if value in {None, ""}: + return None + return int(float(value)) + + +def as_bool(value: Any) -> bool: + return str(value).strip() in {"1", "true", "True"} + + +def fmt_value(value: Any) -> str: + if value in {None, ""}: + return "n/a" + return str(value) + + +def fmt_num(value: float | None, suffix: str = "", decimals: int = 1) -> str: + if value is None: + return "n/a" + return f"{value:,.{decimals}f}{suffix}" + + +def fmt_pct_rate(value: float | None) -> str: + if value is None: + return "n/a" + return f"{value * 100:.1f}%" + + +def fmt_pct_points(value: float | None) -> str: + if value is None: + return "n/a" + return f"{value:.1f}%" + + +def fmt_money_m(value: float | None) -> str: + if value is None: + return "n/a" + return f"HK${value:,.1f}m" + + +def fmt_hkd(value: float | None) -> str: + if value is None: + return "n/a" + return f"HK${value:,.2f}" + + +def fmt_times(value: float | None) -> str: + if value is None: + return "n/a" + return f"{value:,.2f}x" + + +def fmt_int(value: int | None) -> str: + if value is None: + return "n/a" + return f"{value:,}" + + +def determine_stage(record: dict[str, str], requested_stage: str) -> str: + if requested_stage == AUTO_STAGE: + return T1_STAGE if as_bool(record["has_structured_t1"]) else T0_STAGE + if requested_stage == T1_STAGE and not as_bool(record["has_structured_t1"]): + raise SystemExit( + f"{record['ticker']} has no structured T1 demand yet. " + f"Generate a {T0_STAGE} report or update the archive first." + ) + return requested_stage + + +def output_path_for(ticker: str, stage: str, as_of: str) -> Path: + date_part = as_of[:10] + return DEFAULT_OUTPUT_DIR / f"{date_part}_{ticker}_{stage}_analysis.md" + + +def parse_components(text: str) -> list[ScoreComponent]: + components: list[ScoreComponent] = [] + for item in text.split("|"): + if not item: + continue + name, points, reason = item.split(":", 2) + components.append(ScoreComponent(name=name, points=int(points), reason=reason)) + return components + + +def bucket_metric( + rows: list[dict[str, str]], + bucket_key: str, + bucket_value: str, + require_t1: bool, +) -> BucketMetric: + returns: list[float] = [] + for row in rows: + if require_t1 and not as_bool(row["has_structured_t1"]): + continue + if row.get(bucket_key) != bucket_value: + continue + d1_return = as_float(row.get("d1_return_pct")) + if d1_return is not None: + returns.append(d1_return) + if not returns: + return BucketMetric(0, None, None, None, None) + return BucketMetric( + sample_size=len(returns), + d1_positive_rate=sum(value > 0 for value in returns) / len(returns), + d1_strong_rate=sum(value >= 10 for value in returns) / len(returns), + average_d1_return_pct=mean(returns), + median_d1_return_pct=median(returns), + ) + + +def t0_decision_band(score: int) -> str: + if score < 1: + return "weak_or_avoid" + if score <= 4: + return "neutral" + if score <= 7: + return "positive_watch" + return "strong_watch" + + +def action_for_decision(decision: str) -> str: + actions = { + "weak_or_avoid": "Avoid at T0 unless later T1 demand changes the setup.", + "neutral": "Wait for T1 allotment demand before subscribing.", + "positive_watch": "Watch positively, but wait for T1 confirmation before sizing.", + "strong_watch": "Strong watch at T0, still pending T1 demand confirmation.", + "avoid": "Avoid subscription.", + "avoid_or_wait": "Avoid or wait; do not size without a stronger catalyst.", + "watch_or_small": "Small subscription only if execution constraints are favorable.", + "selective_subscribe": "Selective subscription with disciplined sizing.", + "high_conviction_subscribe": "Subscribe, subject to allocation and liquidity discipline.", + } + return actions[decision] + + +def component_label(name: str) -> str: + labels = { + "offer_size": "Offer size", + "public_pct": "Initial public offer percentage", + "min_subscription": "Minimum subscription", + "offer_price": "Offer price", + "over_allotment": "Over-allotment option", + "public_os": "Public oversubscription", + "international_os": "International oversubscription", + "valid_applications": "Valid applications", + "success_rate": "Application success rate", + "hk_reallocation": "HK public offer reallocation", + } + return labels.get(name, name.replace("_", " ").title()) + + +def components_table(components: list[ScoreComponent]) -> str: + lines = ["| Component | Points | Reason |", "| --- | ---: | --- |"] + for component in components: + lines.append(f"| {component_label(component.name)} | {component.points} | `{component.reason}` |") + return "\n".join(lines) + + +def facts_table(record: dict[str, str], stage: str) -> str: + rows = [ + ("Board", fmt_value(record["board"])), + ("Status", fmt_value(record["status"])), + ("Listing date", fmt_value(record["listing_date"])), + ("Application period", f"{fmt_value(record['application_start_date'])} to {fmt_value(record['application_end_date'])}"), + ("Allotment result date", fmt_value(record["allotment_results_expected_date"])), + ("Listing method", fmt_value(record["listing_method"])), + ("Industry", fmt_value(record["industry_label"])), + ("Sponsors", fmt_value(record["sponsors"])), + ("Offer price", fmt_hkd(as_float(record["offer_price_hkd"]))), + ("Offer size", fmt_money_m(as_float(record["offer_size_hkd_m"]))), + ("Market cap", fmt_money_m(as_float(record["market_cap_hkd_m"]))), + ("Board lot", fmt_int(as_int(record["board_lot"]))), + ("Minimum subscription", fmt_hkd(as_float(record["min_subscription_amount_hkd"]))), + ("Initial public offer percentage", fmt_pct_points(as_float(record["public_offer_pct_initial"]) * 100 if record["public_offer_pct_initial"] else None)), + ("Over-allotment shares", fmt_int(as_int(record["over_allotment_offer_shares"]))), + ] + if stage == T1_STAGE: + rows.extend( + [ + ("Public oversubscription", fmt_times(as_float(record["public_oversubscription_times"]))), + ("International oversubscription", fmt_times(as_float(record["international_oversubscription_times"]))), + ("Valid applications", fmt_int(as_int(record["valid_applications"]))), + ("Successful applications", fmt_int(as_int(record["successful_applications"]))), + ("Application success rate", fmt_pct_points(as_float(record["application_success_rate"]) * 100 if record["application_success_rate"] else None)), + ("International placees", fmt_int(as_int(record["international_placees"]))), + ("HK offer reallocation multiple", fmt_times(as_float(record["hk_offer_reallocation_multiple"]))), + ] + ) + + lines = ["| Field | Value |", "| --- | --- |"] + for label, value in rows: + lines.append(f"| {label} | {value} |") + return "\n".join(lines) + + +def source_paths(record: dict[str, str], stage: str) -> list[str]: + paths = [] + if record["prospectus_source_path"]: + paths.append(record["prospectus_source_path"]) + if stage == T1_STAGE and record["allotment_source_path"]: + paths.append(record["allotment_source_path"]) + return paths + + +def reason_lines(components: list[ScoreComponent], positive: bool) -> list[str]: + filtered = [component for component in components if (component.points > 0 if positive else component.points < 0)] + filtered.sort(key=lambda component: component.points, reverse=positive) + if not filtered: + return ["- No material positive scoring component." if positive else "- No material negative scoring component."] + return [f"- {component_label(component.name)}: {component.points:+d} (`{component.reason}`)." for component in filtered[:5]] + + +def missing_field_lines(record: dict[str, str], stage: str) -> list[str]: + required = [ + ("industry_label", "industry label"), + ("market_cap_hkd_m", "market cap"), + ("min_subscription_amount_hkd", "minimum subscription"), + ] + if stage == T1_STAGE: + required.extend( + [ + ("public_oversubscription_times", "public oversubscription"), + ("international_oversubscription_times", "international oversubscription"), + ("valid_applications", "valid applications"), + ("successful_applications", "successful applications"), + ] + ) + missing = [label for key, label in required if not record.get(key)] + if not missing: + return ["- No required report field is blank for this stage."] + return [f"- Missing or blank: {', '.join(missing)}."] + + +def build_report(record: dict[str, str], rows: list[dict[str, str]], stage: str, as_of: str) -> str: + ticker = normalize_ticker(record["ticker"]) + model_version = record["model_version"] + dataset_as_of = record["analysis_as_of"] + + if stage == T0_STAGE: + score = as_int(record["t0_score"]) or 0 + bucket = record["t0_score_bucket"] + decision = t0_decision_band(score) + components = parse_components(record["t0_score_breakdown"]) + metric = bucket_metric(rows, "t0_score_bucket", bucket, require_t1=False) + score_label = "T0 score" + else: + score = as_int(record["total_score"]) or 0 + bucket = record["total_score_bucket"] + decision = record["decision_band"] + components = parse_components(record["t0_score_breakdown"]) + parse_components(record["t1_score_breakdown"]) + metric = bucket_metric(rows, "total_score_bucket", bucket, require_t1=True) + score_label = "Total score" + + paths = source_paths(record, stage) + source_lines = [f"- `{path}`" for path in paths] or ["- No source path recorded for this stage."] + + lines = [ + f"# {ticker} IPO Analyst Report", + "", + "## Summary", + "", + f"- Ticker: `{ticker}`", + f"- Company: {fmt_value(record['company_name_en'])}", + f"- Stage: `{stage}`", + f"- Report as of: `{as_of}`", + f"- Model dataset as of: `{dataset_as_of}`", + f"- Rule version: `{model_version}`", + f"- Rule path: `{MODEL_RULE_PATH.as_posix()}`", + f"- Decision: `{decision}`", + f"- PM action: {action_for_decision(decision)}", + f"- {score_label}: `{score}`", + f"- Score bucket: `{bucket}`", + f"- Calibrated D1 positive probability: {fmt_pct_rate(metric.d1_positive_rate)} from {metric.sample_size} historical D1 labels", + "", + "## Facts", + "", + facts_table(record, stage), + "", + "## Model Inference", + "", + f"- D1 positive probability: {fmt_pct_rate(metric.d1_positive_rate)}", + f"- D1 >= 10% probability: {fmt_pct_rate(metric.d1_strong_rate)}", + f"- Historical average D1 return for bucket: {fmt_num(metric.average_d1_return_pct, '%')}", + f"- Historical median D1 return for bucket: {fmt_num(metric.median_d1_return_pct, '%')}", + "", + "## Score Breakdown", + "", + components_table(components), + "", + "## Bull Points", + "", + *reason_lines(components, positive=True), + "", + "## Risks And Gaps", + "", + *reason_lines(components, positive=False), + *missing_field_lines(record, stage), + "- T2 grey-market signal is not used because the project has no approved reproducible source.", + "- Post-listing D1/D5/D20/D60 outcomes are labels for model calibration only and are not shown as prediction inputs.", + "", + "## Triggers", + "", + "- Upgrade: stronger verified T1 demand, better allocation scarcity, or a new rule-backed positive catalyst.", + "- Downgrade: weak public or international demand, oversized supply, low-quality missing fields, or adverse market window.", + "", + "## Exit Plan", + "", + "- If subscribed and allocated, reassess after allotment and before first trading session using only information available at that stage.", + "- For T1 reports without approved T2 data, treat first-day liquidity and position sizing conservatively.", + "- Record actual D1/D5/D20/D60 outcomes later as review labels, not as retroactive prediction inputs.", + "", + "## Source Paths", + "", + *source_lines, + "", + ] + return "\n".join(lines) + + +def main() -> int: + args = parse_args() + ticker = normalize_ticker(args.ticker) + as_of = parse_as_of(args.as_of) + dataset_path = Path(args.dataset) + + rows = load_dataset(dataset_path) + record = find_record(rows, ticker, dataset_path) + stage = determine_stage(record, args.stage) + report = build_report(record, rows, stage, as_of) + + if args.stdout: + print(report) + return 0 + + output_path = Path(args.output) if args.output else output_path_for(ticker, stage, as_of) + if output_path.exists(): + raise SystemExit(f"Refusing to overwrite existing report: {output_path.as_posix()}") + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(report + "\n", encoding="utf-8") + print(f"report written: {output_path.as_posix()}") + return 0 + + +if __name__ == "__main__": + sys.exit(main())