|

Scaling the Outbox Pattern with PostgreSQL - Part 2: Taming Workers with SKIP LOCKED

Welcome back to our series on scaling the Transactional Outbox Pattern with PostgreSQL. In Part 1, we explored the dual-write problem, introduced the Outbox Pattern, and ended up with a nasty concurrency bug when scaling our background workers.

Today, we’re going to fix that bug using PostgreSQL’s row-level locking capabilities, specifically SKIP LOCKED, and discuss when to bring in Advisory Locks.

The Concurrency Problem

Recap: We have multiple worker instances running a query like this every second:

SELECT * FROM outbox_events WHERE status = 'PENDING' ORDER BY created_at ASC LIMIT 50;

Because this is a standard SELECT, it doesn’t lock the rows. If three workers execute this query at the same millisecond, they all get the exact same 50 rows, process them, and publish duplicate messages to the broker.

Row-Level Locking: FOR UPDATE

The first step to fixing this is telling PostgreSQL, “Hey, I’m reading these rows, and I plan to update them soon, so don’t let anyone else touch them.” We do this by adding FOR UPDATE.

const events = await db.query(
  "SELECT * FROM outbox_events WHERE status = 'PENDING' ORDER BY created_at ASC LIMIT 50 FOR UPDATE"
)

This prevents duplicate processing! If Worker A grabs 50 rows, Worker B will wait for Worker A to finish its transaction before it can read those same rows.

But wait… Worker B will wait. That’s terrible for throughput. If Worker A takes 2 seconds to publish those 50 messages, Worker B is just sitting there doing nothing, holding a database connection hostage. When Worker A finishes, Worker B wakes up, realizes those rows are no longer PENDING, and has to search again.

The Magic of SKIP LOCKED

We don’t want workers to wait. We want them to grab the next available batch of work and go. Enter SKIP LOCKED.

const events = await db.query(
  "SELECT * FROM outbox_events WHERE status = 'PENDING' ORDER BY created_at ASC LIMIT 50 FOR UPDATE SKIP LOCKED"
)

With SKIP LOCKED, the database checks if a row is already locked by another transaction. If it is, it simply ignores it and moves on to the next row until it finds 50 unlocked rows.

  • Worker A executes the query, locks rows 1-50, and starts processing.
  • Worker B executes the query a millisecond later. It sees rows 1-50 are locked, skips them, locks rows 51-100, and starts processing.
  • Worker C executes the query, skips 1-100, locks 101-150.

No waiting. Maximum throughput. Pure concurrent bliss.

Implementation with SKIP LOCKED

Here is what our robust, concurrent poller looks like:

async function processOutboxQueue() {
  const client = await pool.connect()
  
  try {
    // 1. Start transaction to hold the locks
    await client.query('BEGIN')
    
    // 2. Grab a batch of unlocked rows
    const result = await client.query(`
      SELECT * FROM outbox_events 
      WHERE status = 'PENDING' 
      ORDER BY created_at ASC 
      LIMIT 50 
      FOR UPDATE SKIP LOCKED
    `)
    
    if (result.rows.length === 0) {
      await client.query('COMMIT')
      return // Queue is empty, nothing to do
    }
    
    // 3. Process the batch
    for (const event of result.rows) {
      await messageBroker.publish(event.event_type, event.payload)
      
      // 4. Mark as published in the same transaction
      await client.query(
        "UPDATE outbox_events SET status = 'PUBLISHED' WHERE id = $1",
        [event.id]
      )
    }
    
    // 5. Commit releases all locks
    await client.query('COMMIT')
    
  } catch (error) {
    // If anything fails, rollback releases locks so another worker can try
    await client.query('ROLLBACK')
    console.error('Failed to process outbox batch', error)
  } finally {
    client.release()
  }
}

// Run it periodically
setInterval(processOutboxQueue, 1000)

Advanced Coordination: Advisory Locks

SKIP LOCKED is fantastic for a general-purpose queue. However, it doesn’t guarantee strict ordering.

Imagine a user updates their profile twice in rapid succession: ProfileUpdated (Name: John) and ProfileUpdated (Name: Johnny). If Worker A grabs the first event and Worker B grabs the second, Worker B might finish publishing before Worker A. The message broker receives the older state last, resulting in stale data downstream.

If you need strict ordering per aggregate (e.g., all events for User 123 must be processed sequentially), SKIP LOCKED isn’t enough. You need PostgreSQL Advisory Locks.

Advisory locks are application-defined locks. You can lock on a specific ID instead of a row.

// Example: Hashing a UUID to a 32-bit integer for the lock ID
const lockId = hashStringToInt(event.aggregate_id)

// Try to get an exclusive lock for this specific aggregate
const lockResult = await client.query(
  'SELECT pg_try_advisory_xact_lock($1) as locked', 
  [lockId]
)

if (!lockResult.rows[0].locked) {
  // Another worker is currently processing events for this aggregate.
  // We must skip this event to maintain ordering.
  return
}

By using advisory locks based on the aggregate_id, you ensure that even with 100 worker instances, only one instance is ever processing events for a specific user at any given time, guaranteeing perfect order.

Wrapping up Part 2

We’ve turned our naive poller into an industrial-strength, concurrent event processor using SKIP LOCKED. However, polling the database every second still creates unnecessary load when the system is idle.

In Part 3, we will explore how to replace aggressive polling with real-time push notifications using PostgreSQL’s LISTEN/NOTIFY, and we’ll discuss strategies for keeping our outbox table lean and clean.

Keep pushing forward and savor every step of your coding journey.