1227f2c7c4
Request: - Let the analyst skill generate a Markdown report directly when a new IPO ticker is provided. Changes: - Add scripts/generate_ipo_report.py for stage-safe single-ticker reports from the v0 analysis dataset. - Auto-select T1 reports when structured allotment demand exists and otherwise use T0 prospectus-stage reporting. - Keep post-listing D1/D5/D20/D60 outcomes out of prediction reports while using historical buckets for calibration. - Document the workflow in the analyst skill and README. Verification: - Ran py_compile for scripts/generate_ipo_report.py. - Generated stdout dry-run reports for 06106 and 06658. - Wrote temporary Markdown reports under /tmp for output-path validation. - Ran git diff --check. Next useful context: - Before generating a report for a ticker absent from the analysis dataset, run archivist updates and rebuild scripts/build_analysis_dataset.py.
440 lines
16 KiB
Python
440 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""Generate a stage-safe Markdown analyst report for one Hong Kong IPO."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from statistics import mean, median
|
|
from typing import Any
|
|
|
|
|
|
MODEL_RULE_PATH = Path("rules/ipo_score_v0.yaml")
|
|
DEFAULT_DATASET_PATH = Path("data/snapshots/analysis_model_v0_dataset.csv")
|
|
DEFAULT_OUTPUT_DIR = Path("reports")
|
|
|
|
T0_STAGE = "T0_prospectus"
|
|
T1_STAGE = "T1_allotment"
|
|
AUTO_STAGE = "auto"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BucketMetric:
|
|
sample_size: int
|
|
d1_positive_rate: float | None
|
|
d1_strong_rate: float | None
|
|
average_d1_return_pct: float | None
|
|
median_d1_return_pct: float | None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ScoreComponent:
|
|
name: str
|
|
points: int
|
|
reason: str
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("ticker", help="HK ticker, for example 06658 or 6658.")
|
|
parser.add_argument(
|
|
"--stage",
|
|
choices=[AUTO_STAGE, T0_STAGE, T1_STAGE],
|
|
default=AUTO_STAGE,
|
|
help="Prediction stage to generate. Auto uses T1 when structured T1 demand is available.",
|
|
)
|
|
parser.add_argument("--dataset", default=str(DEFAULT_DATASET_PATH), help="Repo-relative model dataset path.")
|
|
parser.add_argument("--output", help="Output Markdown path. Defaults to reports/{date}_{ticker}_{stage}_analysis.md.")
|
|
parser.add_argument("--as-of", help="Report timestamp. Defaults to current UTC time.")
|
|
parser.add_argument("--stdout", action="store_true", help="Print the report instead of writing a file.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def parse_as_of(value: str | None) -> str:
|
|
if value:
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
|
|
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def normalize_ticker(value: str) -> str:
|
|
ticker = value.strip().upper()
|
|
if ticker.endswith(".HK"):
|
|
ticker = ticker[:-3]
|
|
if ticker.isdigit():
|
|
return ticker.zfill(5)
|
|
return ticker
|
|
|
|
|
|
def load_dataset(path: Path) -> list[dict[str, str]]:
|
|
if not path.exists():
|
|
raise SystemExit(f"Dataset not found: {path.as_posix()}. Run scripts/build_analysis_dataset.py first.")
|
|
with path.open(newline="", encoding="utf-8") as handle:
|
|
return list(csv.DictReader(handle))
|
|
|
|
|
|
def find_record(rows: list[dict[str, str]], ticker: str, dataset_path: Path) -> dict[str, str]:
|
|
for row in rows:
|
|
if normalize_ticker(row["ticker"]) == ticker:
|
|
return row
|
|
raise SystemExit(
|
|
f"Ticker {ticker} is not in {dataset_path.as_posix()}. "
|
|
"Use archivist to update archived facts, then rerun scripts/build_analysis_dataset.py."
|
|
)
|
|
|
|
|
|
def as_float(value: Any) -> float | None:
|
|
if value in {None, ""}:
|
|
return None
|
|
return float(value)
|
|
|
|
|
|
def as_int(value: Any) -> int | None:
|
|
if value in {None, ""}:
|
|
return None
|
|
return int(float(value))
|
|
|
|
|
|
def as_bool(value: Any) -> bool:
|
|
return str(value).strip() in {"1", "true", "True"}
|
|
|
|
|
|
def fmt_value(value: Any) -> str:
|
|
if value in {None, ""}:
|
|
return "n/a"
|
|
return str(value)
|
|
|
|
|
|
def fmt_num(value: float | None, suffix: str = "", decimals: int = 1) -> str:
|
|
if value is None:
|
|
return "n/a"
|
|
return f"{value:,.{decimals}f}{suffix}"
|
|
|
|
|
|
def fmt_pct_rate(value: float | None) -> str:
|
|
if value is None:
|
|
return "n/a"
|
|
return f"{value * 100:.1f}%"
|
|
|
|
|
|
def fmt_pct_points(value: float | None) -> str:
|
|
if value is None:
|
|
return "n/a"
|
|
return f"{value:.1f}%"
|
|
|
|
|
|
def fmt_money_m(value: float | None) -> str:
|
|
if value is None:
|
|
return "n/a"
|
|
return f"HK${value:,.1f}m"
|
|
|
|
|
|
def fmt_hkd(value: float | None) -> str:
|
|
if value is None:
|
|
return "n/a"
|
|
return f"HK${value:,.2f}"
|
|
|
|
|
|
def fmt_times(value: float | None) -> str:
|
|
if value is None:
|
|
return "n/a"
|
|
return f"{value:,.2f}x"
|
|
|
|
|
|
def fmt_int(value: int | None) -> str:
|
|
if value is None:
|
|
return "n/a"
|
|
return f"{value:,}"
|
|
|
|
|
|
def determine_stage(record: dict[str, str], requested_stage: str) -> str:
|
|
if requested_stage == AUTO_STAGE:
|
|
return T1_STAGE if as_bool(record["has_structured_t1"]) else T0_STAGE
|
|
if requested_stage == T1_STAGE and not as_bool(record["has_structured_t1"]):
|
|
raise SystemExit(
|
|
f"{record['ticker']} has no structured T1 demand yet. "
|
|
f"Generate a {T0_STAGE} report or update the archive first."
|
|
)
|
|
return requested_stage
|
|
|
|
|
|
def output_path_for(ticker: str, stage: str, as_of: str) -> Path:
|
|
date_part = as_of[:10]
|
|
return DEFAULT_OUTPUT_DIR / f"{date_part}_{ticker}_{stage}_analysis.md"
|
|
|
|
|
|
def parse_components(text: str) -> list[ScoreComponent]:
|
|
components: list[ScoreComponent] = []
|
|
for item in text.split("|"):
|
|
if not item:
|
|
continue
|
|
name, points, reason = item.split(":", 2)
|
|
components.append(ScoreComponent(name=name, points=int(points), reason=reason))
|
|
return components
|
|
|
|
|
|
def bucket_metric(
|
|
rows: list[dict[str, str]],
|
|
bucket_key: str,
|
|
bucket_value: str,
|
|
require_t1: bool,
|
|
) -> BucketMetric:
|
|
returns: list[float] = []
|
|
for row in rows:
|
|
if require_t1 and not as_bool(row["has_structured_t1"]):
|
|
continue
|
|
if row.get(bucket_key) != bucket_value:
|
|
continue
|
|
d1_return = as_float(row.get("d1_return_pct"))
|
|
if d1_return is not None:
|
|
returns.append(d1_return)
|
|
if not returns:
|
|
return BucketMetric(0, None, None, None, None)
|
|
return BucketMetric(
|
|
sample_size=len(returns),
|
|
d1_positive_rate=sum(value > 0 for value in returns) / len(returns),
|
|
d1_strong_rate=sum(value >= 10 for value in returns) / len(returns),
|
|
average_d1_return_pct=mean(returns),
|
|
median_d1_return_pct=median(returns),
|
|
)
|
|
|
|
|
|
def t0_decision_band(score: int) -> str:
|
|
if score < 1:
|
|
return "weak_or_avoid"
|
|
if score <= 4:
|
|
return "neutral"
|
|
if score <= 7:
|
|
return "positive_watch"
|
|
return "strong_watch"
|
|
|
|
|
|
def action_for_decision(decision: str) -> str:
|
|
actions = {
|
|
"weak_or_avoid": "Avoid at T0 unless later T1 demand changes the setup.",
|
|
"neutral": "Wait for T1 allotment demand before subscribing.",
|
|
"positive_watch": "Watch positively, but wait for T1 confirmation before sizing.",
|
|
"strong_watch": "Strong watch at T0, still pending T1 demand confirmation.",
|
|
"avoid": "Avoid subscription.",
|
|
"avoid_or_wait": "Avoid or wait; do not size without a stronger catalyst.",
|
|
"watch_or_small": "Small subscription only if execution constraints are favorable.",
|
|
"selective_subscribe": "Selective subscription with disciplined sizing.",
|
|
"high_conviction_subscribe": "Subscribe, subject to allocation and liquidity discipline.",
|
|
}
|
|
return actions[decision]
|
|
|
|
|
|
def component_label(name: str) -> str:
|
|
labels = {
|
|
"offer_size": "Offer size",
|
|
"public_pct": "Initial public offer percentage",
|
|
"min_subscription": "Minimum subscription",
|
|
"offer_price": "Offer price",
|
|
"over_allotment": "Over-allotment option",
|
|
"public_os": "Public oversubscription",
|
|
"international_os": "International oversubscription",
|
|
"valid_applications": "Valid applications",
|
|
"success_rate": "Application success rate",
|
|
"hk_reallocation": "HK public offer reallocation",
|
|
}
|
|
return labels.get(name, name.replace("_", " ").title())
|
|
|
|
|
|
def components_table(components: list[ScoreComponent]) -> str:
|
|
lines = ["| Component | Points | Reason |", "| --- | ---: | --- |"]
|
|
for component in components:
|
|
lines.append(f"| {component_label(component.name)} | {component.points} | `{component.reason}` |")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def facts_table(record: dict[str, str], stage: str) -> str:
|
|
rows = [
|
|
("Board", fmt_value(record["board"])),
|
|
("Status", fmt_value(record["status"])),
|
|
("Listing date", fmt_value(record["listing_date"])),
|
|
("Application period", f"{fmt_value(record['application_start_date'])} to {fmt_value(record['application_end_date'])}"),
|
|
("Allotment result date", fmt_value(record["allotment_results_expected_date"])),
|
|
("Listing method", fmt_value(record["listing_method"])),
|
|
("Industry", fmt_value(record["industry_label"])),
|
|
("Sponsors", fmt_value(record["sponsors"])),
|
|
("Offer price", fmt_hkd(as_float(record["offer_price_hkd"]))),
|
|
("Offer size", fmt_money_m(as_float(record["offer_size_hkd_m"]))),
|
|
("Market cap", fmt_money_m(as_float(record["market_cap_hkd_m"]))),
|
|
("Board lot", fmt_int(as_int(record["board_lot"]))),
|
|
("Minimum subscription", fmt_hkd(as_float(record["min_subscription_amount_hkd"]))),
|
|
("Initial public offer percentage", fmt_pct_points(as_float(record["public_offer_pct_initial"]) * 100 if record["public_offer_pct_initial"] else None)),
|
|
("Over-allotment shares", fmt_int(as_int(record["over_allotment_offer_shares"]))),
|
|
]
|
|
if stage == T1_STAGE:
|
|
rows.extend(
|
|
[
|
|
("Public oversubscription", fmt_times(as_float(record["public_oversubscription_times"]))),
|
|
("International oversubscription", fmt_times(as_float(record["international_oversubscription_times"]))),
|
|
("Valid applications", fmt_int(as_int(record["valid_applications"]))),
|
|
("Successful applications", fmt_int(as_int(record["successful_applications"]))),
|
|
("Application success rate", fmt_pct_points(as_float(record["application_success_rate"]) * 100 if record["application_success_rate"] else None)),
|
|
("International placees", fmt_int(as_int(record["international_placees"]))),
|
|
("HK offer reallocation multiple", fmt_times(as_float(record["hk_offer_reallocation_multiple"]))),
|
|
]
|
|
)
|
|
|
|
lines = ["| Field | Value |", "| --- | --- |"]
|
|
for label, value in rows:
|
|
lines.append(f"| {label} | {value} |")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def source_paths(record: dict[str, str], stage: str) -> list[str]:
|
|
paths = []
|
|
if record["prospectus_source_path"]:
|
|
paths.append(record["prospectus_source_path"])
|
|
if stage == T1_STAGE and record["allotment_source_path"]:
|
|
paths.append(record["allotment_source_path"])
|
|
return paths
|
|
|
|
|
|
def reason_lines(components: list[ScoreComponent], positive: bool) -> list[str]:
|
|
filtered = [component for component in components if (component.points > 0 if positive else component.points < 0)]
|
|
filtered.sort(key=lambda component: component.points, reverse=positive)
|
|
if not filtered:
|
|
return ["- No material positive scoring component." if positive else "- No material negative scoring component."]
|
|
return [f"- {component_label(component.name)}: {component.points:+d} (`{component.reason}`)." for component in filtered[:5]]
|
|
|
|
|
|
def missing_field_lines(record: dict[str, str], stage: str) -> list[str]:
|
|
required = [
|
|
("industry_label", "industry label"),
|
|
("market_cap_hkd_m", "market cap"),
|
|
("min_subscription_amount_hkd", "minimum subscription"),
|
|
]
|
|
if stage == T1_STAGE:
|
|
required.extend(
|
|
[
|
|
("public_oversubscription_times", "public oversubscription"),
|
|
("international_oversubscription_times", "international oversubscription"),
|
|
("valid_applications", "valid applications"),
|
|
("successful_applications", "successful applications"),
|
|
]
|
|
)
|
|
missing = [label for key, label in required if not record.get(key)]
|
|
if not missing:
|
|
return ["- No required report field is blank for this stage."]
|
|
return [f"- Missing or blank: {', '.join(missing)}."]
|
|
|
|
|
|
def build_report(record: dict[str, str], rows: list[dict[str, str]], stage: str, as_of: str) -> str:
|
|
ticker = normalize_ticker(record["ticker"])
|
|
model_version = record["model_version"]
|
|
dataset_as_of = record["analysis_as_of"]
|
|
|
|
if stage == T0_STAGE:
|
|
score = as_int(record["t0_score"]) or 0
|
|
bucket = record["t0_score_bucket"]
|
|
decision = t0_decision_band(score)
|
|
components = parse_components(record["t0_score_breakdown"])
|
|
metric = bucket_metric(rows, "t0_score_bucket", bucket, require_t1=False)
|
|
score_label = "T0 score"
|
|
else:
|
|
score = as_int(record["total_score"]) or 0
|
|
bucket = record["total_score_bucket"]
|
|
decision = record["decision_band"]
|
|
components = parse_components(record["t0_score_breakdown"]) + parse_components(record["t1_score_breakdown"])
|
|
metric = bucket_metric(rows, "total_score_bucket", bucket, require_t1=True)
|
|
score_label = "Total score"
|
|
|
|
paths = source_paths(record, stage)
|
|
source_lines = [f"- `{path}`" for path in paths] or ["- No source path recorded for this stage."]
|
|
|
|
lines = [
|
|
f"# {ticker} IPO Analyst Report",
|
|
"",
|
|
"## Summary",
|
|
"",
|
|
f"- Ticker: `{ticker}`",
|
|
f"- Company: {fmt_value(record['company_name_en'])}",
|
|
f"- Stage: `{stage}`",
|
|
f"- Report as of: `{as_of}`",
|
|
f"- Model dataset as of: `{dataset_as_of}`",
|
|
f"- Rule version: `{model_version}`",
|
|
f"- Rule path: `{MODEL_RULE_PATH.as_posix()}`",
|
|
f"- Decision: `{decision}`",
|
|
f"- PM action: {action_for_decision(decision)}",
|
|
f"- {score_label}: `{score}`",
|
|
f"- Score bucket: `{bucket}`",
|
|
f"- Calibrated D1 positive probability: {fmt_pct_rate(metric.d1_positive_rate)} from {metric.sample_size} historical D1 labels",
|
|
"",
|
|
"## Facts",
|
|
"",
|
|
facts_table(record, stage),
|
|
"",
|
|
"## Model Inference",
|
|
"",
|
|
f"- D1 positive probability: {fmt_pct_rate(metric.d1_positive_rate)}",
|
|
f"- D1 >= 10% probability: {fmt_pct_rate(metric.d1_strong_rate)}",
|
|
f"- Historical average D1 return for bucket: {fmt_num(metric.average_d1_return_pct, '%')}",
|
|
f"- Historical median D1 return for bucket: {fmt_num(metric.median_d1_return_pct, '%')}",
|
|
"",
|
|
"## Score Breakdown",
|
|
"",
|
|
components_table(components),
|
|
"",
|
|
"## Bull Points",
|
|
"",
|
|
*reason_lines(components, positive=True),
|
|
"",
|
|
"## Risks And Gaps",
|
|
"",
|
|
*reason_lines(components, positive=False),
|
|
*missing_field_lines(record, stage),
|
|
"- T2 grey-market signal is not used because the project has no approved reproducible source.",
|
|
"- Post-listing D1/D5/D20/D60 outcomes are labels for model calibration only and are not shown as prediction inputs.",
|
|
"",
|
|
"## Triggers",
|
|
"",
|
|
"- Upgrade: stronger verified T1 demand, better allocation scarcity, or a new rule-backed positive catalyst.",
|
|
"- Downgrade: weak public or international demand, oversized supply, low-quality missing fields, or adverse market window.",
|
|
"",
|
|
"## Exit Plan",
|
|
"",
|
|
"- If subscribed and allocated, reassess after allotment and before first trading session using only information available at that stage.",
|
|
"- For T1 reports without approved T2 data, treat first-day liquidity and position sizing conservatively.",
|
|
"- Record actual D1/D5/D20/D60 outcomes later as review labels, not as retroactive prediction inputs.",
|
|
"",
|
|
"## Source Paths",
|
|
"",
|
|
*source_lines,
|
|
"",
|
|
]
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
ticker = normalize_ticker(args.ticker)
|
|
as_of = parse_as_of(args.as_of)
|
|
dataset_path = Path(args.dataset)
|
|
|
|
rows = load_dataset(dataset_path)
|
|
record = find_record(rows, ticker, dataset_path)
|
|
stage = determine_stage(record, args.stage)
|
|
report = build_report(record, rows, stage, as_of)
|
|
|
|
if args.stdout:
|
|
print(report)
|
|
return 0
|
|
|
|
output_path = Path(args.output) if args.output else output_path_for(ticker, stage, as_of)
|
|
if output_path.exists():
|
|
raise SystemExit(f"Refusing to overwrite existing report: {output_path.as_posix()}")
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(report + "\n", encoding="utf-8")
|
|
print(f"report written: {output_path.as_posix()}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|