rab**tmq 吞吐能 Rab**tMQ消费者性能优化相关配置说明

seosqwseo4个月前 (09-07)测评日记55

一、Rab**tMQ消费者性能优化相关配置说明

Rab**tMQ是用Erlang语言编写的分布式消息中间件,常常用在大型网站中作为消息队列来使用,主要目的是各个子系统之间的解耦和异步处理。消息中间件的基本模型是典型的生产者-消费者模型,生产者发送消息到消息队列,消费者监听消息队列,收到消息后消费处理。

在使用Rab**tMQ做消息分发时,主要有三个概念要注意:Exchange,RoutingKey,Queue。

Exchange可以理解为交换器,RoutingKey可以理解为路由,Queue作为真实存储消息的队列和某个Exchange绑定,具体如何路由到感兴趣的Queue则由Exchange的三种模式决定:

(1)Exchange为fanout时,生产者往此Exchange发送的消息会发给每个和其绑定的Queue,此时RoutingKey并不起作用;

(2)Exchange为topic时,生产者可以指定一个支持通配符的RoutingKey(如demo.*)发向此Exchange,凡是Exchange上RoutingKey满足此通配符的Queue就会收到消息;

(3)direct类型的Exchange是直接简单的,生产者指定Exchange和RoutingKey,然后往其发送消息,消息只能被绑定的满足RoutingKey的Queue接受消息。(通常如果不指定RoutingKey的具体名字,那么默认的名字其实是Queue的名字)

消费端yml配置:

在消费端,配置prefectch和concurrency参数便可以实现消费端mq并发处理消息,那么这两个参数具有有什么含义呢?

prefetch是每次从一次性从broker里面取的待消费的消息的个数。

每个customer会在MQ预取一些消息放入内存的LinkedBlockingQueue中进行消费,这个值越高,消息传递的越快,但非顺序处理消息的风险更高。如果ack模式为none,则忽略。

prefetch默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。

不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch的值应当设置为1。

对于低容量消息和多个消费者的情况(也包括单listener容器的concurrency配置)希望在多个使用者之间实现更均匀的消息分布,建议在手动ack下并设置prefetch=1。

如果要保证消息的可靠不丢失,当prefetch大于1时,可能会出现因为服务宕机引起的数据丢失,故建议将prefetch=1。

concurrency设置的是对每个listener在初始化的时候设置的并发消费者的个数。在上面的yml配置中,concurrency=1,即每个Listener容器将开启一个线程去处理消息。在2.0以后的版本中,可以在注解中配置该参数:

在服务启动后,可以发现在Listener容器中产生了两个线程去消费queue。如果在Listener配置了exclusive参数,即确定此容器中的单个customer是否具有对队列的独占访问权限。如果为true,则容器的并发性必须为1。

若一个消费者配置prefetch=10,concurrency=2,会有两个消费者(或是线程)同时监听Queue,但是注意这里的消息只要有被一个消费者消费掉就会自动ack,另外一个消费者就不会再获取到此消息,Prefetch Count为配置设置的值10,意味着每个消费者每次会预取10个消息准备消费(注意不是两个消费者去共享内存中抓取的消息)。

每个消费者对应的listener有个Exclusive参数,默认为false,如果设置为true,concurrency就必须设置为1,即只能单个消费者消费队列里的消息,适用于必须严格执行消息队列的消费顺序(先进先出)。

前面说过,设置并发的时候,要考虑具体的业务场景,对那种对消息的顺序有苛刻要求的场景不适合并发消费,而对于其他场景,比如用户注册后给用户发个提示短信,是不太在意哪个消息先被消费,哪个消息后被消费,因为每个消息是相对独立的,后注册的用户先收到短信也并没有太大影响。

设置并发消费除了能提高消费的速度,还有另外一个好处:当某个消费者长期阻塞,此时在当前消费者内部的BlockingQueue的消息也会被一直阻塞,但是新来的消息仍然可以投递给其他消费者消费,这种情况顶多会导致prefetch个数目的消息消费有问题,而不至于单消费者情况下整个Rab**tMQ的队列会因为一个消息有问题而全部堵死。所有在合适的业务场景下,需要合理设置concurrency和prefetch值。

二、Rab**tMQ-消息堆积&高可用

前置文章:

Rab**tMQ-消息可靠性&延迟消息

一、MQ常见问题

二、消息堆积-惰性队列

1、消息堆积问题

2、解决消息堆积方法

3、惰性队列

三、高可用-MQ集群

1、集群分类

2、普通集群

3、镜像集群

4、冲裁队列

确保发送的消息至少被消费一次;

实现消息的延迟投递;

处理消息无法及时消费的问题;

避免单点MQ故障导致整体不可用;

