别着急,坐和放宽
使用社交账号登录
前言:RocketMQ 由阿里巴巴开源,是 Apache 顶级项目。它诞生于双十一的洪峰之下,专注于金融级的可靠性、事务一致性和低延迟。
RocketMQ 的架构非常简洁,主要由四部分组成:
ZK 是强一致性(CP)系统,当主节点挂掉时,选举期间整个集群不可用。RocketMQ 选择了 AP 模型,NameServer 之间数据允许短暂不一致,保证了极致的可用性。
这是 RocketMQ 区别于其他 MQ 的最大亮点,用于解决 分布式事务 问题(最终一致性)。
用户支付成功后,需要:1. 修改订单状态(本地事务);2. 增加用户积分(下游服务)。如何保证两者同时成功?
RocketMQ 支持严格的消息顺序。
MessageListenerOrderly(而非 MessageListenerConcurrently)。RocketMQ 支持定时消息,但开源版(4.x)不支持任意时间精度,只支持 18 个特定级别:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
SCHEDULE_TOPIC_XXXX 系统 Topic 中,到期后再投递到真实 Topic。RECONSUME_LATER,Broker 会在稍后重试。默认重试 16 次(时间间隔递增)。%DLQ%ConsumerGroup。需人工干预。金融级高可靠推荐配置:SYNC_MASTER + SYNC_FLUSH(或者 ASYNC_FLUSH 配合 SYNC_MASTER)。
// 1. 发送事务消息
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendTransaction() {
Message<String> msg = MessageBuilder.withPayload("payload").build();
// "tx-group": 事务生产者组
rocketMQTemplate.sendMessageInTransaction("tx-group", "topic-tx", msg, null);
}
// 2. 事务监听器
@RocketMQTransactionListener(txProducerGroup = "tx-group")
class MyTxListener implements RocketMQLocalTransactionListener {
// 执行本地事务
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// updateOrder();
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
// 事务回查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// checkOrderStatus();
return RocketMQLocalTransactionState.COMMIT;
}
}
// 3. 顺序消费监听
@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-group", consumeMode = ConsumeMode.ORDERLY)
public class OrderConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("有序处理: " + message);
}
}