别着急,坐和放宽
使用社交账号登录
前言:RabbitMQ 是最广泛使用的开源消息代理之一,实现了高级消息队列协议(AMQP)。它的核心杀手锏是灵活的路由机制和成熟的可靠性保障。
在深入原理之前,我们先把它跑起来。
强烈建议使用带 management 后缀的镜像,这样会自带一个 Web 控制台。
登录 http://localhost:15672 (账号 admin/admin),你会看到 Virtual Hosts。
Database,是一个逻辑隔离的命名空间。/order, /user),互不干扰。RabbitMQ 与其他 MQ 最大的区别在于引入了 Exchange(交换机) 的概念。生产者通过 Routing Key 将消息发送给 Exchange,Exchange 再根据规则将消息路由到对应的 Queue。
| 类型 | 路由逻辑 | 场景 |
|---|---|---|
| Direct | 精准匹配:Routing Key 必须与 Binding Key 完全一致。 | 一对一处理,如指定发给“日志处理”队列。 |
| Fanout | 广播:忽略 Routing Key,发给所有绑定到该 Exchange 的 Queue。 | 群发通知,如用户注册后同时通知积分、邮件系统。 |
| Topic | 模糊匹配:支持通配符 * (匹配一个单词) 和 # (匹配多个单词)。 | 灵活路由,如 log.* 匹配 log.error, log.info。 |
| Headers | Header匹配:根据消息头的属性进行匹配(较少使用)。 | 特殊业务属性路由。 |
在金融或订单场景中,消息丢失是不可接受的。RabbitMQ 提供了三道防线:
确保消息成功到达 Broker。
publisher-confirm-type: correlated。ConfirmCallback 接口,如果 ack 为 true,说明消息已到达 Exchange。确保 Broker 重启后数据还在。
durable=true。durable=true。deliveryMode=2。确保消息被成功消费业务逻辑。
channel.basicAck();失败调用 basicNack() 并设置 requeue=true 让消息重回队列。如果队列里有 10000 条消息,消费者启动时默认会全部推送到消费者内存,可能导致消费者 OOM。
prefetch_count。spring.rabbitmq.listener.simple.prefetch=1 (能者多劳)。当消息变成“死信”后,会被重新投递到另一个 Exchange(即 DLX)。 消息变成死信的情况:
basicReject/basicNack) 且 requeue=false。用途:排查异常数据,或实现延迟队列。
RabbitMQ 原生不支持延迟队列(虽然有插件),最经典的做法是 TTL + DLX。
Queue_TTL,设置 TTL(如 30分钟),并绑定死信交换机 Exchange_DLX。Queue_TTL。Exchange_DLX。Queue_Target(绑定了 Exchange_DLX),从而实现延迟消费。docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
rabbitmq:3-management
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
virtual-host: /
publisher-confirm-type: correlated # 开启发送确认
publisher-returns: true # 开启消息回退 (路由失败时返回)
listener:
simple:
acknowledge-mode: manual # 开启手动 Ack
prefetch: 1 # 每次只拉取一条
@Configuration
public class RabbitConfig {
// 1. 配置消息转换器 (使用 JSON 序列化)
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
// 2. 定义死信交换机和死信队列
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange");
}
@Bean
public Queue dlxQueue() {
return new Queue("dlx.queue");
}
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key");
}
// 3. 定义普通队列,绑定死信
@Bean
public Queue myQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.routing.key");
return new Queue("my.queue", true, false, false, args);
}
}
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
// 必须设置 CorrelationData 才能触发 ConfirmCallback
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("my.direct.exchange", "my.routing.key", new User("Tom", 18), correlationData);
}
@Component
public class MyConsumer {
@RabbitListener(queues = "my.queue")
public void receive(User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
System.out.println("收到消息: " + user);
// 业务逻辑处理...
// 手动 Ack: 第二个参数 multiple=false 代表只确认当前这条
channel.basicAck(tag, false);
} catch (Exception e) {
try {
// 出现异常,拒绝消息
// 第三个参数 requeue=false: 不重回队列,直接进入死信队列 (DLX)
// 如果是网络抖动等临时异常,可以设为 true 让其重试
channel.basicNack(tag, false, false);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}