#!/usr/bin/env python3 """Archive due post-listing price performance checkpoints from Yahoo Finance chart data.""" from __future__ import annotations import argparse import csv import hashlib import json import sqlite3 import subprocess import sys from dataclasses import dataclass from datetime import date, datetime, timedelta, timezone from pathlib import Path from urllib.error import HTTPError, URLError from urllib.parse import urlencode from urllib.request import Request, urlopen DB_PATH = Path("data/hk_ipo.sqlite") SCHEMA_PATH = Path("schema/hk_ipo.schema.sql") SNAPSHOT_DIR = Path("data/snapshots") BASE_CHART_URL = "https://query1.finance.yahoo.com/v8/finance/chart" PRICE_STAGES = ("D1", "D5", "D20", "D60") @dataclass(frozen=True) class PriceTask: ticker: str listing_date: str offer_price_hkd: float | None stages: dict[str, str] @dataclass(frozen=True) class Quote: trade_date: str open_price_hkd: float | None high_price_hkd: float | None low_price_hkd: float | None close_price_hkd: float | None volume: int | None @dataclass(frozen=True) class ArchivedHistory: source_id: str ticker: str local_path: str url: str file_sha256: str source_date: str quotes: list[Quote] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.") parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.") parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.") parser.add_argument("--tickers", help="Comma-separated tickers to process instead of selecting all open due price tasks.") parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh sync state after updating facts.") return parser.parse_args() def parse_as_of(value: str | None) -> datetime: if value: return datetime.fromisoformat(value.replace("Z", "+00:00")) return datetime.now(timezone.utc).replace(microsecond=0) def parse_date(value: str) -> date: return date.fromisoformat(value) def epoch(day: date) -> int: return int(datetime(day.year, day.month, day.day, tzinfo=timezone.utc).timestamp()) def yahoo_symbol(ticker: str) -> str: return f"{int(ticker)}.HK" def chart_url(ticker: str, start_date: date, end_date: date) -> str: params = urlencode( { "period1": epoch(start_date), "period2": epoch(end_date + timedelta(days=1)), "interval": "1d", "events": "history", "includeAdjustedClose": "true", } ) return f"{BASE_CHART_URL}/{yahoo_symbol(ticker)}?{params}" def fetch_bytes(url: str) -> bytes: request = Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urlopen(request, timeout=60) as response: return response.read() def sha256_bytes(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def select_tasks(conn: sqlite3.Connection, as_of_date: date, tickers: str | None) -> list[PriceTask]: params: list[object] = [*PRICE_STAGES, as_of_date.isoformat()] ticker_filter = "" if tickers: selected = [ticker.strip().zfill(5) for ticker in tickers.split(",") if ticker.strip()] placeholders = ", ".join("?" for _ in selected) ticker_filter = f" AND t.ticker IN ({placeholders})" params.extend(selected) rows = conn.execute( f""" SELECT t.ticker, m.listing_date, o.offer_price_hkd, t.stage, t.due_date FROM sync_tasks t JOIN ipo_master m ON m.ticker = t.ticker LEFT JOIN offering_terms o ON o.ticker = t.ticker WHERE t.task_status = 'open' AND t.stage IN ({", ".join("?" for _ in PRICE_STAGES)}) AND t.due_date IS NOT NULL AND t.due_date <= ? AND m.listing_date IS NOT NULL {ticker_filter} ORDER BY m.listing_date, t.ticker, t.stage """, tuple(params), ).fetchall() grouped: dict[str, PriceTask] = {} for row in rows: task = grouped.setdefault( row["ticker"], PriceTask(row["ticker"], row["listing_date"], row["offer_price_hkd"], {}), ) task.stages[row["stage"]] = row["due_date"] return list(grouped.values()) def parse_quotes(payload: bytes) -> list[Quote]: data = json.loads(payload) result = (data.get("chart", {}).get("result") or [None])[0] if not result: return [] timestamps = result.get("timestamp") or [] quote_block = (result.get("indicators", {}).get("quote") or [{}])[0] gmtoffset = int(result.get("meta", {}).get("gmtoffset") or 0) quotes: list[Quote] = [] for index, timestamp in enumerate(timestamps): close_price = value_at(quote_block, "close", index) if close_price is None: continue trade_date = datetime.fromtimestamp(timestamp + gmtoffset, timezone.utc).date().isoformat() quotes.append( Quote( trade_date=trade_date, open_price_hkd=value_at(quote_block, "open", index), high_price_hkd=value_at(quote_block, "high", index), low_price_hkd=value_at(quote_block, "low", index), close_price_hkd=close_price, volume=int(value_at(quote_block, "volume", index) or 0) or None, ) ) return quotes def value_at(block: dict[str, list[float | int | None]], key: str, index: int) -> float | int | None: values = block.get(key) or [] if index >= len(values): return None value = values[index] if value is None: return None return value def archive_history(task: PriceTask, as_of_date: date) -> ArchivedHistory | None: due_dates = [parse_date(value) for value in task.stages.values()] start_date = parse_date(task.listing_date) - timedelta(days=3) end_date = min(as_of_date, max(due_dates) + timedelta(days=21)) url = chart_url(task.ticker, start_date, end_date) try: payload = fetch_bytes(url) except (HTTPError, URLError, TimeoutError) as exc: raise RuntimeError(f"fetch failed: {exc}") from exc quotes = parse_quotes(payload) if not quotes: return None local_path = Path("data/raw") / task.ticker / f"price_history_yahoo_{start_date.isoformat()}_{end_date.isoformat()}.json" local_path.parent.mkdir(parents=True, exist_ok=True) local_path.write_bytes(payload) source_id = f"{task.ticker}_price_history_yahoo_{start_date.isoformat().replace('-', '_')}_{end_date.isoformat().replace('-', '_')}" return ArchivedHistory( source_id=source_id, ticker=task.ticker, local_path=local_path.as_posix(), url=url, file_sha256=sha256_bytes(payload), source_date=end_date.isoformat(), quotes=quotes, ) def checkpoint_quote(quotes: list[Quote], due_date: str) -> Quote | None: due = parse_date(due_date) for quote in quotes: if parse_date(quote.trade_date) >= due: return quote return None def source_row(history: ArchivedHistory, as_of: str) -> dict[str, object]: return { "source_id": history.source_id, "ticker": history.ticker, "source_type": "market_price_history", "title": f"Yahoo Finance daily price history for {history.ticker}.HK", "path_base": "repo_root", "local_path": history.local_path, "url": history.url, "file_sha256": history.file_sha256, "source_date": history.source_date, "archived_at": as_of, "notes": "Raw chart response used to derive D1/D5/D20/D60 price performance checkpoints.", } def performance_rows(task: PriceTask, history: ArchivedHistory, as_of: str) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] for stage, due_date in task.stages.items(): quote = checkpoint_quote(history.quotes, due_date) if not quote or quote.close_price_hkd is None: continue return_pct = None if task.offer_price_hkd: return_pct = (quote.close_price_hkd / task.offer_price_hkd - 1) * 100 turnover_hkd_m = None if quote.volume is not None: turnover_hkd_m = quote.volume * quote.close_price_hkd / 1_000_000 rows.append( { "performance_id": f"{task.ticker}_{stage}", "ticker": task.ticker, "stage": stage, "source_id": history.source_id, "as_of_date": quote.trade_date, "open_price_hkd": quote.open_price_hkd, "high_price_hkd": quote.high_price_hkd, "low_price_hkd": quote.low_price_hkd, "close_price_hkd": quote.close_price_hkd, "return_pct": return_pct, "turnover_hkd_m": turnover_hkd_m, "data_as_of": as_of, "notes": f"{stage} uses due date {due_date}; selected first available trading day on or after due date.", } ) return rows def upsert_source_refs(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None: conn.executemany( """ INSERT INTO source_refs ( source_id, ticker, source_type, title, path_base, local_path, url, file_sha256, source_date, archived_at, notes ) VALUES ( :source_id, :ticker, :source_type, :title, :path_base, :local_path, :url, :file_sha256, :source_date, :archived_at, :notes ) ON CONFLICT(source_id) DO UPDATE SET source_type = excluded.source_type, title = excluded.title, path_base = excluded.path_base, local_path = excluded.local_path, url = excluded.url, file_sha256 = excluded.file_sha256, source_date = excluded.source_date, archived_at = excluded.archived_at, notes = excluded.notes """, rows, ) def upsert_performance(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None: conn.executemany( """ INSERT INTO price_performance ( performance_id, ticker, stage, source_id, as_of_date, open_price_hkd, high_price_hkd, low_price_hkd, close_price_hkd, return_pct, turnover_hkd_m, data_as_of, notes ) VALUES ( :performance_id, :ticker, :stage, :source_id, :as_of_date, :open_price_hkd, :high_price_hkd, :low_price_hkd, :close_price_hkd, :return_pct, :turnover_hkd_m, :data_as_of, :notes ) ON CONFLICT(ticker, stage) DO UPDATE SET source_id = excluded.source_id, as_of_date = excluded.as_of_date, open_price_hkd = excluded.open_price_hkd, high_price_hkd = excluded.high_price_hkd, low_price_hkd = excluded.low_price_hkd, close_price_hkd = excluded.close_price_hkd, return_pct = excluded.return_pct, turnover_hkd_m = excluded.turnover_hkd_m, data_as_of = excluded.data_as_of, notes = excluded.notes """, rows, ) def upsert_sync_run(conn: sqlite3.Connection, run_id: str, as_of: str, status: str, notes: str) -> None: conn.execute( """ INSERT INTO sync_runs (sync_run_id, mode, as_of, started_at, finished_at, status, notes) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(sync_run_id) DO UPDATE SET mode = excluded.mode, as_of = excluded.as_of, started_at = excluded.started_at, finished_at = excluded.finished_at, status = excluded.status, notes = excluded.notes """, (run_id, "price_performance_archive", as_of, as_of, as_of, status, notes), ) def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None: SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True) cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}") columns = [description[0] for description in cursor.description] with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle: writer = csv.writer(handle, lineterminator="\n") writer.writerow(columns) writer.writerows(cursor.fetchall()) def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None: command = [ sys.executable, "scripts/update_sync_state.py", "--db", db_path, "--schema", schema_path, "--as-of", as_of, "--mode", "price_performance_archive", "--summary-limit", "20", ] subprocess.run(command, check=True) def main() -> int: args = parse_args() as_of_dt = parse_as_of(args.as_of) as_of = as_of_dt.isoformat().replace("+00:00", "Z") as_of_date = as_of_dt.date() run_id = "price_performance_archive_" + as_of.replace(":", "").replace("-", "").replace("+", "").replace("Z", "Z") archived_sources = 0 inserted_rows = 0 no_history: list[str] = [] no_checkpoint: list[str] = [] failed_tickers: list[tuple[str, str]] = [] with sqlite3.connect(args.db) as conn: conn.row_factory = sqlite3.Row conn.executescript(Path(args.schema).read_text(encoding="utf-8")) upsert_sync_run(conn, run_id, as_of, "running", "Archiving due D1/D5/D20/D60 price performance.") tasks = select_tasks(conn, as_of_date, args.tickers) print(f"tickers selected: {len(tasks)}") for index, task in enumerate(tasks, start=1): print(f"[{index}/{len(tasks)}] {task.ticker}", flush=True) try: history = archive_history(task, as_of_date) if history is None: no_history.append(task.ticker) continue rows = performance_rows(task, history, as_of) if not rows: no_checkpoint.append(task.ticker) continue upsert_source_refs(conn, [source_row(history, as_of)]) upsert_performance(conn, rows) archived_sources += 1 inserted_rows += len(rows) except Exception as exc: failed_tickers.append((task.ticker, str(exc))) upsert_sync_run( conn, run_id, as_of, "complete", ( f"Archived {archived_sources} price-history sources and {inserted_rows} " f"price-performance rows; {len(failed_tickers)} tickers failed." ), ) export_snapshot(conn, "source_refs", "source_id") export_snapshot(conn, "price_performance", "ticker, stage") export_snapshot(conn, "sync_runs", "sync_run_id") if not args.skip_sync_state: refresh_sync_state(args.db, args.schema, as_of) print(f"price sources archived: {archived_sources}") print(f"price performance rows written: {inserted_rows}") if no_history: print("no market history: " + ", ".join(no_history)) if no_checkpoint: print("no checkpoint quote: " + ", ".join(no_checkpoint)) if failed_tickers: print("failed tickers:") for ticker, error in failed_tickers: print(f"- {ticker}: {error}") return 0 if __name__ == "__main__": raise SystemExit(main())