#!/usr/bin/env python3 """Extract text from archived IPO PDFs into repo-relative derived text files.""" from __future__ import annotations import argparse import csv import hashlib import json import logging import sqlite3 import sys from dataclasses import dataclass from pathlib import Path DEFAULT_DB_PATH = Path("data/hk_ipo.sqlite") DEFAULT_OUTPUT_ROOT = Path("data/extracted_text") DEFAULT_MANIFEST = Path("data/snapshots/extracted_text_manifest.csv") logging.getLogger("pypdf").setLevel(logging.ERROR) @dataclass(frozen=True) class SourceDocument: source_id: str ticker: str source_type: str local_path: str file_sha256: str | None def repo_root() -> Path: return Path.cwd() def require_repo_relative(relative_path: str) -> Path: path = Path(relative_path) if path.is_absolute() or relative_path.startswith("./") or "\\" in relative_path: raise ValueError(f"Path must be repo-relative POSIX style: {relative_path}") full_path = repo_root() / path if not full_path.exists(): raise FileNotFoundError(relative_path) return full_path def sha256_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def load_sources(db_path: Path, requested_sources: list[str]) -> list[SourceDocument]: with sqlite3.connect(db_path) as conn: conn.row_factory = sqlite3.Row if requested_sources: placeholders = ", ".join("?" for _ in requested_sources) rows = conn.execute( f""" SELECT source_id, ticker, source_type, local_path, file_sha256 FROM source_refs WHERE source_id IN ({placeholders}) ORDER BY ticker, source_id """, requested_sources, ).fetchall() else: rows = conn.execute( """ SELECT source_id, ticker, source_type, local_path, file_sha256 FROM source_refs WHERE local_path LIKE '%.pdf' ORDER BY ticker, source_id """ ).fetchall() return [SourceDocument(**dict(row)) for row in rows] def import_pypdf(): try: from pypdf import PdfReader except ModuleNotFoundError as exc: raise SystemExit( "Missing dependency: pypdf. Install with `python3 -m pip install -r requirements.txt`." ) from exc return PdfReader def utf8_safe_text(value: str) -> str: return value.encode("utf-8", "replace").decode("utf-8") def extract_text(pdf_path: Path) -> tuple[str, int, int]: PdfReader = import_pypdf() reader = PdfReader(str(pdf_path)) chunks: list[str] = [] pages_with_text = 0 for index, page in enumerate(reader.pages, start=1): text = page.extract_text() or "" if text.strip(): pages_with_text += 1 cleaned_text = utf8_safe_text("\n".join(line.rstrip() for line in text.strip().splitlines())) chunks.append(f"\n\n--- page {index} ---\n{cleaned_text}\n") return "".join(chunks).strip() + "\n", len(reader.pages), pages_with_text def text_output_path(output_root: Path, source: SourceDocument) -> Path: pdf_stem = Path(source.local_path).stem return output_root / source.ticker / f"{pdf_stem}.txt" def write_manifest(rows: list[dict[str, object]], manifest_path: Path) -> None: manifest_path.parent.mkdir(parents=True, exist_ok=True) fieldnames = [ "source_id", "ticker", "source_type", "pdf_local_path", "pdf_sha256", "text_local_path", "text_sha256", "page_count", "pages_with_text", "char_count", "status", "notes", ] with manifest_path.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=fieldnames, lineterminator="\n") writer.writeheader() writer.writerows(rows) def load_manifest(manifest_path: Path) -> dict[str, dict[str, object]]: if not manifest_path.exists(): return {} with manifest_path.open(newline="", encoding="utf-8") as handle: return {row["source_id"]: row for row in csv.DictReader(handle)} def can_reuse_existing_manifest_row( existing_row: dict[str, object] | None, output_path: Path, actual_pdf_hash: str, ) -> bool: if not existing_row: return False if existing_row.get("pdf_sha256") != actual_pdf_hash: return False if existing_row.get("text_local_path") != output_path.as_posix(): return False if not output_path.exists(): return False text_hash = existing_row.get("text_sha256") return bool(text_hash) and sha256_file(output_path) == text_hash def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Repo-relative SQLite database path.") parser.add_argument( "--output-root", default=str(DEFAULT_OUTPUT_ROOT), help="Repo-relative output directory for extracted text.", ) parser.add_argument( "--manifest", default=str(DEFAULT_MANIFEST), help="Repo-relative CSV manifest path.", ) parser.add_argument( "--source-id", action="append", default=[], help="Specific source_id to extract. May be passed multiple times. Defaults to all PDF source_refs.", ) parser.add_argument("--force", action="store_true", help="Re-extract even when the manifest row is unchanged.") parser.add_argument("--json", action="store_true", help="Print a JSON summary.") args = parser.parse_args() db_path = require_repo_relative(args.db) output_root = Path(args.output_root) if output_root.is_absolute() or args.output_root.startswith("./") or "\\" in args.output_root: raise ValueError(f"Output root must be repo-relative POSIX style: {args.output_root}") manifest_path = Path(args.manifest) if manifest_path.is_absolute() or args.manifest.startswith("./") or "\\" in args.manifest: raise ValueError(f"Manifest path must be repo-relative POSIX style: {args.manifest}") existing_rows = load_manifest(manifest_path) rows_by_source_id = dict(existing_rows) if args.source_id else {} processed_rows: list[dict[str, object]] = [] skipped = 0 for source in load_sources(db_path, args.source_id): pdf_path = require_repo_relative(source.local_path) actual_pdf_hash = sha256_file(pdf_path) if source.file_sha256 and source.file_sha256 != actual_pdf_hash: raise ValueError(f"PDF hash mismatch for {source.source_id}") output_path = text_output_path(output_root, source) output_path.parent.mkdir(parents=True, exist_ok=True) existing_row = existing_rows.get(source.source_id) if not args.force and can_reuse_existing_manifest_row(existing_row, output_path, actual_pdf_hash): row = dict(existing_row or {}) skipped += 1 else: try: text, page_count, pages_with_text = extract_text(pdf_path) output_path.write_text(text, encoding="utf-8") text_hash = sha256_file(output_path) char_count = len(text) status = "ok" if pages_with_text else "no_text_extracted" notes = "" except Exception as exc: output_path.write_text("", encoding="utf-8") text_hash = sha256_file(output_path) page_count = 0 pages_with_text = 0 char_count = 0 status = "error" notes = f"{type(exc).__name__}: {exc}" row = { "source_id": source.source_id, "ticker": source.ticker, "source_type": source.source_type, "pdf_local_path": source.local_path, "pdf_sha256": actual_pdf_hash, "text_local_path": output_path.as_posix(), "text_sha256": text_hash, "page_count": page_count, "pages_with_text": pages_with_text, "char_count": char_count, "status": status, "notes": notes, } rows_by_source_id[source.source_id] = row processed_rows.append(row) manifest_rows = sorted(rows_by_source_id.values(), key=lambda row: (str(row["ticker"]), str(row["source_id"]))) write_manifest(manifest_rows, manifest_path) if args.json: print(json.dumps(processed_rows, ensure_ascii=False, indent=2)) else: print(f"processed {len(processed_rows)} PDF source(s); skipped unchanged: {skipped}") print(f"manifest rows: {len(manifest_rows)}; manifest: {manifest_path.as_posix()}") for row in processed_rows: print( f"{row['source_id']}: {row['status']} " f"pages={row['pages_with_text']}/{row['page_count']} " f"chars={row['char_count']}" ) return 0 if __name__ == "__main__": raise SystemExit(main())