Integrate Protobuf with Rabbitmq in NodeJS - Part II

Often, the task at hand involves building a system that enables communication between services using asynchronous methods. There are several ways to accomplish this, but for our purposes, we’ll be using a queue system.

In the second part of our journey to build a communication system between microservices using Node.js, Protobuf, and RabbitMQ, we’ll focus on creating a RabbitMQ client tailored to our needs. We’ll use the amqplib library as the foundation for this implementation.

Let’s start by creating a file called rabbitmq-client.ts. What do we need to establish a functional client?

  • Authentication credentials, including a username and password.
  • The host where RabbitMQ is running.
  • The port number on which the host is listening.
  • The virtual host (vhost) that we will use for our connection.

With these requirements in mind, we can define the following interface for our connection.

// src/rabbitmq-client.ts
export interface RabbitMQClientOptions {
  user: string;
  password: string;
  host: string;
  port: number;
  vhost?: string;
}

Lets try to build a class to handle our client.

// src/rabbitmq-client.ts
...

export class RabbitMQClient {
  private connection: Connection | null;
  private channel: Channel | null;
  private host: string;
  private port: number;
  private user: string;
  private password: string;
  private vhost: string;

  constructor(opts: RabbitMQClientOptions) {
    const { host, port, user, password, vhost } = opts;
    this.host = host;
    this.port = port;
    this.user = user ?? null;
    this.password = password ?? null;
    this.vhost = vhost ?? '';
  }

  async connect() {
    try {
      if (!this.connection) {
        const connectionURI = createConnectionUri()
        console.log(`No RabbitMQ connection found`);
        this.connection = await connect(connectionURI);
      }

      console.log(`RabbitMQ Connection is ready`);
      this.channel 
        = await this.connection.createChannel();
    } catch (error) {
      console.error(
        'RabbitMQ connection error', 
        { error: error }
      );
      throw error;
    }
  }

  async getChannel(): Promise<Channel> {
    if (!this.channel) {
      await this.connect();
    }

    return this.channel;
  }

  async disconnect() {
    if (this.channel) {
      await this.channel.close();
      this.channel = null;
    }

    if (this.connection) {
      await this.connection.close();
      this.connection = null;
    }
  }

  private createConnectionUri() {
    // `amqp://${this.user}:${this.password}
    // @${this.host}:${this.port}
    // /${this.vhost}`
    return 
     `amqp://${this.user}:${this.password}
     @${this.host}:${this.port}
     /${this.vhost}`
  }
}

In our class implementation we have written three functions which are important to handle our client connections.

  • In connect we check if our connection does exist or not, if does not we use the amqp connection to connect. Also we create a channel which will be our source to publish or consume our message
  • In getChannel we check existance of channel, otherwise we create one by calling connect functionality
  • In disconnect we clos both channel and connection

Now, let’s focus on building the functionality to publish changes to RabbitMQ. What do we need to accomplish this?

  • A connection and a channel through which we will publish.
  • The message that we want to publish.
  • An exchange where the message should be published.
  • Optionally, a routing key to direct our message to the appropriate destination
// src/rabbitmq-client.ts
...
export class RabbitMQClient {
  ...

  async publish(
    message: any,
    exchange: string,
    routingKey: string | null = null
  ) {
    try {
      if (!this.channel) {
        await this.connect();
      }

      const ampMessage = Buffer.from(message);
      this.channel.publish(
        exchange, 
        routingKey || '', 
        ampMessage, 
        {
          contentType: 'application/json',
          deliveryMode: 2,
          type: schema.name,
        }
      );
    } catch (error) {
      console.error(
        'Error occurred while publishing a message', 
        {
          exchange: exchange,
          routingKey: routingKey,
        }
      );

      throw error;
    }
  } 
}

We will add link with our Message context of proto definitions, also we need to define the type with right conversion to Buffer.

// src/rabbitmq-client.ts
...
import type { 
  DescMessage, 
  Message
} from '@bufbuild/protobuf';
import { toBinary } from '@bufbuild/protobuf';
...
export class RabbitMQClient {
  ...

