psycopg3 AsyncConnectionPool + pgvector

Standard-Pattern fuer async-FastAPI-Services mit pgvector. Erstmals dokumentiert aus dem Icking-Rebuild — vorher gab es nur die sync-Variante in der alten a-icking-Pipeline. Wenn dieses Pattern in zukuenftigen Services landet (z.B. Becker, HeyJulia), hier nachziehen.

Das Problem

register_vector_async (aus pgvector.psycopg) macht intern ein SELECT typname FROM pg_type um den Vector-Type beim Connection-Adapter zu registrieren. In einer impliziten Transaktion (Default psycopg3-Verhalten) haengt dieses SELECT — Pool-Init dreht ewig, kein Timeout, kein Error. Symptom: pool.wait() returnt nie, ALB-Health bleibt rot.

Loesung: autocommit=True setzen BEVOR register_vector_async laeuft.

Das Pattern

# app/db.py
from pgvector.psycopg import register_vector_async
from psycopg_pool import AsyncConnectionPool
 
async def _configure(conn) -> None:
    """Pool-configure-Callback — laeuft pro neuer Connection."""
    # Reihenfolge ist hart:
    # 1) autocommit=True   → pgvector-internes pg_type-SELECT braucht das
    # 2) register_vector_async → registriert Python-list ↔ vector adapter
    # 3) SET-Statements pro Session
    await conn.set_autocommit(True)
    await register_vector_async(conn)
    async with conn.cursor() as cur:
        await cur.execute("SET hnsw.ef_search = 100")     # Recall-Win vs Default 40
        await cur.execute("SET search_path TO app, public")
 
 
def make_pool(settings) -> AsyncConnectionPool:
    return AsyncConnectionPool(
        conninfo=settings.pg_dsn,
        min_size=settings.pg_pool_min,
        max_size=settings.pg_pool_max,
        open=False,                # explizit, Default ist deprecated
        configure=_configure,
        max_lifetime=3600,
        max_idle=600,
        timeout=30,
        name="leistungskatalog-pool",
    )
 
 
async def open_and_wait(pool, wait_timeout: float = 30.0) -> None:
    """Lifespan: Pool oeffnen + auf min_size healthy Connections warten.
 
    `wait(timeout=...)` ist Pflicht — sonst haengt der Container forever wenn RDS
    nicht erreichbar ist. Lieber Fail-Fast + Container-Restart.
    """
    await pool.open()
    await pool.wait(timeout=wait_timeout)

FastAPI-Lifespan-Integration

# app/main.py — Auszug
async def _warmup_dependencies(app, state):
    pool = None
    try:
        pool = make_pool(settings)
        await open_and_wait(pool)
        # Erst NACH erfolgreichem wait() an state haengen — sonst sieht Teardown
        # einen halb-offenen Pool wenn Lifespan mitten in open() gecancelt wird.
        state.db_pool = pool
        state.db_pool_ready = True
    except Exception as exc:
        state.last_error = f"db_pool_init_failed: {exc}"
        if pool is not None:
            with contextlib.suppress(Exception):
                await pool.close()

Was zu vermeiden ist

Anti-PatternWarum schlecht
register_vector_async(conn) vor set_autocommit(True)haengt forever auf pg_type-SELECT
pool.wait() ohne timeout=bei RDS-Outage haengt Lifespan unendlich, kein Fail-Fast
state.db_pool = pool VOR await open_and_wait(pool)Teardown sieht halb-offenen Pool bei Cancel-Race
close_pool ohne try/exceptpropagiert exception → andere Teardown-Schritte laufen nicht
Vector-Werte als f-string '[0.1, 0.2]'::vector in SQLkillt Prepared-Statement-Cache, killt HNSW-Index-Hit
pool max_size > RDS max_connections / (workers × tasks)Connection-Exhaustion bei Skalierung

Search-Layer-Annahme

embedding_column wird mit psycopg.sql.Identifier parametrisiert — KEIN f-string. Vector-Parameter als list[float] an cur.execute(..., [vec]) uebergeben. pgvector-Adapter macht den Cast automatisch (deshalb register_vector_async Pflicht).

from psycopg import sql
 
base_query = sql.SQL("""
    SELECT id, name, 1 - ({emb_col} <=> %s::vector) AS score
    FROM {table_ref}
    WHERE {emb_col} IS NOT NULL
    ORDER BY {emb_col} <=> %s::vector
    LIMIT %s
""").format(
    emb_col=sql.Identifier(embedding_column),
    table_ref=sql.Identifier(schema, table),
)
await cur.execute(base_query, [query_vector, query_vector, top_k])

Tests

test_db_configure.py in a-icking — Mock-Connection, asserted die Aufruf-Reihenfolge (autocommit=True zuerst, dann register_vector, dann SET-Statements). Bei jedem Service der dieses Pattern uebernimmt: gleiche Tests einbauen.

Quellen

  • Audit-Run zur alten a-icking-Pipeline: performance-audit — Sektion 3 dokumentiert HNSW-ef_search-Wahl
  • Implementierung: ~/source/a-icking/inference-service/app/db.py
  • pgvector-python docs: register_vector_async source comment