消息队列MQ
初识MQ
同步调用

同步调用有3个问题:
- 拓展性差,每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动
- 性能下降,每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和
- 级联失败,当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。
异步调用
技术选型
RabbitMQ
部署
mq: #消息队列
image: rabbitmq:3.8-management
container_name: mq
restart: unless-stopped
hostname: mq
environment:
TZ: "Asia/Shanghai"
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: "admin"
ports:
- "15672:15672"
- "5672:5672"
volumes:
- mq-plugins:/plugins
# 持久化数据卷,保存用户/队列/交换机等元数据
- ./mq-data:/var/lib/rabbitmq
networks:
- hmall-net
volumes:
mq-plugins:
http://localhost:15672/ 访问控制台
架构图
publisher
:生产者,发送消息的一方consumer
:消费者,消费消息的一方queue
:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理exchange
:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。不存储virtual host
:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue(每个项目+环境有各自的vhost)
一个队列最多指定给一个消费者!
Spring AMQP
快速开始
交换机和队列都是直接在控制台创建,消息的发送和接收在Java应用中实现!
简单案例:直接向队列发送消息,不经过交换机
引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置MQ地址,在publisher
和consumer
服务的application.yml
中添加配置:
spring:
rabbitmq:
host: localhost # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
消息发送:
然后在publisher
服务中编写测试类SpringAmqpTest
,并利用**RabbitTemplate
**实现消息发送:
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
convertAndSend
如果 2 个参数,第一个表示队列名,第二个表示消息;
消息接收
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
然后启动启动类,它能自动从队列中取出消息。取出后队列中就没消息了!
交换机
无论是 Direct、Topic 还是 Fanout 交换机,你都可以用 同一个 Binding Key 把多条队列绑定到同一个交换机上。
1)fanout:广播给每个绑定的队列
发送消息:
convertAndSend
如果 3 个参数,第一个表示交换机,第二个表示RoutingKey
,第三个表示消息。
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
2)Direct交换机
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
注意,RoutingKey不等于队列名称
3)Topic交换机
Topic
类型的交换机与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。
只不过Topic
类型交换机可以让队列在绑定BindingKey
的时候使用通配符!
BindingKey一般都是有一个或多个单词组成,多个单词之间以.
分割
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
转发过程:把发送者传来的 Routing Key 按点分成多级,和各队列的 Binding Key(可以带 *
、#
通配符)做模式匹配,匹配上的队列统统都能收到消息。
Routing Key和Binding Key
Routing Key(路由键)
- 由发送者(Producer)在发布消息时指定,附着在消息头上。
- 用来告诉交换机:“我的这条消息属于哪类/哪个主题”。
Binding Key(绑定键)
- 由消费者(在应用启动或队列声明时)指定,是把队列绑定到交换机时用的规则。有些 UI 里 Routing Key 等同于 Binding Key!
- 告诉交换机:“符合这个键的消息,投递到我这个队列”。
交换机本身不设置 Routing Key 或 Binding Key,它只根据类型(Direct/Topic/Fanout/Headers)和已有的“队列–绑定键”关系,把 incoming Routing Key 匹配到对应的队列。
Direct Exchange
- 路由规则:
Routing Key === Binding Key
(完全一致) - 场景:一对一或一对多的精确路由
Topic Exchange
-
路由规则
:支持通配符
*
:匹配一个单词#
:匹配零个或多个单词
-
例:
- Binding Key绑定键
order.*
→ 能匹配order.created
、order.paid
- 绑定键
order.#
→ 能匹配order.created.success
、order
等
- Binding Key绑定键
Fanout Exchange
- 路由规则:忽略 Routing/Binding Key,消息广播到所有绑定队列
- 场景:聊天室广播、缓存失效通知等
消费者处理消息
不同队列: 同一个交换机 + 相同 routing key 绑定到 多个不同的队列 → 每个队列都会收到一份消息,各自独立处理。 👉 相当于多个队列订阅了同类信息,TOPIC
同一个队列: 多个消费者(不管是一个应用里开多个 listener,还是多台实例部署)监听 同一个队列 → 一条消息只会被其中一个消费者消费,起到负载均衡作用。 👉 常用于“任务分摊”。
基于注解声明交换机、队列
前面都是在 RabbitMQ 管理控制台手动创建队列和交换机,开发人员还得把所有配置整理一遍交给运维,既繁琐又容易出错。更好的做法是在应用启动时自动检测所需的队列和交换机,若不存在则直接创建。
基于注解方式来声明
type
默认交换机类型为ExchangeTypes.DIRECT
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
检查队列
- 如果 RabbitMQ 中已经有名为
direct.queue1
的队列,就不会重复创建; - 如果不存在,
RabbitAdmin
会自动帮你创建一个。
检查交换机
- 同理,会查看有没有名为
hmall.direct
、类型为direct
的交换机,若不存在就新建。
检查绑定
- 最后再去声明绑定关系:把
direct.queue1
绑定到hmall.direct
,并且 routing-key 为"red"
和"blue"
。 - 如果已有相同的绑定(队列、交换机、路由键都一致),也不会再重复创建。
消息转换器
使用JSON方式来做序列化和反序列化,替换掉默认方式。
更小或可压缩的消息体、易读、易调试
1)引入依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
2)配置消息转换器,在publisher
和consumer
两个服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
MQ高级
我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:
- 确保生产者一定把消息发送到MQ
- 确保MQ不会将消息弄丢
- 确保消费者一定要处理消息
发送者的可靠性
发送者重试
修改发送者模块的application.yaml
文件,添加下面的内容:
主要是针对网络连接失败的场景,会自动重试;交换机不存在,不会触发重试。
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
- 阻塞重试,一般不建议开启。
发送者确认机制
一、机制概述
RabbitMQ 提供两种发送者确认机制,确保消息投递的可靠性:
- Publisher Confirm:确认消息是否到达 RabbitMQ 服务器
- Publisher Return:确认消息是否成功路由到队列
二、配置开启
1.在发送者模块的application.yaml
中添加配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启异步confirm机制
publisher-returns: true # 开启return机制
confirm类型说明:
none(默认模式)
:关闭confirm机制,消息由于网络连接失败也不会提醒。simple
:同步阻塞等待MQ的回执correlated
:MQ异步回调返回回执
2.每个RabbitTemplate
只能配置一个ReturnCallback
,因此我们可以在配置类中统一设置。
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
// 设置全局ReturnCallback
rabbitTemplate.setReturnsCallback(returned -> {
log.error("消息路由失败 - Exchange: {}, RoutingKey: {}, ReplyCode: {}, ReplyText: {}",
returned.getExchange(),
returned.getRoutingKey(),
returned.getReplyCode(),
returned.getReplyText());
// 可在此添加告警或重试逻辑
sendAlert(returned);
});
}
}
三、ConfirmCallback 使用
消息发送时设置确认回调CorrelationData
这里的CorrelationData
中包含两个核心的东西:
id
:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆SettableListenableFuture
:回执结果的Future对象
public void sendMessageWithConfirmation(String exchange, String routingKey, Object message) {
// 1. 创建关联数据
CorrelationData correlationData = new CorrelationData();
// 2. 添加确认回调
correlationData.getFuture().addCallback(
result -> {
if (result.isAck()) {
log.info("✅ 消息成功到达MQ服务器");
} else {
log.error("❌ 消息发送失败: {}", result.getReason());
// 可在此添加重试逻辑
}
},
ex -> {
log.error("⚠️ 确认过程发生异常", ex);
}
);
// 3. 发送消息
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
四、消息投递结果分析
场景 | 网络状态 | 路由状态 | ConfirmCallback | ReturnsCallback | 最终结果 |
---|---|---|---|---|---|
完全成功 | ✅ 成功 | ✅ 成功 | ACK | 不触发 | 消息入队 |
网络失败 | ❌ 失败 | - | NACK | 不触发 | 发送失败 |
路由失败 | ✅ 成功 | ❌ 失败 | ACK | 触发 | 消息丢弃 |
交换机不存在 | ✅ 成功 | ❌ 失败 | ACK | 触发 | 消息丢弃 |
端到端投递保障
- ConfirmCallback 只告诉你:消息“到”了 RabbitMQ 服务器吗?(ACK:到;NACK:没到)
- ReturnsCallback 只告诉你:到达服务器的消息,能“进”队列吗?(能进就不回;进不了就退)
两者都成功,才能确认:“这条消息真的安全地进了队列,等着消费者去拿。”
- 🟢 ACK:消息到达MQ服务器(可能路由失败)
- 🔴 NACK:消息未到达MQ服务器(网络问题)
- 🔵 Return:消息到达但路由失败(配置问题)
通过组合使用这两种机制,可以实现完整的端到端消息投递保障。如果由于网络问题,NACK了,那么会被correlationData.getFuture().addCallback(...)
回调函数捕捉!!!
MQ的可靠性
数据持久化
MQ消息持久化就是指当RabbitMQ服务重启后,消息仍然会保留在队列中不会丢失。
非持久化消息:只存储在内存中;持久化消息:同时存储在内存和磁盘中
为了保证数据的可靠性,必须配置数据持久化(从内存保存到磁盘上),包括:
- 交换机持久化(选Durable)
- 队列持久化(选Durable)
- 消息持久化(选Persistent)
控制台方式:


代码方式,默认都是持久化的,不用变动。
消费者可靠性
消费者确认机制
消费者确认机制 (Consumer Acknowledgement) 是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态:
- ack:成功处理消息,RabbitMQ 从队列中删除该消息
- nack:消息处理失败,RabbitMQ 需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息

上述的NACK状态时,MQ会不断向消费者重投消息,直至被正确处理!!!
在消费者方,通过下面的配置可以修改消费者收到消息后的处理方式:
none:消费者收到消息后,RabbitMQ 立即自动确认(ACK)
manual,手动实现ack;
auto(默认模式),自动档,业务逻辑异常返回nack, 消息解析异常 返回reject,其他ack
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto
消费者重试
- 类似发送者的重试机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
- 重试达到最大次数后,会返回reject,消息会被丢弃
修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态(默认);如果业务中包含事务,这里改为false有状态
核心概念:一次事务 vs. 多次事务
想象一下这个场景:你是一个消费者,从MQ收到一条消息,内容是“给用户A的账户增加10元”。你的服务需要执行两个步骤:
- 处理业务逻辑(更新数据库,给用户A加钱)。
- 确认消息(告诉MQ消息处理成功了)。
这个“处理业务逻辑”和“确认消息”的过程,可以放在一个数据库事务里。
特性 | 无状态重试 (stateless: true ) |
有状态重试 (stateless: false ) |
---|---|---|
本质 | 本地方法重试 | 消息重新投递 |
事务范围 | 所有重试在同一个事务中 | 每次重试是独立的事务 |
MQ感知 | MQ完全不知情(只投递1次) | MQ完全知情(多次投递) |
性能 | 高(无网络开销) | 较低(有网络开销) |
安全性 | 低(易导致重复操作) | 高(每次失败都回滚) |
适用场景 | 幂等操作、非DB操作(如HTTP调用) | 非幂等操作、数据库事务操作 |
为什么用了 @Transactional
必须有状态重试?
假设是无状态重试,重试是在同一次方法调用/同一事务里循环进行的(拦截器内部重试)。
第一次失败抛出异常后,当前事务被标记为 rollback-only。
- 接下来即便你第2次、第3次尝试都“业务成功”,提交时也会失败(因为事务早已不可提交)。
结果:不适合与 @Transactional
搭配做数据库更新;更适合无事务或幂等且不涉及DB提交的调用(如外部HTTP、缓存写入等)。
假设是有状态重试(stateless: false
)
- 重试通过把异常抛回给容器,让消息重新投递来实现。
- 每次投递 → 监听方法重新执行 → 新的事务开启。
- 每次失败都会完整回滚该次事务;下一次重试是干净的事务上下文。
- 达到最大次数后,按照你的配置reject(可配合死信队列/失败交换器),从而避免“消息风暴”。
有状态重试相比不开启重试机制:可以配置有限次重试次数,更加灵活。
失败处理策略
只有在开启了消费者重试机制(即配置了 spring.rabbitmq.listener.simple.retry.enabled: true
)时才会生效。
当消息消费重试达到最大次数后,默认会直接丢弃,这在要求高可靠性的场景中不可接受。Spring 提供了 MessageRecoverer
接口来自定义最终处理策略,主要有三种实现:
RejectAndDontRequeueRecoverer
- 默认策略。直接拒绝消息并丢弃。
ImmediateRequeueMessageRecoverer
- 让消息重新进入队列,再次被消费(可能导致循环)。
RepublishMessageRecoverer
✅ 推荐方案- 将消息路由到一个专用的异常交换机,最终进入异常队列。
- 优势:实现故障隔离,便于后续人工干预或自动化修复,是保证消息不丢失的优雅方案。
业务幂等性
在程序开发中,幂等则是指同一个业务,执行一次或多次对业务状态的影响是一致的。如:
- 根据id删除数据
- 查询数据
- 新增数据
但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
- 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
- 退款业务。重复退款对商家而言会有经济损失。
所以,我们要尽可能避免业务被重复执行:MQ消息的重复投递、页面卡顿时频繁刷新导致表单重复提交、服务间调用的重试
法一:唯一ID
- 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
法一存在业务侵入,因为mq的消息ID与业务无关,现在却多了一张专门记录 ID 的表或结构
法二:业务判断,基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
综上,支付服务与交易服务之间的订单状态一致性是如何保证的?
- 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
- 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
- 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。
延迟消息
对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。
方案:利用延迟消息实现超时检查
以“订单支付超时时间为30分钟”为例,具体实现流程如下:
- 创建订单时:在订单入库的同时,向消息队列发送一条延迟时间为30分钟的消息。
- 消息等待:此消息不会立即被消费,而是由MQ服务器暂存至延迟时间到期。
- 延迟触发:30分钟后,消息队列自动将该消息投递给消费者服务。
- 执行检查与操作:消费者接收到消息后,查询该订单的当前支付状态:
- 若订单仍为“未支付”:则执行取消订单、释放库存等后续操作。
- 若订单已支付:则忽略此消息,流程结束。
实现延迟消息法一
延迟消息插件
1.下载
GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
2.上传插件,由于之前docker部署MQ挂载了数据卷
docker volume ls #查看所有数据卷
docker volume inspect hmall_all_mq-plugins #获取数据卷的目录
#"Mountpoint": "/var/lib/docker/volumes/hmall_all_mq-plugins/_data"
我们上传插件到该目录下。
3.安装插件
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
声明延迟交换机
额外指定参数 delayed = "true"
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
发送延迟消息
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
实现延迟消息法二
RabbitMQ (TTL + 死信队列)
1.配置类(配置交换机和队列)
类型 | 名称 | 作用 | 路由键 |
---|---|---|---|
交换机 | order.exchange |
业务交换机:接收原始延迟消息 | order.delay.key |
队列 | order.delay.queue |
等待队列:消息在此等待TTL过期 | - |
交换机 | order.delay.exchange |
死信交换机:接收过期消息 | order.delay.key |
队列 | order.process.queue |
处理队列:最终消费消息的队列 | - |
@Configuration
public class RabbitMQDelayConfig {
// 业务交换机
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange");
}
// 死信交换机(作为延迟消息的目标)
@Bean
public DirectExchange orderDelayExchange() {
return new DirectExchange("order.delay.exchange");
}
// 业务队列 - 设置死信参数
@Bean
public Queue orderDelayQueue() {
Map<String, Object> args = new HashMap<>();
// 消息到期后转发的死信交换机
args.put("x-dead-letter-exchange", "order.delay.exchange");
// 死信路由键
args.put("x-dead-letter-routing-key", "order.delay.key");
return new Queue("order.delay.queue", true, false, false, args);
}
// 最终消费队列
@Bean
public Queue orderProcessQueue() {
return new Queue("order.process.queue");
}
// 绑定:业务队列 -> 业务交换机
@Bean
public Binding orderDelayBinding() {
return BindingBuilder.bind(orderDelayQueue())
.to(orderExchange())
.with("order.delay.key");
}
// 绑定:最终队列 -> 死信交换机
@Bean
public Binding orderProcessBinding() {
return BindingBuilder.bind(orderProcessQueue())
.to(orderDelayExchange())
.with("order.delay.key");
}
}
2. 发送消息(设置TTL)
@Service
@RequiredArgsConstructor
public class OrderService {
private final RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
// 创建订单逻辑...
// 发送延迟消息(30分钟)
rabbitTemplate.convertAndSend("order.exchange", "order.delay.key", order.getId(), message -> {
// 设置消息的TTL为30分钟
message.getMessageProperties().setExpiration("1800000"); // 毫秒
return message;
});
}
}
3. 消费者
@Component
public class OrderDelayConsumer {
@RabbitListener(queues = "order.process.queue")
public void processExpiredOrder(String orderId) {
// 查询订单状态,如果未支付则取消订单
System.out.println("处理超时订单:" + orderId);
}
}
超时订单问题
死信交换机
- 当消息在一个队列中变成“死信(Dead Letter)”后,能被重新投递到的另一个交换机,就是死信交换机(DLX)。
- 绑定到 DLX 的队列叫死信队列(DLQ),专门用来存放这些“死信”消息。
触发条件
- 消费者拒绝并不再重投(Consumer Rejection)
- “消费者这一端”的情况。当消费者明确拒绝消息(发送
basic.reject
或basic.nack
)并且设置requeue=false
时,消息会成为死信。 - 场景:消费者处理消息时遇到无法处理的错误(如业务逻辑错误、数据格式错误),明确告知MQ不要重新投递了。
- “消费者这一端”的情况。当消费者明确拒绝消息(发送
- 消息过期(Message TTL Expired)
- 这与消费者无关。消息在队列中等待的时间超过了设定的生存时间(TTL),会被自动删除并变成死信。
- 场景:常用于实现延迟队列。例如,下单15分钟未支付订单取消,就可以将消息TTL设为15分钟,过期后成为死信转到DLQ,由DLQ的消费者来处理取消逻辑。
- 队列溢出(Queue Length Limit Exceeded)
- 这也与消费者无关。当队列的消息数量达到上限时,新来的消息或队列头部的消息(取决于配置)会被丢弃并变成死信。
- 场景:用于限制队列容量,防止消息无限堆积,保护系统。
配置
必须用编程式方式来声明,不可用注解式。
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.config.producer.exchange}")
private String businessExchangeName;
@Value("${spring.rabbitmq.config.producer.topic_team_success.queue}")
private String businessQueueName;
@Value("${spring.rabbitmq.config.producer.topic_team_success.routing_key}")
private String businessRoutingKey;
// 1. 定义死信交换机(通常一个应用一个就够了)
@Bean
public TopicExchange dlxExchange() {
return new TopicExchange(businessExchangeName + ".dlx", true, false);
}
// 2. 定义死信队列
@Bean
public Queue dlq() {
return new Queue(businessQueueName + ".dlq", true);
}
// 3. 将死信队列绑定到死信交换机
@Bean
public Binding dlqBinding() {
return BindingBuilder.bind(dlq())
.to(dlxExchange())
.with(businessRoutingKey + ".dead"); // 使用新的路由键
}
// 4. 定义业务交换机
@Bean
public TopicExchange businessExchange() {
return new TopicExchange(businessExchangeName, true, false);
}
// 5. 定义业务队列,并配置死信规则(核心!)
@Bean
public Queue businessQueue() {
Map<String, Object> args = new HashMap<>();
// 指定死信交换机
args.put("x-dead-letter-exchange", businessExchangeName + ".dlx");
// 指定死信的路由键(可选,不指定则使用原消息的路由键)
args.put("x-dead-letter-routing-key", businessRoutingKey + ".dead");
// 还可以设置其他导致消息成为死信的参数
// args.put("x-message-ttl", 60000); // 消息60秒过期
// args.put("x-max-length", 1000); // 队列最大长度1000条
return new Queue(businessQueueName, true, false, false, args);
}
// 6. 将业务队列绑定到业务交换机
@Bean
public Binding businessBinding() {
return BindingBuilder.bind(businessQueue())
.to(businessExchange())
.with(businessRoutingKey);
}
}