Skalierung des Outbox Patterns mit PostgreSQL - Teil 1: Dem Dual-Write-Dilemma entkommen
Willkommen zum ersten Teil einer dreiteiligen Serie über den Aufbau und die Skalierung des Transactional Outbox Patterns mit PostgreSQL. In dieser Serie werden wir mit den grundlegenden Konzepten beginnen und uns schrittweise den Herausforderungen von Nebenläufigkeit, Skalierung und Echtzeit-Ereignisverarbeitung stellen.
Wenn du Microservices oder ereignisgesteuerte Systeme baust, bist du wahrscheinlich schon auf dieses Problem gestoßen: Wie speicherst du den Status in deiner Datenbank und veröffentlichst gleichzeitig ein Ereignis an einen Message Broker, ohne Daten zu verlieren? Lass uns eintauchen.
Das Dual-Write-Problem
Stell dir einen einfachen Benutzerregistrierungsablauf vor. Wenn sich ein Benutzer anmeldet, musst du zwei Dinge tun:
- Den neuen Benutzer in deiner PostgreSQL-Datenbank speichern.
- Ein
UserCreated-Ereignis an einen Message Broker (wie RabbitMQ oder Kafka) veröffentlichen, damit andere Dienste eine Willkommens-E-Mail senden oder Ressourcen bereitstellen können.
// Der naive Ansatz - GEFÄHRLICH!
async function registerUser(userData: UserData) {
// 1. In der Datenbank speichern
const user = await db.query(
'INSERT INTO users (email, name) VALUES ($1, $2) RETURNING *',
[userData.email, userData.name]
)
// 2. Ereignis an Message Broker veröffentlichen
await messageBroker.publish('UserCreated', user)
return user
}
Das nennt man einen “Dual-Write”. Und es ist eine Falle.
Was passiert, wenn Schritt 1 erfolgreich ist, Schritt 2 aber fehlschlägt, weil der Message Broker vorübergehend nicht erreichbar ist? Du hast jetzt einen Benutzer in deiner Datenbank, aber kein anderer Dienst weiß davon. Keine Willkommens-E-Mail, keine bereitgestellten Ressourcen. Inkonsistenter Zustand.
Wenn du die Reihenfolge tauschst, riskierst du, das Ereignis zu veröffentlichen, aber den Benutzer nicht in der Datenbank zu speichern. Gleiches Problem, andere Ausprägung.
Die Lösung: Das Transactional Outbox Pattern
Das Outbox Pattern löst dieses Problem, indem es eine einzige Datenbanktransaktion verwendet, um zu garantieren, dass sowohl die Domänendaten als auch das Ereignis gespeichert werden. Anstatt direkt an den Message Broker zu veröffentlichen, speicherst du das Ereignis in einer “Outbox”-Tabelle innerhalb exakt derselben Transaktion.
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
status VARCHAR(50) DEFAULT 'PENDING',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
Jetzt sieht deine Registrierungslogik so aus:
async function registerUser(userData: UserData) {
const client = await pool.connect()
try {
await client.query('BEGIN')
// 1. In der Datenbank speichern
const userResult = await client.query(
'INSERT INTO users (email, name) VALUES ($1, $2) RETURNING *',
[userData.email, userData.name]
)
const user = userResult.rows[0]
// 2. Ereignis in der Outbox-Tabelle speichern IN DERSELBEN TRANSAKTION
await client.query(
`INSERT INTO outbox_events
(aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4)`,
['User', user.id, 'UserCreated', JSON.stringify(user)]
)
await client.query('COMMIT')
return user
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
Wenn die Datenbanktransaktion committet wird, ist zu 100 % garantiert, dass das Ereignis sicher in der Tabelle outbox_events liegt und darauf wartet, verarbeitet zu werden.
Der naive Poller
Großartig, das Ereignis ist gespeichert. Aber wie kommt es zum Message Broker?
Du baust einen separaten Hintergrund-Worker (das “Relay” oder den “Poller”), der ständig die Tabelle outbox_events nach ausstehenden Ereignissen abfragt, sie veröffentlicht und als erledigt markiert.
async function startNaivePoller() {
setInterval(async () => {
const events = await db.query(
"SELECT * FROM outbox_events WHERE status = 'PENDING' ORDER BY created_at ASC LIMIT 50"
)
for (const event of events.rows) {
try {
// An Broker veröffentlichen
await messageBroker.publish(event.event_type, event.payload)
// Als veröffentlicht markieren
await db.query(
"UPDATE outbox_events SET status = 'PUBLISHED' WHERE id = $1",
[event.id]
)
} catch (err) {
console.error(`Fehler beim Verarbeiten von Ereignis ${event.id}`, err)
}
}
}, 1000) // Jede Sekunde abfragen
}
Der Flaschenhals
Dieses einfache Setup funktioniert perfekt für eine einfache Anwendung, die auf einer einzelnen Instanz läuft. Aber was passiert, wenn deine Anwendung wächst und du horizontal skalieren musst?
Du fährst drei Instanzen deines Dienstes hoch, und plötzlich laufen drei Poller gleichzeitig.
- Lock Contention: Alle drei Poller feuern die Abfrage
SELECT * FROM outbox_eventsexakt zur gleichen Zeit ab. - Doppelte Verarbeitung: Weil sie die Zeilen, die sie auswählen, nicht sperren, greifen alle drei Instanzen exakt dieselben 50 ausstehenden Ereignisse ab.
- Das Ergebnis: Dasselbe
UserCreated-Ereignis wird dreimal an deinen Message Broker veröffentlicht. Deine Benutzer erhalten drei Willkommens-E-Mails.
Um dies zu lösen, müssen wir die Zeilen, die wir lesen, sperren, damit andere Instanzen wissen, dass sie gerade bearbeitet werden. Aber wenn wir einfach ein normales SELECT ... FOR UPDATE verwenden, blockieren sich unsere Worker gegenseitig und töten unseren Durchsatz.
In Teil 2 werden wir untersuchen, wie man diesen Nebenläufigkeits-Albtraum mit den leistungsstarken Zeilensperr-Funktionen von PostgreSQL, insbesondere SKIP LOCKED, löst und wann man die schweren Geschütze mit Advisory Locks auffahren sollte.
Bleib dran und genieße jeden Schritt deiner Coding-Reise.
