|

Cron-Jobs mit PostgreSQL-Transaction-Locks skalieren

Hast du schon mal deine App auf mehrere Instanzen skaliert und plötzlich laufen deine Cron-Jobs mehrfach? Ja, das ist ein lustiger Bug zum Debuggen in Produktion. Lass mich dir zeigen, wie ich das mit PostgreSQL-Transaction-Locks gelöst habe.

Das Problem

Du hast einen Cron-Job, der jede Stunde läuft. Funktioniert super auf einem Server. Dann skalierst du auf 3 Instanzen für High Availability. Jetzt läuft der Job 3 Mal pro Stunde. Ups.

Vielleicht sendet er E-Mails (Benutzer bekommen 3 Kopien), verarbeitet Zahlungen (3-fache Abbuchung) oder generiert Reports (Ressourcenverschwendung). Nicht gut.

Die naive Lösung (die nicht funktioniert)

Erster Versuch: “Ich prüfe einfach, ob der Job schon läuft!”

const isRunning = await db.query(
  'SELECT * FROM job_status WHERE job_name = $1 AND status = $2',
  ['send-emails', 'running']
)

if (isRunning.rows.length > 0) {
  console.log('Job already running, skipping')
  return
}

await db.query(
  'INSERT INTO job_status (job_name, status) VALUES ($1, $2)',
  ['send-emails', 'running']
)

// Die eigentliche Arbeit
await sendEmails()

Sieht gut aus, oder? Falsch. Race Condition. Beide Instanzen prüfen gleichzeitig, beide sehen keinen laufenden Job, beide starten. Du hast das Problem gerade verschlimmert, weil du jetzt denkst, es ist behoben.

PostgreSQL Advisory Locks

PostgreSQL hat diese Sache namens Advisory Locks. Das sind Application-Level-Locks, mit denen du sagen kannst “nur ein Prozess kann diese Sache gleichzeitig machen.”

Hier ist die Magie:

import { Pool } from 'pg'

const pool = new Pool({
  connectionString: process.env.DATABASE_URL
})

async function runJobWithLock(jobName: string, jobFn: () => Promise<void>) {
  const client = await pool.connect()
  
  try {
    // Versuche einen Advisory Lock zu bekommen
    // pg_try_advisory_lock gibt true zurück wenn wir den Lock bekommen haben
    const lockId = hashJobName(jobName) // Job-Name in Zahl umwandeln
    const result = await client.query(
      'SELECT pg_try_advisory_lock($1) as locked',
      [lockId]
    )
    
    if (!result.rows[0].locked) {
      console.log(`Job ${jobName} läuft bereits auf anderer Instanz, überspringe`)
      return
    }
    
    console.log(`Lock für ${jobName} bekommen, führe Job aus`)
    await jobFn()
    
  } finally {
    // Lock immer freigeben
    await client.query('SELECT pg_advisory_unlock($1)', [lockId])
    client.release()
  }
}

function hashJobName(name: string): number {
  // Einfache Hash-Funktion um String in Zahl umzuwandeln
  let hash = 0
  for (let i = 0; i < name.length; i++) {
    hash = ((hash << 5) - hash) + name.charCodeAt(i)
    hash = hash & hash // In 32bit Integer umwandeln
  }
  return Math.abs(hash)
}

Jetzt verwenden:

// In deinem Cron-Job
cron.schedule('0 * * * *', async () => {
  await runJobWithLock('send-emails', async () => {
    await sendEmails()
  })
})

Wie es funktioniert

  1. Instanz A versucht den Lock zu bekommen - erfolgreich, startet den Job
  2. Instanz B versucht den Lock zu bekommen - fehlgeschlagen, überspringt den Job
  3. Instanz C versucht den Lock zu bekommen - fehlgeschlagen, überspringt den Job
  4. Instanz A beendet, gibt den Lock frei
  5. Nächste Stunde kann jede Instanz den Lock bekommen

Keine Race Conditions. Keine doppelten Jobs. Sauber.

Transaction-Level-Locks für mehr Kontrolle

Manchmal brauchst du feinere Kontrolle. Vielleicht willst du spezifische Zeilen sperren während du sie verarbeitest.

