Archive recent HKEX IPO targets
Request: Use the project archivist workflow to update IPO target coverage for the most recent three-year window. Changes: - Add scripts/update_recent_ipo_list.py to discover HKEXnews annual new listing reports, archive XLSX sources, parse subscription-relevant IPO rows, and update SQLite plus snapshots. - Add new_listing_report_entries to preserve annual report row-level evidence. - Archive 2023-2026 Main Board new listing reports and 2024-2026 GEM new listing reports. - Seed 290 report-backed IPO targets for 2023-06-15 through 2026-06-15, skipping 10 non-IPO rows without numeric offer prices. - Refresh ipo_master, missing offering_terms fields, source_refs, ticker_sync_state, and sync_tasks. - Add openpyxl as the XLSX parser dependency and document the archivist refresh flow. - Limit sync summary output while keeping the full queue in SQLite and CSV snapshots. Verification: - Ran update_recent_ipo_list.py for 2023-06-15 to 2026-06-15 with as-of 2026-06-15T07:30:00Z. - Parsed project Python scripts with ast.parse. - Checked SQLite integrity and DB-to-snapshot row counts. - Verified source_refs paths are repo-relative, files exist, and SHA-256 hashes match. - Ran git diff --check and git diff --cached --check. - Checked for Python cache and SQLite transient files. Next useful context: - ipo_master now has 293 tickers; new_listing_report_entries has 290 report-backed targets. - Current sync queue has 2005 open tasks and 42 waiting_until_due tasks for deeper per-ticker archival stages.
This commit is contained in:
@@ -304,6 +304,7 @@ def main() -> None:
|
||||
"offering_terms",
|
||||
"ipo_demand",
|
||||
"price_performance",
|
||||
"new_listing_report_entries",
|
||||
"source_refs",
|
||||
"data_gaps",
|
||||
"sync_runs",
|
||||
|
||||
@@ -0,0 +1,692 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Archive recent HKEX IPO targets from HKEXnews new listing reports."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import hashlib
|
||||
import re
|
||||
import sqlite3
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from datetime import date, datetime, timezone
|
||||
from pathlib import Path
|
||||
from urllib.parse import urljoin
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
from openpyxl import load_workbook
|
||||
|
||||
|
||||
ARCHIVE_PAGE_URL = "https://www2.hkexnews.hk/New-Listings/New-Listing-Information/Main-Board?sc_lang=en"
|
||||
DB_PATH = Path("data/hk_ipo.sqlite")
|
||||
SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
|
||||
SNAPSHOT_DIR = Path("data/snapshots")
|
||||
RAW_REPORT_DIR = Path("data/raw/hkex_new_listing_reports")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ReportLink:
|
||||
year: int
|
||||
board_key: str
|
||||
board_name: str
|
||||
url: str
|
||||
local_path: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ReportEntry:
|
||||
ticker: str
|
||||
report_year: int
|
||||
board: str
|
||||
source_id: str
|
||||
company_name_en: str
|
||||
prospectus_date: str | None
|
||||
listing_date: str
|
||||
offer_price_hkd: float | None
|
||||
funds_raised_hkd: float | None
|
||||
subscription_ratio_times: float | None
|
||||
market_cap_hkd: float | None
|
||||
outstanding_shares_at_listing: int | None
|
||||
listing_method: str | None
|
||||
industry_label: str | None
|
||||
place_of_incorporation: str | None
|
||||
sponsors: str | None
|
||||
reporting_accountants: str | None
|
||||
valuers: str | None
|
||||
notes: str | None
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.")
|
||||
parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.")
|
||||
parser.add_argument("--archive-page", default=ARCHIVE_PAGE_URL, help="HKEXnews new listing information page.")
|
||||
parser.add_argument("--end-date", default=date.today().isoformat(), help="Inclusive end date for recent IPO listings.")
|
||||
parser.add_argument("--years", type=int, default=3, help="Lookback years ending at --end-date.")
|
||||
parser.add_argument("--start-date", help="Inclusive start date. Defaults to --end-date minus --years.")
|
||||
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
|
||||
parser.add_argument("--include-non-ipo", action="store_true", help="Include report rows without an IPO offer price.")
|
||||
parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh ticker sync state after updating facts.")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def utc_now() -> datetime:
|
||||
return datetime.now(timezone.utc).replace(microsecond=0)
|
||||
|
||||
|
||||
def parse_as_of(value: str | None) -> str:
|
||||
if value:
|
||||
return datetime.fromisoformat(value.replace("Z", "+00:00")).isoformat().replace("+00:00", "Z")
|
||||
return utc_now().isoformat().replace("+00:00", "Z")
|
||||
|
||||
|
||||
def years_before(day: date, years: int) -> date:
|
||||
try:
|
||||
return day.replace(year=day.year - years)
|
||||
except ValueError:
|
||||
return day.replace(year=day.year - years, day=28)
|
||||
|
||||
|
||||
def fetch_bytes(url: str) -> bytes:
|
||||
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
|
||||
with urlopen(request, timeout=60) as response:
|
||||
return response.read()
|
||||
|
||||
|
||||
def clean_text(value: object) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, str):
|
||||
cleaned = " ".join(value.replace("\r", "\n").replace("\n", " ").split())
|
||||
return cleaned or None
|
||||
return str(value)
|
||||
|
||||
|
||||
def numeric(value: object) -> float | None:
|
||||
if isinstance(value, bool) or value is None:
|
||||
return None
|
||||
if isinstance(value, (int, float)):
|
||||
return float(value)
|
||||
if isinstance(value, str):
|
||||
cleaned = value.replace(",", "").strip()
|
||||
if not cleaned or cleaned.upper().startswith("N/A"):
|
||||
return None
|
||||
try:
|
||||
return float(cleaned)
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def whole_number(value: object) -> int | None:
|
||||
number = numeric(value)
|
||||
if number is None:
|
||||
return None
|
||||
return int(round(number))
|
||||
|
||||
|
||||
def excel_date(value: object) -> str | None:
|
||||
if isinstance(value, datetime):
|
||||
return value.date().isoformat()
|
||||
if isinstance(value, date):
|
||||
return value.isoformat()
|
||||
return None
|
||||
|
||||
|
||||
def normalize_ticker(value: object) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, (int, float)) and not isinstance(value, bool):
|
||||
return f"{int(value):05d}"
|
||||
text = str(value).strip()
|
||||
if not text:
|
||||
return None
|
||||
digits = re.sub(r"\D", "", text)
|
||||
if not digits:
|
||||
return None
|
||||
return digits.zfill(5)
|
||||
|
||||
|
||||
def sha256_bytes(data: bytes) -> str:
|
||||
return hashlib.sha256(data).hexdigest()
|
||||
|
||||
|
||||
def sha256_file(path: Path) -> str:
|
||||
digest = hashlib.sha256()
|
||||
with path.open("rb") as handle:
|
||||
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
|
||||
digest.update(chunk)
|
||||
return digest.hexdigest()
|
||||
|
||||
|
||||
def discover_report_links(archive_page_url: str, start_date: date, end_date: date) -> list[ReportLink]:
|
||||
html = fetch_bytes(archive_page_url).decode("utf-8", "replace")
|
||||
hrefs = sorted(set(re.findall(r'href="([^"]*New-Listing-Report/(?:Main|GEM)/[^"]+?\.xlsx)"', html)))
|
||||
needed_years = set(range(start_date.year, end_date.year + 1))
|
||||
links: list[ReportLink] = []
|
||||
for href in hrefs:
|
||||
year_match = re.search(r"(20\d{2})", href)
|
||||
if not year_match:
|
||||
continue
|
||||
year = int(year_match.group(1))
|
||||
if year not in needed_years:
|
||||
continue
|
||||
board_key = "gem" if "/GEM/" in href else "main"
|
||||
board_name = "GEM" if board_key == "gem" else "Main Board"
|
||||
filename = href.rsplit("/", 1)[-1]
|
||||
local_path = RAW_REPORT_DIR / board_key / filename
|
||||
links.append(
|
||||
ReportLink(
|
||||
year=year,
|
||||
board_key=board_key,
|
||||
board_name=board_name,
|
||||
url=urljoin(archive_page_url, href),
|
||||
local_path=local_path.as_posix(),
|
||||
)
|
||||
)
|
||||
return sorted(links, key=lambda item: (item.year, item.board_key, item.url))
|
||||
|
||||
|
||||
def archive_report_file(link: ReportLink) -> str:
|
||||
data = fetch_bytes(link.url)
|
||||
local_path = Path(link.local_path)
|
||||
local_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
if local_path.exists() and local_path.read_bytes() == data:
|
||||
return sha256_file(local_path)
|
||||
local_path.write_bytes(data)
|
||||
return sha256_bytes(data)
|
||||
|
||||
|
||||
def report_source_date(path: Path, board_key: str, report_year: int) -> str:
|
||||
workbook = load_workbook(path, data_only=True, read_only=True)
|
||||
worksheet = workbook[workbook.sheetnames[0]]
|
||||
if board_key == "gem":
|
||||
title = clean_text(worksheet["A5"].value) or ""
|
||||
match = re.search(r"up to (\d{4}/\d{2}/\d{2})", title)
|
||||
if match:
|
||||
return datetime.strptime(match.group(1), "%Y/%m/%d").date().isoformat()
|
||||
else:
|
||||
title = clean_text(worksheet["A1"].value) or ""
|
||||
match = re.search(r"up to (\d{1,2} [A-Za-z]+ \d{4})", title)
|
||||
if match:
|
||||
return datetime.strptime(match.group(1), "%d %B %Y").date().isoformat()
|
||||
return date(report_year, 12, 31).isoformat()
|
||||
|
||||
|
||||
def combined_listing_method(primary: object, following_row: tuple[object, ...] | None) -> str | None:
|
||||
methods = [clean_text(primary)]
|
||||
if following_row is not None and len(following_row) > 17:
|
||||
methods.append(clean_text(following_row[17]))
|
||||
unique_methods = []
|
||||
for method in methods:
|
||||
if method and method not in unique_methods:
|
||||
unique_methods.append(method)
|
||||
return "; ".join(unique_methods) or None
|
||||
|
||||
|
||||
def is_main_ditto_row(row: tuple[object, ...]) -> bool:
|
||||
return len(row) > 1 and row[1] == '"'
|
||||
|
||||
|
||||
def parse_main_report(link: ReportLink, start_date: date, end_date: date, include_non_ipo: bool) -> tuple[list[ReportEntry], int]:
|
||||
workbook = load_workbook(link.local_path, data_only=True, read_only=True)
|
||||
worksheet = workbook[workbook.sheetnames[0]]
|
||||
rows = list(worksheet.iter_rows(min_row=3, values_only=True))
|
||||
entries: list[ReportEntry] = []
|
||||
skipped = 0
|
||||
for index, row in enumerate(rows):
|
||||
if not isinstance(row[0], int):
|
||||
continue
|
||||
ticker = normalize_ticker(row[1])
|
||||
listing_date = excel_date(row[4])
|
||||
if ticker is None or listing_date is None:
|
||||
continue
|
||||
listing_day = date.fromisoformat(listing_date)
|
||||
if listing_day < start_date or listing_day > end_date:
|
||||
continue
|
||||
offer_price = numeric(row[9] if len(row) > 9 else None)
|
||||
if offer_price is None and not include_non_ipo:
|
||||
skipped += 1
|
||||
continue
|
||||
following_row = rows[index + 1] if index + 1 < len(rows) and is_main_ditto_row(rows[index + 1]) else None
|
||||
funds_raised = numeric(row[8] if len(row) > 8 else None)
|
||||
if following_row is not None and len(following_row) > 8:
|
||||
following_funds = numeric(following_row[8])
|
||||
if following_funds is not None:
|
||||
funds_raised = (funds_raised or 0) + following_funds
|
||||
source_id = f"{ticker}_new_listing_report_main_{link.year}"
|
||||
entries.append(
|
||||
ReportEntry(
|
||||
ticker=ticker,
|
||||
report_year=link.year,
|
||||
board=link.board_name,
|
||||
source_id=source_id,
|
||||
company_name_en=clean_text(row[2]) or "",
|
||||
prospectus_date=excel_date(row[3]),
|
||||
listing_date=listing_date,
|
||||
offer_price_hkd=offer_price,
|
||||
funds_raised_hkd=funds_raised,
|
||||
subscription_ratio_times=None,
|
||||
market_cap_hkd=None,
|
||||
outstanding_shares_at_listing=None,
|
||||
listing_method="IPO" if offer_price is not None else clean_text(row[9] if len(row) > 9 else None),
|
||||
industry_label=None,
|
||||
place_of_incorporation=None,
|
||||
sponsors=clean_text(row[5] if len(row) > 5 else None),
|
||||
reporting_accountants=clean_text(row[6] if len(row) > 6 else None),
|
||||
valuers=clean_text(row[7] if len(row) > 7 else None),
|
||||
notes="Funds raised sums annual report rows (a) and (b) when both are present.",
|
||||
)
|
||||
)
|
||||
return entries, skipped
|
||||
|
||||
|
||||
def parse_gem_report(link: ReportLink, start_date: date, end_date: date, include_non_ipo: bool) -> tuple[list[ReportEntry], int]:
|
||||
workbook = load_workbook(link.local_path, data_only=True, read_only=True)
|
||||
worksheet = workbook[workbook.sheetnames[0]]
|
||||
rows = list(worksheet.iter_rows(min_row=12, values_only=True))
|
||||
entries: list[ReportEntry] = []
|
||||
skipped = 0
|
||||
for index, row in enumerate(rows):
|
||||
listing_date = excel_date(row[0])
|
||||
if listing_date is None:
|
||||
continue
|
||||
ticker = normalize_ticker(row[1])
|
||||
if ticker is None:
|
||||
continue
|
||||
listing_day = date.fromisoformat(listing_date)
|
||||
if listing_day < start_date or listing_day > end_date:
|
||||
continue
|
||||
offer_price = numeric(row[5] if len(row) > 5 else None)
|
||||
if offer_price is None and not include_non_ipo:
|
||||
skipped += 1
|
||||
continue
|
||||
following_row = rows[index + 1] if index + 1 < len(rows) else None
|
||||
source_id = f"{ticker}_new_listing_report_gem_{link.year}"
|
||||
entries.append(
|
||||
ReportEntry(
|
||||
ticker=ticker,
|
||||
report_year=link.year,
|
||||
board=link.board_name,
|
||||
source_id=source_id,
|
||||
company_name_en=clean_text(row[2]) or "",
|
||||
prospectus_date=None,
|
||||
listing_date=listing_date,
|
||||
offer_price_hkd=offer_price,
|
||||
funds_raised_hkd=numeric(row[9] if len(row) > 9 else None),
|
||||
subscription_ratio_times=numeric(row[7] if len(row) > 7 else None),
|
||||
market_cap_hkd=numeric(row[12] if len(row) > 12 else None),
|
||||
outstanding_shares_at_listing=whole_number(row[10] if len(row) > 10 else None),
|
||||
listing_method=combined_listing_method(row[17] if len(row) > 17 else None, following_row),
|
||||
industry_label=clean_text(row[14] if len(row) > 14 else None),
|
||||
place_of_incorporation=clean_text(row[16] if len(row) > 16 else None),
|
||||
sponsors=clean_text(row[19] if len(row) > 19 else None),
|
||||
reporting_accountants=clean_text(row[21] if len(row) > 21 else None),
|
||||
valuers=None,
|
||||
notes="GEM annual report does not provide prospectus date.",
|
||||
)
|
||||
)
|
||||
return entries, skipped
|
||||
|
||||
|
||||
def parse_report(link: ReportLink, start_date: date, end_date: date, include_non_ipo: bool) -> tuple[list[ReportEntry], int]:
|
||||
if link.board_key == "gem":
|
||||
return parse_gem_report(link, start_date, end_date, include_non_ipo)
|
||||
return parse_main_report(link, start_date, end_date, include_non_ipo)
|
||||
|
||||
|
||||
def source_ref_rows(links: list[ReportLink], entries_by_source: dict[str, list[ReportEntry]], as_of: str) -> list[dict[str, object]]:
|
||||
rows: list[dict[str, object]] = []
|
||||
report_meta = {
|
||||
(link.board_key, link.year): (
|
||||
link,
|
||||
report_source_date(Path(link.local_path), link.board_key, link.year),
|
||||
sha256_file(Path(link.local_path)),
|
||||
)
|
||||
for link in links
|
||||
}
|
||||
for source_id, entries in entries_by_source.items():
|
||||
if not entries:
|
||||
continue
|
||||
sample = entries[0]
|
||||
board_key = "gem" if sample.board == "GEM" else "main"
|
||||
link, source_date, file_hash = report_meta[(board_key, sample.report_year)]
|
||||
for entry in entries:
|
||||
rows.append(
|
||||
{
|
||||
"source_id": entry.source_id,
|
||||
"ticker": entry.ticker,
|
||||
"source_type": "new_listing_report",
|
||||
"title": f"HKEXnews {entry.board} New Listing Report {entry.report_year}",
|
||||
"path_base": "repo_root",
|
||||
"local_path": link.local_path,
|
||||
"url": link.url,
|
||||
"file_sha256": file_hash,
|
||||
"source_date": source_date,
|
||||
"archived_at": as_of,
|
||||
"notes": "Annual HKEXnews new listing report used to seed recent IPO target coverage.",
|
||||
}
|
||||
)
|
||||
return rows
|
||||
|
||||
|
||||
def master_rows(entries: list[ReportEntry], as_of: str) -> list[dict[str, object]]:
|
||||
rows = []
|
||||
for entry in entries:
|
||||
rows.append(
|
||||
{
|
||||
"ticker": entry.ticker,
|
||||
"company_name_en": entry.company_name_en,
|
||||
"company_name_zh": None,
|
||||
"stock_short_name": None,
|
||||
"exchange": "HKEX",
|
||||
"board": entry.board,
|
||||
"status": "listed",
|
||||
"listing_date": entry.listing_date,
|
||||
"application_start_date": None,
|
||||
"application_end_date": None,
|
||||
"allotment_results_expected_date": None,
|
||||
"industry_label": entry.industry_label,
|
||||
"data_as_of": as_of,
|
||||
"notes": f"Seeded from HKEXnews {entry.board} New Listing Report {entry.report_year}.",
|
||||
}
|
||||
)
|
||||
return rows
|
||||
|
||||
|
||||
def offering_rows(entries: list[ReportEntry], as_of: str) -> list[dict[str, object]]:
|
||||
rows = []
|
||||
for entry in entries:
|
||||
rows.append(
|
||||
{
|
||||
"ticker": entry.ticker,
|
||||
"source_id": entry.source_id,
|
||||
"prospectus_date": entry.prospectus_date,
|
||||
"offer_price_hkd": entry.offer_price_hkd,
|
||||
"board_lot": None,
|
||||
"min_subscription_amount_hkd": None,
|
||||
"global_offer_shares": None,
|
||||
"hk_offer_shares_initial": None,
|
||||
"international_offer_shares_initial": None,
|
||||
"public_offer_pct_initial": None,
|
||||
"over_allotment_offer_shares": None,
|
||||
"offer_size_adjustment_offer_shares": None,
|
||||
"market_cap_hkd_m": entry.market_cap_hkd / 1_000_000 if entry.market_cap_hkd else None,
|
||||
"gross_proceeds_hkd_m": entry.funds_raised_hkd / 1_000_000 if entry.funds_raised_hkd else None,
|
||||
"net_proceeds_hkd_m": None,
|
||||
"issued_shares_upon_listing": entry.outstanding_shares_at_listing,
|
||||
"data_as_of": as_of,
|
||||
}
|
||||
)
|
||||
return rows
|
||||
|
||||
|
||||
def report_entry_rows(entries: list[ReportEntry], as_of: str) -> list[dict[str, object]]:
|
||||
rows = []
|
||||
for entry in entries:
|
||||
rows.append(
|
||||
{
|
||||
"report_entry_id": f"{entry.ticker}_{entry.board.lower().replace(' ', '_')}_{entry.report_year}",
|
||||
"ticker": entry.ticker,
|
||||
"report_year": entry.report_year,
|
||||
"board": entry.board,
|
||||
"source_id": entry.source_id,
|
||||
"company_name_en": entry.company_name_en,
|
||||
"prospectus_date": entry.prospectus_date,
|
||||
"listing_date": entry.listing_date,
|
||||
"offer_price_hkd": entry.offer_price_hkd,
|
||||
"funds_raised_hkd": entry.funds_raised_hkd,
|
||||
"subscription_ratio_times": entry.subscription_ratio_times,
|
||||
"market_cap_hkd": entry.market_cap_hkd,
|
||||
"outstanding_shares_at_listing": entry.outstanding_shares_at_listing,
|
||||
"listing_method": entry.listing_method,
|
||||
"industry_label": entry.industry_label,
|
||||
"place_of_incorporation": entry.place_of_incorporation,
|
||||
"sponsors": entry.sponsors,
|
||||
"reporting_accountants": entry.reporting_accountants,
|
||||
"valuers": entry.valuers,
|
||||
"data_as_of": as_of,
|
||||
"notes": entry.notes,
|
||||
}
|
||||
)
|
||||
return rows
|
||||
|
||||
|
||||
def upsert_source_refs(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None:
|
||||
conn.executemany(
|
||||
"""
|
||||
INSERT INTO source_refs (
|
||||
source_id, ticker, source_type, title, path_base, local_path, url,
|
||||
file_sha256, source_date, archived_at, notes
|
||||
)
|
||||
VALUES (
|
||||
:source_id, :ticker, :source_type, :title, :path_base, :local_path, :url,
|
||||
:file_sha256, :source_date, :archived_at, :notes
|
||||
)
|
||||
ON CONFLICT(source_id) DO UPDATE SET
|
||||
source_type = excluded.source_type,
|
||||
title = excluded.title,
|
||||
path_base = excluded.path_base,
|
||||
local_path = excluded.local_path,
|
||||
url = excluded.url,
|
||||
file_sha256 = excluded.file_sha256,
|
||||
source_date = excluded.source_date,
|
||||
archived_at = excluded.archived_at,
|
||||
notes = excluded.notes
|
||||
""",
|
||||
rows,
|
||||
)
|
||||
|
||||
|
||||
def upsert_master(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None:
|
||||
conn.executemany(
|
||||
"""
|
||||
INSERT INTO ipo_master (
|
||||
ticker, company_name_en, company_name_zh, stock_short_name, exchange, board,
|
||||
status, listing_date, application_start_date, application_end_date,
|
||||
allotment_results_expected_date, industry_label, data_as_of, notes
|
||||
)
|
||||
VALUES (
|
||||
:ticker, :company_name_en, :company_name_zh, :stock_short_name, :exchange, :board,
|
||||
:status, :listing_date, :application_start_date, :application_end_date,
|
||||
:allotment_results_expected_date, :industry_label, :data_as_of, :notes
|
||||
)
|
||||
ON CONFLICT(ticker) DO UPDATE SET
|
||||
company_name_en = CASE
|
||||
WHEN ipo_master.company_name_en = '' THEN excluded.company_name_en
|
||||
ELSE ipo_master.company_name_en
|
||||
END,
|
||||
exchange = excluded.exchange,
|
||||
board = excluded.board,
|
||||
status = CASE
|
||||
WHEN excluded.status = 'listed' THEN 'listed'
|
||||
ELSE ipo_master.status
|
||||
END,
|
||||
listing_date = COALESCE(ipo_master.listing_date, excluded.listing_date),
|
||||
industry_label = COALESCE(ipo_master.industry_label, excluded.industry_label),
|
||||
data_as_of = excluded.data_as_of,
|
||||
notes = COALESCE(ipo_master.notes, excluded.notes)
|
||||
""",
|
||||
rows,
|
||||
)
|
||||
|
||||
|
||||
def upsert_offering_terms(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None:
|
||||
conn.executemany(
|
||||
"""
|
||||
INSERT INTO offering_terms (
|
||||
ticker, source_id, prospectus_date, offer_price_hkd, board_lot,
|
||||
min_subscription_amount_hkd, global_offer_shares, hk_offer_shares_initial,
|
||||
international_offer_shares_initial, public_offer_pct_initial,
|
||||
over_allotment_offer_shares, offer_size_adjustment_offer_shares,
|
||||
market_cap_hkd_m, gross_proceeds_hkd_m, net_proceeds_hkd_m,
|
||||
issued_shares_upon_listing, data_as_of
|
||||
)
|
||||
VALUES (
|
||||
:ticker, :source_id, :prospectus_date, :offer_price_hkd, :board_lot,
|
||||
:min_subscription_amount_hkd, :global_offer_shares, :hk_offer_shares_initial,
|
||||
:international_offer_shares_initial, :public_offer_pct_initial,
|
||||
:over_allotment_offer_shares, :offer_size_adjustment_offer_shares,
|
||||
:market_cap_hkd_m, :gross_proceeds_hkd_m, :net_proceeds_hkd_m,
|
||||
:issued_shares_upon_listing, :data_as_of
|
||||
)
|
||||
ON CONFLICT(ticker) DO UPDATE SET
|
||||
source_id = CASE
|
||||
WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.source_id
|
||||
ELSE offering_terms.source_id
|
||||
END,
|
||||
prospectus_date = CASE
|
||||
WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.prospectus_date
|
||||
ELSE COALESCE(offering_terms.prospectus_date, excluded.prospectus_date)
|
||||
END,
|
||||
offer_price_hkd = CASE
|
||||
WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.offer_price_hkd
|
||||
ELSE COALESCE(offering_terms.offer_price_hkd, excluded.offer_price_hkd)
|
||||
END,
|
||||
market_cap_hkd_m = CASE
|
||||
WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.market_cap_hkd_m
|
||||
ELSE COALESCE(offering_terms.market_cap_hkd_m, excluded.market_cap_hkd_m)
|
||||
END,
|
||||
gross_proceeds_hkd_m = CASE
|
||||
WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.gross_proceeds_hkd_m
|
||||
ELSE COALESCE(offering_terms.gross_proceeds_hkd_m, excluded.gross_proceeds_hkd_m)
|
||||
END,
|
||||
issued_shares_upon_listing = CASE
|
||||
WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.issued_shares_upon_listing
|
||||
ELSE COALESCE(
|
||||
offering_terms.issued_shares_upon_listing,
|
||||
excluded.issued_shares_upon_listing
|
||||
)
|
||||
END,
|
||||
data_as_of = CASE
|
||||
WHEN offering_terms.source_id LIKE '%_new_listing_report_%' THEN excluded.data_as_of
|
||||
ELSE offering_terms.data_as_of
|
||||
END
|
||||
""",
|
||||
rows,
|
||||
)
|
||||
|
||||
|
||||
def upsert_report_entries(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None:
|
||||
conn.executemany(
|
||||
"""
|
||||
INSERT INTO new_listing_report_entries (
|
||||
report_entry_id, ticker, report_year, board, source_id, company_name_en,
|
||||
prospectus_date, listing_date, offer_price_hkd, funds_raised_hkd,
|
||||
subscription_ratio_times, market_cap_hkd, outstanding_shares_at_listing,
|
||||
listing_method, industry_label, place_of_incorporation, sponsors,
|
||||
reporting_accountants, valuers, data_as_of, notes
|
||||
)
|
||||
VALUES (
|
||||
:report_entry_id, :ticker, :report_year, :board, :source_id, :company_name_en,
|
||||
:prospectus_date, :listing_date, :offer_price_hkd, :funds_raised_hkd,
|
||||
:subscription_ratio_times, :market_cap_hkd, :outstanding_shares_at_listing,
|
||||
:listing_method, :industry_label, :place_of_incorporation, :sponsors,
|
||||
:reporting_accountants, :valuers, :data_as_of, :notes
|
||||
)
|
||||
ON CONFLICT(report_entry_id) DO UPDATE SET
|
||||
source_id = excluded.source_id,
|
||||
company_name_en = excluded.company_name_en,
|
||||
prospectus_date = excluded.prospectus_date,
|
||||
listing_date = excluded.listing_date,
|
||||
offer_price_hkd = excluded.offer_price_hkd,
|
||||
funds_raised_hkd = excluded.funds_raised_hkd,
|
||||
subscription_ratio_times = excluded.subscription_ratio_times,
|
||||
market_cap_hkd = excluded.market_cap_hkd,
|
||||
outstanding_shares_at_listing = excluded.outstanding_shares_at_listing,
|
||||
listing_method = excluded.listing_method,
|
||||
industry_label = excluded.industry_label,
|
||||
place_of_incorporation = excluded.place_of_incorporation,
|
||||
sponsors = excluded.sponsors,
|
||||
reporting_accountants = excluded.reporting_accountants,
|
||||
valuers = excluded.valuers,
|
||||
data_as_of = excluded.data_as_of,
|
||||
notes = excluded.notes
|
||||
""",
|
||||
rows,
|
||||
)
|
||||
|
||||
|
||||
def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None:
|
||||
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}")
|
||||
columns = [description[0] for description in cursor.description]
|
||||
with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle:
|
||||
writer = csv.writer(handle, lineterminator="\n")
|
||||
writer.writerow(columns)
|
||||
writer.writerows(cursor.fetchall())
|
||||
|
||||
|
||||
def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None:
|
||||
subprocess.run(
|
||||
[
|
||||
sys.executable,
|
||||
"scripts/update_sync_state.py",
|
||||
"--db",
|
||||
db_path,
|
||||
"--schema",
|
||||
schema_path,
|
||||
"--as-of",
|
||||
as_of,
|
||||
"--mode",
|
||||
"recent_ipo_list_refresh",
|
||||
"--summary-limit",
|
||||
"25",
|
||||
],
|
||||
check=True,
|
||||
)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
end_date = date.fromisoformat(args.end_date)
|
||||
start_date = date.fromisoformat(args.start_date) if args.start_date else years_before(end_date, args.years)
|
||||
as_of = parse_as_of(args.as_of)
|
||||
|
||||
links = discover_report_links(args.archive_page, start_date, end_date)
|
||||
if not links:
|
||||
raise SystemExit("No HKEXnews new listing report links were found for the requested date range.")
|
||||
|
||||
all_entries: list[ReportEntry] = []
|
||||
skipped_rows = 0
|
||||
for link in links:
|
||||
archive_report_file(link)
|
||||
entries, skipped = parse_report(link, start_date, end_date, args.include_non_ipo)
|
||||
all_entries.extend(entries)
|
||||
skipped_rows += skipped
|
||||
|
||||
entries_by_source: dict[str, list[ReportEntry]] = {}
|
||||
for entry in all_entries:
|
||||
entries_by_source.setdefault(entry.source_id, []).append(entry)
|
||||
|
||||
with sqlite3.connect(args.db) as conn:
|
||||
conn.executescript(Path(args.schema).read_text(encoding="utf-8"))
|
||||
upsert_master(conn, master_rows(all_entries, as_of))
|
||||
upsert_source_refs(conn, source_ref_rows(links, entries_by_source, as_of))
|
||||
upsert_report_entries(conn, report_entry_rows(all_entries, as_of))
|
||||
upsert_offering_terms(conn, offering_rows(all_entries, as_of))
|
||||
for table in [
|
||||
"ipo_master",
|
||||
"offering_terms",
|
||||
"new_listing_report_entries",
|
||||
"source_refs",
|
||||
"data_gaps",
|
||||
]:
|
||||
export_snapshot(conn, table)
|
||||
|
||||
if not args.skip_sync_state:
|
||||
refresh_sync_state(args.db, args.schema, as_of)
|
||||
|
||||
print("recent IPO list updated")
|
||||
print(f"date range: {start_date.isoformat()} to {end_date.isoformat()}")
|
||||
print(f"reports archived: {len(links)}")
|
||||
print(f"IPO targets parsed: {len(all_entries)}")
|
||||
if skipped_rows:
|
||||
print(f"non-IPO report rows skipped: {skipped_rows}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -382,7 +382,7 @@ def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -
|
||||
writer.writerows(cursor.fetchall())
|
||||
|
||||
|
||||
def print_summary(states: list[StageState]) -> None:
|
||||
def print_summary(states: list[StageState], actionable_limit: int) -> None:
|
||||
counts: dict[str, int] = {}
|
||||
for state in states:
|
||||
counts[state.status] = counts.get(state.status, 0) + 1
|
||||
@@ -391,9 +391,12 @@ def print_summary(states: list[StageState]) -> None:
|
||||
print(f"{status}: {counts[status]}")
|
||||
open_items = [state for state in states if state.status in {"pending_due", "blocked"}]
|
||||
if open_items:
|
||||
print("actionable items:")
|
||||
for state in open_items:
|
||||
visible_items = open_items if actionable_limit < 0 else open_items[:actionable_limit]
|
||||
print(f"actionable items: {len(open_items)}")
|
||||
for state in visible_items:
|
||||
print(f"- {state.ticker} {state.stage}: {state.status} due={state.due_date or ''}")
|
||||
if actionable_limit >= 0 and len(open_items) > actionable_limit:
|
||||
print(f"... {len(open_items) - actionable_limit} more actionable items hidden; see data/snapshots/sync_tasks.csv")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
@@ -403,6 +406,7 @@ def main() -> int:
|
||||
parser.add_argument("--as-of", help="ISO timestamp for deterministic sync-state snapshots.")
|
||||
parser.add_argument("--run-id", help="Stable sync run id. Defaults to sync_state_<timestamp>.")
|
||||
parser.add_argument("--mode", default="state_refresh", help="Sync run mode label.")
|
||||
parser.add_argument("--summary-limit", type=int, default=50, help="Maximum actionable items to print; use -1 for all.")
|
||||
args = parser.parse_args()
|
||||
|
||||
as_of_dt = parse_as_of(args.as_of)
|
||||
@@ -439,7 +443,7 @@ def main() -> int:
|
||||
""",
|
||||
)
|
||||
export_snapshot(conn, "sync_tasks", "task_status, due_date, ticker, stage")
|
||||
print_summary(states)
|
||||
print_summary(states, args.summary_limit)
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user