797bbde201
Request: - Update the selected analyst reports so stock/company names include Chinese names and use Chinese names first. Changes: - Updated the selected T0 reports for 01392, 06067, 06106, and 06132 to show Chinese company names in the title and summary, with English names in parentheses. - Added company_name_zh to the analyst dataset so report generation has access to Chinese names. - Updated the report generator to prefer Chinese company names and fall back to English names only when Chinese names are unavailable. - Filled Chinese company names for the selected tickers in ipo_master and refreshed snapshots. Verification: - Compiled build_analysis_dataset.py and generate_ipo_report.py. - Ran generator dry-runs for 06132 and 01392 to confirm Chinese-first output. - Ran SQLite integrity_check and foreign_key_check. - Ran git diff --check. Next useful context: - Future generated analyst reports now use company_name_zh first when available.
698 lines
28 KiB
Python
698 lines
28 KiB
Python
#!/usr/bin/env python3
|
|
"""Build the analyst v0 feature dataset and calibration report."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import sqlite3
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from statistics import mean, median
|
|
from typing import Any
|
|
|
|
|
|
MODEL_VERSION = "ipo_score_v0"
|
|
RULE_PATH = Path("rules/ipo_score_v0.yaml")
|
|
DEFAULT_DB_PATH = Path("data/hk_ipo.sqlite")
|
|
DEFAULT_DATASET_PATH = Path("data/snapshots/analysis_model_v0_dataset.csv")
|
|
DEFAULT_REPORT_PATH = Path("reports/2026-06-15_analysis_model_v0.md")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Metric:
|
|
bucket: str
|
|
sample_size: int
|
|
d1_positive_rate: float | None
|
|
d1_strong_rate: float | None
|
|
average_d1_return_pct: float | None
|
|
median_d1_return_pct: float | None
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Repo-relative SQLite database path.")
|
|
parser.add_argument("--dataset", default=str(DEFAULT_DATASET_PATH), help="Output CSV dataset path.")
|
|
parser.add_argument("--report", default=str(DEFAULT_REPORT_PATH), help="Output Markdown report path.")
|
|
parser.add_argument("--as-of", help="Analysis timestamp. Defaults to current UTC time.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def parse_as_of(value: str | None) -> str:
|
|
if value:
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
|
|
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def as_float(value: Any) -> float | None:
|
|
if value is None:
|
|
return None
|
|
return float(value)
|
|
|
|
|
|
def as_int(value: Any) -> int | None:
|
|
if value is None:
|
|
return None
|
|
return int(value)
|
|
|
|
|
|
def offer_size_hkd_m(row: sqlite3.Row) -> float | None:
|
|
gross = as_float(row["gross_proceeds_hkd_m"])
|
|
if gross is not None:
|
|
return gross
|
|
funds = as_float(row["funds_raised_hkd"])
|
|
if funds is not None:
|
|
return funds / 1_000_000
|
|
offer_price = as_float(row["offer_price_hkd"])
|
|
global_shares = as_int(row["global_offer_shares"])
|
|
if offer_price is not None and global_shares is not None:
|
|
return offer_price * global_shares / 1_000_000
|
|
return None
|
|
|
|
|
|
def success_rate(row: sqlite3.Row) -> float | None:
|
|
valid = as_int(row["valid_applications"])
|
|
successful = as_int(row["successful_applications"])
|
|
if not valid or successful is None:
|
|
return None
|
|
return successful / valid
|
|
|
|
|
|
def reallocation_multiple(row: sqlite3.Row) -> float | None:
|
|
final_hk = as_int(row["final_hk_offer_shares"])
|
|
initial_hk = as_int(row["hk_offer_shares_initial"])
|
|
if not final_hk or not initial_hk:
|
|
return None
|
|
return final_hk / initial_hk
|
|
|
|
|
|
def add_component(components: list[str], name: str, points: int, reason: str) -> int:
|
|
components.append(f"{name}:{points}:{reason}")
|
|
return points
|
|
|
|
|
|
def score_t0(row: sqlite3.Row) -> tuple[int, str]:
|
|
score = 0
|
|
components: list[str] = []
|
|
|
|
size = offer_size_hkd_m(row)
|
|
if size is None:
|
|
score += add_component(components, "offer_size", 0, "missing")
|
|
elif size < 300:
|
|
score += add_component(components, "offer_size", -2, "lt_300m")
|
|
elif size < 800:
|
|
score += add_component(components, "offer_size", 1, "300m_to_800m")
|
|
elif size < 2000:
|
|
score += add_component(components, "offer_size", 4, "800m_to_2000m")
|
|
elif size < 5000:
|
|
score += add_component(components, "offer_size", 3, "2000m_to_5000m")
|
|
else:
|
|
score += add_component(components, "offer_size", 2, "gte_5000m")
|
|
|
|
public_pct = as_float(row["public_offer_pct_initial"])
|
|
if public_pct is None:
|
|
score += add_component(components, "public_pct", 0, "missing")
|
|
elif public_pct <= 0.05:
|
|
score += add_component(components, "public_pct", 3, "lte_5pct")
|
|
elif public_pct <= 0.10:
|
|
score += add_component(components, "public_pct", 1, "5pct_to_10pct")
|
|
else:
|
|
score += add_component(components, "public_pct", -1, "gt_10pct")
|
|
|
|
min_sub = as_float(row["min_subscription_amount_hkd"])
|
|
if min_sub is None:
|
|
score += add_component(components, "min_subscription", -1, "missing")
|
|
elif min_sub < 3500:
|
|
score += add_component(components, "min_subscription", -1, "lt_3500")
|
|
elif min_sub < 10000:
|
|
score += add_component(components, "min_subscription", 2, "3500_to_10000")
|
|
else:
|
|
score += add_component(components, "min_subscription", 1, "gte_10000")
|
|
|
|
offer_price = as_float(row["offer_price_hkd"])
|
|
if offer_price is None:
|
|
score += add_component(components, "offer_price", 0, "missing")
|
|
elif offer_price < 1:
|
|
score += add_component(components, "offer_price", -2, "lt_1")
|
|
elif offer_price < 5:
|
|
score += add_component(components, "offer_price", 0, "1_to_5")
|
|
elif offer_price < 30:
|
|
score += add_component(components, "offer_price", 1, "5_to_30")
|
|
elif offer_price < 100:
|
|
score += add_component(components, "offer_price", 2, "30_to_100")
|
|
else:
|
|
score += add_component(components, "offer_price", 1, "gte_100")
|
|
|
|
if as_int(row["over_allotment_offer_shares"]):
|
|
score += add_component(components, "over_allotment", 1, "present")
|
|
else:
|
|
score += add_component(components, "over_allotment", 0, "missing_or_zero")
|
|
|
|
return score, "|".join(components)
|
|
|
|
|
|
def score_t1(row: sqlite3.Row) -> tuple[int, str]:
|
|
score = 0
|
|
components: list[str] = []
|
|
|
|
public_os = as_float(row["public_oversubscription_times"])
|
|
if public_os is None:
|
|
score += add_component(components, "public_os", 0, "missing")
|
|
elif public_os >= 5000:
|
|
score += add_component(components, "public_os", 15, "gte_5000x")
|
|
elif public_os >= 1000:
|
|
score += add_component(components, "public_os", 13, "1000x_to_5000x")
|
|
elif public_os >= 100:
|
|
score += add_component(components, "public_os", 6, "100x_to_1000x")
|
|
elif public_os >= 10:
|
|
score += add_component(components, "public_os", -2, "10x_to_100x")
|
|
else:
|
|
score += add_component(components, "public_os", -4, "lt_10x")
|
|
|
|
international_os = as_float(row["international_oversubscription_times"])
|
|
if international_os is None:
|
|
score += add_component(components, "international_os", 0, "missing")
|
|
elif international_os >= 30:
|
|
score += add_component(components, "international_os", 8, "gte_30x")
|
|
elif international_os >= 10:
|
|
score += add_component(components, "international_os", 6, "10x_to_30x")
|
|
elif international_os >= 3:
|
|
score += add_component(components, "international_os", 1, "3x_to_10x")
|
|
elif international_os >= 1:
|
|
score += add_component(components, "international_os", -1, "1x_to_3x")
|
|
else:
|
|
score += add_component(components, "international_os", -2, "lt_1x")
|
|
|
|
valid = as_int(row["valid_applications"])
|
|
if valid is None:
|
|
score += add_component(components, "valid_applications", 0, "missing")
|
|
elif valid >= 200000:
|
|
score += add_component(components, "valid_applications", 5, "gte_200k")
|
|
elif valid >= 100000:
|
|
score += add_component(components, "valid_applications", 3, "100k_to_200k")
|
|
elif valid >= 50000:
|
|
score += add_component(components, "valid_applications", 1, "50k_to_100k")
|
|
elif valid < 10000:
|
|
score += add_component(components, "valid_applications", -2, "lt_10k")
|
|
else:
|
|
score += add_component(components, "valid_applications", 0, "10k_to_50k")
|
|
|
|
rate = success_rate(row)
|
|
if rate is None:
|
|
score += add_component(components, "success_rate", 0, "missing")
|
|
elif rate <= 0.10:
|
|
score += add_component(components, "success_rate", 4, "lte_10pct")
|
|
elif rate <= 0.30:
|
|
score += add_component(components, "success_rate", 2, "10pct_to_30pct")
|
|
elif rate > 0.80:
|
|
score += add_component(components, "success_rate", -2, "gt_80pct")
|
|
else:
|
|
score += add_component(components, "success_rate", 0, "30pct_to_80pct")
|
|
|
|
realloc = reallocation_multiple(row)
|
|
if realloc is None:
|
|
score += add_component(components, "hk_reallocation", 0, "missing")
|
|
elif realloc >= 3:
|
|
score += add_component(components, "hk_reallocation", 4, "gte_3x")
|
|
elif realloc >= 2:
|
|
score += add_component(components, "hk_reallocation", 2, "2x_to_3x")
|
|
else:
|
|
score += add_component(components, "hk_reallocation", 0, "lt_2x")
|
|
|
|
return score, "|".join(components)
|
|
|
|
|
|
def has_structured_t1(row: sqlite3.Row) -> bool:
|
|
return any(
|
|
row[key] is not None
|
|
for key in [
|
|
"public_oversubscription_times",
|
|
"international_oversubscription_times",
|
|
"valid_applications",
|
|
"successful_applications",
|
|
]
|
|
)
|
|
|
|
|
|
def t0_bucket(score: int) -> str:
|
|
if score < 1:
|
|
return "t0_lt_1"
|
|
if score <= 4:
|
|
return "t0_1_to_4"
|
|
if score <= 7:
|
|
return "t0_5_to_7"
|
|
return "t0_gte_8"
|
|
|
|
|
|
def total_bucket(score: int) -> str:
|
|
if score < 0:
|
|
return "total_lt_0"
|
|
if score <= 9:
|
|
return "total_0_to_9"
|
|
if score <= 17:
|
|
return "total_10_to_17"
|
|
if score <= 25:
|
|
return "total_18_to_25"
|
|
return "total_gte_26"
|
|
|
|
|
|
def decision_band(row: dict[str, Any]) -> str:
|
|
if not row["has_structured_t1"]:
|
|
score = row["t0_score"]
|
|
if score < 1:
|
|
return "weak_or_avoid"
|
|
if score <= 4:
|
|
return "neutral"
|
|
if score <= 7:
|
|
return "positive_watch"
|
|
return "strong_watch"
|
|
|
|
score = row["total_score"]
|
|
if score < 0:
|
|
return "avoid"
|
|
if score <= 9:
|
|
return "avoid_or_wait"
|
|
if score <= 17:
|
|
return "watch_or_small"
|
|
if score <= 25:
|
|
return "selective_subscribe"
|
|
return "high_conviction_subscribe"
|
|
|
|
|
|
def fetch_rows(conn: sqlite3.Connection) -> list[sqlite3.Row]:
|
|
conn.row_factory = sqlite3.Row
|
|
return conn.execute(
|
|
"""
|
|
WITH listing_reports AS (
|
|
SELECT
|
|
ticker,
|
|
MAX(funds_raised_hkd) AS funds_raised_hkd,
|
|
MAX(subscription_ratio_times) AS report_subscription_ratio_times,
|
|
MAX(market_cap_hkd) AS report_market_cap_hkd,
|
|
MAX(industry_label) AS report_industry_label,
|
|
MAX(listing_method) AS listing_method,
|
|
MAX(sponsors) AS sponsors
|
|
FROM new_listing_report_entries
|
|
GROUP BY ticker
|
|
),
|
|
performance AS (
|
|
SELECT
|
|
ticker,
|
|
MAX(CASE WHEN stage = 'D1' THEN return_pct END) AS d1_return_pct,
|
|
MAX(CASE WHEN stage = 'D5' THEN return_pct END) AS d5_return_pct,
|
|
MAX(CASE WHEN stage = 'D20' THEN return_pct END) AS d20_return_pct,
|
|
MAX(CASE WHEN stage = 'D60' THEN return_pct END) AS d60_return_pct,
|
|
MAX(CASE WHEN stage = 'D1' THEN turnover_hkd_m END) AS d1_turnover_hkd_m
|
|
FROM price_performance
|
|
GROUP BY ticker
|
|
)
|
|
SELECT
|
|
m.ticker,
|
|
m.company_name_en,
|
|
m.company_name_zh,
|
|
m.board,
|
|
m.status,
|
|
m.listing_date,
|
|
m.application_start_date,
|
|
m.application_end_date,
|
|
m.allotment_results_expected_date,
|
|
m.industry_label AS master_industry_label,
|
|
ot.offer_price_hkd,
|
|
ot.board_lot,
|
|
ot.min_subscription_amount_hkd,
|
|
ot.global_offer_shares,
|
|
ot.hk_offer_shares_initial,
|
|
ot.international_offer_shares_initial,
|
|
ot.public_offer_pct_initial,
|
|
ot.over_allotment_offer_shares,
|
|
ot.gross_proceeds_hkd_m,
|
|
ot.net_proceeds_hkd_m,
|
|
ot.market_cap_hkd_m,
|
|
lr.funds_raised_hkd,
|
|
lr.report_subscription_ratio_times,
|
|
lr.report_market_cap_hkd,
|
|
lr.report_industry_label,
|
|
lr.listing_method,
|
|
lr.sponsors,
|
|
d.valid_applications,
|
|
d.successful_applications,
|
|
d.public_oversubscription_times,
|
|
d.international_placees,
|
|
d.international_oversubscription_times,
|
|
d.final_hk_offer_shares,
|
|
d.final_international_offer_shares,
|
|
p.d1_return_pct,
|
|
p.d5_return_pct,
|
|
p.d20_return_pct,
|
|
p.d60_return_pct,
|
|
p.d1_turnover_hkd_m,
|
|
(
|
|
SELECT local_path
|
|
FROM source_refs s
|
|
WHERE s.ticker = m.ticker AND s.source_type = 'prospectus'
|
|
ORDER BY s.source_date DESC, s.source_id DESC
|
|
LIMIT 1
|
|
) AS prospectus_source_path,
|
|
(
|
|
SELECT local_path
|
|
FROM source_refs s
|
|
WHERE s.ticker = m.ticker AND s.source_type = 'allotment_results'
|
|
ORDER BY s.source_date DESC, s.source_id DESC
|
|
LIMIT 1
|
|
) AS allotment_source_path
|
|
FROM ipo_master m
|
|
LEFT JOIN offering_terms ot ON ot.ticker = m.ticker
|
|
LEFT JOIN listing_reports lr ON lr.ticker = m.ticker
|
|
LEFT JOIN ipo_demand d ON d.ticker = m.ticker
|
|
LEFT JOIN performance p ON p.ticker = m.ticker
|
|
ORDER BY m.listing_date, m.ticker
|
|
"""
|
|
).fetchall()
|
|
|
|
|
|
def build_records(rows: list[sqlite3.Row], as_of: str) -> list[dict[str, Any]]:
|
|
records: list[dict[str, Any]] = []
|
|
for row in rows:
|
|
t0_score_value, t0_breakdown = score_t0(row)
|
|
t1_score_value, t1_breakdown = score_t1(row)
|
|
structured_t1 = has_structured_t1(row)
|
|
total_score = t0_score_value + (t1_score_value if structured_t1 else 0)
|
|
size = offer_size_hkd_m(row)
|
|
record: dict[str, Any] = {
|
|
"model_version": MODEL_VERSION,
|
|
"analysis_as_of": as_of,
|
|
"ticker": row["ticker"],
|
|
"company_name_en": row["company_name_en"],
|
|
"company_name_zh": row["company_name_zh"],
|
|
"board": row["board"],
|
|
"status": row["status"],
|
|
"listing_date": row["listing_date"],
|
|
"application_start_date": row["application_start_date"],
|
|
"application_end_date": row["application_end_date"],
|
|
"allotment_results_expected_date": row["allotment_results_expected_date"],
|
|
"listing_method": row["listing_method"],
|
|
"industry_label": row["master_industry_label"] or row["report_industry_label"],
|
|
"sponsors": row["sponsors"],
|
|
"offer_price_hkd": row["offer_price_hkd"],
|
|
"offer_size_hkd_m": size,
|
|
"gross_proceeds_hkd_m": row["gross_proceeds_hkd_m"],
|
|
"net_proceeds_hkd_m": row["net_proceeds_hkd_m"],
|
|
"market_cap_hkd_m": row["market_cap_hkd_m"]
|
|
or (row["report_market_cap_hkd"] / 1_000_000 if row["report_market_cap_hkd"] else None),
|
|
"board_lot": row["board_lot"],
|
|
"min_subscription_amount_hkd": row["min_subscription_amount_hkd"],
|
|
"public_offer_pct_initial": row["public_offer_pct_initial"],
|
|
"over_allotment_offer_shares": row["over_allotment_offer_shares"],
|
|
"public_oversubscription_times": row["public_oversubscription_times"],
|
|
"international_oversubscription_times": row["international_oversubscription_times"],
|
|
"valid_applications": row["valid_applications"],
|
|
"successful_applications": row["successful_applications"],
|
|
"application_success_rate": success_rate(row),
|
|
"international_placees": row["international_placees"],
|
|
"hk_offer_reallocation_multiple": reallocation_multiple(row),
|
|
"has_structured_t1": structured_t1,
|
|
"t0_score": t0_score_value,
|
|
"t1_add_score": t1_score_value if structured_t1 else None,
|
|
"total_score": total_score,
|
|
"t0_score_bucket": t0_bucket(t0_score_value),
|
|
"total_score_bucket": total_bucket(total_score) if structured_t1 else None,
|
|
"t0_score_breakdown": t0_breakdown,
|
|
"t1_score_breakdown": t1_breakdown if structured_t1 else "",
|
|
"d1_return_pct": row["d1_return_pct"],
|
|
"d5_return_pct": row["d5_return_pct"],
|
|
"d20_return_pct": row["d20_return_pct"],
|
|
"d60_return_pct": row["d60_return_pct"],
|
|
"d1_turnover_hkd_m": row["d1_turnover_hkd_m"],
|
|
"d1_positive": as_float(row["d1_return_pct"]) is not None and as_float(row["d1_return_pct"]) > 0,
|
|
"d1_strong_10pct": as_float(row["d1_return_pct"]) is not None and as_float(row["d1_return_pct"]) >= 10,
|
|
"prospectus_source_path": row["prospectus_source_path"],
|
|
"allotment_source_path": row["allotment_source_path"],
|
|
}
|
|
record["decision_band"] = decision_band(record)
|
|
records.append(record)
|
|
return records
|
|
|
|
|
|
def metric_for(bucket: str, values: list[float]) -> Metric:
|
|
if not values:
|
|
return Metric(bucket, 0, None, None, None, None)
|
|
return Metric(
|
|
bucket=bucket,
|
|
sample_size=len(values),
|
|
d1_positive_rate=sum(value > 0 for value in values) / len(values),
|
|
d1_strong_rate=sum(value >= 10 for value in values) / len(values),
|
|
average_d1_return_pct=mean(values),
|
|
median_d1_return_pct=median(values),
|
|
)
|
|
|
|
|
|
def calibration(records: list[dict[str, Any]], bucket_key: str, require_t1: bool = False) -> list[Metric]:
|
|
buckets: dict[str, list[float]] = {}
|
|
for record in records:
|
|
value = record["d1_return_pct"]
|
|
if value is None:
|
|
continue
|
|
if require_t1 and not record["has_structured_t1"]:
|
|
continue
|
|
bucket = record[bucket_key]
|
|
if bucket is None:
|
|
continue
|
|
buckets.setdefault(str(bucket), []).append(float(value))
|
|
return [metric_for(bucket, buckets[bucket]) for bucket in sorted(buckets)]
|
|
|
|
|
|
def calibrated_rates(metrics: list[Metric]) -> dict[str, float | None]:
|
|
return {metric.bucket: metric.d1_positive_rate for metric in metrics}
|
|
|
|
|
|
def add_calibrated_rates(records: list[dict[str, Any]], t0_metrics: list[Metric], total_metrics: list[Metric]) -> None:
|
|
t0_rates = calibrated_rates(t0_metrics)
|
|
total_rates = calibrated_rates(total_metrics)
|
|
for record in records:
|
|
if record["has_structured_t1"]:
|
|
record["calibrated_d1_positive_rate"] = total_rates.get(record["total_score_bucket"])
|
|
else:
|
|
record["calibrated_d1_positive_rate"] = t0_rates.get(record["t0_score_bucket"])
|
|
|
|
|
|
def format_cell(value: Any) -> Any:
|
|
if value is None:
|
|
return ""
|
|
if isinstance(value, bool):
|
|
return "1" if value else "0"
|
|
if isinstance(value, float):
|
|
return f"{value:.6g}"
|
|
return value
|
|
|
|
|
|
def write_dataset(records: list[dict[str, Any]], output_path: Path) -> None:
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
columns = [
|
|
"model_version",
|
|
"analysis_as_of",
|
|
"ticker",
|
|
"company_name_en",
|
|
"company_name_zh",
|
|
"board",
|
|
"status",
|
|
"listing_date",
|
|
"application_start_date",
|
|
"application_end_date",
|
|
"allotment_results_expected_date",
|
|
"listing_method",
|
|
"industry_label",
|
|
"sponsors",
|
|
"offer_price_hkd",
|
|
"offer_size_hkd_m",
|
|
"gross_proceeds_hkd_m",
|
|
"net_proceeds_hkd_m",
|
|
"market_cap_hkd_m",
|
|
"board_lot",
|
|
"min_subscription_amount_hkd",
|
|
"public_offer_pct_initial",
|
|
"over_allotment_offer_shares",
|
|
"public_oversubscription_times",
|
|
"international_oversubscription_times",
|
|
"valid_applications",
|
|
"successful_applications",
|
|
"application_success_rate",
|
|
"international_placees",
|
|
"hk_offer_reallocation_multiple",
|
|
"has_structured_t1",
|
|
"t0_score",
|
|
"t1_add_score",
|
|
"total_score",
|
|
"t0_score_bucket",
|
|
"total_score_bucket",
|
|
"decision_band",
|
|
"calibrated_d1_positive_rate",
|
|
"d1_return_pct",
|
|
"d5_return_pct",
|
|
"d20_return_pct",
|
|
"d60_return_pct",
|
|
"d1_turnover_hkd_m",
|
|
"d1_positive",
|
|
"d1_strong_10pct",
|
|
"prospectus_source_path",
|
|
"allotment_source_path",
|
|
"t0_score_breakdown",
|
|
"t1_score_breakdown",
|
|
]
|
|
with output_path.open("w", newline="", encoding="utf-8") as handle:
|
|
writer = csv.DictWriter(handle, fieldnames=columns, lineterminator="\n")
|
|
writer.writeheader()
|
|
for record in records:
|
|
writer.writerow({column: format_cell(record.get(column)) for column in columns})
|
|
|
|
|
|
def fmt_pct(value: float | None) -> str:
|
|
if value is None:
|
|
return ""
|
|
return f"{value * 100:.1f}%"
|
|
|
|
|
|
def fmt_num(value: float | None) -> str:
|
|
if value is None:
|
|
return ""
|
|
return f"{value:.1f}"
|
|
|
|
|
|
def metrics_table(metrics: list[Metric]) -> str:
|
|
lines = [
|
|
"| Bucket | N | D1 positive | D1 >= 10% | Avg D1 return | Median D1 return |",
|
|
"| --- | ---: | ---: | ---: | ---: | ---: |",
|
|
]
|
|
for metric in metrics:
|
|
lines.append(
|
|
"| "
|
|
+ " | ".join(
|
|
[
|
|
metric.bucket,
|
|
str(metric.sample_size),
|
|
fmt_pct(metric.d1_positive_rate),
|
|
fmt_pct(metric.d1_strong_rate),
|
|
fmt_num(metric.average_d1_return_pct),
|
|
fmt_num(metric.median_d1_return_pct),
|
|
]
|
|
)
|
|
+ " |"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def count_present(records: list[dict[str, Any]], key: str) -> int:
|
|
return sum(record.get(key) not in {None, ""} for record in records)
|
|
|
|
|
|
def write_report(
|
|
records: list[dict[str, Any]],
|
|
t0_metrics: list[Metric],
|
|
total_metrics: list[Metric],
|
|
report_path: Path,
|
|
dataset_path: Path,
|
|
as_of: str,
|
|
) -> None:
|
|
report_path.parent.mkdir(parents=True, exist_ok=True)
|
|
total = len(records)
|
|
d1_records = [record for record in records if record["d1_return_pct"] is not None]
|
|
structured_t1 = [record for record in records if record["has_structured_t1"]]
|
|
pending_t1_tickers = ", ".join(sorted(record["ticker"] for record in records if not record["has_structured_t1"]))
|
|
t1_public_os_missing = sum(record["public_oversubscription_times"] is None for record in structured_t1)
|
|
t1_international_os_missing = sum(record["international_oversubscription_times"] is None for record in structured_t1)
|
|
t1_valid_missing = sum(record["valid_applications"] is None for record in structured_t1)
|
|
t1_successful_missing = sum(record["successful_applications"] is None for record in structured_t1)
|
|
best_bucket = max(total_metrics, key=lambda metric: metric.d1_positive_rate or -1)
|
|
|
|
lines = [
|
|
f"# HK IPO Analysis Model v0",
|
|
"",
|
|
f"- Model version: `{MODEL_VERSION}`",
|
|
f"- Analysis as of: `{as_of}`",
|
|
f"- Rule file: `{RULE_PATH.as_posix()}`",
|
|
f"- Dataset: `{dataset_path.as_posix()}`",
|
|
"",
|
|
"## What This Model Does",
|
|
"",
|
|
"This is the first analyst model built from the downloaded archive. It creates a repeatable feature table, scores each IPO using stage-safe rules, and calibrates the score buckets against archived D1 sell outcomes. It is intentionally transparent: the output includes every score component and the archived source paths used for each ticker.",
|
|
"",
|
|
"The model is built for a short IPO allocation trade: sell in T2 grey market when reliable executable data exists, or sell on D1 otherwise. It does not use grey-market data in v0 because T2 currently has no approved reproducible source. It also does not use post-listing returns as inputs; D1 is the primary sell label, while D5/D20/D60 are review labels only.",
|
|
"",
|
|
"## Data Inventory",
|
|
"",
|
|
f"- IPO rows scored: {total}",
|
|
f"- Rows with D1 labels: {len(d1_records)}",
|
|
f"- Rows with structured T1 demand fields: {len(structured_t1)}",
|
|
f"- Rows with prospectus source path: {count_present(records, 'prospectus_source_path')}",
|
|
f"- Rows with allotment source path: {count_present(records, 'allotment_source_path')}",
|
|
f"- Rows with offer size: {count_present(records, 'offer_size_hkd_m')}",
|
|
f"- Rows with public oversubscription: {count_present(records, 'public_oversubscription_times')}",
|
|
f"- Rows with international oversubscription: {count_present(records, 'international_oversubscription_times')}",
|
|
f"- Rows pending T1 structure: {total - len(structured_t1)}"
|
|
+ (f" ({pending_t1_tickers})" if pending_t1_tickers else ""),
|
|
f"- T1 field-level blanks: public oversubscription {t1_public_os_missing}, international oversubscription {t1_international_os_missing}, valid applications {t1_valid_missing}, successful applications {t1_successful_missing}",
|
|
"",
|
|
"## T0 Calibration",
|
|
"",
|
|
"T0 uses only prospectus-stage structure: offer size, initial public offer percentage, minimum subscription amount, offer price band, and over-allotment availability.",
|
|
"",
|
|
metrics_table(t0_metrics),
|
|
"",
|
|
"## T1 Calibration",
|
|
"",
|
|
"T1 adds allotment-stage demand: public subscription, international placing demand, valid application count, application success rate, and HK public offer reallocation.",
|
|
"",
|
|
metrics_table(total_metrics),
|
|
"",
|
|
"## Current Read",
|
|
"",
|
|
f"After the T1 demand text backfill, the strongest v0 T1 bucket is `{best_bucket.bucket}` with {best_bucket.sample_size} historical D1 observations and a {fmt_pct(best_bucket.d1_positive_rate)} D1 positive rate. The model is most useful after allotment results are available; T0 is a watchlist filter rather than a final subscription call.",
|
|
"",
|
|
"The high-conviction bucket remains clearly differentiated, but the middle and low score buckets are still not monotonic. This refresh keeps the v0 score formula unchanged and updates empirical calibration only; future rule changes should come from reviewed prediction cards rather than overfitting this historical sample.",
|
|
"",
|
|
"## Usage",
|
|
"",
|
|
"1. Run `scripts/build_analysis_dataset.py` after archivist updates the database.",
|
|
"2. Use `t0_score` for prospectus-stage watchlisting.",
|
|
"3. Use `total_score`, `decision_band`, and `calibrated_d1_positive_rate` for T1-stage subscription cards.",
|
|
"4. Frame live decisions around a T2 or D1 sell, not long-term holding.",
|
|
"5. Treat D5/D20/D60 columns as review labels only, never as prediction inputs or holding targets.",
|
|
"",
|
|
"## Known Gaps",
|
|
"",
|
|
"- T1 is structurally complete for listed rows; residual field-level NULLs remain when the archived source does not explicitly state a demand field.",
|
|
"- Industry and issuer fundamentals are not sufficiently structured for model input.",
|
|
"- T2 grey-market signal is blocked pending an approved source.",
|
|
"- Extreme D1 returns should be audited before they drive rule changes.",
|
|
]
|
|
report_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
as_of = parse_as_of(args.as_of)
|
|
db_path = Path(args.db)
|
|
dataset_path = Path(args.dataset)
|
|
report_path = Path(args.report)
|
|
|
|
with sqlite3.connect(db_path) as conn:
|
|
rows = fetch_rows(conn)
|
|
|
|
records = build_records(rows, as_of)
|
|
t0_metrics = calibration(records, "t0_score_bucket")
|
|
total_metrics = calibration(records, "total_score_bucket", require_t1=True)
|
|
add_calibrated_rates(records, t0_metrics, total_metrics)
|
|
write_dataset(records, dataset_path)
|
|
write_report(records, t0_metrics, total_metrics, report_path, dataset_path, as_of)
|
|
|
|
print("analysis dataset built")
|
|
print(f"model_version: {MODEL_VERSION}")
|
|
print(f"rows: {len(records)}")
|
|
print(f"dataset: {dataset_path.as_posix()}")
|
|
print(f"report: {report_path.as_posix()}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|