RabbitMQ 消息重复消费问题处理方案(消息幂等性)

RabbitMQ 消息重复消费问题处理方案(消息幂等性)

一、RabbitMQ 为什么会出现消息重复消费?

先明确重复消费的根源,才能针对性解决:

  1. 生产者重试:消息发送后未收到 MQ 确认,生产者重发;

  2. 消费者确认超时:消费者消费了消息,但还没给 MQ 回 ACK 就宕机,MQ 会重新投递;

  3. 网络波动:ACK 回执丢失,MQ 误以为消费失败,重新投递。

二、什么是幂等性?

在消息队列场景中,幂等性就是:同一条消息被消费多次,业务结果也和消费一次一致(比如不会重复创建订单、重复扣库存)。

举个通俗例子:

  • 非幂等:你给用户转账 100 元,重复执行 2 次,用户会收到 200 元(出错);

  • 幂等:你给用户转账 100 元,无论点多少次,用户最终只收到 100 元(正确)。

三、保证 RabbitMQ 消息幂等性的核心方案(从易到难)

方案 1:唯一标识 + 幂等表(最常用、最稳妥)

1、思路

给每条消息分配唯一 ID(消息唯一标识),消费者消费前先查「幂等表」,确认未消费过再处理,处理完标记为已消费。

2、代码实现:
1)创建幂等表
CREATE TABLE `msg_idempotent` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `msg_id` varchar(64) NOT NULL COMMENT '消息唯一ID',
  `status` tinyint NOT NULL DEFAULT '0' COMMENT '0-未消费 1-已消费',
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_msg_id` (`msg_id`) COMMENT '保证消息ID唯一'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2)生产者:发送带唯一 ID 的消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.UUID;

public class IdempotentProducer {
    private static final String QUEUE_NAME = "idempotent_queue";

    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");

        // 2. 创建连接和通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            // 3. 发送10条测试消息,每条带唯一ID
            for (int i = 1; i <= 10; i++) {
                String msgId = UUID.randomUUID().toString(); // 消息唯一ID
                String message = "订单创建:" + i + ", msgId=" + msgId;
                
                // 把msgId放在消息属性里(推荐),也可以放消息体
                channel.basicPublish("", QUEUE_NAME, 
                        com.rabbitmq.client.AMQP.BasicProperties.builder()
                                .messageId(msgId) // 设置消息唯一ID
                                .build(),
                        message.getBytes());
                System.out.println("发送消息:" + message);
            }
        }
    }
}
3)消费者:消费前校验幂等
import com.rabbitmq.client.*;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class IdempotentConsumer {
    private static final String QUEUE_NAME = "idempotent_queue";
    private static JdbcTemplate jdbcTemplate;

    // 初始化JDBC模板
    static {
        DriverManagerDataSource dataSource = new DriverManagerDataSource();
        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
        dataSource.setUrl("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai");
        dataSource.setUsername("root");
        dataSource.setPassword("123456");
        jdbcTemplate = new JdbcTemplate(dataSource);
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 关键:设置手动ACK,避免自动确认导致重复
        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                String msgId = properties.getMessageId(); // 获取消息唯一ID
                long deliveryTag = envelope.getDeliveryTag();

                try {
                    // 1. 校验幂等:插入幂等表(唯一索引保证重复插入失败)
                    String sql = "INSERT INTO msg_idempotent (msg_id, status) VALUES (?, 1)";
                    jdbcTemplate.update(sql, msgId);

                    // 2. 执行业务逻辑(比如创建订单)
                    System.out.println("处理业务:" + message);

                    // 3. 手动确认ACK:消息处理完成,MQ删除消息
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    // 唯一索引冲突 → 消息已消费过,直接确认ACK,丢弃消息
                    if (e.getMessage().contains("uk_msg_id")) {
                        System.out.println("消息已消费,直接丢弃:" + msgId);
                        channel.basicAck(deliveryTag, false);
                    } else {
                        // 其他异常 → 拒绝消息并重新入队(或死信队列)
                        channel.basicNack(deliveryTag, false, true);
                        e.printStackTrace();
                    }
                }
            }
        });
    }
}

方案 2:基于 Redis 的 SETNX(高性能)

1、思路

用消息唯一 ID 作为 Redis 的 key,消费前执行 SETNX(只有 key 不存在时才设置成功),设置成功则处理消息,失败则直接丢弃。

2、代码实现
1)消费者:只需调整消费者中的幂等校验逻辑
// 初始化Redis
private static Jedis jedis = new Jedis("localhost", 6379);

// 消费逻辑中替换:
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msgId = properties.getMessageId();
    long deliveryTag = envelope.getDeliveryTag();
    String message = new String(body, "UTF-8");

    // 1. Redis SETNX 校验:key=msg:idempotent:xxx,过期时间避免内存泄漏
    String redisKey = "msg:idempotent:" + msgId;
    Long result = jedis.setnx(redisKey, "1");
    if (result == 1) {
        // 设置过期时间(比如1小时),避免Redis堆积
        jedis.expire(redisKey, 3600);
        
        // 2. 执行业务逻辑
        System.out.println("处理业务:" + message);
        
        // 3. 确认ACK
        channel.basicAck(deliveryTag, false);
    } else {
        // SETNX返回0 → 消息已消费,直接确认ACK
        System.out.println("消息已消费,丢弃:" + msgId);
        channel.basicAck(deliveryTag, false);
    }
}

方案 3:业务逻辑天然幂等(最优)

比如:

  • 更新操作:UPDATE order SET status=1 WHERE id=100(执行 1 次和 100 次结果一样);

  • 查询操作:本身就是幂等的。

五、关键注意事项

  1. 消息唯一 ID 生成:推荐用 UUID、雪花算法,避免重复;

  2. ACK 机制:必须用「手动 ACK」,自动 ACK 会导致消费失败也无法重试;

  3. 幂等表 / Redis 过期:Redis 要设置过期时间,幂等表可定期清理历史数据;

  4. 异常处理:消费失败区分「重复消费」和「真异常」,真异常要重试(或入死信队列)。

总结

  1. 幂等性核心:同一个操作执行多次结果一致,RabbitMQ 中解决的是消息重复消费问题;

  2. 实现思路:给消息加唯一标识,消费前通过「幂等表(MySQL)」或「Redis SETNX」校验是否已消费;

  3. Java 操作关键:生产者设置消息唯一 ID,消费者手动 ACK,消费前校验幂等,重复则直接确认丢弃,正常则执行业务并 ACK。

SSL 证书自动续期工作流(Certimate) 2026-03-18
线上CPU飙高如何排查? 2026-03-21

评论区