使用社交账号登录
前言:如果说 Kafka 是重型卡车,NATS 就是 F1 赛车。NATS 是 CNCF(云原生计算基金会)毕业项目,以 简单、安全 和 高性能 著称。它的设计哲学是“做少而精的事”,在云原生和微服务架构中异军突起。
NATS 包含两个核心部分,针对完全不同的场景,理解这一点至关重要:
Core NATS(传统的 NATS):
NATS JetStream (2.0+ 引入):
Core NATS 是 NATS 的基石,它的核心概念是 Subject-Based Addressing(基于主题的寻址)。
NATS 没有 IP 和端口的概念,消费者监听的是 Subject。Subject 是区分大小写的字符串,支持通配符:
time.us.east*:time.*.east (匹配 time.us.east, time.eu.east,但不匹配 time.us.east.atlanta)>:time.> (匹配 time.us,time.us.east,time.us.east.atlanta,必须在末尾)help,自动附带一个临时的 reply-to subject。reply-to。workers)。JetStream 弥补了 Core NATS 不持久化的短板,使其成为一个完整的流处理平台。
Stream 是消息的存储容器,你可以把它看作 Kafka 的 Topic,但配置更灵活。
File:高性能文件存储(生产环境推荐)。Memory:内存存储,重启丢失(测试或临时缓存)。Limits:按大小 (MaxBytes) / 时间 (MaxAge) / 数量 (MaxMsgs) 限制(类似 Kafka)。Interest:所有当前的 Consumer 都确认消费后删除(类似 RabbitMQ)。WorkQueue:消息被任何一个 Consumer 消费后立即删除(做任务队列)。JetStream 的 Consumer 是服务器端的视图,记录了消费进度。
Fetch 或 Next)。适合批处理,能更好地控制流速(Flow Control)。NATS 甚至内置了 KV 存储和对象存储(基于 JetStream 实现),这使得你可以用 NATS 做配置中心或存放大文件,真正实现“All in NATS”。
NATS 的集群设计非常独特,采用 Full Mesh(全网状)连接。
你只需要让新节点知道集群中任意一个现有节点的地址,它就能自动发现所有其他节点,并建立全连接。
对于跨数据中心或跨地域的连接,NATS 使用 Gateway 模式。
虽然 NATS 是用 Go 写的,但它的 Java 客户端也非常成熟。
与其他 MQ 需要依赖 ZooKeeper、Erlang VM 或复杂的 JVM 调优不同,NATS 只有一个二进制文件 nats-server,大小不到 20MB。
NATS 官方提供了 CLI 工具 nats,功能极其强大。
| 场景 | 推荐指数 | 理由 |
|---|---|---|
| 微服务 RPC | ⭐⭐⭐⭐⭐ | 自带服务发现、负载均衡,比 HTTP/gRPC 更快更解耦。 |
| IoT / 边缘计算 | ⭐⭐⭐⭐⭐ | 二进制极小,资源占用极低,支持 WebSocket/MQTT,适合跑在树莓派或网关上。 |
| 云原生中间件 | ⭐⭐⭐⭐ | Kubernetes 友好,配置简单,Sidecar 模式部署方便。 |
| 高吞吐日志 | ⭐⭐⭐⭐ | JetStream 性能强悍,但 Kafka 生态更完善。 |
| 传统企业集成 | ⭐⭐⭐ | 只有 NATS 可能不够,可能需要结合 RabbitMQ 的复杂路由能力。 |
NATS 代表了新一代的基础设施:轻量、单一二进制、云原生友好。如果你厌倦了维护庞大的 JVM 集群,或者在寻找一个既能做消息队列又能做服务发现的统一组件,NATS 绝对值得一试。
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.16.5</version>
</dependency>
import io.nats.client.*;
public class NatsDemo {
public static void main(String[] args) throws Exception {
// 连接 NATS
Connection nc = Nats.connect("nats://localhost:4222");
// 1. Core NATS: 发布订阅
Dispatcher d = nc.createDispatcher((msg) -> {
System.out.println("收到消息: " + new String(msg.getData()));
});
d.subscribe("updates");
nc.publish("updates", "Hello NATS".getBytes());
// 2. JetStream: 持久化流
JetStreamManagement jsm = nc.createJetStreamManagement();
JetStream js = nc.jetStream();
// 创建 Stream (幂等)
StreamConfiguration streamConfig = StreamConfiguration.builder()
.name("ORDERS")
.subjects("orders.*")
.storageType(StorageType.File)
.build();
try {
jsm.addStream(streamConfig);
} catch (JetStreamApiException e) {
// Stream 已存在
}
// 发布消息到 Stream
js.publish("orders.new", "Order #1234".getBytes());
// Pull Consumer 消费
JetStreamSubscription sub = js.subscribe("orders.*", PullSubscribeOptions.builder().durable("my-durable-consumer").build());
for (Message m : sub.fetch(10, Duration.ofSeconds(1))) {
System.out.println("处理订单: " + new String(m.getData()));
m.ack(); // 确认消费
}
nc.close();
}
}
# 启动服务器 (开启 JetStream)
./nats-server -js -m 8222 # 开启监控端口 8222
# 启动集群
./nats-server -cluster nats://0.0.0.0:6222 -routes nats://node-a:6222,nats://node-b:6222
# 查看集群状态
nats server list
# 实时监控流量
nats server report jetstream
# 性能测试 (Bench)
# 1个发布者,1个订阅者,100万条消息
nats bench test-subject --pub 1 --sub 1 --msgs 1000000