一、RabbitMQ 为什么会出现消息重复消费?
先明确重复消费的根源,才能针对性解决:
生产者重试:消息发送后未收到 MQ 确认,生产者重发;
消费者确认超时:消费者消费了消息,但还没给 MQ 回 ACK 就宕机,MQ 会重新投递;
网络波动:ACK 回执丢失,MQ 误以为消费失败,重新投递。
二、什么是幂等性?
在消息队列场景中,幂等性就是:同一条消息被消费多次,业务结果也和消费一次一致(比如不会重复创建订单、重复扣库存)。
举个通俗例子:
非幂等:你给用户转账 100 元,重复执行 2 次,用户会收到 200 元(出错);
幂等:你给用户转账 100 元,无论点多少次,用户最终只收到 100 元(正确)。
三、保证 RabbitMQ 消息幂等性的核心方案(从易到难)
方案 1:唯一标识 + 幂等表(最常用、最稳妥)
1、思路
给每条消息分配唯一 ID(消息唯一标识),消费者消费前先查「幂等表」,确认未消费过再处理,处理完标记为已消费。
2、代码实现:
1)创建幂等表
CREATE TABLE `msg_idempotent` (
`id` bigint NOT NULL AUTO_INCREMENT,
`msg_id` varchar(64) NOT NULL COMMENT '消息唯一ID',
`status` tinyint NOT NULL DEFAULT '0' COMMENT '0-未消费 1-已消费',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_msg_id` (`msg_id`) COMMENT '保证消息ID唯一'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;2)生产者:发送带唯一 ID 的消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.UUID;
public class IdempotentProducer {
private static final String QUEUE_NAME = "idempotent_queue";
public static void main(String[] args) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
// 2. 创建连接和通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 3. 发送10条测试消息,每条带唯一ID
for (int i = 1; i <= 10; i++) {
String msgId = UUID.randomUUID().toString(); // 消息唯一ID
String message = "订单创建:" + i + ", msgId=" + msgId;
// 把msgId放在消息属性里(推荐),也可以放消息体
channel.basicPublish("", QUEUE_NAME,
com.rabbitmq.client.AMQP.BasicProperties.builder()
.messageId(msgId) // 设置消息唯一ID
.build(),
message.getBytes());
System.out.println("发送消息:" + message);
}
}
}
}3)消费者:消费前校验幂等
import com.rabbitmq.client.*;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class IdempotentConsumer {
private static final String QUEUE_NAME = "idempotent_queue";
private static JdbcTemplate jdbcTemplate;
// 初始化JDBC模板
static {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai");
dataSource.setUsername("root");
dataSource.setPassword("123456");
jdbcTemplate = new JdbcTemplate(dataSource);
}
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 关键:设置手动ACK,避免自动确认导致重复
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
String msgId = properties.getMessageId(); // 获取消息唯一ID
long deliveryTag = envelope.getDeliveryTag();
try {
// 1. 校验幂等:插入幂等表(唯一索引保证重复插入失败)
String sql = "INSERT INTO msg_idempotent (msg_id, status) VALUES (?, 1)";
jdbcTemplate.update(sql, msgId);
// 2. 执行业务逻辑(比如创建订单)
System.out.println("处理业务:" + message);
// 3. 手动确认ACK:消息处理完成,MQ删除消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 唯一索引冲突 → 消息已消费过,直接确认ACK,丢弃消息
if (e.getMessage().contains("uk_msg_id")) {
System.out.println("消息已消费,直接丢弃:" + msgId);
channel.basicAck(deliveryTag, false);
} else {
// 其他异常 → 拒绝消息并重新入队(或死信队列)
channel.basicNack(deliveryTag, false, true);
e.printStackTrace();
}
}
}
});
}
}方案 2:基于 Redis 的 SETNX(高性能)
1、思路
用消息唯一 ID 作为 Redis 的 key,消费前执行 SETNX(只有 key 不存在时才设置成功),设置成功则处理消息,失败则直接丢弃。
2、代码实现
1)消费者:只需调整消费者中的幂等校验逻辑
// 初始化Redis
private static Jedis jedis = new Jedis("localhost", 6379);
// 消费逻辑中替换:
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msgId = properties.getMessageId();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, "UTF-8");
// 1. Redis SETNX 校验:key=msg:idempotent:xxx,过期时间避免内存泄漏
String redisKey = "msg:idempotent:" + msgId;
Long result = jedis.setnx(redisKey, "1");
if (result == 1) {
// 设置过期时间(比如1小时),避免Redis堆积
jedis.expire(redisKey, 3600);
// 2. 执行业务逻辑
System.out.println("处理业务:" + message);
// 3. 确认ACK
channel.basicAck(deliveryTag, false);
} else {
// SETNX返回0 → 消息已消费,直接确认ACK
System.out.println("消息已消费,丢弃:" + msgId);
channel.basicAck(deliveryTag, false);
}
}方案 3:业务逻辑天然幂等(最优)
比如:
更新操作:
UPDATE order SET status=1 WHERE id=100(执行 1 次和 100 次结果一样);查询操作:本身就是幂等的。
五、关键注意事项
消息唯一 ID 生成:推荐用 UUID、雪花算法,避免重复;
ACK 机制:必须用「手动 ACK」,自动 ACK 会导致消费失败也无法重试;
幂等表 / Redis 过期:Redis 要设置过期时间,幂等表可定期清理历史数据;
异常处理:消费失败区分「重复消费」和「真异常」,真异常要重试(或入死信队列)。
总结
幂等性核心:同一个操作执行多次结果一致,RabbitMQ 中解决的是消息重复消费问题;
实现思路:给消息加唯一标识,消费前通过「幂等表(MySQL)」或「Redis SETNX」校验是否已消费;
Java 操作关键:生产者设置消息唯一 ID,消费者手动 ACK,消费前校验幂等,重复则直接确认丢弃,正常则执行业务并 ACK。