Stage-Idempotenz: UPSERT mit setWhere + pg-boss singletonKey
Pipeline-Stages (intake, extract, feasibility, calculate) duerfen pro (tenant, anfrage) maximal einmal erfolgreich durchlaufen. Erkenntnis aus dem bas-twin ce-review 2026-05-12: SELECT-then-INSERT war TOCTOU-anfaellig — bei parallelen Worker-Instanzen finden beide keine Row, beide INSERTen, einer scheitert am Unique-Constraint und der Job geht in den Retry.
Loesung (Defense-in-Depth, zwei Schichten)
Schicht 1 — atomare UPSERT auf pipeline_runs
const upsertResult = await db
.insert(pipelineRuns)
.values({
tenantId, id: newUlid(), anfrageId,
stage: 'intake',
status: 'running',
startedAt: new Date(),
inputHash,
attempts: 1,
})
.onConflictDoUpdate({
target: [pipelineRuns.tenantId, pipelineRuns.anfrageId, pipelineRuns.stage],
set: {
status: 'running',
startedAt: new Date(),
attempts: sql`${pipelineRuns.attempts} + 1`,
},
setWhere: sql`${pipelineRuns.status} <> 'success'`,
})
.returning({ id: pipelineRuns.id });
if (upsertResult.length === 0) {
log.info('already done — skipping (idempotent)');
return;
}
const runId = upsertResult[0]!.id;Trick: setWhere: sql\${pipelineRuns.status} <> ‘success’`blockt das UPDATE wenn der existierende Run schonsuccessist. Diereturning`-Liste ist dann leer — Stage skippt sich.
Schicht 2 — pg-boss singletonKey beim Enqueue
function singletonKey(stage: string, payload: { tenantId: string; anfrageId: string }): string {
return `${stage}:${payload.tenantId}:${payload.anfrageId}`;
}
export async function enqueueExtract(payload: ExtractPayload): Promise<string | null> {
return getQueue().send(JOB_NAMES.extract, payload, {
singletonKey: singletonKey('extract', payload),
});
}Dedupliziert auf Queue-Ebene, bevor der Job ueberhaupt gepickt wird. Defense-in-Depth zum DB-UPSERT.
Schema-Voraussetzungen
export const pipelineRuns = pgTable('pipeline_runs', {
// ...
}, (t) => [
uniqueIndex('pipeline_runs_tenant_anfrage_stage_uq').on(
t.tenantId, t.anfrageId, t.stage,
),
// ...
]);Der UQ-Index ist Pflicht — onConflictDoUpdate braucht ein Target. Ohne UQ-Index ist die Race-Condition wieder da.
Retry-Pattern fuer abhaengige Inserts (z.B. anfrage_extraktionen)
Wenn der Stage in einer Transaction Rows einfuegt, die selbst einen UQ-Constraint haben (z.B. (tenant, anfrage, position_index)), wuerde der Retry an dem UQ haengen. Loesung: DELETE-before-INSERT in derselben Transaction.
await db.transaction(async (tx) => {
await tx.delete(anfrageExtraktionen).where(
and(
eq(anfrageExtraktionen.tenantId, payload.tenantId),
eq(anfrageExtraktionen.anfrageId, payload.anfrageId),
),
);
// Dann die neuen Rows.
for (const pos of positions) {
await tx.insert(anfrageExtraktionen).values(positionToRow(...));
}
});Bei Fehler im LLM-Pfad vor dem DELETE: Transaction wurde gar nicht gestartet, alte Rows sind unangetastet.
Spezialfall: onConflictDoNothing + audit_log Konsistenz
Wenn ein Insert via onConflictDoNothing skipt (z.B. dateien bei gleichem sha256+s3Key), liefert die .returning() eine leere Liste. Dann muss man die existierende Row-ID per SELECT holen, damit der nachgelagerte audit-Eintrag konsistent ist:
const inserted = await db
.insert(dateien)
.values({...})
.onConflictDoNothing({ target: [...] })
.returning({ id: dateien.id });
let dateiId: string;
if (inserted[0]) {
dateiId = inserted[0].id;
} else {
const existing = await db.select({ id: dateien.id }).from(dateien).where(...).limit(1);
if (!existing[0]) throw new Error(`lookup failed for ${key}`);
dateiId = existing[0].id;
}
await audit(db, { entityId: dateiId, ... });Quellen
- bas-twin Implementation: intake.ts, extract.ts, queue.ts
- ce-review-Findings: P1-19 (TOCTOU via UPSERT), P1-30 (Audit-ULID Lookup-on-Conflict)
- Plan: 2026-05-12-001-sprint-1-foundation-review-fixes.md §Phase 1B
Wann nicht so machen
- Wenn du kein UQ-Constraint hast und auch keinen einfuehren willst — dann ist Idempotenz nur via Application-Lock moeglich (z.B. Redis-Lock). Schlechter aber manchmal noetig.
- Wenn die Stage hohe Schreibvolumen hat und der
DELETE-Schritt im Retry-Path expensive wird. Dann pro-RowonConflictDoUpdatemit setWhere statt DELETE-before-INSERT.
Related
- bas-twin Projekt-Index
- Code-Stil AI-optimiert — verwandt, gleicher Sprint