  async publish(
    schema: DescMessage,
    message: Message,
    exchange: string,
    routingKey: string | null = null,
  ) {
    try {
      if (!this.channel) {
        await this.connect();
      }

      const ampMessage = Buffer.from(
        toBinary(schema, message)
      );
      this.channel.publish(
        exchange, 
        routingKey || '', 
        ampMessage, 
        {
          contentType: 'application/json',
          deliveryMode: 2,
          type: schema.name,
        }
      );
    } catch (error) {
      console.error(
        'Error occurred while publishing a message', 
        {
          exchange: exchange,
          routingKey: routingKey,
        }
      );

      throw error;
    }
  } 
}

The missing piece of our client is to have a consumer functionality. Lets build also that taking into consideration that to build a handler interface.

// src/rabbitmq-client.ts
import { ..., GetMessage } from 'amqplib';

export interface HandlerInterface {
  (message: Message, originalMessage?: GetMessage): void;
}
...
export class RabbitMQClient {
  ...
 async consumeOne(
    queue: string,
    handlers: { [key: string]: HandlerInterface },
    autoAck: boolean = true,
  ) {
    if (!this.channel) {
      await this.connect();
    }

    await this.channel.prefetch(1);
    const message = await this.channel.get(queue);
    if (message) {
      console.debug('Processing message');
      try {
        const messageType = message.properties.type;
        const messageBody = message.content;
        if (
          typeof handlers[messageType] == 'undefined'
        ) {
          throw new Error(`No handler ${messageType}`);
        }

        const events = await import('../gen');
        const messageObject 
          = fromBinary(events[`${messageType}Schema`], messageBody);

        const handler = handlers[messageType];
        handler(messageObject, message);

        if (autoAck) {
          console.debug('Acknowledging the message');
          this.channel.ack(message);
        }
      } catch (error) {
        console.error(
          'Error occurred while processing message', 
          { error: error }
        );
        this.channel.nack(message, false, false);
      }
    }
  }
}

Lets go through our implementation:

  • const message = await this.channel.get(queue); this line of code is responsible to get the message from our queue using the channel which we need to process
  • const messageType = message.properties.type; this line of code is responsible to read the schema which the message was converted from and should be used to convert to
  • if (typeof handlers[messageType] == 'undefined') {} this line of code is responsible to do a check if message has a consumer to process information
  • const events = await import('../gen'); this line of code is responsible to get all needed information about our messages on runtime
  • const messageObject = fromBinary(events[messageType], messageBody); this line of code is responsible to convert message to an instance of message that we can consume from our published binary message
  • const handler = handlers[messageType]; this line of code is responsible to select the consumer or handler which has to process the message from our list of consumers
  • handler(messageObject, message); this line of code is responsible to execute the handler functionality
  • this.channel.ack(message); this line of code is responsible to auto acknowledge (autoAck) if in our function calls we have difined it, otherwise the consumer has to define the behaviour on itself what to do.

How can une the above, written functionality?

import { create } from '@bufbuild/protobuf';
import { GPSMessageSchema } from '../gen';
import { 
  RabbitMQClient, 
  RabbitMQClientOptions 
} from './rabbitmq-client';

const opts: RabbitMQClientOptions = {
  user: 'guest',
  password: 'guest',
  host: 'localhost',
  port: 5672,
  vhost: '/',
};

const rabbitMqClient = new RabbitMQClient(opts);

rabbitMqClient.connect().then(() => {
  console.log('Connected to RabbitMQ');

  const type = GPSMessageSchema;
  const message = create(GPSMessageSchema, {
    latitude: '123.123',
    longitude: '123.123',
  });

  rabbitMqClient.publish(
    GPSMessageSchema, 
    message, 
    'test-exchange', 
    'test-key'
  );
});

In the third part we will dive into the configuration of all these components to work together seamlessly in a simple project.

Keep pushing forward and savor every step of your coding journey.