psycopg3 AsyncConnectionPool + pgvector
Standard-Pattern fuer async-FastAPI-Services mit pgvector. Erstmals dokumentiert aus dem Icking-Rebuild — vorher gab es nur die sync-Variante in der alten a-icking-Pipeline. Wenn dieses Pattern in zukuenftigen Services landet (z.B. Becker, HeyJulia), hier nachziehen.
Das Problem
register_vector_async (aus pgvector.psycopg) macht intern ein SELECT typname FROM pg_type um den Vector-Type beim Connection-Adapter zu registrieren. In einer impliziten Transaktion (Default psycopg3-Verhalten) haengt dieses SELECT — Pool-Init dreht ewig, kein Timeout, kein Error. Symptom: pool.wait() returnt nie, ALB-Health bleibt rot.
Loesung: autocommit=True setzen BEVOR register_vector_async laeuft.
Das Pattern
# app/db.py
from pgvector.psycopg import register_vector_async
from psycopg_pool import AsyncConnectionPool
async def _configure(conn) -> None:
"""Pool-configure-Callback — laeuft pro neuer Connection."""
# Reihenfolge ist hart:
# 1) autocommit=True → pgvector-internes pg_type-SELECT braucht das
# 2) register_vector_async → registriert Python-list ↔ vector adapter
# 3) SET-Statements pro Session
await conn.set_autocommit(True)
await register_vector_async(conn)
async with conn.cursor() as cur:
await cur.execute("SET hnsw.ef_search = 100") # Recall-Win vs Default 40
await cur.execute("SET search_path TO app, public")
def make_pool(settings) -> AsyncConnectionPool:
return AsyncConnectionPool(
conninfo=settings.pg_dsn,
min_size=settings.pg_pool_min,
max_size=settings.pg_pool_max,
open=False, # explizit, Default ist deprecated
configure=_configure,
max_lifetime=3600,
max_idle=600,
timeout=30,
name="leistungskatalog-pool",
)
async def open_and_wait(pool, wait_timeout: float = 30.0) -> None:
"""Lifespan: Pool oeffnen + auf min_size healthy Connections warten.
`wait(timeout=...)` ist Pflicht — sonst haengt der Container forever wenn RDS
nicht erreichbar ist. Lieber Fail-Fast + Container-Restart.
"""
await pool.open()
await pool.wait(timeout=wait_timeout)FastAPI-Lifespan-Integration
# app/main.py — Auszug
async def _warmup_dependencies(app, state):
pool = None
try:
pool = make_pool(settings)
await open_and_wait(pool)
# Erst NACH erfolgreichem wait() an state haengen — sonst sieht Teardown
# einen halb-offenen Pool wenn Lifespan mitten in open() gecancelt wird.
state.db_pool = pool
state.db_pool_ready = True
except Exception as exc:
state.last_error = f"db_pool_init_failed: {exc}"
if pool is not None:
with contextlib.suppress(Exception):
await pool.close()Was zu vermeiden ist
| Anti-Pattern | Warum schlecht |
|---|---|
register_vector_async(conn) vor set_autocommit(True) | haengt forever auf pg_type-SELECT |
pool.wait() ohne timeout= | bei RDS-Outage haengt Lifespan unendlich, kein Fail-Fast |
state.db_pool = pool VOR await open_and_wait(pool) | Teardown sieht halb-offenen Pool bei Cancel-Race |
close_pool ohne try/except | propagiert exception → andere Teardown-Schritte laufen nicht |
Vector-Werte als f-string '[0.1, 0.2]'::vector in SQL | killt Prepared-Statement-Cache, killt HNSW-Index-Hit |
| pool max_size > RDS max_connections / (workers × tasks) | Connection-Exhaustion bei Skalierung |
Search-Layer-Annahme
embedding_column wird mit psycopg.sql.Identifier parametrisiert — KEIN f-string. Vector-Parameter als list[float] an cur.execute(..., [vec]) uebergeben. pgvector-Adapter macht den Cast automatisch (deshalb register_vector_async Pflicht).
from psycopg import sql
base_query = sql.SQL("""
SELECT id, name, 1 - ({emb_col} <=> %s::vector) AS score
FROM {table_ref}
WHERE {emb_col} IS NOT NULL
ORDER BY {emb_col} <=> %s::vector
LIMIT %s
""").format(
emb_col=sql.Identifier(embedding_column),
table_ref=sql.Identifier(schema, table),
)
await cur.execute(base_query, [query_vector, query_vector, top_k])Tests
test_db_configure.py in a-icking — Mock-Connection, asserted die Aufruf-Reihenfolge (autocommit=True zuerst, dann register_vector, dann SET-Statements). Bei jedem Service der dieses Pattern uebernimmt: gleiche Tests einbauen.
Quellen
- Audit-Run zur alten a-icking-Pipeline: performance-audit — Sektion 3 dokumentiert HNSW-
ef_search-Wahl - Implementierung:
~/source/a-icking/inference-service/app/db.py - pgvector-python docs:
register_vector_asyncsource comment