一、安装依赖
nestjs 官方自带 mqttClient,需要安装相关依赖包。
npm install @nestjs/microservices mqtt --save
在 NestJS 中,有两种不同类型的传输方式:
- 基于代理的:这一类包括 NATS,以及其他选项如 Redis、RabbitMQ、MQTT 和 Kafka。
- 点对点的:这一组包括 TCP 和 gRPC 通信机制。
微服务控制器类型
- 代理控制器:@EventPattern
- 点对点控制器:@MessagePattern
二、引入配置
可以在 main.ts 入口文件引入配置,有两种方式(二选一),第一种是在原服务的基础上增加 mqtt 微服务,第二张是将服务作为独立的 mqtt 微服务使用。
第一种:增加微服务
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport, MqttStatus } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// 重复使用connectMicroservice,可以注册多个微服务
const server = app.connectMicroservice<MicroserviceOptions>({
transport: Transport.MQTT,
options: {
url: 'mqtt://emqx.microgridcloud.com:1883',
clientId: 'nestjs'
},
});
server.status.subscribe((status: MqttStatus) => {
console.log('MQTT连接状态:', status);
});
// 启动应用中所有已注册的微服务实例
app.startAllMicroservices();
app.listen(3000);
}
bootstrap();
第二种:独立微服务
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.MQTT,
options: {
url: 'mqtt://emqx.microgridcloud.com:1883',
clientId: 'nestjs'
},
});
app.listen();
}
bootstrap();
三、订阅和发布消息
nestjs 官方提供的 mqttClient 发布和订阅分别对应两个 MQTT 客户端,clientID 不填时,默认系统会默认生成。
-
订阅消息
在 app.controller.ts 文件配置订阅消息
import { Controller } from '@nestjs/common';
import { Ctx, EventPattern, MessagePattern, MqttContext, Payload } from '@nestjs/microservices';
@Controller()
export class AppController {
// 处理消息,并响应请求,对应send方法
@MessagePattern('sensors/+/temperature/+')
getTemperature(@Ctx() context: MqttContext, @Payload() payload: any) {
// 输出示例:Topic: sensors/000/temperature/1
console.log(`Topic: ${context.getTopic()}`);
// 输出示例:Packet {cmd: 'public', retain: false, qos: 0, topic: 'test', payload: <Buffer 65 65>}
console.log(context.getPacket())
// 输出示例:将payload转为string输出,适合字符或json传输;如需原始值请用context.getPacket().payload
console.log(payload) // 在使用官方mqttClient发布消息时,默认自动封装额外数据,@Payload() payload会自动去除额外的数据
// 响应this.client.send()请求
return { message: '收到消息' }
}
// 只处理消息,不响应请求,对应emit方法
@EventPattern('sensors/+/temperature/+')
async handleEvent(@Ctx() context: MqttContext, @Payload() data: any) {
console.log(context.getPacket());
console.log(context.getPacket().payload.toString('hex'));
}
}
-
发布消息
方式一:在业务代码直接引入新 Client
可在 app.controller.ts 或者 app.service.ts 引入额外客户端作为 MQTT 生产者
import { Controller } from '@nestjs/common';
import { Client, ClientMqtt, MqttRecordBuilder, Serializer, Transport } from '@nestjs/microservices';
// 自定义MQTT发布消息时的数据序列化规则
class RawPayloadSerializer implements Serializer {
serialize(value: any, options?: Record<string, any>): any {
// 默认封装数据:{"pattern":"sensors/device/0000/up","data":{"type":"Buffer","data":[65]}}
return value.data.data;
}
}
@Controller()
export class AppController {
@Client({
transport: Transport.MQTT, // 需要指明类型才有效果
options: {
host: 'emqx.microgridcloud.com',
port: 1883,
// 发布消息默认会带额外信息,可自定义序列化消息
serializer: new RawPayloadSerializer(),
},
})
client: ClientMqtt; // 发布消息时会启动该客户端发布消息
async publishMessage() {
const userProperties = { 'x-version': '1.0.0' };
// 构建发送消息,可发送Buffer.from([0x65, 0x65])或字符串cat
const record = new MqttRecordBuilder('cat')
.setProperties({ userProperties })
.setQoS(1)
.build();
// emit用于发布-订阅模式,不会等待接收方的响应,不需要调用subcribe
this.client.emit('test/request', record)
// send用于请求 - 响应模式,会等待接收方的响应,需要调用subcribe。
this.client.send('test/request', record).subscribe(data => {
// 接收响应的数据
console.log(data));
}
}
}
方式二:先在注册客户端模块,在注入业务代码
- 在模块文件 app.module.ts 先注册客户端
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { AppService } from './app.service';
@Module({
imports: [
ClientsModule.register([
{
name: 'MQTT_SERVICE',
transport: Transport.MQTT,
options: {
host: 'emqx.microgridcloud.com',
port: 1883
}
},
]),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
- 在业务代码如 app.controller.ts 注入模块,需要关联 name 字段
import { Controller, Inject } from '@nestjs/common';
import { ClientMqtt, ClientProxy } from '@nestjs/microservices';
export class AppController {
@Inject('MATH_SERVICE')
client: ClientProxy; // ClientProxy是抽象类,ClientMqtt是具体的实现类,ClientProxy是ClientMqtt的父类
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于