Protobuf mit Rabbitmq in NodeJS integrieren - Teil II

Häufig besteht die Aufgabe darin, ein System zu entwickeln, das die Kommunikation zwischen Diensten mittels asynchroner Methoden ermöglicht. Es gibt verschiedene Wege, dies zu erreichen, aber für unsere Zwecke werden wir ein Warteschlangensystem verwenden.

Im zweiten Teil unserer Reise, ein Kommunikationssystem zwischen Microservices mit Node.js, Protobuf und RabbitMQ aufzubauen, konzentrieren wir uns auf die Erstellung eines RabbitMQ-Clients, der auf unsere Bedürfnisse zugeschnitten ist. Wir verwenden die Bibliothek amqplib als Grundlage für diese Implementierung.

Beginnen wir damit, eine Datei namens rabbitmq-client.ts zu erstellen. Was benötigen wir, um einen funktionalen Client einzurichten?

  • Authentifizierungsdaten, also Benutzername und Passwort.
  • Den Host, auf dem RabbitMQ läuft.
  • Die Portnummer, auf dem der Host erreichbar ist.
  • Den Virtual Host (vhost), den wir für unsere Verbindung nutzen werden.

Mit diesen Anforderungen im Hinterkopf können wir das folgende Interface für unsere Verbindung definieren.

// src/rabbitmq-client.ts
export interface RabbitMQClientOptions {
  user: string;
  password: string;
  host: string;
  port: number;
  vhost?: string;
}

Lassen Sie uns versuchen, eine Klasse zu erstellen, die unseren Kunden behandelt.

// src/rabbitmq-client.ts
...

export class RabbitMQClient {
  private connection: Connection | null;
  private channel: Channel | null;
  private host: string;
  private port: number;
  private user: string;
  private password: string;
  private vhost: string;

  constructor(opts: RabbitMQClientOptions) {
    const { host, port, user, password, vhost } = opts;
    this.host = host;
    this.port = port;
    this.user = user ?? null;
    this.password = password ?? null;
    this.vhost = vhost ?? '';
  }

  async connect() {
    try {
      if (!this.connection) {
        const connectionURI = createConnectionUri()
        console.log(`No RabbitMQ connection found`);
        this.connection = await connect(connectionURI);
      }

      console.log(`RabbitMQ Connection is ready`);
      this.channel 
        = await this.connection.createChannel();
    } catch (error) {
      console.error(
        'RabbitMQ connection error', 
        { error: error }
      );
      throw error;
    }
  }

  async getChannel(): Promise<Channel> {
    if (!this.channel) {
      await this.connect();
    }

    return this.channel;
  }

  async disconnect() {
    if (this.channel) {
      await this.channel.close();
      this.channel = null;
    }

    if (this.connection) {
      await this.connection.close();
      this.connection = null;
    }
  }

  private createConnectionUri() {
    // `amqp://${this.user}:${this.password}
    // @${this.host}:${this.port}
    // /${this.vhost}`
    return 
     `amqp://${this.user}:${this.password}
     @${this.host}:${this.port}
     /${this.vhost}`
  }
}

In unserer Klassenimplementierung haben wir drei Funktionen geschrieben, die für die Handhabung unserer Client-Verbindungen wichtig sind.

  • In connect prüfen wir, ob unsere Verbindung existiert oder nicht, wenn nicht, benutzen wir die amqp Verbindung um uns zu verbinden. Außerdem erstellen wir einen Kanal, der unsere Quelle für die Veröffentlichung oder den Konsum unserer Nachricht sein wird
  • In getChannel prüfen wir, ob ein Kanal vorhanden ist, andernfalls erstellen wir einen, indem wir die Funktion connect aufrufen
  • In disconnect schließen wir sowohl Kanal als auch Verbindung

Nun wollen wir uns darauf konzentrieren, die Funktionalität zum Veröffentlichen von Änderungen in RabbitMQ zu erstellen. Was benötigen wir dafür?

  • Eine Verbindung und einen Kanal, über den wir veröffentlichen.
  • Die Nachricht, die wir veröffentlichen möchten.
  • Ein Exchange, in dem die Nachricht veröffentlicht werden soll.
  • Optional: Einen Routing-Key, um unsere Nachricht gezielt weiterzuleiten
// src/rabbitmq-client.ts
...
export class RabbitMQClient {
  ...

  async publish(
    message: any,
    exchange: string,
    routingKey: string | null = null
  ) {
    try {
      if (!this.channel) {
        await this.connect();
      }

      const ampMessage = Buffer.from(message);
      this.channel.publish(
        exchange, 
        routingKey || '', 
        ampMessage, 
        {
          contentType: 'application/json',
          deliveryMode: 2,
          type: schema.name,
        }
      );
    } catch (error) {
      console.error(
        'Error occurred while publishing a message', 
        {
          exchange: exchange,
          routingKey: routingKey,
        }
      );

      throw error;
    }
  } 
}

Wir werden den Link mit unserem „Message“-Kontext der Proto-Definitionen hinzufügen, außerdem müssen wir den Typ mit der richtigen Umwandlung in Buffer definieren.

