Protobuf mit Rabbitmq in NodeJS integrieren - Teil III

Häufig besteht die Aufgabe darin, ein System zu entwickeln, das die Kommunikation zwischen Diensten mittels asynchroner Methoden ermöglicht. Es gibt verschiedene Wege, dies zu erreichen, aber für unsere Zwecke werden wir ein Warteschlangensystem verwenden.

Im dritten Teil unserer Serie zum Aufbau eines Kommunikationssystems zwischen Mikroservices mit Node.js, Protobuf und RabbitMQ konzentrieren wir uns darauf, alle Komponenten zu einem zusammenhängenden Projekt zu integrieren.

Lassen Sie uns damit beginnen, zwei Dateien zu erstellen: producer.ts und consumer.ts.

// src/producer.ts
import * as dotenv from 'dotenv';
import { 
  RabbitMQClient,
  RabbitMQClientOptions 
} from './rabbitmq-client';

dotenv.config();

const opts: RabbitMQClientOptions = {
  host: process.env.RABBITMQ_HOST || 'localhost',
  port: process.env.RABBITMQ_PORT 
    ? parseInt(process.env.RABBITMQ_PORT) 
    : 5672,
  user: process.env.RABBITMQ_USER || 'user',
  password: process.env.RABBITMQ_PASSWORD || 'password',
};

export const producer = (): RabbitMQClient => 
  new RabbitMQClient(opts);

const main = async () => {
  const client = producer();
  await client.connect();
}

main().catch(console.error);
// src/consumer.ts
import * as dotenv from 'dotenv';
import { 
  RabbitMQClient,
  RabbitMQClientOptions 
} from './rabbitmq-client';

dotenv.config();

const opts: RabbitMQClientOptions = {
  host: process.env.RABBITMQ_HOST || 'localhost',
  port: process.env.RABBITMQ_PORT 
    ? parseInt(process.env.RABBITMQ_PORT) 
    : 5672,
  user: process.env.RABBITMQ_USER || 'user',
  password: process.env.RABBITMQ_PASSWORD || 'password',
};

export const consumer = (): RabbitMQClient => 
  new RabbitMQClient(opts);

const main = async () => {
  const client = consumer();
  await client.connect();
}

main().catch(console.error);

Um eine einfache Anwendung auf unserem Rechner laufen zu lassen, verwenden wir am besten ein Docker-Compose-Setup. Bevor wir das tun, werden wir nodedemon für unseren Entwicklungsprozess verwenden. Installieren wir also die fehlenden Tools.

npm i -D nodemon

Zusätzlich fügen wir zu unseren Skripten in package.json zwei weitere Skripte hinzu, die für die Ausführung von Consumer und Producer verantwortlich sind.

  "scripts": {
    ...
    "start:producer": 
      "nodemon -r dotenv/config src/producer.ts",
    "start:consumer": 
      "nodemon -r dotenv/config src/consumer.ts"
  },

Jetzt können wir unsere Arbeit in der Datei docker-compose.yml fortsetzen.

services:
  rabbitmq:
    container_name: rabbitmq
    image: rabbitmq:3.12.9-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_DEFAULT_USER=user
      - RABBITMQ_DEFAULT_PASS=password
    volumes:
    - rabbitmq:/var/lib/rabbitmq/mnesia
    - ./.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie

volumes:
  rabbitmq:

Wir sollten in unserer Konfiguration einen Container für rabbitmq definieren, unsere Wahl ist rabbitmq:3.12.9-management. In dieser Konfiguration ist es erforderlich, bestimmte Werte wie RABBITMQ_DEFAULT_USER und RABBITMQ_DEFAULT_PASS zu definieren, die für den Betrieb von RabbitMQ erforderlich sind.

Fügen wir unserem Producer die Fähigkeit hinzu, tatsächlich eine Nachricht zu produzieren, die dann in der Warteschlange veröffentlicht wird.

