我来帮您了解 Node.js 中订阅 NATS 主题的相关信息,包括基本概念、实现方法和示例代码。
我来为您详细讲解 Node.js 中订阅 NATS 主题的相关知识,包括基础用法和实际示例。
安装 NATS 客户端
npm install nats
基础订阅示例
const { connect, StringCodec } = require('nats');
(async () => {
// 连接到 NATS 服务器
const nc = await connect({ servers: "nats://localhost:4222" });
const sc = StringCodec();
// 订阅主题
const sub = nc.subscribe("updates");
// 使用异步迭代器接收消息
(async () => {
for await (const msg of sub) {
console.log(`收到消息: ${sc.decode(msg.data)}`);
}
})();
console.log("订阅者已启动,等待消息...");
})();
通配符订阅
// 订阅所有以 'user.' 开头的主题
const sub1 = nc.subscribe("user.*");
// 订阅所有以 'order.' 开头的主题及其子主题
const sub2 = nc.subscribe("order.>");
队列组订阅
// 多个订阅者可以加入同一个队列组实现负载均衡
const sub = nc.subscribe("task", { queue: "workers" });
完整示例
发布者 (publisher.js):
const { connect, StringCodec } = require('nats');
(async () => {
const nc = await connect({ servers: "nats://localhost:4222" });
const sc = StringCodec();
nc.publish("updates", sc.encode("Hello, NATS!"));
console.log("消息已发布");
await nc.drain();
})();
订阅者 (subscriber.js):
const { connect, StringCodec } = require('nats');
(async () => {
const nc = await connect({ servers: "nats://localhost:4222" });
const sc = StringCodec();
const sub = nc.subscribe("updates");
(async () => {
for await (const msg of sub) {
console.log(`收到: ${sc.decode(msg.data)}`);
}
})();
console.log("订阅者监听中...");
})();
运行步骤
1. 启动 NATS 服务器:
nats-server
# 或使用 Docker
docker run -d -p 4222:4222 nats
1. 运行订阅者:
node subscriber.js
1. 运行发布者:
node publisher.js
高级特性
- 异步迭代器:推荐使用 for await...of 循环处理消息
- 通配符:支持 *(匹配单个 token)和 >(匹配多个 token)
- 队列组:实现负载均衡的消息消费
- 连接管理:支持自动重连和连接状态监控
NATS 是一个高性能、轻量级的消息系统,非常适合微服务架构和分布式系统。