AMQP协议与RabbitMQ、MQ消息队列的应用场景

2023-07-29,,

什么是AMQP?

  在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,
AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。

AMQP 中包含的主要元素

生产者(Producer):向Exchange发布消息的应用。

消费者(Consumer):从消息队列queue中消费消息的应用。

消息队列(Message Queue):服务器组件,用于保存消息,直到发送给消费者。

Queue:消息载体;每个消息都会被投入到一个或多个队列。

消息(Message):传输的内容。

交换器(exchange):路由组件,接收Producer发送的消息,并根据Routing Key转发给消息队列queue。

Routing Key:路由关键字,exchange根据这个Routing Key进行消息投递到队列queue。

虚拟主机(Virtual Host): 用作不同用户的权限分离;一批交换器,消息队列和相关对象。虚拟主机是共享相同身份认证和加密环境的独立服务器域。vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的权限的 vhost 中)

Broker :AMQP的服务端称为Broker。

连接(Connection):一个网络连接,比如TCP/IP套接字连接;应用程序与Rabbit之间建立连接的管理器,程序代码中使用ConnectionFactory(连接管理器)。

信道(Channel):消息通道,在客户端的每个Connection连接里,可建立多个channel,每个channel代表一个会话任务;多路复用连接中的一条独立的双向数据流通道,为会话提供物理传输介质。

绑定器(Binding):把exchange和queue按照路由规则绑定起来。

exchange 与 Queue 的路由机制

生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType,然后转发到对应的Queue中;

生产者发消息不需要指定Queue,消费者可以指定Queue绑定到某个RoutingKey和某个Exchange,也可以不指定Queue,就只根据某个Exchange和某个RoutingKey接受到消息


Exchange 将消息发送到哪一个queue是由exchange type 和 Binding绑定规则决定的,目前常用的有3种exchange,Direct exchange, Fanout exchange, Topic exchange :


  1. Direct exchange 直接转发路由,其实现原理是会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。


  2. Fanout exchange 复制分发路由,该路由不需要RoutingKey,会将消息发送给所有与该 Exchange 定义过Binding的所有Queues中去,其实是一种广播行为。


  3. topic exchange 通配路由,是direct exchange的通配符模式,消息中的RoutingKey可以写成通配的模式,exchange支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配正则表达式的Queue。

TopicExchange的匹配符号:

#:匹配多个词

*: 匹配一个词


需要注意的一点只有queue具有保存消息的功能,exchange不能保存消息。

RabbitMQ中一个核心的原则是,消息不能直接投递到Queue中。Producer只能将自己的消息投递到Exchange中,由Exchange按照路由规则将消息投递到对应的Queue中。

 

在Consumer中,声明自己对哪个Exchange感兴趣,并将自己的Queue绑定到自己感兴趣的路由关键字上,建立相应的映射关系;第二,在Producer中,将消息投递一个Exchange中,并指明它的路由关键字。

AMQP 如何实现通信的

(1)建立连接Connection。由producer和consumer分别连接到broker的物理节点上。

(2)建立消息Channel。Channel是建立在Connection之上的,一个Connection可以建立多个Channel;producer连接Virtual Host 建立Channel,Consumer连接到相应的queue上建立Channel。

(3)发送消息。由Producer发送消息到Broker中的exchange中。

(4)路由转发。exchange收到消息后,根据一定的路由策略routing key,将消息转发到相应的queue中去。

(5)消息接收。Consumer会监听相应的queue,一旦queue中有可以消费的消息,queue就将消息发送给Consumer端。

(6)消息确认。当Consumer完成某一条消息的处理之后,需要发送一条ACK消息给对应的Queue。Queue收到ACK信息后,才会认为消息处理成功,并将消息从Queue中移除;如果在对应的Channel断开后,Queue没有收到这条消息的ACK信息,该消息将被发送给另外的Channel。 至此一个消息的发送接收流程走完了。消息的确认机制提高了通信的可靠性。

消息队列的使用大概过程

(1)客户端连接Connection到消息队列服务器Broker,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。

 

RabbitMQ中 exchange、route、queue的关系

