|

Skalierung des Outbox Patterns mit PostgreSQL - Teil 2: Worker bändigen mit SKIP LOCKED

Willkommen zurück zu unserer Serie über die Skalierung des Transactional Outbox Patterns mit PostgreSQL. In Teil 1 haben wir das Dual-Write-Problem untersucht, das Outbox Pattern eingeführt und sind am Ende bei einem fiesen Nebenläufigkeits-Bug gelandet, als wir unsere Hintergrund-Worker skaliert haben.

Heute werden wir diesen Bug mithilfe der Zeilensperr-Funktionen von PostgreSQL, insbesondere SKIP LOCKED, beheben und besprechen, wann man Advisory Locks einsetzen sollte.

Das Nebenläufigkeitsproblem

Zur Erinnerung: Wir haben mehrere Worker-Instanzen, die jede Sekunde eine Abfrage wie diese ausführen:

SELECT * FROM outbox_events WHERE status = 'PENDING' ORDER BY created_at ASC LIMIT 50;

Da dies ein normales SELECT ist, werden die Zeilen nicht gesperrt. Wenn drei Worker diese Abfrage in derselben Millisekunde ausführen, erhalten sie alle exakt dieselben 50 Zeilen, verarbeiten sie und veröffentlichen doppelte Nachrichten an den Broker.

Row-Level Locking: FOR UPDATE

Der erste Schritt zur Behebung besteht darin, PostgreSQL mitzuteilen: “Hey, ich lese diese Zeilen und plane, sie bald zu aktualisieren, also lass niemanden sonst sie anfassen.” Das tun wir, indem wir FOR UPDATE hinzufügen.

const events = await db.query(
  "SELECT * FROM outbox_events WHERE status = 'PENDING' ORDER BY created_at ASC LIMIT 50 FOR UPDATE"
)

Dies verhindert die doppelte Verarbeitung! Wenn Worker A 50 Zeilen greift, wartet Worker B, bis Worker A seine Transaktion beendet hat, bevor er dieselben Zeilen lesen kann.

Aber warte… Worker B wird warten. Das ist furchtbar für den Durchsatz. Wenn Worker A 2 Sekunden braucht, um diese 50 Nachrichten zu veröffentlichen, sitzt Worker B nur da, tut nichts und hält eine Datenbankverbindung als Geisel. Wenn Worker A fertig ist, wacht Worker B auf, stellt fest, dass diese Zeilen nicht mehr PENDING sind, und muss erneut suchen.

Die Magie von SKIP LOCKED

Wir wollen nicht, dass Worker warten. Wir wollen, dass sie sich den nächsten verfügbaren Stapel Arbeit schnappen und loslegen. Hier kommt SKIP LOCKED ins Spiel.

const events = await db.query(
  "SELECT * FROM outbox_events WHERE status = 'PENDING' ORDER BY created_at ASC LIMIT 50 FOR UPDATE SKIP LOCKED"
)

Mit SKIP LOCKED prüft die Datenbank, ob eine Zeile bereits von einer anderen Transaktion gesperrt ist. Wenn ja, wird sie einfach ignoriert und zur nächsten Zeile übergegangen, bis 50 ungesperrte Zeilen gefunden wurden.

  • Worker A führt die Abfrage aus, sperrt die Zeilen 1-50 und beginnt mit der Verarbeitung.
  • Worker B führt die Abfrage eine Millisekunde später aus. Er sieht, dass die Zeilen 1-50 gesperrt sind, überspringt sie, sperrt die Zeilen 51-100 und beginnt mit der Verarbeitung.
  • Worker C führt die Abfrage aus, überspringt 1-100, sperrt 101-150.

Kein Warten. Maximaler Durchsatz. Pure Nebenläufigkeits-Glückseligkeit.

Implementierung mit SKIP LOCKED

So sieht unser robuster, nebenläufiger Poller aus:

