#!/usr/bin/env python3 """Update per-ticker archive sync state and pending sync tasks.""" from __future__ import annotations import argparse import csv import sqlite3 from dataclasses import dataclass from datetime import date, datetime, timedelta, timezone from pathlib import Path DEFAULT_DB_PATH = Path("data/hk_ipo.sqlite") DEFAULT_SCHEMA_PATH = Path("schema/hk_ipo.schema.sql") SNAPSHOT_DIR = Path("data/snapshots") @dataclass(frozen=True) class Ticker: ticker: str listing_date: str | None application_start_date: str | None allotment_results_expected_date: str | None @dataclass(frozen=True) class StageState: ticker: str stage: str status: str required: int due_date: str | None completed_at: str | None last_source_id: str | None data_gap_id: str | None notes: str STAGE_ORDER = [ "T0_prospectus", "T1_allotment", "T2_grey_market", "D1", "D5", "D20", "D60", ] TASK_TYPES = { "T0_prospectus": "archive_prospectus_and_terms", "T1_allotment": "archive_allotment_results", "T2_grey_market": "archive_grey_market_result", "D1": "archive_price_performance", "D5": "archive_price_performance", "D20": "archive_price_performance", "D60": "archive_price_performance", } def parse_as_of(value: str | None) -> datetime: if value: normalized = value.replace("Z", "+00:00") return datetime.fromisoformat(normalized) return datetime.now(timezone.utc).replace(microsecond=0) def parse_date(value: str | None) -> date | None: if not value: return None return date.fromisoformat(value) def due_status(due_date: str | None, as_of_date: date) -> str: if due_date is None: return "pending_due" due = parse_date(due_date) if due is None or due <= as_of_date: return "pending_due" return "pending_not_due" def query_one(conn: sqlite3.Connection, sql: str, params: tuple[object, ...]) -> sqlite3.Row | None: return conn.execute(sql, params).fetchone() def load_tickers(conn: sqlite3.Connection) -> list[Ticker]: rows = conn.execute( """ SELECT ticker, listing_date, application_start_date, allotment_results_expected_date FROM ipo_master ORDER BY ticker """ ).fetchall() return [Ticker(**dict(row)) for row in rows] def data_gap_for(conn: sqlite3.Connection, ticker: str, stage: str) -> sqlite3.Row | None: return query_one( conn, """ SELECT gap_id, reason, expected_resolution_date FROM data_gaps WHERE ticker = ? AND stage = ? ORDER BY created_at DESC, gap_id DESC LIMIT 1 """, (ticker, stage), ) def prospectus_state(conn: sqlite3.Connection, ticker: Ticker, as_of_date: date) -> StageState: source = query_one( conn, """ SELECT source_id, source_date FROM source_refs WHERE ticker = ? AND source_type = 'prospectus' ORDER BY source_date DESC, source_id DESC LIMIT 1 """, (ticker.ticker,), ) terms = query_one(conn, "SELECT ticker FROM offering_terms WHERE ticker = ?", (ticker.ticker,)) if source and terms: return StageState( ticker.ticker, "T0_prospectus", "complete", 1, ticker.application_start_date, source["source_date"], source["source_id"], None, "Prospectus source and offering terms are archived.", ) gap = data_gap_for(conn, ticker.ticker, "T0_prospectus") status = "blocked" if gap and gap["expected_resolution_date"] is None else due_status(ticker.application_start_date, as_of_date) return StageState( ticker.ticker, "T0_prospectus", status, 1, ticker.application_start_date, None, source["source_id"] if source else None, gap["gap_id"] if gap else None, "Missing prospectus source or offering terms.", ) def allotment_state(conn: sqlite3.Connection, ticker: Ticker, as_of_date: date) -> StageState: demand = query_one( conn, """ SELECT source_id, stage_date FROM ipo_demand WHERE ticker = ? ORDER BY stage_date DESC, demand_id DESC LIMIT 1 """, (ticker.ticker,), ) source = query_one( conn, """ SELECT source_id, source_date FROM source_refs WHERE ticker = ? AND source_type = 'allotment_results' ORDER BY source_date DESC, source_id DESC LIMIT 1 """, (ticker.ticker,), ) if demand or source: return StageState( ticker.ticker, "T1_allotment", "complete", 1, ticker.allotment_results_expected_date, (demand or source)["stage_date" if demand else "source_date"], (demand or source)["source_id"], None, "Allotment result facts are archived.", ) gap = data_gap_for(conn, ticker.ticker, "T1_allotment") status = "blocked" if gap and gap["expected_resolution_date"] is None else due_status(ticker.allotment_results_expected_date, as_of_date) return StageState( ticker.ticker, "T1_allotment", status, 1, ticker.allotment_results_expected_date, None, None, gap["gap_id"] if gap else None, "Allotment result facts are not archived yet.", ) def price_state(conn: sqlite3.Connection, ticker: Ticker, stage: str, due_date: str | None, as_of_date: date) -> StageState: perf = query_one( conn, """ SELECT source_id, as_of_date FROM price_performance WHERE ticker = ? AND stage = ? LIMIT 1 """, (ticker.ticker, stage), ) source_types = { "T2_grey_market": ("grey_market", "dark_market", "grey_market_performance"), }.get(stage, ()) source = None if source_types: placeholders = ", ".join("?" for _ in source_types) source = query_one( conn, f""" SELECT source_id, source_date FROM source_refs WHERE ticker = ? AND source_type IN ({placeholders}) ORDER BY source_date DESC, source_id DESC LIMIT 1 """, (ticker.ticker, *source_types), ) if perf or source: row = perf or source completed_key = "as_of_date" if perf else "source_date" return StageState( ticker.ticker, stage, "complete", 1, due_date, row[completed_key], row["source_id"], None, "Performance source is archived.", ) gap = data_gap_for(conn, ticker.ticker, stage) status = "blocked" if gap and gap["expected_resolution_date"] is None else due_status(due_date, as_of_date) note = "Price/performance source is not archived yet." if stage in {"D5", "D20", "D60"}: note += " Due date uses calendar days until trading-calendar support is added." return StageState( ticker.ticker, stage, status, 1, due_date, None, None, gap["gap_id"] if gap else None, note, ) def listing_offset_due(listing_date: str | None, days_after_listing: int) -> str | None: listed = parse_date(listing_date) if listed is None: return None return (listed + timedelta(days=days_after_listing)).isoformat() def build_states(conn: sqlite3.Connection, ticker: Ticker, as_of_date: date) -> list[StageState]: return [ prospectus_state(conn, ticker, as_of_date), allotment_state(conn, ticker, as_of_date), price_state(conn, ticker, "T2_grey_market", ticker.listing_date, as_of_date), price_state(conn, ticker, "D1", listing_offset_due(ticker.listing_date, 0), as_of_date), price_state(conn, ticker, "D5", listing_offset_due(ticker.listing_date, 4), as_of_date), price_state(conn, ticker, "D20", listing_offset_due(ticker.listing_date, 19), as_of_date), price_state(conn, ticker, "D60", listing_offset_due(ticker.listing_date, 59), as_of_date), ] def upsert_sync_run(conn: sqlite3.Connection, run_id: str, mode: str, as_of: str, status: str, notes: str) -> None: conn.execute( """ INSERT INTO sync_runs (sync_run_id, mode, as_of, started_at, finished_at, status, notes) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(sync_run_id) DO UPDATE SET mode = excluded.mode, as_of = excluded.as_of, started_at = excluded.started_at, finished_at = excluded.finished_at, status = excluded.status, notes = excluded.notes """, (run_id, mode, as_of, as_of, as_of, status, notes), ) def replace_state(conn: sqlite3.Connection, states: list[StageState], run_id: str, updated_at: str) -> None: conn.executemany( """ INSERT INTO ticker_sync_state ( ticker, stage, status, required, due_date, completed_at, last_source_id, data_gap_id, last_sync_run_id, updated_at, notes ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(ticker, stage) DO UPDATE SET status = excluded.status, required = excluded.required, due_date = excluded.due_date, completed_at = excluded.completed_at, last_source_id = excluded.last_source_id, data_gap_id = excluded.data_gap_id, last_sync_run_id = excluded.last_sync_run_id, updated_at = excluded.updated_at, notes = excluded.notes """, [ ( state.ticker, state.stage, state.status, state.required, state.due_date, state.completed_at, state.last_source_id, state.data_gap_id, run_id, updated_at, state.notes, ) for state in states ], ) def rebuild_tasks(conn: sqlite3.Connection, states: list[StageState], run_id: str, updated_at: str) -> None: conn.execute("DELETE FROM sync_tasks") rows = [] for state in states: if state.status == "complete" or state.required == 0: continue task_status = { "pending_due": "open", "pending_not_due": "waiting_until_due", "blocked": "blocked", }.get(state.status) if task_status is None: continue rows.append( ( f"{state.ticker}_{state.stage}", state.ticker, state.stage, TASK_TYPES[state.stage], task_status, state.due_date, state.data_gap_id, run_id, updated_at, state.notes, ) ) conn.executemany( """ INSERT INTO sync_tasks ( task_id, ticker, stage, task_type, task_status, due_date, data_gap_id, last_sync_run_id, updated_at, notes ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, rows, ) def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None: SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True) cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}") columns = [description[0] for description in cursor.description] with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle: writer = csv.writer(handle, lineterminator="\n") writer.writerow(columns) writer.writerows(cursor.fetchall()) def print_summary(states: list[StageState]) -> None: counts: dict[str, int] = {} for state in states: counts[state.status] = counts.get(state.status, 0) + 1 print("sync state updated") for status in sorted(counts): print(f"{status}: {counts[status]}") open_items = [state for state in states if state.status in {"pending_due", "blocked"}] if open_items: print("actionable items:") for state in open_items: print(f"- {state.ticker} {state.stage}: {state.status} due={state.due_date or ''}") def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Repo-relative SQLite database path.") parser.add_argument("--schema", default=str(DEFAULT_SCHEMA_PATH), help="Repo-relative schema path.") parser.add_argument("--as-of", help="ISO timestamp for deterministic sync-state snapshots.") parser.add_argument("--run-id", help="Stable sync run id. Defaults to sync_state_.") parser.add_argument("--mode", default="state_refresh", help="Sync run mode label.") args = parser.parse_args() as_of_dt = parse_as_of(args.as_of) as_of = as_of_dt.isoformat().replace("+00:00", "Z") as_of_date = as_of_dt.date() run_id = args.run_id or "sync_state_" + as_of.replace(":", "").replace("-", "").replace("+", "").replace("Z", "Z") db_path = Path(args.db) schema_path = Path(args.schema) with sqlite3.connect(db_path) as conn: conn.row_factory = sqlite3.Row conn.executescript(schema_path.read_text(encoding="utf-8")) upsert_sync_run(conn, run_id, args.mode, as_of, "running", "Refreshing derived ticker sync state.") states = [state for ticker in load_tickers(conn) for state in build_states(conn, ticker, as_of_date)] replace_state(conn, states, run_id, as_of) rebuild_tasks(conn, states, run_id, as_of) upsert_sync_run(conn, run_id, args.mode, as_of, "complete", "Derived ticker sync state refreshed.") export_snapshot(conn, "sync_runs", "sync_run_id") export_snapshot( conn, "ticker_sync_state", """ ticker, CASE stage WHEN 'T0_prospectus' THEN 0 WHEN 'T1_allotment' THEN 1 WHEN 'T2_grey_market' THEN 2 WHEN 'D1' THEN 3 WHEN 'D5' THEN 4 WHEN 'D20' THEN 5 WHEN 'D60' THEN 6 ELSE 99 END """, ) export_snapshot(conn, "sync_tasks", "task_status, due_date, ticker, stage") print_summary(states) return 0 if __name__ == "__main__": raise SystemExit(main())