078f56998b
Request:
- Adjust archivist after the audit findings and update historical data.
Changes:
- Teach the archivist skill to close audit-discovered gaps in priority order.
- Add scripts/archive_price_performance.py for due D1/D5/D20/D60 price-performance backfills.
- Document the price-performance backfill command in README.
- Archive raw Yahoo Finance chart responses under repo-relative data/raw/{ticker}/ paths.
- Populate price_performance with D1/D5/D20/D60 checkpoints and refresh source_refs, sync_runs, sync_tasks, and ticker_sync_state snapshots.
Execution:
- Ran .venv/bin/python scripts/archive_price_performance.py --as-of 2026-06-15T10:00:00Z.
- Selected 291 due price-performance tickers.
- Archived 273 price-history sources and wrote 1063 price-performance rows.
- Re-ran .venv/bin/python scripts/archive_hkex_documents.py --as-of 2026-06-15T10:05:00Z for the remaining open T0/T1 tasks; no additional completed T0/T1 stages resulted.
Verification:
- Compiled the new price-performance script.
- Ran git diff --check.
- Checked SQLite integrity and foreign keys.
- Confirmed database row counts match CSV snapshots.
- Verified all 979 source_refs use valid repo-relative paths, have files, have hashes, and SHA256 hashes match.
- Confirmed no generated Python caches or SQLite transient files remain.
Next useful context:
- price_performance now has 1063 rows: D1 273, D5 272, D20 267, D60 251.
- Remaining due price-performance gaps are 18 tickers where Yahoo history was unavailable or the request failed.
- T0/T1 gaps remain at T0 93 and T1 77; T2 grey-market remains unresolved pending a reproducible source strategy.
430 lines
15 KiB
Python
430 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""Archive due post-listing price performance checkpoints from Yahoo Finance chart data."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import hashlib
|
|
import json
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from datetime import date, datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.parse import urlencode
|
|
from urllib.request import Request, urlopen
|
|
|
|
|
|
DB_PATH = Path("data/hk_ipo.sqlite")
|
|
SCHEMA_PATH = Path("schema/hk_ipo.schema.sql")
|
|
SNAPSHOT_DIR = Path("data/snapshots")
|
|
BASE_CHART_URL = "https://query1.finance.yahoo.com/v8/finance/chart"
|
|
PRICE_STAGES = ("D1", "D5", "D20", "D60")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PriceTask:
|
|
ticker: str
|
|
listing_date: str
|
|
offer_price_hkd: float | None
|
|
stages: dict[str, str]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Quote:
|
|
trade_date: str
|
|
open_price_hkd: float | None
|
|
high_price_hkd: float | None
|
|
low_price_hkd: float | None
|
|
close_price_hkd: float | None
|
|
volume: int | None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ArchivedHistory:
|
|
source_id: str
|
|
ticker: str
|
|
local_path: str
|
|
url: str
|
|
file_sha256: str
|
|
source_date: str
|
|
quotes: list[Quote]
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--db", default=str(DB_PATH), help="Repo-relative SQLite database path.")
|
|
parser.add_argument("--schema", default=str(SCHEMA_PATH), help="Repo-relative schema path.")
|
|
parser.add_argument("--as-of", help="Archive timestamp. Defaults to current UTC time.")
|
|
parser.add_argument("--tickers", help="Comma-separated tickers to process instead of selecting all open due price tasks.")
|
|
parser.add_argument("--skip-sync-state", action="store_true", help="Do not refresh sync state after updating facts.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def parse_as_of(value: str | None) -> datetime:
|
|
if value:
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
return datetime.now(timezone.utc).replace(microsecond=0)
|
|
|
|
|
|
def parse_date(value: str) -> date:
|
|
return date.fromisoformat(value)
|
|
|
|
|
|
def epoch(day: date) -> int:
|
|
return int(datetime(day.year, day.month, day.day, tzinfo=timezone.utc).timestamp())
|
|
|
|
|
|
def yahoo_symbol(ticker: str) -> str:
|
|
return f"{int(ticker)}.HK"
|
|
|
|
|
|
def chart_url(ticker: str, start_date: date, end_date: date) -> str:
|
|
params = urlencode(
|
|
{
|
|
"period1": epoch(start_date),
|
|
"period2": epoch(end_date + timedelta(days=1)),
|
|
"interval": "1d",
|
|
"events": "history",
|
|
"includeAdjustedClose": "true",
|
|
}
|
|
)
|
|
return f"{BASE_CHART_URL}/{yahoo_symbol(ticker)}?{params}"
|
|
|
|
|
|
def fetch_bytes(url: str) -> bytes:
|
|
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
|
|
with urlopen(request, timeout=60) as response:
|
|
return response.read()
|
|
|
|
|
|
def sha256_bytes(data: bytes) -> str:
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
|
|
def select_tasks(conn: sqlite3.Connection, as_of_date: date, tickers: str | None) -> list[PriceTask]:
|
|
params: list[object] = [*PRICE_STAGES, as_of_date.isoformat()]
|
|
ticker_filter = ""
|
|
if tickers:
|
|
selected = [ticker.strip().zfill(5) for ticker in tickers.split(",") if ticker.strip()]
|
|
placeholders = ", ".join("?" for _ in selected)
|
|
ticker_filter = f" AND t.ticker IN ({placeholders})"
|
|
params.extend(selected)
|
|
rows = conn.execute(
|
|
f"""
|
|
SELECT t.ticker, m.listing_date, o.offer_price_hkd, t.stage, t.due_date
|
|
FROM sync_tasks t
|
|
JOIN ipo_master m ON m.ticker = t.ticker
|
|
LEFT JOIN offering_terms o ON o.ticker = t.ticker
|
|
WHERE t.task_status = 'open'
|
|
AND t.stage IN ({", ".join("?" for _ in PRICE_STAGES)})
|
|
AND t.due_date IS NOT NULL
|
|
AND t.due_date <= ?
|
|
AND m.listing_date IS NOT NULL
|
|
{ticker_filter}
|
|
ORDER BY m.listing_date, t.ticker, t.stage
|
|
""",
|
|
tuple(params),
|
|
).fetchall()
|
|
grouped: dict[str, PriceTask] = {}
|
|
for row in rows:
|
|
task = grouped.setdefault(
|
|
row["ticker"],
|
|
PriceTask(row["ticker"], row["listing_date"], row["offer_price_hkd"], {}),
|
|
)
|
|
task.stages[row["stage"]] = row["due_date"]
|
|
return list(grouped.values())
|
|
|
|
|
|
def parse_quotes(payload: bytes) -> list[Quote]:
|
|
data = json.loads(payload)
|
|
result = (data.get("chart", {}).get("result") or [None])[0]
|
|
if not result:
|
|
return []
|
|
timestamps = result.get("timestamp") or []
|
|
quote_block = (result.get("indicators", {}).get("quote") or [{}])[0]
|
|
gmtoffset = int(result.get("meta", {}).get("gmtoffset") or 0)
|
|
quotes: list[Quote] = []
|
|
for index, timestamp in enumerate(timestamps):
|
|
close_price = value_at(quote_block, "close", index)
|
|
if close_price is None:
|
|
continue
|
|
trade_date = datetime.fromtimestamp(timestamp + gmtoffset, timezone.utc).date().isoformat()
|
|
quotes.append(
|
|
Quote(
|
|
trade_date=trade_date,
|
|
open_price_hkd=value_at(quote_block, "open", index),
|
|
high_price_hkd=value_at(quote_block, "high", index),
|
|
low_price_hkd=value_at(quote_block, "low", index),
|
|
close_price_hkd=close_price,
|
|
volume=int(value_at(quote_block, "volume", index) or 0) or None,
|
|
)
|
|
)
|
|
return quotes
|
|
|
|
|
|
def value_at(block: dict[str, list[float | int | None]], key: str, index: int) -> float | int | None:
|
|
values = block.get(key) or []
|
|
if index >= len(values):
|
|
return None
|
|
value = values[index]
|
|
if value is None:
|
|
return None
|
|
return value
|
|
|
|
|
|
def archive_history(task: PriceTask, as_of_date: date) -> ArchivedHistory | None:
|
|
due_dates = [parse_date(value) for value in task.stages.values()]
|
|
start_date = parse_date(task.listing_date) - timedelta(days=3)
|
|
end_date = min(as_of_date, max(due_dates) + timedelta(days=21))
|
|
url = chart_url(task.ticker, start_date, end_date)
|
|
try:
|
|
payload = fetch_bytes(url)
|
|
except (HTTPError, URLError, TimeoutError) as exc:
|
|
raise RuntimeError(f"fetch failed: {exc}") from exc
|
|
quotes = parse_quotes(payload)
|
|
if not quotes:
|
|
return None
|
|
local_path = Path("data/raw") / task.ticker / f"price_history_yahoo_{start_date.isoformat()}_{end_date.isoformat()}.json"
|
|
local_path.parent.mkdir(parents=True, exist_ok=True)
|
|
local_path.write_bytes(payload)
|
|
source_id = f"{task.ticker}_price_history_yahoo_{start_date.isoformat().replace('-', '_')}_{end_date.isoformat().replace('-', '_')}"
|
|
return ArchivedHistory(
|
|
source_id=source_id,
|
|
ticker=task.ticker,
|
|
local_path=local_path.as_posix(),
|
|
url=url,
|
|
file_sha256=sha256_bytes(payload),
|
|
source_date=end_date.isoformat(),
|
|
quotes=quotes,
|
|
)
|
|
|
|
|
|
def checkpoint_quote(quotes: list[Quote], due_date: str) -> Quote | None:
|
|
due = parse_date(due_date)
|
|
for quote in quotes:
|
|
if parse_date(quote.trade_date) >= due:
|
|
return quote
|
|
return None
|
|
|
|
|
|
def source_row(history: ArchivedHistory, as_of: str) -> dict[str, object]:
|
|
return {
|
|
"source_id": history.source_id,
|
|
"ticker": history.ticker,
|
|
"source_type": "market_price_history",
|
|
"title": f"Yahoo Finance daily price history for {history.ticker}.HK",
|
|
"path_base": "repo_root",
|
|
"local_path": history.local_path,
|
|
"url": history.url,
|
|
"file_sha256": history.file_sha256,
|
|
"source_date": history.source_date,
|
|
"archived_at": as_of,
|
|
"notes": "Raw chart response used to derive D1/D5/D20/D60 price performance checkpoints.",
|
|
}
|
|
|
|
|
|
def performance_rows(task: PriceTask, history: ArchivedHistory, as_of: str) -> list[dict[str, object]]:
|
|
rows: list[dict[str, object]] = []
|
|
for stage, due_date in task.stages.items():
|
|
quote = checkpoint_quote(history.quotes, due_date)
|
|
if not quote or quote.close_price_hkd is None:
|
|
continue
|
|
return_pct = None
|
|
if task.offer_price_hkd:
|
|
return_pct = (quote.close_price_hkd / task.offer_price_hkd - 1) * 100
|
|
turnover_hkd_m = None
|
|
if quote.volume is not None:
|
|
turnover_hkd_m = quote.volume * quote.close_price_hkd / 1_000_000
|
|
rows.append(
|
|
{
|
|
"performance_id": f"{task.ticker}_{stage}",
|
|
"ticker": task.ticker,
|
|
"stage": stage,
|
|
"source_id": history.source_id,
|
|
"as_of_date": quote.trade_date,
|
|
"open_price_hkd": quote.open_price_hkd,
|
|
"high_price_hkd": quote.high_price_hkd,
|
|
"low_price_hkd": quote.low_price_hkd,
|
|
"close_price_hkd": quote.close_price_hkd,
|
|
"return_pct": return_pct,
|
|
"turnover_hkd_m": turnover_hkd_m,
|
|
"data_as_of": as_of,
|
|
"notes": f"{stage} uses due date {due_date}; selected first available trading day on or after due date.",
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
def upsert_source_refs(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None:
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO source_refs (
|
|
source_id, ticker, source_type, title, path_base, local_path, url,
|
|
file_sha256, source_date, archived_at, notes
|
|
)
|
|
VALUES (
|
|
:source_id, :ticker, :source_type, :title, :path_base, :local_path, :url,
|
|
:file_sha256, :source_date, :archived_at, :notes
|
|
)
|
|
ON CONFLICT(source_id) DO UPDATE SET
|
|
source_type = excluded.source_type,
|
|
title = excluded.title,
|
|
path_base = excluded.path_base,
|
|
local_path = excluded.local_path,
|
|
url = excluded.url,
|
|
file_sha256 = excluded.file_sha256,
|
|
source_date = excluded.source_date,
|
|
archived_at = excluded.archived_at,
|
|
notes = excluded.notes
|
|
""",
|
|
rows,
|
|
)
|
|
|
|
|
|
def upsert_performance(conn: sqlite3.Connection, rows: list[dict[str, object]]) -> None:
|
|
conn.executemany(
|
|
"""
|
|
INSERT INTO price_performance (
|
|
performance_id, ticker, stage, source_id, as_of_date, open_price_hkd,
|
|
high_price_hkd, low_price_hkd, close_price_hkd, return_pct,
|
|
turnover_hkd_m, data_as_of, notes
|
|
)
|
|
VALUES (
|
|
:performance_id, :ticker, :stage, :source_id, :as_of_date, :open_price_hkd,
|
|
:high_price_hkd, :low_price_hkd, :close_price_hkd, :return_pct,
|
|
:turnover_hkd_m, :data_as_of, :notes
|
|
)
|
|
ON CONFLICT(ticker, stage) DO UPDATE SET
|
|
source_id = excluded.source_id,
|
|
as_of_date = excluded.as_of_date,
|
|
open_price_hkd = excluded.open_price_hkd,
|
|
high_price_hkd = excluded.high_price_hkd,
|
|
low_price_hkd = excluded.low_price_hkd,
|
|
close_price_hkd = excluded.close_price_hkd,
|
|
return_pct = excluded.return_pct,
|
|
turnover_hkd_m = excluded.turnover_hkd_m,
|
|
data_as_of = excluded.data_as_of,
|
|
notes = excluded.notes
|
|
""",
|
|
rows,
|
|
)
|
|
|
|
|
|
def upsert_sync_run(conn: sqlite3.Connection, run_id: str, as_of: str, status: str, notes: str) -> None:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO sync_runs (sync_run_id, mode, as_of, started_at, finished_at, status, notes)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(sync_run_id) DO UPDATE SET
|
|
mode = excluded.mode,
|
|
as_of = excluded.as_of,
|
|
started_at = excluded.started_at,
|
|
finished_at = excluded.finished_at,
|
|
status = excluded.status,
|
|
notes = excluded.notes
|
|
""",
|
|
(run_id, "price_performance_archive", as_of, as_of, as_of, status, notes),
|
|
)
|
|
|
|
|
|
def export_snapshot(conn: sqlite3.Connection, table: str, order_by: str = "1") -> None:
|
|
SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
|
|
cursor = conn.execute(f"SELECT * FROM {table} ORDER BY {order_by}")
|
|
columns = [description[0] for description in cursor.description]
|
|
with (SNAPSHOT_DIR / f"{table}.csv").open("w", newline="", encoding="utf-8") as handle:
|
|
writer = csv.writer(handle, lineterminator="\n")
|
|
writer.writerow(columns)
|
|
writer.writerows(cursor.fetchall())
|
|
|
|
|
|
def refresh_sync_state(db_path: str, schema_path: str, as_of: str) -> None:
|
|
command = [
|
|
sys.executable,
|
|
"scripts/update_sync_state.py",
|
|
"--db",
|
|
db_path,
|
|
"--schema",
|
|
schema_path,
|
|
"--as-of",
|
|
as_of,
|
|
"--mode",
|
|
"price_performance_archive",
|
|
"--summary-limit",
|
|
"20",
|
|
]
|
|
subprocess.run(command, check=True)
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
as_of_dt = parse_as_of(args.as_of)
|
|
as_of = as_of_dt.isoformat().replace("+00:00", "Z")
|
|
as_of_date = as_of_dt.date()
|
|
run_id = "price_performance_archive_" + as_of.replace(":", "").replace("-", "").replace("+", "").replace("Z", "Z")
|
|
|
|
archived_sources = 0
|
|
inserted_rows = 0
|
|
no_history: list[str] = []
|
|
no_checkpoint: list[str] = []
|
|
failed_tickers: list[tuple[str, str]] = []
|
|
|
|
with sqlite3.connect(args.db) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
conn.executescript(Path(args.schema).read_text(encoding="utf-8"))
|
|
upsert_sync_run(conn, run_id, as_of, "running", "Archiving due D1/D5/D20/D60 price performance.")
|
|
tasks = select_tasks(conn, as_of_date, args.tickers)
|
|
print(f"tickers selected: {len(tasks)}")
|
|
for index, task in enumerate(tasks, start=1):
|
|
print(f"[{index}/{len(tasks)}] {task.ticker}", flush=True)
|
|
try:
|
|
history = archive_history(task, as_of_date)
|
|
if history is None:
|
|
no_history.append(task.ticker)
|
|
continue
|
|
rows = performance_rows(task, history, as_of)
|
|
if not rows:
|
|
no_checkpoint.append(task.ticker)
|
|
continue
|
|
upsert_source_refs(conn, [source_row(history, as_of)])
|
|
upsert_performance(conn, rows)
|
|
archived_sources += 1
|
|
inserted_rows += len(rows)
|
|
except Exception as exc:
|
|
failed_tickers.append((task.ticker, str(exc)))
|
|
upsert_sync_run(
|
|
conn,
|
|
run_id,
|
|
as_of,
|
|
"complete",
|
|
(
|
|
f"Archived {archived_sources} price-history sources and {inserted_rows} "
|
|
f"price-performance rows; {len(failed_tickers)} tickers failed."
|
|
),
|
|
)
|
|
export_snapshot(conn, "source_refs", "source_id")
|
|
export_snapshot(conn, "price_performance", "ticker, stage")
|
|
export_snapshot(conn, "sync_runs", "sync_run_id")
|
|
|
|
if not args.skip_sync_state:
|
|
refresh_sync_state(args.db, args.schema, as_of)
|
|
|
|
print(f"price sources archived: {archived_sources}")
|
|
print(f"price performance rows written: {inserted_rows}")
|
|
if no_history:
|
|
print("no market history: " + ", ".join(no_history))
|
|
if no_checkpoint:
|
|
print("no checkpoint quote: " + ", ".join(no_checkpoint))
|
|
if failed_tickers:
|
|
print("failed tickers:")
|
|
for ticker, error in failed_tickers:
|
|
print(f"- {ticker}: {error}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|