// src/rabbitmq-client.ts
...
import type { 
  DescMessage, 
  Message
} from '@bufbuild/protobuf';
import { toBinary } from '@bufbuild/protobuf';
...
export class RabbitMQClient {
  ...

  async publish(
    schema: DescMessage,
    message: Message,
    exchange: string,
    routingKey: string | null = null,
  ) {
    try {
      if (!this.channel) {
        await this.connect();
      }

      const ampMessage = Buffer.from(
        toBinary(schema, message)
      );
      this.channel.publish(
        exchange, 
        routingKey || '', 
        ampMessage, 
        {
          contentType: 'application/json',
          deliveryMode: 2,
          type: schema.name,
        }
      );
    } catch (error) {
      console.error(
        'Error occurred while publishing a message', 
        {
          exchange: exchange,
          routingKey: routingKey,
        }
      );

      throw error;
    }
  } 
}

Der fehlende Teil unseres Clients ist eine Konsumentenfunktionalität. Lassen Sie uns auch das in Betracht ziehen, um eine Handler-Schnittstelle zu bauen.

// src/rabbitmq-client.ts
import { ..., GetMessage } from 'amqplib';

export interface HandlerInterface {
  (message: Message, originalMessage?: GetMessage): void;
}
...
export class RabbitMQClient {
  ...
 async consumeOne(
    queue: string,
    handlers: { [key: string]: HandlerInterface },
    autoAck: boolean = true,
  ) {
    if (!this.channel) {
      await this.connect();
    }

    await this.channel.prefetch(1);
    const message = await this.channel.get(queue);
    if (message) {
      console.debug('Processing message');
      try {
        const messageType = message.properties.type;
        const messageBody = message.content;
        if (
          typeof handlers[messageType] == 'undefined'
        ) {
          throw new Error(`No handler ${messageType}`);
        }

        const events = await import('../gen');
        const messageObject 
          = fromBinary(events[`${messageType}Schema`], messageBody);

        const handler = handlers[messageType];
        handler(messageObject, message);

        if (autoAck) {
          console.debug('Acknowledging the message');
          this.channel.ack(message);
        }
      } catch (error) {
        console.error(
          'Error occurred while processing message', 
          { error: error }
        );
        this.channel.nack(message, false, false);
      }
    }
  }
}

Gehen wir unsere Implementierung durch:

  • const message = await this.channel.get(queue); diese Codezeile ist dafür verantwortlich, die Nachricht aus unserer Warteschlange über den Kanal zu holen, die wir verarbeiten müssen
  • const messageType = message.properties.type; diese Codezeile ist dafür verantwortlich, das Schema zu lesen, aus dem die Nachricht konvertiert wurde und das für die Konvertierung verwendet werden soll
  • if (typeof handlers[messageType] == 'undefined') {} diese Codezeile ist dafür verantwortlich, zu prüfen, ob die Nachricht einen Verbraucher hat, um Informationen zu verarbeiten
  • const events = await import('../gen'); diese Code-Zeile ist dafür verantwortlich, alle benötigten Informationen über unsere Nachrichten zur Laufzeit zu erhalten
  • const messageObject = fromBinary(events[messageType], messageBody); diese Codezeile ist für die Umwandlung der Nachricht in eine Instanz der Nachricht verantwortlich, die wir von unserer veröffentlichten binären Nachricht konsumieren können
  • const handler = handlers[messageType];` diese Codezeile ist dafür verantwortlich, den Verbraucher oder Handler, der die Nachricht verarbeiten soll, aus unserer Liste der Verbraucher auszuwählen
  • handler(messageObject, message);` diese Codezeile ist für die Ausführung der Handler-Funktionalität verantwortlich
  • this.channel.ack(message);` diese Codezeile ist für die automatische Bestätigung (autoAck) verantwortlich, wenn wir dies in unseren Funktionsaufrufen festgelegt haben, andernfalls muss der Verbraucher das Verhalten selbst definieren.

Wie kann die oben beschriebene Funktionalität nicht funktionieren?

import { create } from '@bufbuild/protobuf';
import { GPSMessageSchema } from '../gen';
import { 
  RabbitMQClient, 
  RabbitMQClientOptions 
} from './rabbitmq-client';

const opts: RabbitMQClientOptions = {
  user: 'guest',
  password: 'guest',
  host: 'localhost',
  port: 5672,
  vhost: '/',
};

const rabbitMqClient = new RabbitMQClient(opts);

rabbitMqClient.connect().then(() => {
  console.log('Connected to RabbitMQ');

  const type = GPSMessageSchema;
  const message = create(GPSMessageSchema, {
    latitude: '123.123',
    longitude: '123.123',
  });

  rabbitMqClient.publish(
    GPSMessageSchema, 
    message, 
    'test-exchange', 
    'test-key'
  );
});

Im dritten Teil werden wir uns mit der Konfiguration all dieser Komponenten beschäftigen, damit sie in einem einfachen Projekt nahtlos zusammenarbeiten.

Schreiten Sie weiter voran und genießen Sie jeden Schritt Ihrer Programmierreise.