Add external IPO history to heat model

Request:
- Add historical data around T0.5 margin heat and rebuild the model.

Changes:
- Add external_ipo_history to store third-party historical IPO records separately from true T0.5 market-heat snapshots.
- Add scripts/archive_ipohk_history.py to archive ipohk structured listed IPO history.
- Archive 807 ipohk rows, including final oversubscription, one-lot win rate, grey-market return, and first-day return where available.
- Extend the v0 analysis dataset with true T0.5 market-heat columns and separate external final-heat columns.
- Rebuild reports/2026-06-15_analysis_model_v0.md with T0.5 coverage and external final-heat calibration.
- Add a Chinese report explaining why historical final oversubscription cannot be treated as T0.5 margin snapshots.
- Update analyst and archivist skills to keep T0.5 and external final history separate.

Verification:
- .venv/bin/python -m py_compile scripts/build_analysis_dataset.py scripts/archive_ipohk_history.py scripts/archive_t0_5_market_heat.py
- .venv/bin/python scripts/build_analysis_dataset.py --as-of 2026-06-15T19:20:00Z
- Python sqlite3 PRAGMA integrity_check returned ok and foreign_key_check returned zero rows.
- Confirmed 807 external_ipo_history rows, 792 rows with external final oversubscription, 5 true T0.5 market-heat rows, and 297 analysis dataset rows.
- git diff --cached --check

