1 AMQP协议中的关键概念
Connection
socket连接,它封装了socket协议相关部分逻辑。你可以认为一个Connection就是一个Tcp连接。Channel
生产者与Broker通信的通道,可以把通道理解成共享一个 TCP 连接的多个轻量化连接(通常每个thread创建单独的channel进行通讯)。
Connection
就是一个TCP连接对象,而Channel
相当于连接池
Exchange
交换器,接收消息,按照路由规则将消息路由到一个或者多个队列Queue
消息队列,用来保存消息,供消费者消费Produce/Consumer
消息生产者和消费者Binding
绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个 RoutingKey。RoutingKey
路由键,生产者将消息发送给交换器的时候,会指定一个 RoutingKey,用来指定路由规则,交换器根据路由键把消息发送到指定队列
2 消息队列模式
2.1 点对点模式
点对点模式特点:一个具体的消息只能由一个消费者消费,多个生产者可以向同一个消息队列发送消息,但是一个消息在被一个消息者处理的时候,这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。
2.2 发布/订阅模式
单个消息可以被多个订阅者并发的获取和处理。一般来说,订阅有两种类型:
- 临时订阅:这种订阅只有在消费者启动并且运行的时候才存在。一旦消费者退出,相应的订阅以及尚未处理的消息就会丢失。
- 持久订阅:这种订阅会一直存在,除非主动去删除。消费者退出后,消息系统会继续维护该订阅,并且后续消息可以被继续处理。
3 RabbitMQ原理
RabbbitMQ使用Erlang语言开发,基于AMQP协议实现。AMQP协议由三部分组成,分别是消费者,生产者和服务端。执行流程如下:
生产者连接到Server,建立一个连接,开启一个通道。
生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。
消费者也需要进行建立连接,开启信道等操作,便于接收消息。
生产者发送消息,发送到服务端中的虚拟主机。
虚拟主机中的交换器根据路由键选择路由规则,发送到不同的消息队列中。
订阅了消息队列的消费者就可以获取到消息,进行消费。
📌RabbitMQ 消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,生产者通常根本不知道消息是否会被传递到任何队列。
相反,生产者只能将消息发送到交换器。交换是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,它将消息推送到队列。
交换机绑定多个队列,一个队列绑定一个键
3.1 交换机
Rabbitmq队列和交换机是绑定的,每个队列都有一个***routingKey
,发布者每次发布的消息也有一个routingKey
***,交换机会根据不同的匹配规则将消息匹配到与自己绑定的队列中
RabbitMQ常用交换机有四种:
3.1.1 Direct
Direct exchange背后的路由算法很简单 - 消息进入其绑定键与消息的路由键完全匹配的队列。
多重绑定
使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键black在X和Q1之间添加绑定。在这种情况下,直接交换的行为将类似于扇出,并将消息广播到所有匹配的队列。
3.1.2 Topic exchange
**topic
**:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”只能匹配一个词
发送到Topic exchange的 routing_key
它必须是一个由点分隔的单词列表。这些单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由键示例: stock.usd.nyse
、 nyse.vmw
、quick.orange.rabbit
。路由密钥中可以有任意多个单词,最多 255 个字节。
*
可以恰好替代一个单词。#
可以替代零个或多个单词。
当队列与#
绑定键绑定时 - 它将接收所有消息,无论路由键如何 - 就像在扇区交换器中一样。
3.1.3 Fanout
Fanout 不处理路由键,而是广播。你只需要简单的将队列绑定到交换机上。一个发送到该类型交换机的消息都会被广播到与该交换机绑定的所有队列上
Headers
不处理路由键,而是根据发送的消息内容中的headers属性进行匹配。
3.2 队列
3.2.1 死信队列
来自队列的消息可能是“死信的”;也就是说,当发生以下任何事件时,重新发布到交换机:
- 消费者使用
basic.reject
或basic.nack
否定确认消息,并将requeue参数设置为false。 - 由于每条消息 TTL(最大存活时间) 的原因,该消息过期;
- 由于队列超出长度限制,消息被丢弃
应用场景
一般用在较为重要的业务队列中,为了确保未被正确消费的消息不被丢弃,当发生异常时,通过死信队列,可以让未正确处理的消息暂存到另一个队列中。待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了 。
3.2.2 延时队列
延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
3.3 ACK机制
ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将次消息从队列中删除。如果消费者处理消息时出现异常,RabbitMQ会认为消息未被正常消费,将消息重新放入队列中。可以在消费消息中try-catch或在Consumer的配置文件中配置重试机制来解决这个问题。
3.4 重试机制
在消息消费失败的时候,Spring-AMQP 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
当然,Spring-AMQP 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。后续,我们可以通过对死信队列中的消息进行重发,来使得消费者实例再次进行消费。
为了效率和节省资源,我们一般会选择CONNECTION
模式
4 衡量标准
- 消息顺序:发送到队列的消息,消费时是否可以保证消费的顺序,比如 A 先下单,B 后下单,应该是 A 先去扣库存,B 再去扣,顺序不能反。
- 消息路由:根据路由规则,只订阅匹配路由规则的消息,比如有 A/B 两者规则的消息,消费者可以只订阅 A 消息,B 消息不会消费。
- 消息可靠性:是否会存在丢消息的情况,比如有 A/B 两个消息,最后只有 B 消息能消费,A 消息丢失。
- 消息时序:主要包括 “消息存活时间” 和“延迟 / 预定的消息”,“消息存活时间”表示生产者可以对消息设置 TTL,如果超过该 TTL,消息会自动消失;“延迟 / 预定的消息”指的是可以延迟或者预订消费消息,比如延时 5 分钟,那么消息会 5 分钟后才能让消费者消费,时间未到的话,是不能消费的。
- 消息留存:消息消费成功后,是否还会继续保留在消息队列。
- 容错性:当一条消息消费失败后,是否有一些机制,保证这条消息是一种能成功,比如异步第三方退款消息,需要保证这条消息消费掉,才能确定给用户退款成功,所以必须保证这条消息消费成功的准确性。
- 伸缩:当消息队列性能有问题,比如消费太慢,是否可以快速支持库容;当消费队列过多,浪费系统资源,是否可以支持缩容。
- 吞吐量:支持的最高并发数。