|

Redis zu deinem Circuit Breaker in NestJS hinzufügen

Letzten Monat haben wir einen einfachen In-Memory Circuit Breaker gebaut. Funktioniert super… bis du auf mehrere Instanzen skalierst. Dann hat jede Instanz ihren eigenen Circuit-Zustand. Nicht ideal.

Lass uns das mit Redis beheben.

Das Problem mit In-Memory

Du hast 3 App-Instanzen. Externe API fängt an zu versagen.

  • Instanz 1: Öffnet Circuit nach 5 Fehlern
  • Instanz 2: Öffnet Circuit nach 5 Fehlern
  • Instanz 3: Öffnet Circuit nach 5 Fehlern

Das sind 15 fehlgeschlagene Anfragen, bevor alle Circuits offen sind. Außerdem, wenn der Circuit von Instanz 1 auf half-open geht und erfolgreich ist, wissen Instanzen 2 und 3 nichts davon.

Wir brauchen einen gemeinsamen Zustand. Enter Redis.

Redis hinzufügen

Zuerst Dependencies installieren:

npm install ioredis
npm install -D @types/ioredis

Erstelle ein Redis-Modul:

import { Module, Global } from '@nestjs/common'
import { ConfigService } from '@nestjs/config'
import Redis from 'ioredis'

export const REDIS_CLIENT = 'REDIS_CLIENT'

@Global()
@Module({
  providers: [
    {
      provide: REDIS_CLIENT,
      useFactory: (configService: ConfigService) => {
        return new Redis({
          host: configService.get('REDIS_HOST', 'localhost'),
          port: configService.get('REDIS_PORT', 6379),
          password: configService.get('REDIS_PASSWORD'),
          retryStrategy: (times) => {
            const delay = Math.min(times * 50, 2000)
            return delay
          }
        })
      },
      inject: [ConfigService]
    }
  ],
  exports: [REDIS_CLIENT]
})
export class RedisModule {}

Redis-basierter Circuit Breaker

Jetzt aktualisiere den Circuit Breaker, um Redis zu verwenden:

import { Injectable, Inject } from '@nestjs/common'
import Redis from 'ioredis'
import { REDIS_CLIENT } from './redis.module'

enum CircuitState {
  CLOSED = 'CLOSED',
  OPEN = 'OPEN',
  HALF_OPEN = 'HALF_OPEN'
}

interface CircuitConfig {
  failureThreshold: number
  successThreshold: number
  timeout: number
}

@Injectable()
export class CircuitBreakerService {
  private readonly defaultConfig: CircuitConfig = {
    failureThreshold: 5,
    successThreshold: 2,
    timeout: 60000
  }

  constructor(
    @Inject(REDIS_CLIENT) private readonly redis: Redis
  ) {}

  async execute<T>(
    key: string,
    fn: () => Promise<T>,
    config?: Partial<CircuitConfig>
  ): Promise<T> {
    const cfg = { ...this.defaultConfig, ...config }
    const state = await this.getState(key)

    if (state === CircuitState.OPEN) {
      const nextAttempt = await this.redis.get(`circuit:${key}:next_attempt`)
      
      if (nextAttempt && Date.now() < parseInt(nextAttempt)) {
        throw new Error(`Circuit breaker is OPEN for ${key}`)
      }

      // Timeout vorbei, versuche half-open
      await this.setState(key, CircuitState.HALF_OPEN)
      await this.redis.set(`circuit:${key}:successes`, 0)
    }

    try {
      const result = await fn()
      await this.onSuccess(key, cfg)
      return result
    } catch (error) {
      await this.onFailure(key, cfg)
      throw error
    }
  }

  private async getState(key: string): Promise<CircuitState> {
    const state = await this.redis.get(`circuit:${key}:state`)
    return (state as CircuitState) || CircuitState.CLOSED
  }

  private async setState(key: string, state: CircuitState) {
    await this.redis.set(`circuit:${key}:state`, state)
  }

  private async onSuccess(key: string, config: CircuitConfig) {
    const state = await this.getState(key)

    if (state === CircuitState.HALF_OPEN) {
      const successes = await this.redis.incr(`circuit:${key}:successes`)

      if (successes >= config.successThreshold) {
        // Erholt! Schließe den Circuit
        await this.setState(key, CircuitState.CLOSED)
        await this.redis.del(
          `circuit:${key}:failures`,
          `circuit:${key}:successes`,
          `circuit:${key}:next_attempt`
        )
      }
    } else {
      // Setze Fehlerzähler bei Erfolg zurück
      await this.redis.set(`circuit:${key}:failures`, 0)
    }
  }

  private async onFailure(key: string, config: CircuitConfig) {
    const failures = await this.redis.incr(`circuit:${key}:failures`)

    if (failures >= config.failureThreshold) {
      // Zu viele Fehler, öffne den Circuit
      await this.setState(key, CircuitState.OPEN)
      const nextAttempt = Date.now() + config.timeout
      await this.redis.set(`circuit:${key}:next_attempt`, nextAttempt)
      await this.redis.set(`circuit:${key}:successes`, 0)
    }
  }

  async reset(key: string) {
    await this.redis.del(
      `circuit:${key}:state`,
      `circuit:${key}:failures`,
      `circuit:${key}:successes`,
      `circuit:${key}:next_attempt`
    )
  }