async function processOrders() {
  const client = await pool.connect()
  
  try {
    await client.query('BEGIN')
    
    // Zeilen für Update sperren - andere Transaktionen warten
    const result = await client.query(`
      SELECT * FROM orders 
      WHERE status = 'pending' 
      AND processed_at IS NULL
      FOR UPDATE SKIP LOCKED
      LIMIT 100
    `)
    
    for (const order of result.rows) {
      await processOrder(order)
      
      await client.query(
        'UPDATE orders SET status = $1, processed_at = NOW() WHERE id = $2',
        ['processed', order.id]
      )
    }
    
    await client.query('COMMIT')
    
  } catch (error) {
    await client.query('ROLLBACK')
    throw error
  } finally {
    client.release()
  }
}

Der FOR UPDATE SKIP LOCKED Teil ist der Schlüssel:

  • FOR UPDATE: Sperrt die Zeilen
  • SKIP LOCKED: Wenn eine andere Transaktion sie bereits gesperrt hat, überspringe sie statt zu warten

Das bedeutet, mehrere Instanzen können verschiedene Bestellungen gleichzeitig verarbeiten. Keine Konflikte, keine Duplikate.

Row-Level-Locking-Muster

Hier ist ein Muster, das ich für Queue-Verarbeitung verwende:

async function processQueue() {
  const client = await pool.connect()
  
  try {
    await client.query('BEGIN')
    
    // Nächstes Item holen, das nicht gesperrt ist
    const result = await client.query(`
      UPDATE queue_items
      SET 
        status = 'processing',
        locked_at = NOW(),
        locked_by = $1
      WHERE id = (
        SELECT id FROM queue_items
        WHERE status = 'pending'
        ORDER BY created_at ASC
        FOR UPDATE SKIP LOCKED
        LIMIT 1
      )
      RETURNING *
    `)
    
    if (result.rows.length === 0) {
      await client.query('COMMIT')
      return // Nichts zu verarbeiten
    }
    
    const item = result.rows[0]
    
    // Die Arbeit machen
    await processItem(item)
    
    // Als erledigt markieren
    await client.query(
      'UPDATE queue_items SET status = $1, completed_at = NOW() WHERE id = $2',
      ['completed', item.id]
    )
    
    await client.query('COMMIT')
    
  } catch (error) {
    await client.query('ROLLBACK')
    throw error
  } finally {
    client.release()
  }
}

Umgang mit hängenden Locks

Was wenn ein Prozess abstürzt während er einen Lock hält? Advisory Locks werden automatisch freigegeben wenn die Verbindung schließt. Aber Row-Locks in Transaktionen? Du brauchst ein Timeout.

// Statement-Timeout setzen
await client.query('SET statement_timeout = 30000') // 30 Sekunden

// Oder Lock-Timeout setzen
await client.query('SET lock_timeout = 5000') // 5 Sekunden

Für hängende Queue-Items, füge einen Cleanup-Job hinzu:

// Läuft alle 5 Minuten
cron.schedule('*/5 * * * *', async () => {
  await db.query(`
    UPDATE queue_items
    SET status = 'pending', locked_at = NULL, locked_by = NULL
    WHERE status = 'processing'
    AND locked_at < NOW() - INTERVAL '10 minutes'
  `)
})

Was ich gelernt habe

Advisory Locks für Job-Level-Locking verwenden: Ein Job läuft gleichzeitig über alle Instanzen.

Row-Level-Locks für Queue-Verarbeitung verwenden: Mehrere Instanzen können verschiedene Items gleichzeitig verarbeiten.

Immer SKIP LOCKED verwenden: Lass Instanzen nicht aufeinander warten.

Timeouts setzen: Verhindere, dass hängende Locks alles blockieren.

Hängende Items aufräumen: Hab einen separaten Job, der Items zurücksetzt, die hängen geblieben sind.

Mit mehreren Instanzen testen: Starte 3 Instanzen lokal und stelle sicher, dass Jobs nicht dupliziert werden.

Lock-Contention überwachen: Wenn du viele übersprungene Locks siehst, musst du vielleicht deinen Ansatz überdenken.

Fazit

Cron-Jobs zu skalieren ist knifflig. PostgreSQL gibt dir die Werkzeuge, um es richtig zu machen - Advisory Locks für Job-Level-Koordination, Row-Level-Locks für Queue-Verarbeitung.

Keine doppelten Jobs mehr. Keine Race Conditions mehr. Nur saubere, skalierbare Cron-Jobs.

Keep pushing forward and savor every step of your coding journey.