Files
hk-ipo/scripts/archive_price_performance.py
T
geometrybase 078f56998b Backfill IPO price performance history
Request:
- Adjust archivist after the audit findings and update historical data.

Changes:
- Teach the archivist skill to close audit-discovered gaps in priority order.
- Add scripts/archive_price_performance.py for due D1/D5/D20/D60 price-performance backfills.
- Document the price-performance backfill command in README.
- Archive raw Yahoo Finance chart responses under repo-relative data/raw/{ticker}/ paths.
- Populate price_performance with D1/D5/D20/D60 checkpoints and refresh source_refs, sync_runs, sync_tasks, and ticker_sync_state snapshots.

Execution:
- Ran .venv/bin/python scripts/archive_price_performance.py --as-of 2026-06-15T10:00:00Z.
- Selected 291 due price-performance tickers.
- Archived 273 price-history sources and wrote 1063 price-performance rows.
- Re-ran .venv/bin/python scripts/archive_hkex_documents.py --as-of 2026-06-15T10:05:00Z for the remaining open T0/T1 tasks; no additional completed T0/T1 stages resulted.

Verification:
- Compiled the new price-performance script.
- Ran git diff --check.
- Checked SQLite integrity and foreign keys.
- Confirmed database row counts match CSV snapshots.
- Verified all 979 source_refs use valid repo-relative paths, have files, have hashes, and SHA256 hashes match.
- Confirmed no generated Python caches or SQLite transient files remain.

Next useful context:
- price_performance now has 1063 rows: D1 273, D5 272, D20 267, D60 251.
- Remaining due price-performance gaps are 18 tickers where Yahoo history was unavailable or the request failed.
- T0/T1 gaps remain at T0 93 and T1 77; T2 grey-market remains unresolved pending a reproducible source strategy.
2026-06-15 09:16:08 +00:00

430 lines
15 KiB
Python

