RabbitMQ

RabbitMQ 是一个开源的轻量级消息代理,支持多种消息传递协议。它可以部署在分布式和联邦配置中,以满足高规模、高可用性要求。此外,它是部署最广泛的消息代理,在全球范围内被小型创业公司和大型企业使用。

安装

要开始构建基于 RabbitMQ 的微服务,首先安装所需的包:

$ npm i --save amqplib amqp-connection-manager

概述

要使用 RabbitMQ 传输器,请将以下选项对象传递给 createMicroservice() 方法:

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.RMQ,
  options: {
    urls: ['amqp://localhost:5672'],
    queue: 'cats_queue',
    queueOptions: {
      durable: false
    },
  },
});
提示

Transport 枚举从 @nestjs/microservices 包导入。

选项

options 属性特定于所选的传输器。RabbitMQ 传输器公开以下描述的属性。

urls 要按顺序尝试的连接 URL 数组
queue 服务器将监听的队列名称
prefetchCount 设置通道的预取计数
isGlobalPrefetchCount 启用每个通道的预取
noAck 如果 false,启用手动确认模式
consumerTag 服务器将用于区分消费者的消息传递的名称;不能已在通道上使用。通常更容易省略此选项,在这种情况下,服务器将创建一个随机名称并在回复中提供它。消费者标签标识符(了解更多 此处
queueOptions 其他队列选项(了解更多 此处
socketOptions 其他套接字选项(了解更多 此处
headers 随每条消息一起发送的标头
replyQueue 生产者的回复队列。默认为 amq.rabbitmq.reply-to
persistent 如果为真,消息将在代理重启后仍然存在,前提是它在也能在重启后仍然存在的队列中
noAssert 当为 false 时,队列在消费前不会被断言
wildcards 仅当您要使用主题交换来将消息路由到队列时才设置为 true。启用此功能将允许您使用通配符 (*, #) 作为消息和事件模式
exchange 交换的名称。当 "wildcards" 设置为 true 时,默认为队列名称
exchangeType 交换的类型。默认为 topic。有效值为 directfanouttopicheaders
routingKey 主题交换的附加路由键
maxConnectionAttempts 最大连接尝试次数。仅适用于消费者配置。-1 === 无限

客户端

与其他微服务传输器一样,您有 几种选项 来创建 RabbitMQ ClientProxy 实例。

创建实例的一种方法是使用 ClientsModule。要使用 ClientsModule 创建客户端实例,导入它并使用 register() 方法传递一个选项对象,该对象具有与上面 createMicroservice() 方法中显示的相同属性,以及用作注入令牌的 name 属性。了解更多关于 ClientsModule 的信息 此处

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'],
          queue: 'cats_queue',
          queueOptions: {
            durable: false
          },
        },
      },
    ]),
  ]
  ...
})

也可以使用其他选项创建客户端(ClientProxyFactory@Client())。您可以在 此处 了解它们。

上下文

在更复杂的场景中,您可能需要访问有关传入请求的其他信息。使用 RabbitMQ 传输器时,您可以访问 RmqContext 对象。

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(`Pattern: ${context.getPattern()}`);
}
提示

@Payload()@Ctx()RmqContext@nestjs/microservices 包导入。

要访问原始 RabbitMQ 消息(带有 propertiesfieldscontent),请使用 RmqContext 对象的 getMessage() 方法,如下所示:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage());
}

要检索对 RabbitMQ 通道 的引用,请使用 RmqContext 对象的 getChannelRef 方法,如下所示:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getChannelRef());
}

消息确认

为了确保消息永远不会丢失,RabbitMQ 支持 消息确认。消费者会发送确认信息,告诉 RabbitMQ 特定消息已被接收、处理,RabbitMQ 可以自由删除它。如果消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失)而没有发送确认,RabbitMQ 将理解消息未完全处理并将其重新排队。

要启用手动确认模式,请将 noAck 属性设置为 false