MessageQueue、Exchange和Binding构成了AMQP协议的核心。

  声明MessageQueue 

  在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:

  a)消费者是无法订阅或者获取不存在的MessageQueue中信息。

  b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。

  在明白了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。

  

  (重点) 这里有一点需要明确:

如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的,所以一个队列如果已经存在了,比如消费者如果再次尝试建立已存在的队列,是无效的
比如,你通过SpringBoot程序已经建立了一个queueA,再通过另外一个SpringBoot程序想要更改其queue属性,比如设置队列持久化durable=="true",就再次建立了一个queueA设置属性,是无效的

如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不允许该消费者在同一个channel去声明其他队列的。Rabbit MQ中,可以通过queue.declare命令声明一个队列,可以设置该队列以下属性:

a) Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景

b)   Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

c)   Durable:持久化。

d)  其他选项,例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。

  

exchange 与 Queue 的路由机制

生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType,然后转发到对应的Queue中,所以发消息不需要指定Queue,似乎消费者可以指定Queue绑定到某个RoutingKey和某个Exchange,也可以不指定Queue,就只根据某个Exchange和某个RoutingKey接受到消息


exchange 将消息发送到哪一个queue是由exchange type 和 Binding绑定规则决定的,目前常用的有3种exchange,Direct exchange, Fanout exchange, Topic exchange 。


Direct exchange 直接转发路由,其实现原理是会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。


Fanout exchange 复制分发路由,该路由不需要RoutingKey,会将消息发送给所有与该 Exchange 定义过Binding的所有Queues中去,其实是一种广播行为。


topic exchange 通配路由,是direct exchange的通配符模式,消息中的RoutingKey可以写成通配的模式,exchange支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配正则表达式的Queue。


需要注意的一点只有queue具有保存消息的功能,exchange不能保存消息。

 

AMQP的应用场景


AMQP是实现消息机制的一种协议,消息队列主要有以下几种应用场景:


异步处理


跨系统的异步通信;比如公司新入职一个员工,需要开通系统账号,有几件事情要做,开通系统账号,发短信通知用户,发邮件给员工,在公司内部通讯系统中发送消息给员工。其中发短信,发邮件,发内部通讯系统消息,这三件事情可以串行也可以并行,并行的好处就是可以提高效率,这时可以应用MQ来实现并行;如果不使用MQ,那么开通系统账号的服务就要依次调用发短信服务、发邮件服务、发内部通讯系统消息,以后如果还要分配git账号,那又要在开通系统账号的服务里添加代码,如果使用MQ那么开通账号后发送消息到MQ,对应订阅的消费者消费就行了。
异步处理不需要返回值的耗时操作:https://www.cnblogs.com/theRhyme/p/10796009.html


应用解耦

在公司内部系统中,有人事系统,OA系统,财务系统,外围应用系统等等,当人事发生变动的时候(离职入职调岗),人事系统需要将这些变动通知给其他系统,这时只需人事系统发送一条消息,各个外围系统订阅该消息,就可得知人事变动,与实时服务调用相比,如果人事系统挂掉,各个外围系统不会受到影响,继续运行;如果是实时服务调用,比如人事系统被各个服务调用,人事系统挂了,调用人事系统的服务都会受到影响。
 

死信队列

重要的业务队列如果挂了,可以被重新路由到死信队列进行处理。
 

分布式事务

RocketMQ TODO待写


流量缓冲


在有些流量会瞬间暴增的场景下,如秒杀,为了防止流量突然增大而使得应用挂掉,可以引入MQ,将请求存入MQ中,如果超过了MQ的长度,就把请求丢弃掉,这样来限制流量。


日志处理


将消息队列引入到日志处理中,如kafka的应用,解决了大量日志的传输问题。日志客户端负责采集日志数据,并定期写入kafka队列,kafka负责接收,存储和转发日志,日志处理系统订阅并消费kafka中的日志数据。
 

SpringBoot+RabbitMQ的简单demo

https://www.cnblogs.com/theRhyme/p/10071781.html

RabbitMQ死信队列的应用场景和代码实现

https://www.cnblogs.com/theRhyme/p/10874409.html

RabbitMQ延迟队列代码实现和应用场景

场景: 订单下单30min如果没有付款就删除该订单