#!/usr/bin/env python3
"""Archive due post-listing price performance checkpoints from Yahoo Finance chart data."""
from __future__ import annotations
import argparse
import csv
import hashlib
import json
import sqlite3
import subprocess
import sys
from dataclasses import dataclass
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
DB_PATH = Path("data/hk_ipo.sqlite")
SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
SNAPSHOT_DIR = Path("data/snapshots")
BASE_CHART_URL = "https://query1.finance.yahoo.com/v8/finance/chart"
PRICE_STAGES = ("D1", "D5", "D20", "D60")
@dataclass(frozen=True)
class PriceTask:
ticker: str
listing_date: str
offer_price_hkd: float | None
stages: dict[str, str]
@dataclass(frozen=True)
class Quote:
trade_date: str
open_price_hkd: float | None
high_price_hkd: float | None
low_price_hkd: float | None
close_price_hkd: float | None
volume: int | None
@dataclass(frozen=True)
class ArchivedHistory:
source_id: str
ticker: str
local_path: str
url: str
file_sha256: str
source_date: str
quotes: list[Quote]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.")
parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.")
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
parser.add_argument("--tickers", help="Comma-separated tickers to process instead of selecting all open due price tasks.")
parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh sync state after updating facts.")
return parser.parse_args()
def parse_as_of(value: str | None) -> datetime:
if value:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
return datetime.now(timezone.utc).replace(microsecond=0)
def parse_date(value: str) -> date:
return date.fromisoformat(value)
def epoch(day: date) -> int:
return int(datetime(day.year, day.month, day.day, tzinfo=timezone.utc).timestamp())
def yahoo_symbol(ticker: str) -> str:
return f"{int(ticker)}.HK"
def chart_url(ticker: str, start_date: date, end_date: date) -> str:
params = urlencode(
{
"period1": epoch(start_date),
"period2": epoch(end_date + timedelta(days=1)),
"interval": "1d",
"events": "history",
"includeAdjustedClose": "true",
}
)
return f"{BASE_CHART_URL}/{yahoo_symbol(ticker)}?{params}"
def fetch_bytes(url: str) -> bytes:
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(request, timeout=60) as response:
return response.read()
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def select_tasks(conn: sqlite3.Connection, as_of_date: date, tickers: str | None) -> list[PriceTask]:
params: list[object] = [*PRICE_STAGES, as_of_date.isoformat()]
ticker_filter = ""
if tickers:
selected = [ticker.strip().zfill(5) for ticker in tickers.split(",") if ticker.strip()]
placeholders = ", ".join("?" for _ in selected)
ticker_filter = f" AND t.ticker IN ({placeholders})"
params.extend(selected)
rows = conn.execute(
f"""
SELECT t.ticker, m.listing_date, o.offer_price_hkd, t.stage, t.due_date
FROM sync_tasks t
JOIN ipo_master m ON m.ticker = t.ticker
LEFT JOIN offering_terms o ON o.ticker = t.ticker
WHERE t.task_status = 'open'
AND t.stage IN ({", ".join("?" for _ in PRICE_STAGES)})
AND t.due_date IS NOT NULL
AND t.due_date <= ?
AND m.listing_date IS NOT NULL
{ticker_filter}
ORDER BY m.listing_date, t.ticker, t.stage
""",
tuple(params),
).fetchall()
grouped: dict[str, PriceTask] = {}
for row in rows:
task = grouped.setdefault(
row["ticker"],
PriceTask(row["ticker"], row["listing_date"], row["offer_price_hkd"], {}),
)
task.stages[row["stage"]] = row["due_date"]
return list(grouped.values())
def parse_quotes(payload: bytes) -> list[Quote]:
data = json.loads(payload)
result = (data.get("chart", {}).get("result") or [None])[0]
if not result:
return []
timestamps = result.get("timestamp") or []
quote_block = (result.get("indicators", {}).get("quote") or [{}])[0]
gmtoffset = int(result.get("meta", {}).get("gmtoffset") or 0)
quotes: list[Quote] = []
for index, timestamp in enumerate(timestamps):
close_price = value_at(quote_block, "close", index)
if close_price is None:
continue
trade_date = datetime.fromtimestamp(timestamp + gmtoffset, timezone.utc).date().isoformat()
quotes.append(
Quote(
trade_date=trade_date,
open_price_hkd=value_at(quote_block, "open", index),
high_price_hkd=value_at(quote_block, "high", index),
low_price_hkd=value_at(quote_block, "low", index),
close_price_hkd=close_price,
volume=int(value_at(quote_block, "volume", index) or 0) or None,
)
)
return quotes
def value_at(block: dict[str, list[float | int | None]], key: str, index: int) -> float | int | None:
values = block.get(key) or []
if index >= len(values):
return None
value = values[index]
if value is None:
return None
return value
def archive_history(task: PriceTask, as_of_date: date) -> ArchivedHistory | None:
due_dates = [parse_date(value) for value in task.stages.values()]
start_date = parse_date(task.listing_date) - timedelta(days=3)
end_date = min(as_of_date, max(due_dates) + timedelta(days=21))
url = chart_url(task.ticker, start_date, end_date)
try:
payload = fetch_bytes(url)
except (HTTPError, URLError, TimeoutError) as exc:
raise RuntimeError(f"fetch failed: {exc}") from exc
quotes = parse_quotes(payload)
if not quotes:
return None
local_path = Path("data/raw") / task.ticker / f"price_history_yahoo_{start_date.isoformat()}_{end_date.isoformat()}.json"
local_path.parent.mkdir(parents=True, exist_ok=True)
local_path.write_bytes(payload)
source_id = f"{task.ticker}_price_history_yahoo_{start_date.isoformat().replace('-', '_')}_{end_date.isoformat().replace('-', '_')}"
return ArchivedHistory(
source_id=source_id,
ticker=task.ticker,
local_path=local_path.as_posix(),
url=url,
file_sha256=sha256_bytes(payload),
source_date=end_date.isoformat(),
quotes=quotes,
)
def checkpoint_quote(quotes: list[Quote], due_date: str) -> Quote | None:
due = parse_date(due_date)
for quote in quotes:
if parse_date(quote.trade_date) >= due:
return quote
return None
def source_row(history: ArchivedHistory, as_of: str) -> dict[str, object]:
return {
"source_id": history.source_id,
"ticker": history.ticker,
"source_type": "market_price_history",
"title": f"Yahoo Finance daily price history for {history.ticker}.HK",
"path_base": "repo_root",
"local_path": history.local_path,
"url": history.url,
"file_sha256": history.file_sha256,
"source_date": history.source_date,
"archived_at": as_of,
"notes": "Raw chart response used to derive D1/D5/D20/D60 price performance checkpoints.",
}
def performance_rows(task: PriceTask, history: ArchivedHistory, as_of: str) -> list[dict[str, object]]:
rows: list[dict[str, object]] = []
for stage, due_date in task.stages.items():
quote = checkpoint_quote(history.quotes, due_date)
if not quote or quote.close_price_hkd is None:
continue
return_pct = None
if task.offer_price_hkd:
return_pct = (quote.close_price_hkd / task.offer_price_hkd - 1) * 100
turnover_hkd_m = None
if quote.volume is not None:
turnover_hkd_m = quote.volume * quote.close_price_hkd / 1_000_000
rows.append(
{
"performance_id": f"{task.ticker}_{stage}",
"ticker": task.ticker,
"stage": stage,
"source_id": history.source_id,
"as_of_date": quote.trade_date,
"open_price_hkd": quote.open_price_hkd,
"high_price_hkd": quote.high_price_hkd,
"low_price_hkd": quote.low_price_hkd,
"close_price_hkd": quote.close_price_hkd,
"return_pct": return_pct,
"turnover_hkd_m": turnover_hkd_m,
"data_as_of": as_of,
"notes": f"{stage} uses due date {due_date}; selected first available trading day on or after due date.",
}
)
return rows
def upsert_source_refs(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None:
conn.executemany(
"""
INSERT INTO source_refs (
source_id, ticker, source_type, title, path_base, local_path, url,
file_sha256, source_date, archived_at, notes
)
VALUES (
:source_id, :ticker, :source_type, :title, :path_base, :local_path, :url,
:file_sha256, :source_date, :archived_at, :notes
)
ON CONFLICT(source_id) DO UPDATE SET
source_type = excluded.source_type,
title = excluded.title,
path_base = excluded.path_base,
local_path = excluded.local_path,
url = excluded.url,
file_sha256 = excluded.file_sha256,
source_date = excluded.source_date,
archived_at = excluded.archived_at,
notes = excluded.notes
""",
rows,
)
def upsert_performance(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None:
conn.executemany(
"""
INSERT INTO price_performance (
performance_id, ticker, stage, source_id, as_of_date, open_price_hkd,
high_price_hkd, low_price_hkd, close_price_hkd, return_pct,
turnover_hkd_m, data_as_of, notes
)
VALUES (
:performance_id, :ticker, :stage, :source_id, :as_of_date, :open_price_hkd,
:high_price_hkd, :low_price_hkd, :close_price_hkd, :return_pct,
:turnover_hkd_m, :data_as_of, :notes
)
ON CONFLICT(ticker, stage) DO UPDATE SET
source_id = excluded.source_id,
as_of_date = excluded.as_of_date,
open_price_hkd = excluded.open_price_hkd,
high_price_hkd = excluded.high_price_hkd,
low_price_hkd = excluded.low_price_hkd,
close_price_hkd = excluded.close_price_hkd,
return_pct = excluded.return_pct,
turnover_hkd_m = excluded.turnover_hkd_m,
data_as_of = excluded.data_as_of,
notes = excluded.notes
""",
rows,
)
def upsert_sync_run(conn: sqlite3.Connection, run_id: str, as_of: str, status: str, notes: str) -> None:
conn.execute(
"""
INSERT INTO sync_runs (sync_run_id, mode, as_of, started_at, finished_at, status, notes)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(sync_run_id) DO UPDATE SET
mode = excluded.mode,
as_of = excluded.as_of,
started_at = excluded.started_at,
finished_at = excluded.finished_at,
status = excluded.status,
notes = excluded.notes
""",
(run_id, "price_performance_archive", as_of, as_of, as_of, status, notes),
)
def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None:
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}")
columns = [description[0] for description in cursor.description]
with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle:
writer = csv.writer(handle, lineterminator="\n")
writer.writerow(columns)
writer.writerows(cursor.fetchall())
def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None:
command = [
sys.executable,
"scripts/update_sync_state.py",
"--db",
db_path,
"--schema",
schema_path,
"--as-of",
as_of,
"--mode",
"price_performance_archive",
"--summary-limit",
"20",
]
subprocess.run(command, check=True)
def main() -> int:
args = parse_args()
as_of_dt = parse_as_of(args.as_of)
as_of = as_of_dt.isoformat().replace("+00:00", "Z")
as_of_date = as_of_dt.date()
run_id = "price_performance_archive_" + as_of.replace(":", "").replace("-", "").replace("+", "").replace("Z", "Z")
archived_sources = 0
inserted_rows = 0
no_history: list[str] = []
no_checkpoint: list[str] = []
failed_tickers: list[tuple[str, str]] = []
with sqlite3.connect(args.db) as conn:
conn.row_factory = sqlite3.Row
conn.executescript(Path(args.schema).read_text(encoding="utf-8"))
upsert_sync_run(conn, run_id, as_of, "running", "Archiving due D1/D5/D20/D60 price performance.")
tasks = select_tasks(conn, as_of_date, args.tickers)
print(f"tickers selected: {len(tasks)}")
for index, task in enumerate(tasks, start=1):
print(f"[{index}/{len(tasks)}] {task.ticker}", flush=True)
try:
history = archive_history(task, as_of_date)
if history is None:
no_history.append(task.ticker)
continue
rows = performance_rows(task, history, as_of)
if not rows:
no_checkpoint.append(task.ticker)
continue
upsert_source_refs(conn, [source_row(history, as_of)])
upsert_performance(conn, rows)
archived_sources += 1
inserted_rows += len(rows)
except Exception as exc:
failed_tickers.append((task.ticker, str(exc)))
upsert_sync_run(
conn,
run_id,
as_of,
"complete",
(
f"Archived {archived_sources} price-history sources and {inserted_rows} "
f"price-performance rows; {len(failed_tickers)} tickers failed."
),
)
export_snapshot(conn, "source_refs", "source_id")
export_snapshot(conn, "price_performance", "ticker, stage")
export_snapshot(conn, "sync_runs", "sync_run_id")
if not args.skip_sync_state:
refresh_sync_state(args.db, args.schema, as_of)
print(f"price sources archived: {archived_sources}")
print(f"price performance rows written: {inserted_rows}")
if no_history:
print("no market history: " + ", ".join(no_history))
if no_checkpoint:
print("no checkpoint quote: " + ", ".join(no_checkpoint))
if failed_tickers:
print("failed tickers:")
for ticker, error in failed_tickers:
print(f"- {ticker}: {error}")
return 0
if __name__ == "__main__":
raise SystemExit(main())