#!/usr/bin/env python3 """Build the analyst v0 feature dataset and calibration report.""" from __future__ import annotations import argparse import csv import sqlite3 from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from statistics import mean, median from typing import Any MODEL_VERSION = "ipo_score_v0" RULE_PATH = Path("rules/ipo_score_v0.yaml") DEFAULT_DB_PATH = Path("data/hk_ipo.sqlite") DEFAULT_DATASET_PATH = Path("data/snapshots/analysis_model_v0_dataset.csv") DEFAULT_REPORT_PATH = Path("reports/2026-06-15_analysis_model_v0.md") @dataclass(frozen=True) class Metric: bucket: str sample_size: int d1_positive_rate: float | None d1_strong_rate: float | None average_d1_return_pct: float | None median_d1_return_pct: float | None def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Repo-relative SQLite database path.") parser.add_argument("--dataset", default=str(DEFAULT_DATASET_PATH), help="Output CSV dataset path.") parser.add_argument("--report", default=str(DEFAULT_REPORT_PATH), help="Output Markdown report path.") parser.add_argument("--as-of", help="Analysis timestamp. Defaults to current UTC time.") return parser.parse_args() def parse_as_of(value: str | None) -> str: if value: return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z") return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def as_float(value: Any) -> float | None: if value is None: return None return float(value) def as_int(value: Any) -> int | None: if value is None: return None return int(value) def offer_size_hkd_m(row: sqlite3.Row) -> float | None: gross = as_float(row["gross_proceeds_hkd_m"]) if gross is not None: return gross funds = as_float(row["funds_raised_hkd"]) if funds is not None: return funds / 1_000_000 offer_price = as_float(row["offer_price_hkd"]) global_shares = as_int(row["global_offer_shares"]) if offer_price is not None and global_shares is not None: return offer_price * global_shares / 1_000_000 return None def success_rate(row: sqlite3.Row) -> float | None: valid = as_int(row["valid_applications"]) successful = as_int(row["successful_applications"]) if not valid or successful is None: return None return successful / valid def reallocation_multiple(row: sqlite3.Row) -> float | None: final_hk = as_int(row["final_hk_offer_shares"]) initial_hk = as_int(row["hk_offer_shares_initial"]) if not final_hk or not initial_hk: return None return final_hk / initial_hk def add_component(components: list[str], name: str, points: int, reason: str) -> int: components.append(f"{name}:{points}:{reason}") return points def score_t0(row: sqlite3.Row) -> tuple[int, str]: score = 0 components: list[str] = [] size = offer_size_hkd_m(row) if size is None: score += add_component(components, "offer_size", 0, "missing") elif size < 300: score += add_component(components, "offer_size", -2, "lt_300m") elif size < 800: score += add_component(components, "offer_size", 1, "300m_to_800m") elif size < 2000: score += add_component(components, "offer_size", 4, "800m_to_2000m") elif size < 5000: score += add_component(components, "offer_size", 3, "2000m_to_5000m") else: score += add_component(components, "offer_size", 2, "gte_5000m") public_pct = as_float(row["public_offer_pct_initial"]) if public_pct is None: score += add_component(components, "public_pct", 0, "missing") elif public_pct <= 0.05: score += add_component(components, "public_pct", 3, "lte_5pct") elif public_pct <= 0.10: score += add_component(components, "public_pct", 1, "5pct_to_10pct") else: score += add_component(components, "public_pct", -1, "gt_10pct") min_sub = as_float(row["min_subscription_amount_hkd"]) if min_sub is None: score += add_component(components, "min_subscription", -1, "missing") elif min_sub < 3500: score += add_component(components, "min_subscription", -1, "lt_3500") elif min_sub < 10000: score += add_component(components, "min_subscription", 2, "3500_to_10000") else: score += add_component(components, "min_subscription", 1, "gte_10000") offer_price = as_float(row["offer_price_hkd"]) if offer_price is None: score += add_component(components, "offer_price", 0, "missing") elif offer_price < 1: score += add_component(components, "offer_price", -2, "lt_1") elif offer_price < 5: score += add_component(components, "offer_price", 0, "1_to_5") elif offer_price < 30: score += add_component(components, "offer_price", 1, "5_to_30") elif offer_price < 100: score += add_component(components, "offer_price", 2, "30_to_100") else: score += add_component(components, "offer_price", 1, "gte_100") if as_int(row["over_allotment_offer_shares"]): score += add_component(components, "over_allotment", 1, "present") else: score += add_component(components, "over_allotment", 0, "missing_or_zero") return score, "|".join(components) def score_t1(row: sqlite3.Row) -> tuple[int, str]: score = 0 components: list[str] = [] public_os = as_float(row["public_oversubscription_times"]) if public_os is None: score += add_component(components, "public_os", 0, "missing") elif public_os >= 5000: score += add_component(components, "public_os", 15, "gte_5000x") elif public_os >= 1000: score += add_component(components, "public_os", 13, "1000x_to_5000x") elif public_os >= 100: score += add_component(components, "public_os", 6, "100x_to_1000x") elif public_os >= 10: score += add_component(components, "public_os", -2, "10x_to_100x") else: score += add_component(components, "public_os", -4, "lt_10x") international_os = as_float(row["international_oversubscription_times"]) if international_os is None: score += add_component(components, "international_os", 0, "missing") elif international_os >= 30: score += add_component(components, "international_os", 8, "gte_30x") elif international_os >= 10: score += add_component(components, "international_os", 6, "10x_to_30x") elif international_os >= 3: score += add_component(components, "international_os", 1, "3x_to_10x") elif international_os >= 1: score += add_component(components, "international_os", -1, "1x_to_3x") else: score += add_component(components, "international_os", -2, "lt_1x") valid = as_int(row["valid_applications"]) if valid is None: score += add_component(components, "valid_applications", 0, "missing") elif valid >= 200000: score += add_component(components, "valid_applications", 5, "gte_200k") elif valid >= 100000: score += add_component(components, "valid_applications", 3, "100k_to_200k") elif valid >= 50000: score += add_component(components, "valid_applications", 1, "50k_to_100k") elif valid < 10000: score += add_component(components, "valid_applications", -2, "lt_10k") else: score += add_component(components, "valid_applications", 0, "10k_to_50k") rate = success_rate(row) if rate is None: score += add_component(components, "success_rate", 0, "missing") elif rate <= 0.10: score += add_component(components, "success_rate", 4, "lte_10pct") elif rate <= 0.30: score += add_component(components, "success_rate", 2, "10pct_to_30pct") elif rate > 0.80: score += add_component(components, "success_rate", -2, "gt_80pct") else: score += add_component(components, "success_rate", 0, "30pct_to_80pct") realloc = reallocation_multiple(row) if realloc is None: score += add_component(components, "hk_reallocation", 0, "missing") elif realloc >= 3: score += add_component(components, "hk_reallocation", 4, "gte_3x") elif realloc >= 2: score += add_component(components, "hk_reallocation", 2, "2x_to_3x") else: score += add_component(components, "hk_reallocation", 0, "lt_2x") return score, "|".join(components) def score_t0_5_market_heat(row: sqlite3.Row) -> tuple[int | None, str]: margin = as_float(row["t0_5_margin_subscription_multiple"]) if margin is None: return None, "" components: list[str] = [] if margin >= 5000: score = add_component(components, "margin_subscription", 8, "gte_5000x") elif margin >= 1000: score = add_component(components, "margin_subscription", 6, "1000x_to_5000x") elif margin >= 100: score = add_component(components, "margin_subscription", 3, "100x_to_1000x") elif margin >= 10: score = add_component(components, "margin_subscription", 0, "10x_to_100x") else: score = add_component(components, "margin_subscription", -3, "lt_10x") return score, "|".join(components) def has_structured_t1(row: sqlite3.Row) -> bool: return any( row[key] is not None for key in [ "public_oversubscription_times", "international_oversubscription_times", "valid_applications", "successful_applications", ] ) def t0_bucket(score: int) -> str: if score < 1: return "t0_lt_1" if score <= 4: return "t0_1_to_4" if score <= 7: return "t0_5_to_7" return "t0_gte_8" def total_bucket(score: int) -> str: if score < 0: return "total_lt_0" if score <= 9: return "total_0_to_9" if score <= 17: return "total_10_to_17" if score <= 25: return "total_18_to_25" return "total_gte_26" def t0_plus_t0_5_bucket(score: int | None) -> str | None: if score is None: return None if score < 5: return "t0_5_lt_5" if score <= 7: return "t0_5_5_to_7" if score <= 11: return "t0_5_8_to_11" return "t0_5_gte_12" def external_oversub_bucket(value: Any) -> str | None: oversub = as_float(value) if oversub is None: return None if oversub >= 5000: return "external_os_gte_5000x" if oversub >= 1000: return "external_os_1000x_to_5000x" if oversub >= 100: return "external_os_100x_to_1000x" if oversub >= 10: return "external_os_10x_to_100x" return "external_os_lt_10x" def decision_band(row: dict[str, Any]) -> str: if not row["has_structured_t1"]: score = row["t0_score"] if score < 1: return "weak_or_avoid" if score <= 4: return "neutral" if score <= 7: return "positive_watch" return "strong_watch" score = row["total_score"] if score < 0: return "avoid" if score <= 9: return "avoid_or_wait" if score <= 17: return "watch_or_small" if score <= 25: return "selective_subscribe" return "high_conviction_subscribe" def fetch_rows(conn: sqlite3.Connection) -> list[sqlite3.Row]: conn.row_factory = sqlite3.Row return conn.execute( """ WITH listing_reports AS ( SELECT ticker, MAX(funds_raised_hkd) AS funds_raised_hkd, MAX(subscription_ratio_times) AS report_subscription_ratio_times, MAX(market_cap_hkd) AS report_market_cap_hkd, MAX(industry_label) AS report_industry_label, MAX(listing_method) AS listing_method, MAX(sponsors) AS sponsors FROM new_listing_report_entries GROUP BY ticker ), performance AS ( SELECT ticker, MAX(CASE WHEN stage = 'D1' THEN return_pct END) AS d1_return_pct, MAX(CASE WHEN stage = 'D5' THEN return_pct END) AS d5_return_pct, MAX(CASE WHEN stage = 'D20' THEN return_pct END) AS d20_return_pct, MAX(CASE WHEN stage = 'D60' THEN return_pct END) AS d60_return_pct, MAX(CASE WHEN stage = 'D1' THEN turnover_hkd_m END) AS d1_turnover_hkd_m FROM price_performance GROUP BY ticker ), latest_market_heat AS ( SELECT h.* FROM ipo_market_heat h JOIN ( SELECT ticker, MAX(observed_at) AS observed_at FROM ipo_market_heat GROUP BY ticker ) latest ON latest.ticker = h.ticker AND latest.observed_at = h.observed_at ), external_history AS ( SELECT e.* FROM external_ipo_history e JOIN ( SELECT ticker, MAX(listing_date) AS listing_date FROM external_ipo_history WHERE provider = 'ipohk' GROUP BY ticker ) latest ON latest.ticker = e.ticker AND latest.listing_date = e.listing_date WHERE e.provider = 'ipohk' ) SELECT m.ticker, m.company_name_en, m.company_name_zh, m.board, m.status, m.listing_date, m.application_start_date, m.application_end_date, m.allotment_results_expected_date, m.industry_label AS master_industry_label, ot.offer_price_hkd, ot.board_lot, ot.min_subscription_amount_hkd, ot.global_offer_shares, ot.hk_offer_shares_initial, ot.international_offer_shares_initial, ot.public_offer_pct_initial, ot.over_allotment_offer_shares, ot.gross_proceeds_hkd_m, ot.net_proceeds_hkd_m, ot.market_cap_hkd_m, lr.funds_raised_hkd, lr.report_subscription_ratio_times, lr.report_market_cap_hkd, lr.report_industry_label, lr.listing_method, lr.sponsors, d.valid_applications, d.successful_applications, d.public_oversubscription_times, d.international_placees, d.international_oversubscription_times, d.final_hk_offer_shares, d.final_international_offer_shares, p.d1_return_pct, p.d5_return_pct, p.d20_return_pct, p.d60_return_pct, p.d1_turnover_hkd_m, h.observed_at AS t0_5_observed_at, h.stage AS market_heat_stage, h.provider AS t0_5_provider, h.margin_subscription_multiple AS t0_5_margin_subscription_multiple, h.source_id AS t0_5_source_id, eh.one_hand_win_rate_pct AS external_one_hand_win_rate_pct, eh.public_oversubscription_times AS external_public_oversubscription_times, eh.grey_market_return_pct AS external_grey_market_return_pct, eh.first_day_return_pct AS external_first_day_return_pct, eh.local_path AS external_history_source_path, ( SELECT local_path FROM source_refs s WHERE s.ticker = m.ticker AND s.source_type = 'prospectus' ORDER BY s.source_date DESC, s.source_id DESC LIMIT 1 ) AS prospectus_source_path, ( SELECT local_path FROM source_refs s WHERE s.ticker = m.ticker AND s.source_type = 'allotment_results' ORDER BY s.source_date DESC, s.source_id DESC LIMIT 1 ) AS allotment_source_path FROM ipo_master m LEFT JOIN offering_terms ot ON ot.ticker = m.ticker LEFT JOIN listing_reports lr ON lr.ticker = m.ticker LEFT JOIN ipo_demand d ON d.ticker = m.ticker LEFT JOIN performance p ON p.ticker = m.ticker LEFT JOIN latest_market_heat h ON h.ticker = m.ticker LEFT JOIN external_history eh ON eh.ticker = m.ticker ORDER BY m.listing_date, m.ticker """ ).fetchall() def build_records(rows: list[sqlite3.Row], as_of: str) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] for row in rows: t0_score_value, t0_breakdown = score_t0(row) t1_score_value, t1_breakdown = score_t1(row) t0_5_score_value, t0_5_breakdown = score_t0_5_market_heat(row) structured_t1 = has_structured_t1(row) total_score = t0_score_value + (t1_score_value if structured_t1 else 0) t0_plus_t0_5_score = t0_score_value + t0_5_score_value if t0_5_score_value is not None else None size = offer_size_hkd_m(row) record: dict[str, Any] = { "model_version": MODEL_VERSION, "analysis_as_of": as_of, "ticker": row["ticker"], "company_name_en": row["company_name_en"], "company_name_zh": row["company_name_zh"], "board": row["board"], "status": row["status"], "listing_date": row["listing_date"], "application_start_date": row["application_start_date"], "application_end_date": row["application_end_date"], "allotment_results_expected_date": row["allotment_results_expected_date"], "listing_method": row["listing_method"], "industry_label": row["master_industry_label"] or row["report_industry_label"], "sponsors": row["sponsors"], "offer_price_hkd": row["offer_price_hkd"], "offer_size_hkd_m": size, "gross_proceeds_hkd_m": row["gross_proceeds_hkd_m"], "net_proceeds_hkd_m": row["net_proceeds_hkd_m"], "market_cap_hkd_m": row["market_cap_hkd_m"] or (row["report_market_cap_hkd"] / 1_000_000 if row["report_market_cap_hkd"] else None), "board_lot": row["board_lot"], "min_subscription_amount_hkd": row["min_subscription_amount_hkd"], "public_offer_pct_initial": row["public_offer_pct_initial"], "over_allotment_offer_shares": row["over_allotment_offer_shares"], "public_oversubscription_times": row["public_oversubscription_times"], "international_oversubscription_times": row["international_oversubscription_times"], "t0_5_observed_at": row["t0_5_observed_at"], "market_heat_stage": row["market_heat_stage"], "t0_5_provider": row["t0_5_provider"], "t0_5_margin_subscription_multiple": row["t0_5_margin_subscription_multiple"], "t0_5_source_id": row["t0_5_source_id"], "t0_5_add_score": t0_5_score_value, "t0_plus_t0_5_score": t0_plus_t0_5_score, "t0_plus_t0_5_score_bucket": t0_plus_t0_5_bucket(t0_plus_t0_5_score), "t0_5_score_breakdown": t0_5_breakdown, "valid_applications": row["valid_applications"], "successful_applications": row["successful_applications"], "application_success_rate": success_rate(row), "international_placees": row["international_placees"], "hk_offer_reallocation_multiple": reallocation_multiple(row), "has_structured_t1": structured_t1, "t0_score": t0_score_value, "t1_add_score": t1_score_value if structured_t1 else None, "total_score": total_score, "t0_score_bucket": t0_bucket(t0_score_value), "total_score_bucket": total_bucket(total_score) if structured_t1 else None, "t0_score_breakdown": t0_breakdown, "t1_score_breakdown": t1_breakdown if structured_t1 else "", "d1_return_pct": row["d1_return_pct"], "d5_return_pct": row["d5_return_pct"], "d20_return_pct": row["d20_return_pct"], "d60_return_pct": row["d60_return_pct"], "d1_turnover_hkd_m": row["d1_turnover_hkd_m"], "d1_positive": as_float(row["d1_return_pct"]) is not None and as_float(row["d1_return_pct"]) > 0, "d1_strong_10pct": as_float(row["d1_return_pct"]) is not None and as_float(row["d1_return_pct"]) >= 10, "external_one_hand_win_rate_pct": row["external_one_hand_win_rate_pct"], "external_public_oversubscription_times": row["external_public_oversubscription_times"], "external_public_oversubscription_bucket": external_oversub_bucket(row["external_public_oversubscription_times"]), "external_grey_market_return_pct": row["external_grey_market_return_pct"], "external_first_day_return_pct": row["external_first_day_return_pct"], "external_history_source_path": row["external_history_source_path"], "prospectus_source_path": row["prospectus_source_path"], "allotment_source_path": row["allotment_source_path"], } record["decision_band"] = decision_band(record) records.append(record) return records def metric_for(bucket: str, values: list[float]) -> Metric: if not values: return Metric(bucket, 0, None, None, None, None) return Metric( bucket=bucket, sample_size=len(values), d1_positive_rate=sum(value > 0 for value in values) / len(values), d1_strong_rate=sum(value >= 10 for value in values) / len(values), average_d1_return_pct=mean(values), median_d1_return_pct=median(values), ) def calibration(records: list[dict[str, Any]], bucket_key: str, require_t1: bool = False) -> list[Metric]: buckets: dict[str, list[float]] = {} for record in records: value = record["d1_return_pct"] if value is None: continue if require_t1 and not record["has_structured_t1"]: continue bucket = record[bucket_key] if bucket is None: continue buckets.setdefault(str(bucket), []).append(float(value)) return [metric_for(bucket, buckets[bucket]) for bucket in sorted(buckets)] def calibrated_rates(metrics: list[Metric]) -> dict[str, float | None]: return {metric.bucket: metric.d1_positive_rate for metric in metrics} def add_calibrated_rates(records: list[dict[str, Any]], t0_metrics: list[Metric], total_metrics: list[Metric]) -> None: t0_rates = calibrated_rates(t0_metrics) total_rates = calibrated_rates(total_metrics) for record in records: if record["has_structured_t1"]: record["calibrated_d1_positive_rate"] = total_rates.get(record["total_score_bucket"]) else: record["calibrated_d1_positive_rate"] = t0_rates.get(record["t0_score_bucket"]) def format_cell(value: Any) -> Any: if value is None: return "" if isinstance(value, bool): return "1" if value else "0" if isinstance(value, float): return f"{value:.6g}" return value def write_dataset(records: list[dict[str, Any]], output_path: Path) -> None: output_path.parent.mkdir(parents=True, exist_ok=True) columns = [ "model_version", "analysis_as_of", "ticker", "company_name_en", "company_name_zh", "board", "status", "listing_date", "application_start_date", "application_end_date", "allotment_results_expected_date", "listing_method", "industry_label", "sponsors", "offer_price_hkd", "offer_size_hkd_m", "gross_proceeds_hkd_m", "net_proceeds_hkd_m", "market_cap_hkd_m", "board_lot", "min_subscription_amount_hkd", "public_offer_pct_initial", "over_allotment_offer_shares", "public_oversubscription_times", "international_oversubscription_times", "t0_5_observed_at", "market_heat_stage", "t0_5_provider", "t0_5_margin_subscription_multiple", "t0_5_source_id", "t0_5_add_score", "t0_plus_t0_5_score", "t0_plus_t0_5_score_bucket", "valid_applications", "successful_applications", "application_success_rate", "international_placees", "hk_offer_reallocation_multiple", "has_structured_t1", "t0_score", "t1_add_score", "total_score", "t0_score_bucket", "total_score_bucket", "decision_band", "calibrated_d1_positive_rate", "d1_return_pct", "d5_return_pct", "d20_return_pct", "d60_return_pct", "d1_turnover_hkd_m", "d1_positive", "d1_strong_10pct", "external_one_hand_win_rate_pct", "external_public_oversubscription_times", "external_public_oversubscription_bucket", "external_grey_market_return_pct", "external_first_day_return_pct", "external_history_source_path", "prospectus_source_path", "allotment_source_path", "t0_score_breakdown", "t0_5_score_breakdown", "t1_score_breakdown", ] with output_path.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=columns, lineterminator="\n") writer.writeheader() for record in records: writer.writerow({column: format_cell(record.get(column)) for column in columns}) def fmt_pct(value: float | None) -> str: if value is None: return "" return f"{value * 100:.1f}%" def fmt_num(value: float | None) -> str: if value is None: return "" return f"{value:.1f}" def metrics_table(metrics: list[Metric]) -> str: lines = [ "| Bucket | N | D1 positive | D1 >= 10% | Avg D1 return | Median D1 return |", "| --- | ---: | ---: | ---: | ---: | ---: |", ] for metric in metrics: lines.append( "| " + " | ".join( [ metric.bucket, str(metric.sample_size), fmt_pct(metric.d1_positive_rate), fmt_pct(metric.d1_strong_rate), fmt_num(metric.average_d1_return_pct), fmt_num(metric.median_d1_return_pct), ] ) + " |" ) return "\n".join(lines) def count_present(records: list[dict[str, Any]], key: str) -> int: return sum(record.get(key) not in {None, ""} for record in records) def write_report( records: list[dict[str, Any]], t0_metrics: list[Metric], total_metrics: list[Metric], report_path: Path, dataset_path: Path, as_of: str, ) -> None: report_path.parent.mkdir(parents=True, exist_ok=True) total = len(records) d1_records = [record for record in records if record["d1_return_pct"] is not None] structured_t1 = [record for record in records if record["has_structured_t1"]] structured_market_heat = [record for record in records if record["t0_5_margin_subscription_multiple"] is not None] structured_t0_5 = [ record for record in structured_market_heat if record["market_heat_stage"] in {None, "", "T0_5_market_heat"} ] structured_t0_95 = [ record for record in structured_market_heat if record["market_heat_stage"] == "T0_95_final_heat" ] t0_5_with_d1 = [record for record in structured_t0_5 if record["d1_return_pct"] is not None] t0_95_with_d1 = [record for record in structured_t0_95 if record["d1_return_pct"] is not None] external_history_rows = [record for record in records if record["external_history_source_path"]] external_oversub_rows = [record for record in records if record["external_public_oversubscription_times"] is not None] external_oversub_with_d1 = [ record for record in records if record["external_public_oversubscription_times"] is not None and record["d1_return_pct"] is not None ] external_oversub_metrics = calibration(records, "external_public_oversubscription_bucket") pending_t1_tickers = ", ".join(sorted(record["ticker"] for record in records if not record["has_structured_t1"])) t1_public_os_missing = sum(record["public_oversubscription_times"] is None for record in structured_t1) t1_international_os_missing = sum(record["international_oversubscription_times"] is None for record in structured_t1) t1_valid_missing = sum(record["valid_applications"] is None for record in structured_t1) t1_successful_missing = sum(record["successful_applications"] is None for record in structured_t1) best_bucket = max(total_metrics, key=lambda metric: metric.d1_positive_rate or -1) lines = [ f"# HK IPO Analysis Model v0", "", f"- Model version: `{MODEL_VERSION}`", f"- Analysis as of: `{as_of}`", f"- Rule file: `{RULE_PATH.as_posix()}`", f"- Dataset: `{dataset_path.as_posix()}`", "", "## What This Model Does", "", "This is the first analyst model built from the downloaded archive. It creates a repeatable feature table, scores each IPO using stage-safe rules, and calibrates the score buckets against archived D1 sell outcomes. It is intentionally transparent: the output includes every score component and the archived source paths used for each ticker.", "", "The model is built for a short IPO allocation trade: sell in T2 grey market when reliable executable data exists, or sell on D1 otherwise. It does not use grey-market data in v0 because T2 currently has no approved reproducible source. It also does not use post-listing returns as inputs; D1 is the primary sell label, while D5/D20/D60 are review labels only.", "", "## Data Inventory", "", f"- IPO rows scored: {total}", f"- Rows with D1 labels: {len(d1_records)}", f"- Rows with structured T1 demand fields: {len(structured_t1)}", f"- Rows with prospectus source path: {count_present(records, 'prospectus_source_path')}", f"- Rows with allotment source path: {count_present(records, 'allotment_source_path')}", f"- Rows with offer size: {count_present(records, 'offer_size_hkd_m')}", f"- Rows with public oversubscription: {count_present(records, 'public_oversubscription_times')}", f"- Rows with international oversubscription: {count_present(records, 'international_oversubscription_times')}", f"- Rows with market heat snapshots: {len(structured_market_heat)}", f"- Rows with T0.5 margin heat snapshots: {len(structured_t0_5)}", f"- Rows with T0.95 late-order heat snapshots: {len(structured_t0_95)}", f"- Rows with T0.5 margin heat and D1 labels: {len(t0_5_with_d1)}", f"- Rows with T0.95 late-order heat and D1 labels: {len(t0_95_with_d1)}", f"- Rows matched to external ipohk history: {len(external_history_rows)}", f"- Rows with external final oversubscription: {len(external_oversub_rows)}", f"- Rows with external final oversubscription and D1 labels: {len(external_oversub_with_d1)}", f"- Rows pending T1 structure: {total - len(structured_t1)}" + (f" ({pending_t1_tickers})" if pending_t1_tickers else ""), f"- T1 field-level blanks: public oversubscription {t1_public_os_missing}, international oversubscription {t1_international_os_missing}, valid applications {t1_valid_missing}, successful applications {t1_successful_missing}", "", "## T0 Calibration", "", "T0 uses only prospectus-stage structure: offer size, initial public offer percentage, minimum subscription amount, offer price band, and over-allotment availability.", "", metrics_table(t0_metrics), "", "## T1 Calibration", "", "T1 adds allotment-stage demand: public subscription, international placing demand, valid application count, application success rate, and HK public offer reallocation.", "", metrics_table(total_metrics), "", "## T0.5 Market Heat", "", "T0.5 uses archived subscription-period margin heat snapshots. T0.95 is the near-deadline subset that is still actionable before the user's order cutoff. These are non-official live signals and are kept separate from T1 allotment demand. The current archive is not yet a historical training set: it has too few rows and no D1 labels for calibration.", "", f"- Total market heat rows: {len(structured_market_heat)}", f"- T0.5 margin rows: {len(structured_t0_5)}", f"- T0.5 rows with D1 labels: {len(t0_5_with_d1)}", f"- T0.95 late-order heat rows: {len(structured_t0_95)}", f"- T0.95 rows with D1 labels: {len(t0_95_with_d1)}", "", "## External Final Heat Proxy", "", "The ipohk history archive adds final public oversubscription, one-lot win rate, grey-market return, and first-day return where available. These fields are useful for coverage checks and post-hoc calibration, but they are not T0.5 inputs because they are final or near-final history.", "", f"- External history rows matched into this dataset: {len(external_history_rows)}", f"- Matched rows with final oversubscription: {len(external_oversub_rows)}", f"- Matched rows with final oversubscription and D1 labels: {len(external_oversub_with_d1)}", "", metrics_table(external_oversub_metrics), "", "## Current Read", "", f"After the T1 demand text backfill, the strongest v0 T1 bucket is `{best_bucket.bucket}` with {best_bucket.sample_size} historical D1 observations and a {fmt_pct(best_bucket.d1_positive_rate)} D1 positive rate. The model is most useful after allotment results are available; T0 is a watchlist filter rather than a final subscription call.", "", "The high-conviction bucket remains clearly differentiated, but the middle and low score buckets are still not monotonic. This refresh keeps the v0 score formula unchanged and updates empirical calibration only; future rule changes should come from reviewed prediction cards rather than overfitting this historical sample.", "", "## Usage", "", "1. Run `scripts/build_analysis_dataset.py` after archivist updates the database.", "2. Use `t0_score` for prospectus-stage watchlisting.", "3. Use `total_score`, `decision_band`, and `calibrated_d1_positive_rate` for T1-stage subscription cards.", "4. Frame live decisions around a T2 or D1 sell, not long-term holding.", "5. Treat D5/D20/D60 columns as review labels only, never as prediction inputs or holding targets.", "", "## Known Gaps", "", "- T1 is structurally complete for listed rows; residual field-level NULLs remain when the archived source does not explicitly state a demand field.", "- Industry and issuer fundamentals are not sufficiently structured for model input.", "- T2 grey-market signal is blocked pending an approved source.", "- Extreme D1 returns should be audited before they drive rule changes.", ] report_path.write_text("\n".join(lines) + "\n", encoding="utf-8") def main() -> int: args = parse_args() as_of = parse_as_of(args.as_of) db_path = Path(args.db) dataset_path = Path(args.dataset) report_path = Path(args.report) with sqlite3.connect(db_path) as conn: rows = fetch_rows(conn) records = build_records(rows, as_of) t0_metrics = calibration(records, "t0_score_bucket") total_metrics = calibration(records, "total_score_bucket", require_t1=True) add_calibrated_rates(records, t0_metrics, total_metrics) write_dataset(records, dataset_path) write_report(records, t0_metrics, total_metrics, report_path, dataset_path, as_of) print("analysis dataset built") print(f"model_version: {MODEL_VERSION}") print(f"rows: {len(records)}") print(f"dataset: {dataset_path.as_posix()}") print(f"report: {report_path.as_posix()}") return 0 if __name__ == "__main__": raise SystemExit(main())