Next useful context:
- True T0.5 historical backtesting still requires ongoing frozen margin-heat snapshots during each IPO subscription window.
This commit is contained in:
2026-06-15 16:06:56 +00:00
parent 222f55c140
commit 943eab27cb
12 changed files with 1589 additions and 299 deletions
+211
View File
@@ -0,0 +1,211 @@
#!/usr/bin/env python3
"""Archive structured historical HK IPO data from ipohk.com.cn."""
from __future__ import annotations
import argparse
import csv
import hashlib
import json
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from urllib.request import Request, urlopen
DEFAULT_DB_PATH = Path("data/hk_ipo.sqlite")
DEFAULT_SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
SNAPSHOT_DIR = Path("data/snapshots")
IPOHK_URL = "http://www.ipohk.com.cn/ipo/data.php?action=listed&year=&search="
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Repo-relative SQLite database path.")
parser.add_argument("--schema", default=str(DEFAULT_SCHEMA_PATH), help="Repo-relative schema path.")
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
parser.add_argument("--url", default=IPOHK_URL, help="ipohk listed-data endpoint.")
return parser.parse_args()
def parse_as_of(value: str | None) -> str:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def compact_timestamp(value: str) -> str:
return value.replace("-", "").replace(":", "").replace("+00:00", "Z")
def fetch_bytes(url: str) -> bytes:
request = Request(url, headers={"User-Agent": "Mozilla/5.0", "Referer": "http://www.ipohk.com.cn/ipo/"})
with urlopen(request, timeout=60) as response:
return response.read()
def sha256_bytes(payload: bytes) -> str:
return hashlib.sha256(payload).hexdigest()
def save_raw(payload: bytes, as_of: str) -> tuple[str, str]:
raw_dir = Path("data/raw/external_history")
raw_dir.mkdir(parents=True, exist_ok=True)
path = raw_dir / f"ipohk_listed_{compact_timestamp(as_of)}.json"
if not path.exists() or path.read_bytes() != payload:
path.write_bytes(payload)
return path.as_posix(), sha256_bytes(payload)
def as_float(value: object) -> float | None:
if value is None:
return None
cleaned = str(value).strip().replace(",", "").replace("%", "")
if cleaned in {"", "-"}:
return None
try:
return float(cleaned)
except ValueError:
return None
def as_ticker(value: object) -> str:
return str(value or "").strip().zfill(5)
def source_date(as_of: str) -> str:
return datetime.fromisoformat(as_of.replace("Z", "+00:00")).date().isoformat()
def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None:
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
rows = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}").fetchall()
columns = [description[0] for description in conn.execute(f"SELECT * FROM {table} LIMIT 0").description]
with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle:
writer = csv.writer(handle, lineterminator="\n")
writer.writerow(columns)
writer.writerows(rows)
def write_sync_run(conn: sqlite3.Connection, as_of: str, row_count: int) -> None:
sync_run_id = f"external_history_ipohk_{compact_timestamp(as_of)}"
conn.execute(
"""
INSERT INTO sync_runs (sync_run_id, mode, as_of, started_at, finished_at, status, notes)
VALUES (?, 'external_history_update', ?, ?, ?, 'complete', ?)
ON CONFLICT(sync_run_id) DO UPDATE SET
finished_at = excluded.finished_at,
status = excluded.status,
notes = excluded.notes
""",
(sync_run_id, as_of, as_of, as_of, f"Archived {row_count} ipohk listed-history rows."),
)
def upsert_history(
conn: sqlite3.Connection,
rows: list[dict[str, object]],
local_path: str,
file_sha256: str,
url: str,
as_of: str,
notes: str,
) -> int:
written = 0
for row in rows:
ticker = as_ticker(row.get("stock_code"))
listing_date = str(row.get("listing_date") or "").strip()
if not ticker.strip("0") or not listing_date:
continue
source_id = f"external_ipohk_listed_{compact_timestamp(as_of)}"
history_id = f"ipohk_{ticker}_{listing_date}"
conn.execute(
"""
INSERT INTO external_ipo_history (
history_id, ticker, provider, source_id, stock_name, listing_date,
price_range_low_hkd, price_range_high_hkd, issue_price_hkd,
one_lot_capital_hkd, one_hand_win_rate_pct,
public_oversubscription_times, total_fundraise_hkd_b,
market_cap_at_listing_hkd_b, grey_market_return_pct,
first_day_return_pct, sponsor, source_date, archived_at,
local_path, url, file_sha256, notes
)
VALUES (?, ?, 'ipohk', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(ticker, provider, listing_date) DO UPDATE SET
source_id = excluded.source_id,
stock_name = excluded.stock_name,
price_range_low_hkd = excluded.price_range_low_hkd,
price_range_high_hkd = excluded.price_range_high_hkd,
issue_price_hkd = excluded.issue_price_hkd,
one_lot_capital_hkd = excluded.one_lot_capital_hkd,
one_hand_win_rate_pct = excluded.one_hand_win_rate_pct,
public_oversubscription_times = excluded.public_oversubscription_times,
total_fundraise_hkd_b = excluded.total_fundraise_hkd_b,
market_cap_at_listing_hkd_b = excluded.market_cap_at_listing_hkd_b,
grey_market_return_pct = excluded.grey_market_return_pct,
first_day_return_pct = excluded.first_day_return_pct,
sponsor = excluded.sponsor,
source_date = excluded.source_date,
archived_at = excluded.archived_at,
local_path = excluded.local_path,
url = excluded.url,
file_sha256 = excluded.file_sha256,
notes = excluded.notes
""",
(
history_id,
ticker,
source_id,
row.get("name"),
listing_date,
as_float(row.get("price_range_low")),
as_float(row.get("price_range_high")),
as_float(row.get("issue_price")),
as_float(row.get("one_lot_capital")),
as_float(row.get("one_hand_win_rate")),
as_float(row.get("oversub_ratio")),
as_float(row.get("total_fundraise")),
as_float(row.get("market_cap_at_listing")),
as_float(row.get("grey_market_chg")),
as_float(row.get("first_day_chg")),
row.get("sponsor"),
source_date(as_of),
as_of,
local_path,
url,
file_sha256,
notes,
),
)
written += 1
return written
def main() -> int:
args = parse_args()
as_of = parse_as_of(args.as_of)
payload = fetch_bytes(args.url)
local_path, file_sha256 = save_raw(payload, as_of)
parsed = json.loads(payload.decode("utf-8"))
rows = parsed.get("data") or []
notes = (
"External structured IPO history. Includes final oversubscription, one-lot win rate, "
"grey-market return, and first-day return where available; not a T0.5 margin snapshot."
)
with sqlite3.connect(args.db) as conn:
conn.executescript(Path(args.schema).read_text(encoding="utf-8"))
written = upsert_history(conn, rows, local_path, file_sha256, args.url, as_of, notes)
write_sync_run(conn, as_of, written)
export_snapshot(conn, "external_ipo_history", "listing_date DESC, ticker")
export_snapshot(conn, "sync_runs", "sync_run_id")
print("ipohk history archived")
print(f"as_of: {as_of}")
print(f"raw_snapshot: {local_path}")
print(f"rows: {written}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+140
View File
@@ -223,6 +223,24 @@ def score_t1(row: sqlite3.Row) -> tuple[int, str]:
return score, "|".join(components)
def score_t0_5_market_heat(row: sqlite3.Row) -> tuple[int | None, str]:
margin = as_float(row["t0_5_margin_subscription_multiple"])
if margin is None:
return None, ""
components: list[str] = []
if margin >= 5000:
score = add_component(components, "margin_subscription", 8, "gte_5000x")
elif margin >= 1000:
score = add_component(components, "margin_subscription", 6, "1000x_to_5000x")
elif margin >= 100:
score = add_component(components, "margin_subscription", 3, "100x_to_1000x")
elif margin >= 10:
score = add_component(components, "margin_subscription", 0, "10x_to_100x")
else:
score = add_component(components, "margin_subscription", -3, "lt_10x")
return score, "|".join(components)
def has_structured_t1(row: sqlite3.Row) -> bool:
return any(
row[key] is not None
@@ -257,6 +275,33 @@ def total_bucket(score: int) -> str:
return "total_gte_26"
def t0_plus_t0_5_bucket(score: int | None) -> str | None:
if score is None:
return None
if score < 5:
return "t0_5_lt_5"
if score <= 7:
return "t0_5_5_to_7"
if score <= 11:
return "t0_5_8_to_11"
return "t0_5_gte_12"
def external_oversub_bucket(value: Any) -> str | None:
oversub = as_float(value)
if oversub is None:
return None
if oversub >= 5000:
return "external_os_gte_5000x"
if oversub >= 1000:
return "external_os_1000x_to_5000x"
if oversub >= 100:
return "external_os_100x_to_1000x"
if oversub >= 10:
return "external_os_10x_to_100x"
return "external_os_lt_10x"
def decision_band(row: dict[str, Any]) -> str:
if not row["has_structured_t1"]:
score = row["t0_score"]
@@ -306,6 +351,28 @@ def fetch_rows(conn: sqlite3.Connection) -> list[sqlite3.Row]:
MAX(CASE WHEN stage = 'D1' THEN turnover_hkd_m END) AS d1_turnover_hkd_m
FROM price_performance
GROUP BY ticker
),
latest_market_heat AS (
SELECT h.*
FROM ipo_market_heat h
JOIN (
SELECT ticker, MAX(observed_at) AS observed_at
FROM ipo_market_heat
GROUP BY ticker
) latest
ON latest.ticker = h.ticker AND latest.observed_at = h.observed_at
),
external_history AS (
SELECT e.*
FROM external_ipo_history e
JOIN (
SELECT ticker, MAX(listing_date) AS listing_date
FROM external_ipo_history
WHERE provider = 'ipohk'
GROUP BY ticker
) latest
ON latest.ticker = e.ticker AND latest.listing_date = e.listing_date
WHERE e.provider = 'ipohk'
)
SELECT
m.ticker,
@@ -347,6 +414,15 @@ def fetch_rows(conn: sqlite3.Connection) -> list[sqlite3.Row]:
p.d20_return_pct,
p.d60_return_pct,
p.d1_turnover_hkd_m,
h.observed_at AS t0_5_observed_at,
h.provider AS t0_5_provider,
h.margin_subscription_multiple AS t0_5_margin_subscription_multiple,
h.source_id AS t0_5_source_id,
eh.one_hand_win_rate_pct AS external_one_hand_win_rate_pct,
eh.public_oversubscription_times AS external_public_oversubscription_times,
eh.grey_market_return_pct AS external_grey_market_return_pct,
eh.first_day_return_pct AS external_first_day_return_pct,
eh.local_path AS external_history_source_path,
(
SELECT local_path
FROM source_refs s
@@ -366,6 +442,8 @@ def fetch_rows(conn: sqlite3.Connection) -> list[sqlite3.Row]:
LEFT JOIN listing_reports lr ON lr.ticker = m.ticker
LEFT JOIN ipo_demand d ON d.ticker = m.ticker
LEFT JOIN performance p ON p.ticker = m.ticker
LEFT JOIN latest_market_heat h ON h.ticker = m.ticker
LEFT JOIN external_history eh ON eh.ticker = m.ticker
ORDER BY m.listing_date, m.ticker
"""
).fetchall()
@@ -376,8 +454,10 @@ def build_records(rows: list[sqlite3.Row], as_of: str) -> list[dict[str, Any]]:
for row in rows:
t0_score_value, t0_breakdown = score_t0(row)
t1_score_value, t1_breakdown = score_t1(row)
t0_5_score_value, t0_5_breakdown = score_t0_5_market_heat(row)
structured_t1 = has_structured_t1(row)
total_score = t0_score_value + (t1_score_value if structured_t1 else 0)
t0_plus_t0_5_score = t0_score_value + t0_5_score_value if t0_5_score_value is not None else None
size = offer_size_hkd_m(row)
record: dict[str, Any] = {
"model_version": MODEL_VERSION,
@@ -406,6 +486,14 @@ def build_records(rows: list[sqlite3.Row], as_of: str) -> list[dict[str, Any]]:
"over_allotment_offer_shares": row["over_allotment_offer_shares"],
"public_oversubscription_times": row["public_oversubscription_times"],
"international_oversubscription_times": row["international_oversubscription_times"],
"t0_5_observed_at": row["t0_5_observed_at"],
"t0_5_provider": row["t0_5_provider"],
"t0_5_margin_subscription_multiple": row["t0_5_margin_subscription_multiple"],
"t0_5_source_id": row["t0_5_source_id"],
"t0_5_add_score": t0_5_score_value,
"t0_plus_t0_5_score": t0_plus_t0_5_score,
"t0_plus_t0_5_score_bucket": t0_plus_t0_5_bucket(t0_plus_t0_5_score),
"t0_5_score_breakdown": t0_5_breakdown,
"valid_applications": row["valid_applications"],
"successful_applications": row["successful_applications"],
"application_success_rate": success_rate(row),
@@ -426,6 +514,12 @@ def build_records(rows: list[sqlite3.Row], as_of: str) -> list[dict[str, Any]]:
"d1_turnover_hkd_m": row["d1_turnover_hkd_m"],
"d1_positive": as_float(row["d1_return_pct"]) is not None and as_float(row["d1_return_pct"]) > 0,
"d1_strong_10pct": as_float(row["d1_return_pct"]) is not None and as_float(row["d1_return_pct"]) >= 10,
"external_one_hand_win_rate_pct": row["external_one_hand_win_rate_pct"],
"external_public_oversubscription_times": row["external_public_oversubscription_times"],
"external_public_oversubscription_bucket": external_oversub_bucket(row["external_public_oversubscription_times"]),
"external_grey_market_return_pct": row["external_grey_market_return_pct"],
"external_first_day_return_pct": row["external_first_day_return_pct"],
"external_history_source_path": row["external_history_source_path"],
"prospectus_source_path": row["prospectus_source_path"],
"allotment_source_path": row["allotment_source_path"],
}
@@ -514,6 +608,13 @@ def write_dataset(records: list[dict[str, Any]], output_path: Path) -> None:
"over_allotment_offer_shares",
"public_oversubscription_times",
"international_oversubscription_times",
"t0_5_observed_at",
"t0_5_provider",
"t0_5_margin_subscription_multiple",
"t0_5_source_id",
"t0_5_add_score",
"t0_plus_t0_5_score",
"t0_plus_t0_5_score_bucket",
"valid_applications",
"successful_applications",
"application_success_rate",
@@ -534,9 +635,16 @@ def write_dataset(records: list[dict[str, Any]], output_path: Path) -> None:
"d1_turnover_hkd_m",
"d1_positive",
"d1_strong_10pct",
"external_one_hand_win_rate_pct",
"external_public_oversubscription_times",
"external_public_oversubscription_bucket",
"external_grey_market_return_pct",
"external_first_day_return_pct",
"external_history_source_path",
"prospectus_source_path",
"allotment_source_path",
"t0_score_breakdown",
"t0_5_score_breakdown",
"t1_score_breakdown",
]
with output_path.open("w", newline="", encoding="utf-8") as handle:
@@ -597,6 +705,16 @@ def write_report(
total = len(records)
d1_records = [record for record in records if record["d1_return_pct"] is not None]
structured_t1 = [record for record in records if record["has_structured_t1"]]
structured_t0_5 = [record for record in records if record["t0_5_margin_subscription_multiple"] is not None]
t0_5_with_d1 = [record for record in structured_t0_5 if record["d1_return_pct"] is not None]
external_history_rows = [record for record in records if record["external_history_source_path"]]
external_oversub_rows = [record for record in records if record["external_public_oversubscription_times"] is not None]
external_oversub_with_d1 = [
record
for record in records
if record["external_public_oversubscription_times"] is not None and record["d1_return_pct"] is not None
]
external_oversub_metrics = calibration(records, "external_public_oversubscription_bucket")
pending_t1_tickers = ", ".join(sorted(record["ticker"] for record in records if not record["has_structured_t1"]))
t1_public_os_missing = sum(record["public_oversubscription_times"] is None for record in structured_t1)
t1_international_os_missing = sum(record["international_oversubscription_times"] is None for record in structured_t1)
@@ -628,6 +746,11 @@ def write_report(
f"- Rows with offer size: {count_present(records, 'offer_size_hkd_m')}",
f"- Rows with public oversubscription: {count_present(records, 'public_oversubscription_times')}",
f"- Rows with international oversubscription: {count_present(records, 'international_oversubscription_times')}",
f"- Rows with T0.5 margin heat snapshots: {len(structured_t0_5)}",
f"- Rows with T0.5 margin heat and D1 labels: {len(t0_5_with_d1)}",
f"- Rows matched to external ipohk history: {len(external_history_rows)}",
f"- Rows with external final oversubscription: {len(external_oversub_rows)}",
f"- Rows with external final oversubscription and D1 labels: {len(external_oversub_with_d1)}",
f"- Rows pending T1 structure: {total - len(structured_t1)}"
+ (f" ({pending_t1_tickers})" if pending_t1_tickers else ""),
f"- T1 field-level blanks: public oversubscription {t1_public_os_missing}, international oversubscription {t1_international_os_missing}, valid applications {t1_valid_missing}, successful applications {t1_successful_missing}",
@@ -644,6 +767,23 @@ def write_report(
"",
metrics_table(total_metrics),
"",
"## T0.5 Market Heat",
"",
"T0.5 uses archived subscription-period margin heat snapshots. These are non-official live signals and are kept separate from T1 allotment demand. The current archive is not yet a historical training set: it has too few rows and no D1 labels for calibration.",
"",
f"- T0.5 margin rows: {len(structured_t0_5)}",
f"- T0.5 rows with D1 labels: {len(t0_5_with_d1)}",
"",
"## External Final Heat Proxy",
"",
"The ipohk history archive adds final public oversubscription, one-lot win rate, grey-market return, and first-day return where available. These fields are useful for coverage checks and post-hoc calibration, but they are not T0.5 inputs because they are final or near-final history.",
"",
f"- External history rows matched into this dataset: {len(external_history_rows)}",
f"- Matched rows with final oversubscription: {len(external_oversub_rows)}",
f"- Matched rows with final oversubscription and D1 labels: {len(external_oversub_with_d1)}",
"",
metrics_table(external_oversub_metrics),
"",
"## Current Read",
"",
f"After the T1 demand text backfill, the strongest v0 T1 bucket is `{best_bucket.bucket}` with {best_bucket.sample_size} historical D1 observations and a {fmt_pct(best_bucket.d1_positive_rate)} D1 positive rate. The model is most useful after allotment results are available; T0 is a watchlist filter rather than a final subscription call.",