1、消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。

2、解决消息堆积方法

3、惰性队列

从Rab**tMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。

Ⅰ接收到消息后直接存入磁盘而非内存;

Ⅱ消费者要消费消息时才会从磁盘中读取并加载到内存;

Ⅲ支持数百万条的消息存储。

要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。

Ⅰ可以通过命令行将一个运行中的队列修改为惰性队列,如下:

rab**tmqctl set_policy Lazy"^lazy-queue$"'{"queue-mode":"lazy"}'--apply-to queues

Ⅱ用SpringAMQP声明惰性队列,如下:

@Bean注解的形式,如下:

@Rab**tListener注解的形式,如下:

Ⅰ优点

基于磁盘存储,消息上限高;

没有间歇性的page-out,性能比较稳定;

Ⅱ缺点

基于磁盘存储,消息时效性会降低;

性能受限于磁盘的IO。

官方文档: Clustering Guide— Rab**tMQ。

1、集群分类

是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。

是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

注意:镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。

在Rab**tMQ的3.8版本以后推出的,底层采用Raft协议确保主从的数据一致性。

2、普通集群

Ⅰ会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息;

Ⅱ当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回;

Ⅲ队列所在节点宕机,队列中的消息就会丢失。

Ⅰ获取Cookie

Rab**tMQ底层依赖于Erlang,而Erlang虚拟机就是一个面向分布式的语言,默认就支持集群模式。集群模式中的每个Rab**tMQ节点使用 cookie来确定它们是否被允许相互通信。

要使两个节点能够通信,它们必须具有相同的共享秘密,称为 Erlang cookie。cookie只是一串多 255个字符的字母数字字符。

每个集群节点必须具有相同的 cookie。实例之间也需要它来相互通信。

首先获取Cookie,指令如下:

其中 YYNCLCJEKVNUFYQFPNZH这一串就是生成的Cookie,如下:

Ⅱ删除现有mq容器

Ⅲ准备rab**tmq.conf配置文件

此处选择在tmp目录下创建,如下:

配置文件内容如下:

Ⅳ准备Cookie记录文件

Ⅴ准备集群目录

Ⅵ拷贝配置文件、Cookie文件到目录

echo:用于字符串的输出,输出字符串到|后面;

-t:表示先打印命令,再执行;

-n 1:表示执行命令时用的args个数为1个。

Ⅶ创建集群网络

Ⅷ运行容器

集群中的节点标示默认都是: rab**t@[hostname]。

Ⅰ往rab**t@mq1添加队列

在mq2、mq3中也可以查看到该队列,因为元信息共享。

Ⅱ往simple.queue添加数据

在mq2、mq3中可以查看到消息,如下:

Ⅲ让mq1宕机

mq2、mq3无法读取到数据,因为只共享元信息,没有同步备份数据,如下:

3、镜像集群

镜像集群官方文档: Classic Queue Mirroring— Rab**tMQ。

普通集群不具备高可用的特性,使用镜像集群可以解决这个问题。

Ⅰ镜像队列结构是一主多从(从就是镜像);

Ⅱ所有*作都是主节点完成,然后同步给镜像节点;

Ⅲ主宕机后,镜像节点会替代成新的主(如果在主从同步完成前,主就已经宕机,可能出现数据丢失);

Ⅳ不具备负载均衡功能,因为所有*作都会有主节点完成(但是不同队列,其主节点可以不同,可以利用这个提高吞吐量)。

Ⅰ设置exactly模式

Ⅱ创建队列

Ⅲ发送消息

Ⅳ让mq1宕机

注意:mq1恢复后,该队列的主节点仍然为mq3。

4、冲裁队列

Ⅰ与镜像队列一样,都是主从模式,支持主从数据同步;

Ⅱ使用非常简单,没有复杂的配置;

Ⅲ主从同步基于Raft协议,强一致。

注意:仲裁队列是3.8版本以后才有的新功能。

+2表示有2个镜像节点,仲裁队列默认镜像数为5,集群节点不足5则都是镜像。

@Bean注解配置

修改配置文件

以上即为Rab**tMQ-消息堆积&高可用的全部内容,感谢阅读。

三、rab**tmq保证消息不丢失

我是在网上粘贴**的。

mq原则

数据不能多,也不能少,不能多是说消息不能重复消费,这个我们上一节已解决;不能少,就是说不能丢失数据。如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的。

2.丢失数据场景

丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了。下面从rab**tmq和kafka分别说一下,丢失数据的场景,

(1)rab**tmq

A:生产者弄丢了数据生产者将数据发送到rab**tmq的时候,可能在传输过程中因为网络等问题而将数据弄丢了。

