Stage-Idempotenz: UPSERT mit setWhere + pg-boss singletonKey

Pipeline-Stages (intake, extract, feasibility, calculate) duerfen pro (tenant, anfrage) maximal einmal erfolgreich durchlaufen. Erkenntnis aus dem bas-twin ce-review 2026-05-12: SELECT-then-INSERT war TOCTOU-anfaellig — bei parallelen Worker-Instanzen finden beide keine Row, beide INSERTen, einer scheitert am Unique-Constraint und der Job geht in den Retry.

Loesung (Defense-in-Depth, zwei Schichten)

Schicht 1 — atomare UPSERT auf pipeline_runs

const upsertResult = await db
  .insert(pipelineRuns)
  .values({
    tenantId, id: newUlid(), anfrageId,
    stage: 'intake',
    status: 'running',
    startedAt: new Date(),
    inputHash,
    attempts: 1,
  })
  .onConflictDoUpdate({
    target: [pipelineRuns.tenantId, pipelineRuns.anfrageId, pipelineRuns.stage],
    set: {
      status: 'running',
      startedAt: new Date(),
      attempts: sql`${pipelineRuns.attempts} + 1`,
    },
    setWhere: sql`${pipelineRuns.status} <> 'success'`,
  })
  .returning({ id: pipelineRuns.id });
 
if (upsertResult.length === 0) {
  log.info('already done — skipping (idempotent)');
  return;
}
const runId = upsertResult[0]!.id;

Trick: setWhere: sql\${pipelineRuns.status} <> ‘success’`blockt das UPDATE wenn der existierende Run schonsuccessist. Diereturning`-Liste ist dann leer — Stage skippt sich.

Schicht 2 — pg-boss singletonKey beim Enqueue

function singletonKey(stage: string, payload: { tenantId: string; anfrageId: string }): string {
  return `${stage}:${payload.tenantId}:${payload.anfrageId}`;
}
 
export async function enqueueExtract(payload: ExtractPayload): Promise<string | null> {
  return getQueue().send(JOB_NAMES.extract, payload, {
    singletonKey: singletonKey('extract', payload),
  });
}

Dedupliziert auf Queue-Ebene, bevor der Job ueberhaupt gepickt wird. Defense-in-Depth zum DB-UPSERT.

Schema-Voraussetzungen

export const pipelineRuns = pgTable('pipeline_runs', {
  // ...
}, (t) => [
  uniqueIndex('pipeline_runs_tenant_anfrage_stage_uq').on(
    t.tenantId, t.anfrageId, t.stage,
  ),
  // ...
]);

Der UQ-Index ist Pflicht — onConflictDoUpdate braucht ein Target. Ohne UQ-Index ist die Race-Condition wieder da.

Retry-Pattern fuer abhaengige Inserts (z.B. anfrage_extraktionen)

Wenn der Stage in einer Transaction Rows einfuegt, die selbst einen UQ-Constraint haben (z.B. (tenant, anfrage, position_index)), wuerde der Retry an dem UQ haengen. Loesung: DELETE-before-INSERT in derselben Transaction.

await db.transaction(async (tx) => {
  await tx.delete(anfrageExtraktionen).where(
    and(
      eq(anfrageExtraktionen.tenantId, payload.tenantId),
      eq(anfrageExtraktionen.anfrageId, payload.anfrageId),
    ),
  );
  // Dann die neuen Rows.
  for (const pos of positions) {
    await tx.insert(anfrageExtraktionen).values(positionToRow(...));
  }
});

Bei Fehler im LLM-Pfad vor dem DELETE: Transaction wurde gar nicht gestartet, alte Rows sind unangetastet.

Spezialfall: onConflictDoNothing + audit_log Konsistenz

Wenn ein Insert via onConflictDoNothing skipt (z.B. dateien bei gleichem sha256+s3Key), liefert die .returning() eine leere Liste. Dann muss man die existierende Row-ID per SELECT holen, damit der nachgelagerte audit-Eintrag konsistent ist:

const inserted = await db
  .insert(dateien)
  .values({...})
  .onConflictDoNothing({ target: [...] })
  .returning({ id: dateien.id });
 
let dateiId: string;
if (inserted[0]) {
  dateiId = inserted[0].id;
} else {
  const existing = await db.select({ id: dateien.id }).from(dateien).where(...).limit(1);
  if (!existing[0]) throw new Error(`lookup failed for ${key}`);
  dateiId = existing[0].id;
}
 
await audit(db, { entityId: dateiId, ... });

Quellen

Wann nicht so machen

  • Wenn du kein UQ-Constraint hast und auch keinen einfuehren willst — dann ist Idempotenz nur via Application-Lock moeglich (z.B. Redis-Lock). Schlechter aber manchmal noetig.
  • Wenn die Stage hohe Schreibvolumen hat und der DELETE-Schritt im Retry-Path expensive wird. Dann pro-Row onConflictDoUpdate mit setWhere statt DELETE-before-INSERT.