// src/producer.ts
import { 
  DeviceInformationMessageSchema 
} from '../gen';
...
const main = async () => {
  ...
  onst info = {
    time: {
      seconds: BigInt(1630000000),
      nanos: 0,
    },
    mac: '00:00:00:00:00:00',
    name: 'device-name',
  };

  const message = create(
    DeviceInformationMessageSchema, 
    info
  );
  await client.publish(
    DeviceInformationMessageSchema, 
    message, 
    'device', 
    ''
  );
}

Ebenso sollten wir den anderen Teil des Konsums der Nachricht implementieren, da dies auch Teil der ganzen Idee der Implementierung eines Pub/Sub-Patterns mit RabbitMQ ist

// src/consumer.ts
...

const handler = (
  message: Message, 
  originalMessage?: GetMessage
) => {
  console.log('Received message');
};

const main = async () => {
  ...
 await client.consumeOne(
    'device',
    {
      DeviceInformationMessage: handler,
    },
    true,
  );
}

Der einzige Schritt, der noch fehlt, bevor wir mit der Nutzung und dem Testen unserer Demo beginnen, ist die Konfiguration von RabbitMQ. Gehen Sie also auf die Benutzeroberfläche von RabbitMQ und erstellen Sie einen Exchange mit dem Namen device, eine Queue mit dem Namen device und binden Sie sie mit einem leeren Routing-Schlüssel zusammen. Wenn Sie es verpasst haben, sollten Sie die RabbitMQ-Instanz starten, indem Sie ausführen:

docker compose up

Wir erweitern unsere Definition um zwei weitere Services, einen zum Konsumieren und einen zum Produzieren unter Verwendung eines Builds mit Docker für die lokale Entwicklung. Beide Container müssen Zugang zu RabbitMQ haben, also übergeben wir Umgebungsvariablen, die für die Verbindung verwendet werden können. Außerdem definieren wir, dass eine Abhängigkeit zwischen diesen Services und RabbitMQ besteht

services:
  ...
  producer:
    container_name: producer
    build:
      context: .
      dockerfile: Dockerfile
    volumes:
      - .:/app
    depends_on:
      - rabbitmq
    environment:
      RABBITMQ_HOST: rabbitmq
      RABBITMQ_PORT: 5672
      RABBITMQ_USER: user
      RABBITMQ_PASS: password
    entrypoint: ["npm", "run", "start:producer"]

  consumer:
    container_name: consumer
    build:
      context: .
      dockerfile: Dockerfile
    volumes:
      - .:/app
    depends_on:
      - rabbitmq
    environment:
      RABBITMQ_HOST: rabbitmq
      RABBITMQ_PORT: 5672
      RABBITMQ_USER: user
      RABBITMQ_PASS: password
    entrypoint: ["npm", "run", "start:consumer"]

Wie man sieht, gibt es einen Build-Kontext eines Docker mit einem Dockerfile, unten findet man, was darin definiert wurde

FROM node:20 as build

WORKDIR /app

COPY package*.json ./
RUN npm install
COPY . .

EXPOSE 3000

Was noch zu tun ist, ist den Befehl auszuführen, um auch die beiden neuen Dienste zum Laufen zu bringen.

docker compose up

Wie Sie vielleicht bemerkt haben, fehlen noch einige Funktionen, wie zum Beispiel langlaufende consume-Prozesse (da wir nur consumeOne implementiert haben) und die Möglichkeit, mehrere Nachrichten zu publishen. Diese Funktionen können auf den bestehenden Methoden wie consumeOne und publish aufgebaut werden, aber ich lasse diese Erweiterung denjenigen, die neugierig sind, das Projekt weiterzuentwickeln.

Das ist das Ende unserer Reise zur Integration von Protobuf mit RabbitMQ in Node.js anhand eines einfachen Projekts. Es war eine erstaunliche Reise mit vielen Konzepten und Integrationen, die uns die wahre Kraft der Kombination dieser Tools gezeigt hat. Auch wenn unsere Implementierung auf Node.js ausgerichtet war, können die Prinzipien leicht auf andere Ökosysteme übertragen werden.

Schreiten Sie weiter voran und genießen Sie jeden Schritt Ihrer Programmierreise.