Cron-Jobs mit PostgreSQL-Transaction-Locks skalieren
Hast du schon mal deine App auf mehrere Instanzen skaliert und plötzlich laufen deine Cron-Jobs mehrfach? Ja, das ist ein lustiger Bug zum Debuggen in Produktion. Lass mich dir zeigen, wie ich das mit PostgreSQL-Transaction-Locks gelöst habe.
Das Problem
Du hast einen Cron-Job, der jede Stunde läuft. Funktioniert super auf einem Server. Dann skalierst du auf 3 Instanzen für High Availability. Jetzt läuft der Job 3 Mal pro Stunde. Ups.
Vielleicht sendet er E-Mails (Benutzer bekommen 3 Kopien), verarbeitet Zahlungen (3-fache Abbuchung) oder generiert Reports (Ressourcenverschwendung). Nicht gut.
Die naive Lösung (die nicht funktioniert)
Erster Versuch: “Ich prüfe einfach, ob der Job schon läuft!”
const isRunning = await db.query(
'SELECT * FROM job_status WHERE job_name = $1 AND status = $2',
['send-emails', 'running']
)
if (isRunning.rows.length > 0) {
console.log('Job already running, skipping')
return
}
await db.query(
'INSERT INTO job_status (job_name, status) VALUES ($1, $2)',
['send-emails', 'running']
)
// Die eigentliche Arbeit
await sendEmails()
Sieht gut aus, oder? Falsch. Race Condition. Beide Instanzen prüfen gleichzeitig, beide sehen keinen laufenden Job, beide starten. Du hast das Problem gerade verschlimmert, weil du jetzt denkst, es ist behoben.
PostgreSQL Advisory Locks
PostgreSQL hat diese Sache namens Advisory Locks. Das sind Application-Level-Locks, mit denen du sagen kannst “nur ein Prozess kann diese Sache gleichzeitig machen.”
Hier ist die Magie:
import { Pool } from 'pg'
const pool = new Pool({
connectionString: process.env.DATABASE_URL
})
async function runJobWithLock(jobName: string, jobFn: () => Promise<void>) {
const client = await pool.connect()
try {
// Versuche einen Advisory Lock zu bekommen
// pg_try_advisory_lock gibt true zurück wenn wir den Lock bekommen haben
const lockId = hashJobName(jobName) // Job-Name in Zahl umwandeln
const result = await client.query(
'SELECT pg_try_advisory_lock($1) as locked',
[lockId]
)
if (!result.rows[0].locked) {
console.log(`Job ${jobName} läuft bereits auf anderer Instanz, überspringe`)
return
}
console.log(`Lock für ${jobName} bekommen, führe Job aus`)
await jobFn()
} finally {
// Lock immer freigeben
await client.query('SELECT pg_advisory_unlock($1)', [lockId])
client.release()
}
}
function hashJobName(name: string): number {
// Einfache Hash-Funktion um String in Zahl umzuwandeln
let hash = 0
for (let i = 0; i < name.length; i++) {
hash = ((hash << 5) - hash) + name.charCodeAt(i)
hash = hash & hash // In 32bit Integer umwandeln
}
return Math.abs(hash)
}
Jetzt verwenden:
// In deinem Cron-Job
cron.schedule('0 * * * *', async () => {
await runJobWithLock('send-emails', async () => {
await sendEmails()
})
})
Wie es funktioniert
- Instanz A versucht den Lock zu bekommen - erfolgreich, startet den Job
- Instanz B versucht den Lock zu bekommen - fehlgeschlagen, überspringt den Job
- Instanz C versucht den Lock zu bekommen - fehlgeschlagen, überspringt den Job
- Instanz A beendet, gibt den Lock frei
- Nächste Stunde kann jede Instanz den Lock bekommen
Keine Race Conditions. Keine doppelten Jobs. Sauber.
Transaction-Level-Locks für mehr Kontrolle
Manchmal brauchst du feinere Kontrolle. Vielleicht willst du spezifische Zeilen sperren während du sie verarbeitest.
async function processOrders() {
const client = await pool.connect()
try {
await client.query('BEGIN')
// Zeilen für Update sperren - andere Transaktionen warten
const result = await client.query(`
SELECT * FROM orders
WHERE status = 'pending'
AND processed_at IS NULL
FOR UPDATE SKIP LOCKED
LIMIT 100
`)
for (const order of result.rows) {
await processOrder(order)
await client.query(
'UPDATE orders SET status = $1, processed_at = NOW() WHERE id = $2',
['processed', order.id]
)
}
await client.query('COMMIT')
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
Der FOR UPDATE SKIP LOCKED Teil ist der Schlüssel:
FOR UPDATE: Sperrt die ZeilenSKIP LOCKED: Wenn eine andere Transaktion sie bereits gesperrt hat, überspringe sie statt zu warten
Das bedeutet, mehrere Instanzen können verschiedene Bestellungen gleichzeitig verarbeiten. Keine Konflikte, keine Duplikate.
Row-Level-Locking-Muster
Hier ist ein Muster, das ich für Queue-Verarbeitung verwende:
async function processQueue() {
const client = await pool.connect()
try {
await client.query('BEGIN')
// Nächstes Item holen, das nicht gesperrt ist
const result = await client.query(`
UPDATE queue_items
SET
status = 'processing',
locked_at = NOW(),
locked_by = $1
WHERE id = (
SELECT id FROM queue_items
WHERE status = 'pending'
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *
`)
if (result.rows.length === 0) {
await client.query('COMMIT')
return // Nichts zu verarbeiten
}
const item = result.rows[0]
// Die Arbeit machen
await processItem(item)
// Als erledigt markieren
await client.query(
'UPDATE queue_items SET status = $1, completed_at = NOW() WHERE id = $2',
['completed', item.id]
)
await client.query('COMMIT')
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
Umgang mit hängenden Locks
Was wenn ein Prozess abstürzt während er einen Lock hält? Advisory Locks werden automatisch freigegeben wenn die Verbindung schließt. Aber Row-Locks in Transaktionen? Du brauchst ein Timeout.
// Statement-Timeout setzen
await client.query('SET statement_timeout = 30000') // 30 Sekunden
// Oder Lock-Timeout setzen
await client.query('SET lock_timeout = 5000') // 5 Sekunden
Für hängende Queue-Items, füge einen Cleanup-Job hinzu:
// Läuft alle 5 Minuten
cron.schedule('*/5 * * * *', async () => {
await db.query(`
UPDATE queue_items
SET status = 'pending', locked_at = NULL, locked_by = NULL
WHERE status = 'processing'
AND locked_at < NOW() - INTERVAL '10 minutes'
`)
})
Was ich gelernt habe
Advisory Locks für Job-Level-Locking verwenden: Ein Job läuft gleichzeitig über alle Instanzen.
Row-Level-Locks für Queue-Verarbeitung verwenden: Mehrere Instanzen können verschiedene Items gleichzeitig verarbeiten.
Immer SKIP LOCKED verwenden: Lass Instanzen nicht aufeinander warten.
Timeouts setzen: Verhindere, dass hängende Locks alles blockieren.
Hängende Items aufräumen: Hab einen separaten Job, der Items zurücksetzt, die hängen geblieben sind.
Mit mehreren Instanzen testen: Starte 3 Instanzen lokal und stelle sicher, dass Jobs nicht dupliziert werden.
Lock-Contention überwachen: Wenn du viele übersprungene Locks siehst, musst du vielleicht deinen Ansatz überdenken.
Fazit
Cron-Jobs zu skalieren ist knifflig. PostgreSQL gibt dir die Werkzeuge, um es richtig zu machen - Advisory Locks für Job-Level-Koordination, Row-Level-Locks für Queue-Verarbeitung.
Keine doppelten Jobs mehr. Keine Race Conditions mehr. Nur saubere, skalierbare Cron-Jobs.
Keep pushing forward and savor every step of your coding journey.
