别着急,坐和放宽
使用社交账号登录
前言:Apache Pulsar 被称为“下一代云原生消息系统”,最初由 Yahoo 开发。它诞生的使命就是解决 Kafka 在运维扩容、多租户管理上的痛点。它的核心设计哲学是 存储与计算分离。
虽然 Kafka 已经是流处理的霸主,并且在 3.x 版本中引入了 KRaft(移除 ZooKeeper)和 Tiered Storage(分层存储)来尝试解决扩容问题,但在大规模云原生生产环境中,它仍有一些痛点:
Pulsar 就是为了解决这些问题而生的,它不仅仅是一个 MQ,更是一个统一的消息流平台。
Pulsar 采用了分层架构,将无状态的计算层和服务层与有状态的存储层完全解耦。
Pulsar 原生支持将冷数据自动卸载(Offload)到廉价的对象存储(如 AWS S3, Google GCS, HDFS)。
| 特性 | Kafka | Pulsar | 胜出 |
|---|---|---|---|
| 架构模式 | 存算一体 (Broker 即存储) | 存算分离 (Broker + BookKeeper) | Pulsar (更灵活) |
| 扩容/缩容 | 需要数据 Rebalance (重,3.6+ 分层存储有缓解) | 无需迁移数据,秒级扩容 (轻) | Pulsar |
| 消费模型 | 仅 Partition (顺序消费,4.0+ 计划引入共享组) | 统一模型 (支持 Partition, Shared, Key_Shared) | Pulsar |
| 多租户 | 弱 (只有 ACL/Quota) | 原生支持 (Tenant/Namespace 资源隔离) | Pulsar |
| 跨地域复制 | 需 MirrorMaker (外挂) | 原生内置 (Geo-Replication) | Pulsar |
| 运维复杂度 | 中 (依赖 ZK/KRaft) | 高 (ZK + Broker + BookKeeper) | Kafka |
| 生态与社区 | 极其成熟,标准 | 正在快速成长,学习曲线陡峭 | Kafka |
Pulsar 的一大创新是支持多种订阅模式,让它同时具备了 MQ (RabbitMQ) 和 Streaming (Kafka) 的能力。
Pulsar 是一个“集大成者”,它试图在一个系统中同时解决 Kafka(流处理)和 RabbitMQ(队列处理)的问题,并加上了云原生的翅膀(存算分离、多租户、分层存储)。
虽然它的架构复杂度和学习成本较高(需要维护 Broker, BookKeeper, ZK 三套组件),但对于追求极致弹性和统一数据平台的大型企业来说,Pulsar 是目前最先进的选择。
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>3.0.0</version>
</dependency>
import org.apache.pulsar.client.api.*;
public class PulsarDemo {
public static void main(String[] args) throws Exception {
// 创建客户端
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// 1. 生产者
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.create();
producer.send("Hello Pulsar".getBytes());
// 2. 消费者 (Key_Shared 模式)
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared) // 指定订阅模式
.subscribe();
while (true) {
// 接收消息
Message msg = consumer.receive();
try {
System.out.println("收到消息: " + new String(msg.getData()));
// 确认消息 (Ack)
consumer.acknowledge(msg);
} catch (Exception e) {
// 消费失败,稍后重试 (Nack)
consumer.negativeAcknowledge(msg);
}
}
}
}