async function processOutboxQueue() {
  const client = await pool.connect()
  
  try {
    // 1. Transaktion starten, um die Locks zu halten
    await client.query('BEGIN')
    
    // 2. Einen Stapel ungesperrter Zeilen greifen
    const result = await client.query(`
      SELECT * FROM outbox_events 
      WHERE status = 'PENDING' 
      ORDER BY created_at ASC 
      LIMIT 50 
      FOR UPDATE SKIP LOCKED
    `)
    
    if (result.rows.length === 0) {
      await client.query('COMMIT')
      return // Warteschlange ist leer, nichts zu tun
    }
    
    // 3. Den Stapel verarbeiten
    for (const event of result.rows) {
      await messageBroker.publish(event.event_type, event.payload)
      
      // 4. In derselben Transaktion als veröffentlicht markieren
      await client.query(
        "UPDATE outbox_events SET status = 'PUBLISHED' WHERE id = $1",
        [event.id]
      )
    }
    
    // 5. Commit gibt alle Locks frei
    await client.query('COMMIT')
    
  } catch (error) {
    // Wenn etwas fehlschlägt, gibt Rollback die Locks frei, damit ein anderer Worker es versuchen kann
    await client.query('ROLLBACK')
    console.error('Fehler beim Verarbeiten des Outbox-Stapels', error)
  } finally {
    client.release()
  }
}

// Regelmäßig ausführen
setInterval(processOutboxQueue, 1000)

Erweiterte Koordination: Advisory Locks

SKIP LOCKED ist fantastisch für eine universelle Warteschlange. Es garantiert jedoch keine strikte Reihenfolge.

Stell dir vor, ein Benutzer aktualisiert sein Profil zweimal in rascher Folge: ProfileUpdated (Name: John) und ProfileUpdated (Name: Johnny). Wenn Worker A das erste Ereignis greift und Worker B das zweite, könnte Worker B mit dem Veröffentlichen vor Worker A fertig sein. Der Message Broker empfängt den älteren Zustand zuletzt, was zu veralteten Daten im nachgelagerten System führt.

Wenn du eine strikte Reihenfolge pro Aggregat benötigst (z. B. müssen alle Ereignisse für Benutzer 123 nacheinander verarbeitet werden), reicht SKIP LOCKED nicht aus. Du benötigst PostgreSQL Advisory Locks.

Advisory Locks sind anwendungsdefinierte Sperren. Du kannst auf eine bestimmte ID sperren anstatt auf eine Zeile.

// Beispiel: Hashing einer UUID zu einem 32-Bit-Integer für die Lock-ID
const lockId = hashStringToInt(event.aggregate_id)

// Versuche, einen exklusiven Lock für dieses spezifische Aggregat zu bekommen
const lockResult = await client.query(
  'SELECT pg_try_advisory_xact_lock($1) as locked', 
  [lockId]
)

if (!lockResult.rows[0].locked) {
  // Ein anderer Worker verarbeitet derzeit Ereignisse für dieses Aggregat.
  // Wir müssen dieses Ereignis überspringen, um die Reihenfolge beizubehalten.
  return
}

Durch die Verwendung von Advisory Locks basierend auf der aggregate_id stellst du sicher, dass selbst bei 100 Worker-Instanzen immer nur eine einzige Instanz Ereignisse für einen bestimmten Benutzer zu einem bestimmten Zeitpunkt verarbeitet, was eine perfekte Reihenfolge garantiert.

Zusammenfassung von Teil 2

Wir haben unseren naiven Poller mithilfe von SKIP LOCKED in einen industrietauglichen, nebenläufigen Event-Prozessor verwandelt. Die Datenbank jede Sekunde abzufragen, erzeugt jedoch immer noch unnötige Last, wenn das System im Leerlauf ist.

In Teil 3 werden wir untersuchen, wie man aggressives Polling durch Echtzeit-Push-Benachrichtigungen mit LISTEN/NOTIFY von PostgreSQL ersetzt, und wir werden Strategien diskutieren, um unsere Outbox-Tabelle schlank und sauber zu halten.

Bleib dran und genieße jeden Schritt deiner Coding-Reise.