  async getStats(key: string) {
    const [state, failures, successes, nextAttempt] = await Promise.all([
      this.redis.get(`circuit:${key}:state`),
      this.redis.get(`circuit:${key}:failures`),
      this.redis.get(`circuit:${key}:successes`),
      this.redis.get(`circuit:${key}:next_attempt`)
    ])

    return {
      state: state || CircuitState.CLOSED,
      failures: parseInt(failures || '0'),
      successes: parseInt(successes || '0'),
      nextAttempt: nextAttempt ? parseInt(nextAttempt) : null
    }
  }
}

Redis-Fehler behandeln

Hier ist die Sache: Was, wenn Redis ausfällt? Dein Circuit Breaker sollte deine App nicht kaputt machen.

Füge einen Fallback hinzu:

@Injectable()
export class CircuitBreakerService {
  private fallbackCircuits = new Map<string, any>()
  private redisAvailable = true

  constructor(
    @Inject(REDIS_CLIENT) private readonly redis: Redis
  ) {
    // Überwache Redis-Verbindung
    this.redis.on('error', () => {
      this.redisAvailable = false
    })

    this.redis.on('connect', () => {
      this.redisAvailable = true
    })
  }

  async execute<T>(
    key: string,
    fn: () => Promise<T>,
    config?: Partial<CircuitConfig>
  ): Promise<T> {
    // Wenn Redis down ist, nutze In-Memory-Fallback
    if (!this.redisAvailable) {
      return this.executeWithFallback(key, fn, config)
    }

    try {
      return await this.executeWithRedis(key, fn, config)
    } catch (error) {
      // Redis-Operation fehlgeschlagen, nutze Fallback
      if (error.message?.includes('Redis')) {
        return this.executeWithFallback(key, fn, config)
      }
      throw error
    }
  }

  private async executeWithRedis<T>(
    key: string,
    fn: () => Promise<T>,
    config?: Partial<CircuitConfig>
  ): Promise<T> {
    // Redis-Implementierung von oben
    // ...
  }

  private async executeWithFallback<T>(
    key: string,
    fn: () => Promise<T>,
    config?: Partial<CircuitConfig>
  ): Promise<T> {
    // Nutze In-Memory-Map als Fallback
    if (!this.fallbackCircuits.has(key)) {
      this.fallbackCircuits.set(key, {
        state: CircuitState.CLOSED,
        failures: 0,
        successes: 0,
        nextAttempt: 0
      })
    }

    const circuit = this.fallbackCircuits.get(key)
    const cfg = { ...this.defaultConfig, ...config }

    // Gleiche Logik wie In-Memory-Version
    if (circuit.state === CircuitState.OPEN) {
      if (Date.now() < circuit.nextAttempt) {
        throw new Error(`Circuit breaker is OPEN for ${key}`)
      }
      circuit.state = CircuitState.HALF_OPEN
      circuit.successes = 0
    }

    try {
      const result = await fn()
      this.onSuccessFallback(key, cfg)
      return result
    } catch (error) {
      this.onFailureFallback(key, cfg)
      throw error
    }
  }

  private onSuccessFallback(key: string, config: CircuitConfig) {
    const circuit = this.fallbackCircuits.get(key)

    if (circuit.state === CircuitState.HALF_OPEN) {
      circuit.successes++
      if (circuit.successes >= config.successThreshold) {
        circuit.state = CircuitState.CLOSED
        circuit.failures = 0
        circuit.successes = 0
      }
    } else {
      circuit.failures = 0
    }
  }

  private onFailureFallback(key: string, config: CircuitConfig) {
    const circuit = this.fallbackCircuits.get(key)
    circuit.failures++

    if (circuit.failures >= config.failureThreshold) {
      circuit.state = CircuitState.OPEN
      circuit.nextAttempt = Date.now() + config.timeout
      circuit.successes = 0
    }
  }
}

Circuit-Zustände überwachen

Füge einen Endpoint hinzu, um Circuit-Health zu prüfen:

@Controller('health')
export class HealthController {
  constructor(
    private readonly circuitBreaker: CircuitBreakerService
  ) {}

  @Get('circuits')
  async getCircuits() {
    const circuits = ['payment-api', 'email-service', 'analytics-api']
    
    const stats = await Promise.all(
      circuits.map(async (key) => ({
        key,
        ...(await this.circuitBreaker.getStats(key))
      }))
    )

    return stats
  }

  @Post('circuits/:key/reset')
  async resetCircuit(@Param('key') key: string) {
    await this.circuitBreaker.reset(key)
    return { message: `Circuit ${key} reset` }
  }
}

Was ich gelernt habe

Redis macht es verteilt: Alle Instanzen teilen den gleichen Circuit-Zustand.

Habe immer einen Fallback: Wenn Redis ausfällt, falle zurück auf In-Memory. Lass deinen Circuit Breaker nicht deine App kaputt machen.

Überwache Redis-Health: Tracke den Verbindungsstatus und wechsle automatisch zum Fallback.

Nutze Redis-Pipelining: Wenn du mehrere Werte holst, nutze Promise.all, um Redis-Aufrufe zu bündeln.

Setze TTLs auf Keys: Füge Ablaufzeiten zu Circuit-Keys hinzu, damit sie nicht ewig leben.

Teste Redis-Fehler: Simuliere Redis-Ausfälle in deinen Tests.

Fazit

Redis-basierte Circuit Breaker funktionieren über mehrere Instanzen hinweg. Aber wir können es besser machen - im nächsten Post machen wir es hybrid: nutze In-Memory bis Redis verbindet, dann synchronisiere den Zustand.

Keep pushing forward and savor every step of your coding journey.