B:rab**tmq自己丢了数据如果没有开启rab**tmq的持久化,那么rab**tmq一旦重启,那么数据就丢了。所依必须开启持久化将消息持久化到磁盘,这样就算rab**tmq挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,rab**tmq还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。

C:消费端弄丢了数据主要是因为消费者消费时,刚消费到,还没有处理,结果消费者就挂了,这样你重启之后,rab**tmq就认为你已经消费过了,然后就丢了数据。

3.如何防止消息丢失

(1)rab**tmq

A:生产者丢失消息

①:可以选择使用rab**tmq提供是事物功能,就是生产者在发送数据之前开启事物,然后发送消息,如果消息没有成功被rab**tmq接收到,那么生产者会受到异常报错,这时就可以回滚事物,然后尝试重新发送;如果收到了消息,那么就可以提交事物

channel.txSelect();//开启事物try{//发送消息}catch(Exection e){channel.txRollback();//回滚事物//重新提交}**代码

缺点:rab**tmq事物已开启,就会变为同步阻塞*作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。

②:可以开启confirm模式。在生产者哪里设置开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如何写入了rab**tmq之中,rab**tmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rab**tmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

//开启confirmchannel.confirm();//发送成功回调public void ack(String messageId){}//发送失败回调public void nack(String messageId){//重发该消息}**代码

二者不同事务机制是同步的,你提交了一个事物之后会阻塞住,但是confirm机制是异步的,发送消息之后可以接着发送下一个消息,然后rab**tmq会回调告知成功与否。一般在生产者这块避免丢失,都是用confirm机制。

B:rab**tmq自己弄丢了数据设置消息持久化到磁盘。设置持久化有两个步骤:

①创建queue的时候将其设置为持久化的,这样就可以保证rab**tmq持久化queue的元数据,但是不会持久化queue里面的数据。

②发送消息的时候讲消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时rab**tmq就会将消息持久化到磁盘上。必须要同时开启这两个才可以。

而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前rab**tmq挂了,数据丢了,生产者收不到ack回调也会进行消息重发。

C:消费者弄丢了数据使用rab**tmq提供的ack机制,首先关闭rab**tmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。

相关文章

海信电视(现货放心购)55英寸55E3G4K超高清护眼智慧屏超薄悬浮全面屏远场语音智能液晶平板电视机口碑怎么样

海信电视(现货放心购)55英寸55E3G4K超高清护眼智慧屏超薄悬浮全面屏远场语音智能液晶平板电视机口碑怎么样

很多小伙伴在关注海信电视(现货放心购)55英寸55E3G4K超高清护眼智慧屏超薄悬浮全面屏远场语音智能液晶平板电视机怎么样?质量好不好?使用测评如何?本文综合已购用户的客观...

亚美嘉擦镜纸测评分享

亚美嘉擦镜纸测评分享

很多小伙伴在关注亚美嘉擦镜纸怎么样?质量好不好?使用测评如何?本文综合已购用户的客观使用分享和相应的优惠信息,为大家推荐一款高性价比的产品,一起来看看吧。...

正浩EcoFlow【磷酸铁锂】快充移动户外电源220V测评使用介绍

正浩EcoFlow【磷酸铁锂】快充移动户外电源220V测评使用介绍

很多小伙伴在关注正浩EcoFlow【磷酸铁锂】快充移动户外电源220V怎么样?质量好不好?使用测评如何?本文综合已购用户的客观使用分享和相应的优惠信息,为大家推荐一款高性价...

小白D1接入米家APP智能可视门铃无线电子猫眼1080P高清摄像头家用监控器持久续航手机远程查看怎么样?质量测评好不好用?

小白D1接入米家APP智能可视门铃无线电子猫眼1080P高清摄像头家用监控器持久续航手机远程查看怎么样?质量测评好不好用?

很多小伙伴在关注小白D1接入米家APP智能可视门铃无线电子猫眼1080P高清摄像头家用监控器持久续航手机远程查看怎么样?质量好不好?使用测评如何?本文综合已购用户的客观使用...

绿联USB/Type-C读卡器3.0高速好不好用

绿联USB/Type-C读卡器3.0高速好不好用

很多小伙伴在关注绿联USB/Type-C读卡器3.0高速怎么样?质量好不好?使用测评如何?本文综合已购用户的客观使用分享和相应的优惠信息,为大家推荐一款高性价比的产品,一起...

南孚5号电池40粒五号碱性质量测评好不好

南孚5号电池40粒五号碱性质量测评好不好

很多小伙伴在关注南孚5号电池40粒五号碱性怎么样?质量好不好?使用测评如何?本文综合已购用户的客观使用分享和相应的优惠信息,为大家推荐一款高性价比的产品,一起来看看吧。...