options: {
  urls: ['amqp://localhost:5672'],
  queue: 'cats_queue',
  noAck: false,
  queueOptions: {
    durable: false
  },
},

当手动消费者确认开启时,我们必须从工作者发送适当的确认信号,以表明我们已完成任务。

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();

  channel.ack(originalMsg);
}

记录构建器

要配置消息选项,您可以使用 RmqRecordBuilder 类(注意:这也适用于基于事件的流)。例如,要设置 headerspriority 属性,请使用 setOptions 方法,如下所示:

const message = ':cat:';
const record = new RmqRecordBuilder(message)
  .setOptions({
    headers: {
      ['x-version']: '1.0.0',
    },
    priority: 3,
  })
  .build();

this.client.send('replace-emoji', record).subscribe(...);
提示

RmqRecordBuilder 类从 @nestjs/microservices 包导出。

您也可以在服务器端通过访问 RmqContext 来读取这些值,如下所示:

@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
  const { properties: { headers } } = context.getMessage();
  return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}

实例状态更新

要获取连接和底层驱动程序实例状态的实时更新,您可以订阅 status 流。此流提供特定于所选驱动程序的状态更新。对于 RMQ 驱动程序,status 流会发出 connecteddisconnected 事件。

this.client.status.subscribe((status: RmqStatus) => {
  console.log(status);
});
提示

RmqStatus 类型从 @nestjs/microservices 包导入。

同样,您可以订阅服务器的 status 流以接收有关服务器状态的通知。

const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: RmqStatus) => {
  console.log(status);
});

监听 RabbitMQ 事件

在某些情况下,您可能希望监听微服务发出的内部事件。例如,您可以监听 error 事件以在发生错误时触发其他操作。要执行此操作,请使用 on() 方法,如下所示:

this.client.on('error', (err) => {
  console.error(err);
});

同样,您可以监听服务器的内部事件:

server.on<RmqEvents>('error', (err) => {
  console.error(err);
});
提示

RmqEvents 类型从 @nestjs/microservices 包导入。

底层驱动程序访问

对于更高级的用例,您可能需要访问底层驱动程序实例。这对于手动关闭连接或使用特定于驱动程序的方法等场景非常有用。但是,请记住,对于大多数情况,您 不需要 直接访问驱动程序。

要这样做,您可以使用 unwrap() 方法,该方法返回底层驱动程序实例。泛型类型参数应指定您期望的驱动程序实例类型。

const managerRef =
  this.client.unwrap<import('amqp-connection-manager').AmqpConnectionManager>();

同样,您可以访问服务器的底层驱动程序实例:

const managerRef =
  server.unwrap<import('amqp-connection-manager').AmqpConnectionManager>();

通配符

RabbitMQ 支持在路由键中使用通配符,以实现灵活的消息路由。# 通配符匹配零个或多个单词,而 * 通配符恰好匹配一个单词。

例如,路由键 cats.# 匹配 catscats.meowcats.meow.purr。路由键 cats.* 匹配 cats.meow 但不匹配 cats.meow.purr

要在 RabbitMQ 微服务中启用通配符支持,请在选项对象中将 wildcards 配置选项设置为 true

const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.RMQ,
    options: {
      urls: ['amqp://localhost:5672'],
      queue: 'cats_queue',
      wildcards: true,
    },
  },
);

使用此配置,您可以在订阅事件/消息时在路由键中使用通配符。例如,要监听路由键为 cats.# 的消息,您可以使用以下代码:

@MessagePattern('cats.#')
getCats(@Payload() data: { message: string }, @Ctx() context: RmqContext) {
  console.log(`Received message with routing key: ${context.getPattern()}`);

  return {
    message: 'Hello from the cats service!',
  }
}

要发送具有特定路由键的消息,您可以使用 ClientProxy 实例的 send() 方法:

this.client.send('cats.meow', { message: 'Meow!' }).subscribe((response) => {
  console.log(response);
});