通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列(重定向队列),实现延迟功能;

使用 rabbitmq_delayed_message_exchange 插件实现延迟功能。

代码:https://www.cnblogs.com/theRhyme/p/10986409.html

rabbitmq 怎么避免消息丢失?

生产者Confirm机制(异步,推荐)或是事务方式(同步,不推荐)
MQ服务端将消息持久化
消费者给MQ回复ACK,确认机制
MQ服务端设置集群镜像模式
消费者消费消息补偿机制(如死信队列)

如果生产者弄丢了数据

RabbitMQ 生产者将数据发送到 RabbitMQ 的时候,可能数据在网络传输中搞丢了,这个时候 RabbitMQ 收不到消息,消息就丢了。

RabbitMQ 提供了两种方式来解决这个问题:

事务方式:在生产者发送消息之前,通过`channel.txSelect`开启一个事务,接着发送消息。

如果消息没有成功被 RabbitMQ 接收到,生产者会收到异常,此时就可以进行事务回滚`channel.txRollback`,然后重新发送。假如 RabbitMQ 收到了这个消息,就可以提交事务`channel.txCommit`。

但是这样一来,生产者的吞吐量和性能都会降低很多,现在一般不这么干。

另外一种方式就是通过 Confirm 机制:这个 Confirm 模式是在生产者那里设置的,就是每次发消息的时候会分配一个唯一的 ID,然后 RabbitMQ服务端 收到之后会回传一个 ACK,告诉生产者这个消息 OK 了。

如果 RabbitMQ 没有处理到这个消息,那么就回调一个 Nack 的接口,这个时候生产者就可以重发。

事务机制和 Confirm 机制最大的不同在于事务机制是同步的,提交一个事务之后会阻塞在那儿。

但是 Confirm 机制是异步的,发送一个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你一个接口通知你这个消息接收到了。

所以一般在生产者这块避免数据丢失,都是用 Confirm 机制的。

要保证消息持久化成功的条件有哪些?

声明队列必须设置持久化 durable 设置为 true.
消息推送投递模式必须设置持久化,deliveryMode 设置为 2(持久)。
消息已经到达持久化交换器。
消息已经到达持久化队列。

以上四个条件都满足才能保证消息持久化成功。

rabbitmq 持久化有什么缺点?

持久化的缺点就是降低了服务器的吞吐量,因为使用的是磁盘而非内存存储,从而降低了吞吐量。可尽量使用 ssd 硬盘来缓解吞吐量的问题。

RabbitMQ如何保证同一个队列中的消息被顺序消费?

TODO待写

来源:

https://blog.csdn.net/letempsar/article/details/52565020

https://blog.csdn.net/ztx114/article/details/78410727

https://www.cnblogs.com/linkenpark/p/5393666.html

http://techblog.ppdai.com/2018/07/17/20180717/

https://www.toutiao.com/a6698312611185820171/?timestamp=1559696015&app=news_article&group_id=6698312611185820171&tdsourcetag=s_pctim_aiomsg&req_id=2019060508533401002506701591332CD

https://mp.weixin.qq.com/s?__biz=MjM5ODI5Njc2MA==&mid=2655825391&idx=1&sn=f7523195ff08a51085012c736bc002a8&chksm=bd74e0388a03692e49ca3967a03dd2e8164e02741a75cabc8bd56e5b70ba6f2cc19f6fe10fd2&scene=0&xtrack=1&key=1c855a3d2871be72b53c28efecdb6c847aa5d9daffdac4207cc93d6a62948c3ac03b6e8813a35eaa72a54f7668de41b31fb1265ff3066312574ca210769ad2b726b9932dd21a296f9ea91fd6cf367dd7&ascene=1&uin=ODEzMzE3OTc%3D&devicetype=Windows+10&version=62060833&lang=zh_CN&pass_ticket=ZLGuBJ0cY2BIuQpqK%2Be08dQVFm3Htt7htVVelbWP8XE%3D

AMQP协议与RabbitMQ、MQ消息队列的应用场景的相关教程结束。

《AMQP协议与RabbitMQ、MQ消息队列的应用场景.doc》

下载本文的Word格式文档,以方便收藏与打印。