微服务之 MQTT 说明

一、安装依赖

nestjs 官方自带 mqttClient,需要安装相关依赖包。

npm install @nestjs/microservices mqtt --save

在 NestJS 中,有两种不同类型的传输方式:

  1. 基于代理的:这一类包括 NATS,以及其他选项如 RedisRabbitMQMQTTKafka
  2. 点对点的:这一组包括 TCP 和 gRPC 通信机制。
微服务控制器类型
  • 代理控制器:@EventPattern
  • 点对点控制器:@MessagePattern

二、引入配置

可以在 main.ts 入口文件引入配置,有两种方式(二选一),第一种是在原服务的基础上增加 mqtt 微服务,第二张是将服务作为独立的 mqtt 微服务使用。

第一种:增加微服务
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport, MqttStatus } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  // 重复使用connectMicroservice,可以注册多个微服务
  const server = app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.MQTT,
    options: {
      url: 'mqtt://emqx.microgridcloud.com:1883',
      clientId: 'nestjs'
    },
  });
  server.status.subscribe((status: MqttStatus) => {
    console.log('MQTT连接状态:', status);
  });
  // 启动应用中所有已注册的微服务实例
  app.startAllMicroservices();
  app.listen(3000);
}
bootstrap();
第二种:独立微服务
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
	transport: Transport.MQTT,
    options: {
      url: 'mqtt://emqx.microgridcloud.com:1883',
      clientId: 'nestjs'
    },
  });
  app.listen();
}
bootstrap();

三、订阅和发布消息

nestjs 官方提供的 mqttClient 发布和订阅分别对应两个 MQTT 客户端,clientID 不填时,默认系统会默认生成。

  • 订阅消息

在 app.controller.ts 文件配置订阅消息

import { Controller } from '@nestjs/common';
import { Ctx, EventPattern, MessagePattern, MqttContext, Payload } from '@nestjs/microservices';

@Controller()
export class AppController {

  // 处理消息,并响应请求,对应send方法
  @MessagePattern('sensors/+/temperature/+')
  getTemperature(@Ctx() context: MqttContext, @Payload() payload: any) {
    // 输出示例:Topic: sensors/000/temperature/1
    console.log(`Topic: ${context.getTopic()}`);
    // 输出示例:Packet {cmd: 'public', retain: false, qos: 0, topic: 'test', payload: <Buffer 65 65>}
    console.log(context.getPacket())
   // 输出示例:将payload转为string输出,适合字符或json传输;如需原始值请用context.getPacket().payload
   console.log(payload) // 在使用官方mqttClient发布消息时,默认自动封装额外数据,@Payload() payload会自动去除额外的数据
   // 响应this.client.send()请求
   return { message: '收到消息' }
  }

  // 只处理消息,不响应请求,对应emit方法
  @EventPattern('sensors/+/temperature/+')
  async handleEvent(@Ctx() context: MqttContext, @Payload() data: any) {
    console.log(context.getPacket());
    console.log(context.getPacket().payload.toString('hex'));
  }
}

  • 发布消息
方式一:在业务代码直接引入新 Client

可在 app.controller.ts 或者 app.service.ts 引入额外客户端作为 MQTT 生产者

import { Controller } from '@nestjs/common';
import { Client, ClientMqtt, MqttRecordBuilder, Serializer, Transport } from '@nestjs/microservices';

// 自定义MQTT发布消息时的数据序列化规则
class RawPayloadSerializer implements Serializer {
  serialize(value: any, options?: Record<string, any>): any {
    // 默认封装数据:{"pattern":"sensors/device/0000/up","data":{"type":"Buffer","data":[65]}}
    return value.data.data;
  }
}

@Controller()
export class AppController {
  @Client({
    transport: Transport.MQTT, // 需要指明类型才有效果
    options: {
      host: 'emqx.microgridcloud.com',
      port: 1883,
      // 发布消息默认会带额外信息,可自定义序列化消息
      serializer: new RawPayloadSerializer(),
    },
  })
  client: ClientMqtt; // 发布消息时会启动该客户端发布消息

  async publishMessage() {
    const userProperties = { 'x-version': '1.0.0' };
    // 构建发送消息,可发送Buffer.from([0x65, 0x65])或字符串cat
    const record = new MqttRecordBuilder('cat')
      .setProperties({ userProperties })
      .setQoS(1)
      .build();
    // emit用于发布-订阅模式,不会等待接收方的响应,不需要调用subcribe
    this.client.emit('test/request', record)
	// send用于请求 - 响应模式,会等待接收方的响应,需要调用subcribe。
    this.client.send('test/request', record).subscribe(data => {
      // 接收响应的数据
      console.log(data));
    }
  }
}
方式二:先在注册客户端模块,在注入业务代码
  1. 在模块文件 app.module.ts 先注册客户端
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { AppService } from './app.service';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MQTT_SERVICE',
        transport: Transport.MQTT,
        options: {
          host: 'emqx.microgridcloud.com',
          port: 1883
        }
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
  1. 在业务代码如 app.controller.ts 注入模块,需要关联 name 字段
import { Controller, Inject } from '@nestjs/common';
import { ClientMqtt, ClientProxy } from '@nestjs/microservices';

export class AppController {
  @Inject('MATH_SERVICE')
  client: ClientProxy; // ClientProxy是抽象类,ClientMqtt是具体的实现类,ClientProxy是ClientMqtt的父类
}
  • MQTT
    6 引用 • 14 回帖
  • 微服务

    微服务架构是一种架构模式,它提倡将单一应用划分成一组小的服务。服务之间互相协调,互相配合,为用户提供最终价值。每个服务运行在独立的进程中。服务于服务之间才用轻量级的通信机制互相沟通。每个服务都围绕着具体业务构建,能够被独立的部署。

    97 引用 • 155 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...