Add single IPO analyst report generator
Request: - Let the analyst skill generate a Markdown report directly when a new IPO ticker is provided. Changes: - Add scripts/generate_ipo_report.py for stage-safe single-ticker reports from the v0 analysis dataset. - Auto-select T1 reports when structured allotment demand exists and otherwise use T0 prospectus-stage reporting. - Keep post-listing D1/D5/D20/D60 outcomes out of prediction reports while using historical buckets for calibration. - Document the workflow in the analyst skill and README. Verification: - Ran py_compile for scripts/generate_ipo_report.py. - Generated stdout dry-run reports for 06106 and 06658. - Wrote temporary Markdown reports under /tmp for output-path validation. - Ran git diff --check. Next useful context: - Before generating a report for a ticker absent from the analysis dataset, run archivist updates and rebuild scripts/build_analysis_dataset.py.
This commit is contained in:
@@ -0,0 +1,439 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Generate a stage-safe Markdown analyst report for one Hong Kong IPO."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from statistics import mean, median
|
||||
from typing import Any
|
||||
|
||||
|
||||
MODEL_RULE_PATH = Path("rules/ipo_score_v0.yaml")
|
||||
DEFAULT_DATASET_PATH = Path("data/snapshots/analysis_model_v0_dataset.csv")
|
||||
DEFAULT_OUTPUT_DIR = Path("reports")
|
||||
|
||||
T0_STAGE = "T0_prospectus"
|
||||
T1_STAGE = "T1_allotment"
|
||||
AUTO_STAGE = "auto"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BucketMetric:
|
||||
sample_size: int
|
||||
d1_positive_rate: float | None
|
||||
d1_strong_rate: float | None
|
||||
average_d1_return_pct: float | None
|
||||
median_d1_return_pct: float | None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ScoreComponent:
|
||||
name: str
|
||||
points: int
|
||||
reason: str
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("ticker", help="HK ticker, for example 06658 or 6658.")
|
||||
parser.add_argument(
|
||||
"--stage",
|
||||
choices=[AUTO_STAGE, T0_STAGE, T1_STAGE],
|
||||
default=AUTO_STAGE,
|
||||
help="Prediction stage to generate. Auto uses T1 when structured T1 demand is available.",
|
||||
)
|
||||
parser.add_argument("--dataset", default=str(DEFAULT_DATASET_PATH), help="Repo-relative model dataset path.")
|
||||
parser.add_argument("--output", help="Output Markdown path. Defaults to reports/{date}_{ticker}_{stage}_analysis.md.")
|
||||
parser.add_argument("--as-of", help="Report timestamp. Defaults to current UTC time.")
|
||||
parser.add_argument("--stdout", action="store_true", help="Print the report instead of writing a file.")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def parse_as_of(value: str | None) -> str:
|
||||
if value:
|
||||
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
|
||||
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
||||
|
||||
|
||||
def normalize_ticker(value: str) -> str:
|
||||
ticker = value.strip().upper()
|
||||
if ticker.endswith(".HK"):
|
||||
ticker = ticker[:-3]
|
||||
if ticker.isdigit():
|
||||
return ticker.zfill(5)
|
||||
return ticker
|
||||
|
||||
|
||||
def load_dataset(path: Path) -> list[dict[str, str]]:
|
||||
if not path.exists():
|
||||
raise SystemExit(f"Dataset not found: {path.as_posix()}. Run scripts/build_analysis_dataset.py first.")
|
||||
with path.open(newline="", encoding="utf-8") as handle:
|
||||
return list(csv.DictReader(handle))
|
||||
|
||||
|
||||
def find_record(rows: list[dict[str, str]], ticker: str, dataset_path: Path) -> dict[str, str]:
|
||||
for row in rows:
|
||||
if normalize_ticker(row["ticker"]) == ticker:
|
||||
return row
|
||||
raise SystemExit(
|
||||
f"Ticker {ticker} is not in {dataset_path.as_posix()}. "
|
||||
"Use archivist to update archived facts, then rerun scripts/build_analysis_dataset.py."
|
||||
)
|
||||
|
||||
|
||||
def as_float(value: Any) -> float | None:
|
||||
if value in {None, ""}:
|
||||
return None
|
||||
return float(value)
|
||||
|
||||
|
||||
def as_int(value: Any) -> int | None:
|
||||
if value in {None, ""}:
|
||||
return None
|
||||
return int(float(value))
|
||||
|
||||
|
||||
def as_bool(value: Any) -> bool:
|
||||
return str(value).strip() in {"1", "true", "True"}
|
||||
|
||||
|
||||
def fmt_value(value: Any) -> str:
|
||||
if value in {None, ""}:
|
||||
return "n/a"
|
||||
return str(value)
|
||||
|
||||
|
||||
def fmt_num(value: float | None, suffix: str = "", decimals: int = 1) -> str:
|
||||
if value is None:
|
||||
return "n/a"
|
||||
return f"{value:,.{decimals}f}{suffix}"
|
||||
|
||||
|
||||
def fmt_pct_rate(value: float | None) -> str:
|
||||
if value is None:
|
||||
return "n/a"
|
||||
return f"{value * 100:.1f}%"
|
||||
|
||||
|
||||
def fmt_pct_points(value: float | None) -> str:
|
||||
if value is None:
|
||||
return "n/a"
|
||||
return f"{value:.1f}%"
|
||||
|
||||
|
||||
def fmt_money_m(value: float | None) -> str:
|
||||
if value is None:
|
||||
return "n/a"
|
||||
return f"HK${value:,.1f}m"
|
||||
|
||||
|
||||
def fmt_hkd(value: float | None) -> str:
|
||||
if value is None:
|
||||
return "n/a"
|
||||
return f"HK${value:,.2f}"
|
||||
|
||||
|
||||
def fmt_times(value: float | None) -> str:
|
||||
if value is None:
|
||||
return "n/a"
|
||||
return f"{value:,.2f}x"
|
||||
|
||||
|
||||
def fmt_int(value: int | None) -> str:
|
||||
if value is None:
|
||||
return "n/a"
|
||||
return f"{value:,}"
|
||||
|
||||
|
||||
def determine_stage(record: dict[str, str], requested_stage: str) -> str:
|
||||
if requested_stage == AUTO_STAGE:
|
||||
return T1_STAGE if as_bool(record["has_structured_t1"]) else T0_STAGE
|
||||
if requested_stage == T1_STAGE and not as_bool(record["has_structured_t1"]):
|
||||
raise SystemExit(
|
||||
f"{record['ticker']} has no structured T1 demand yet. "
|
||||
f"Generate a {T0_STAGE} report or update the archive first."
|
||||
)
|
||||
return requested_stage
|
||||
|
||||
|
||||
def output_path_for(ticker: str, stage: str, as_of: str) -> Path:
|
||||
date_part = as_of[:10]
|
||||
return DEFAULT_OUTPUT_DIR / f"{date_part}_{ticker}_{stage}_analysis.md"
|
||||
|
||||
|
||||
def parse_components(text: str) -> list[ScoreComponent]:
|
||||
components: list[ScoreComponent] = []
|
||||
for item in text.split("|"):
|
||||
if not item:
|
||||
continue
|
||||
name, points, reason = item.split(":", 2)
|
||||
components.append(ScoreComponent(name=name, points=int(points), reason=reason))
|
||||
return components
|
||||
|
||||
|
||||
def bucket_metric(
|
||||
rows: list[dict[str, str]],
|
||||
bucket_key: str,
|
||||
bucket_value: str,
|
||||
require_t1: bool,
|
||||
) -> BucketMetric:
|
||||
returns: list[float] = []
|
||||
for row in rows:
|
||||
if require_t1 and not as_bool(row["has_structured_t1"]):
|
||||
continue
|
||||
if row.get(bucket_key) != bucket_value:
|
||||
continue
|
||||
d1_return = as_float(row.get("d1_return_pct"))
|
||||
if d1_return is not None:
|
||||
returns.append(d1_return)
|
||||
if not returns:
|
||||
return BucketMetric(0, None, None, None, None)
|
||||
return BucketMetric(
|
||||
sample_size=len(returns),
|
||||
d1_positive_rate=sum(value > 0 for value in returns) / len(returns),
|
||||
d1_strong_rate=sum(value >= 10 for value in returns) / len(returns),
|
||||
average_d1_return_pct=mean(returns),
|
||||
median_d1_return_pct=median(returns),
|
||||
)
|
||||
|
||||
|
||||
def t0_decision_band(score: int) -> str:
|
||||
if score < 1:
|
||||
return "weak_or_avoid"
|
||||
if score <= 4:
|
||||
return "neutral"
|
||||
if score <= 7:
|
||||
return "positive_watch"
|
||||
return "strong_watch"
|
||||
|
||||
|
||||
def action_for_decision(decision: str) -> str:
|
||||
actions = {
|
||||
"weak_or_avoid": "Avoid at T0 unless later T1 demand changes the setup.",
|
||||
"neutral": "Wait for T1 allotment demand before subscribing.",
|
||||
"positive_watch": "Watch positively, but wait for T1 confirmation before sizing.",
|
||||
"strong_watch": "Strong watch at T0, still pending T1 demand confirmation.",
|
||||
"avoid": "Avoid subscription.",
|
||||
"avoid_or_wait": "Avoid or wait; do not size without a stronger catalyst.",
|
||||
"watch_or_small": "Small subscription only if execution constraints are favorable.",
|
||||
"selective_subscribe": "Selective subscription with disciplined sizing.",
|
||||
"high_conviction_subscribe": "Subscribe, subject to allocation and liquidity discipline.",
|
||||
}
|
||||
return actions[decision]
|
||||
|
||||
|
||||
def component_label(name: str) -> str:
|
||||
labels = {
|
||||
"offer_size": "Offer size",
|
||||
"public_pct": "Initial public offer percentage",
|
||||
"min_subscription": "Minimum subscription",
|
||||
"offer_price": "Offer price",
|
||||
"over_allotment": "Over-allotment option",
|
||||
"public_os": "Public oversubscription",
|
||||
"international_os": "International oversubscription",
|
||||
"valid_applications": "Valid applications",
|
||||
"success_rate": "Application success rate",
|
||||
"hk_reallocation": "HK public offer reallocation",
|
||||
}
|
||||
return labels.get(name, name.replace("_", " ").title())
|
||||
|
||||
|
||||
def components_table(components: list[ScoreComponent]) -> str:
|
||||
lines = ["| Component | Points | Reason |", "| --- | ---: | --- |"]
|
||||
for component in components:
|
||||
lines.append(f"| {component_label(component.name)} | {component.points} | `{component.reason}` |")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def facts_table(record: dict[str, str], stage: str) -> str:
|
||||
rows = [
|
||||
("Board", fmt_value(record["board"])),
|
||||
("Status", fmt_value(record["status"])),
|
||||
("Listing date", fmt_value(record["listing_date"])),
|
||||
("Application period", f"{fmt_value(record['application_start_date'])} to {fmt_value(record['application_end_date'])}"),
|
||||
("Allotment result date", fmt_value(record["allotment_results_expected_date"])),
|
||||
("Listing method", fmt_value(record["listing_method"])),
|
||||
("Industry", fmt_value(record["industry_label"])),
|
||||
("Sponsors", fmt_value(record["sponsors"])),
|
||||
("Offer price", fmt_hkd(as_float(record["offer_price_hkd"]))),
|
||||
("Offer size", fmt_money_m(as_float(record["offer_size_hkd_m"]))),
|
||||
("Market cap", fmt_money_m(as_float(record["market_cap_hkd_m"]))),
|
||||
("Board lot", fmt_int(as_int(record["board_lot"]))),
|
||||
("Minimum subscription", fmt_hkd(as_float(record["min_subscription_amount_hkd"]))),
|
||||
("Initial public offer percentage", fmt_pct_points(as_float(record["public_offer_pct_initial"]) * 100 if record["public_offer_pct_initial"] else None)),
|
||||
("Over-allotment shares", fmt_int(as_int(record["over_allotment_offer_shares"]))),
|
||||
]
|
||||
if stage == T1_STAGE:
|
||||
rows.extend(
|
||||
[
|
||||
("Public oversubscription", fmt_times(as_float(record["public_oversubscription_times"]))),
|
||||
("International oversubscription", fmt_times(as_float(record["international_oversubscription_times"]))),
|
||||
("Valid applications", fmt_int(as_int(record["valid_applications"]))),
|
||||
("Successful applications", fmt_int(as_int(record["successful_applications"]))),
|
||||
("Application success rate", fmt_pct_points(as_float(record["application_success_rate"]) * 100 if record["application_success_rate"] else None)),
|
||||
("International placees", fmt_int(as_int(record["international_placees"]))),
|
||||
("HK offer reallocation multiple", fmt_times(as_float(record["hk_offer_reallocation_multiple"]))),
|
||||
]
|
||||
)
|
||||
|
||||
lines = ["| Field | Value |", "| --- | --- |"]
|
||||
for label, value in rows:
|
||||
lines.append(f"| {label} | {value} |")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def source_paths(record: dict[str, str], stage: str) -> list[str]:
|
||||
paths = []
|
||||
if record["prospectus_source_path"]:
|
||||
paths.append(record["prospectus_source_path"])
|
||||
if stage == T1_STAGE and record["allotment_source_path"]:
|
||||
paths.append(record["allotment_source_path"])
|
||||
return paths
|
||||
|
||||
|
||||
def reason_lines(components: list[ScoreComponent], positive: bool) -> list[str]:
|
||||
filtered = [component for component in components if (component.points > 0 if positive else component.points < 0)]
|
||||
filtered.sort(key=lambda component: component.points, reverse=positive)
|
||||
if not filtered:
|
||||
return ["- No material positive scoring component." if positive else "- No material negative scoring component."]
|
||||
return [f"- {component_label(component.name)}: {component.points:+d} (`{component.reason}`)." for component in filtered[:5]]
|
||||
|
||||
|
||||
def missing_field_lines(record: dict[str, str], stage: str) -> list[str]:
|
||||
required = [
|
||||
("industry_label", "industry label"),
|
||||
("market_cap_hkd_m", "market cap"),
|
||||
("min_subscription_amount_hkd", "minimum subscription"),
|
||||
]
|
||||
if stage == T1_STAGE:
|
||||
required.extend(
|
||||
[
|
||||
("public_oversubscription_times", "public oversubscription"),
|
||||
("international_oversubscription_times", "international oversubscription"),
|
||||
("valid_applications", "valid applications"),
|
||||
("successful_applications", "successful applications"),
|
||||
]
|
||||
)
|
||||
missing = [label for key, label in required if not record.get(key)]
|
||||
if not missing:
|
||||
return ["- No required report field is blank for this stage."]
|
||||
return [f"- Missing or blank: {', '.join(missing)}."]
|
||||
|
||||
|
||||
def build_report(record: dict[str, str], rows: list[dict[str, str]], stage: str, as_of: str) -> str:
|
||||
ticker = normalize_ticker(record["ticker"])
|
||||
model_version = record["model_version"]
|
||||
dataset_as_of = record["analysis_as_of"]
|
||||
|
||||
if stage == T0_STAGE:
|
||||
score = as_int(record["t0_score"]) or 0
|
||||
bucket = record["t0_score_bucket"]
|
||||
decision = t0_decision_band(score)
|
||||
components = parse_components(record["t0_score_breakdown"])
|
||||
metric = bucket_metric(rows, "t0_score_bucket", bucket, require_t1=False)
|
||||
score_label = "T0 score"
|
||||
else:
|
||||
score = as_int(record["total_score"]) or 0
|
||||
bucket = record["total_score_bucket"]
|
||||
decision = record["decision_band"]
|
||||
components = parse_components(record["t0_score_breakdown"]) + parse_components(record["t1_score_breakdown"])
|
||||
metric = bucket_metric(rows, "total_score_bucket", bucket, require_t1=True)
|
||||
score_label = "Total score"
|
||||
|
||||
paths = source_paths(record, stage)
|
||||
source_lines = [f"- `{path}`" for path in paths] or ["- No source path recorded for this stage."]
|
||||
|
||||
lines = [
|
||||
f"# {ticker} IPO Analyst Report",
|
||||
"",
|
||||
"## Summary",
|
||||
"",
|
||||
f"- Ticker: `{ticker}`",
|
||||
f"- Company: {fmt_value(record['company_name_en'])}",
|
||||
f"- Stage: `{stage}`",
|
||||
f"- Report as of: `{as_of}`",
|
||||
f"- Model dataset as of: `{dataset_as_of}`",
|
||||
f"- Rule version: `{model_version}`",
|
||||
f"- Rule path: `{MODEL_RULE_PATH.as_posix()}`",
|
||||
f"- Decision: `{decision}`",
|
||||
f"- PM action: {action_for_decision(decision)}",
|
||||
f"- {score_label}: `{score}`",
|
||||
f"- Score bucket: `{bucket}`",
|
||||
f"- Calibrated D1 positive probability: {fmt_pct_rate(metric.d1_positive_rate)} from {metric.sample_size} historical D1 labels",
|
||||
"",
|
||||
"## Facts",
|
||||
"",
|
||||
facts_table(record, stage),
|
||||
"",
|
||||
"## Model Inference",
|
||||
"",
|
||||
f"- D1 positive probability: {fmt_pct_rate(metric.d1_positive_rate)}",
|
||||
f"- D1 >= 10% probability: {fmt_pct_rate(metric.d1_strong_rate)}",
|
||||
f"- Historical average D1 return for bucket: {fmt_num(metric.average_d1_return_pct, '%')}",
|
||||
f"- Historical median D1 return for bucket: {fmt_num(metric.median_d1_return_pct, '%')}",
|
||||
"",
|
||||
"## Score Breakdown",
|
||||
"",
|
||||
components_table(components),
|
||||
"",
|
||||
"## Bull Points",
|
||||
"",
|
||||
*reason_lines(components, positive=True),
|
||||
"",
|
||||
"## Risks And Gaps",
|
||||
"",
|
||||
*reason_lines(components, positive=False),
|
||||
*missing_field_lines(record, stage),
|
||||
"- T2 grey-market signal is not used because the project has no approved reproducible source.",
|
||||
"- Post-listing D1/D5/D20/D60 outcomes are labels for model calibration only and are not shown as prediction inputs.",
|
||||
"",
|
||||
"## Triggers",
|
||||
"",
|
||||
"- Upgrade: stronger verified T1 demand, better allocation scarcity, or a new rule-backed positive catalyst.",
|
||||
"- Downgrade: weak public or international demand, oversized supply, low-quality missing fields, or adverse market window.",
|
||||
"",
|
||||
"## Exit Plan",
|
||||
"",
|
||||
"- If subscribed and allocated, reassess after allotment and before first trading session using only information available at that stage.",
|
||||
"- For T1 reports without approved T2 data, treat first-day liquidity and position sizing conservatively.",
|
||||
"- Record actual D1/D5/D20/D60 outcomes later as review labels, not as retroactive prediction inputs.",
|
||||
"",
|
||||
"## Source Paths",
|
||||
"",
|
||||
*source_lines,
|
||||
"",
|
||||
]
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
ticker = normalize_ticker(args.ticker)
|
||||
as_of = parse_as_of(args.as_of)
|
||||
dataset_path = Path(args.dataset)
|
||||
|
||||
rows = load_dataset(dataset_path)
|
||||
record = find_record(rows, ticker, dataset_path)
|
||||
stage = determine_stage(record, args.stage)
|
||||
report = build_report(record, rows, stage, as_of)
|
||||
|
||||
if args.stdout:
|
||||
print(report)
|
||||
return 0
|
||||
|
||||
output_path = Path(args.output) if args.output else output_path_for(ticker, stage, as_of)
|
||||
if output_path.exists():
|
||||
raise SystemExit(f"Refusing to overwrite existing report: {output_path.as_posix()}")
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_text(report + "\n", encoding="utf-8")
|
||||
print(f"report written: {output_path.as_posix()}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user