rab**tmq 吞吐能 Rab**tMQ消费者性能优化相关配置说明
一、Rab**tMQ消费者性能优化相关配置说明
Rab**tMQ是用Erlang语言编写的分布式消息中间件,常常用在大型网站中作为消息队列来使用,主要目的是各个子系统之间的解耦和异步处理。消息中间件的基本模型是典型的生产者-消费者模型,生产者发送消息到消息队列,消费者监听消息队列,收到消息后消费处理。
在使用Rab**tMQ做消息分发时,主要有三个概念要注意:Exchange,RoutingKey,Queue。
Exchange可以理解为交换器,RoutingKey可以理解为路由,Queue作为真实存储消息的队列和某个Exchange绑定,具体如何路由到感兴趣的Queue则由Exchange的三种模式决定:
(1)Exchange为fanout时,生产者往此Exchange发送的消息会发给每个和其绑定的Queue,此时RoutingKey并不起作用;
(2)Exchange为topic时,生产者可以指定一个支持通配符的RoutingKey(如demo.*)发向此Exchange,凡是Exchange上RoutingKey满足此通配符的Queue就会收到消息;
(3)direct类型的Exchange是直接简单的,生产者指定Exchange和RoutingKey,然后往其发送消息,消息只能被绑定的满足RoutingKey的Queue接受消息。(通常如果不指定RoutingKey的具体名字,那么默认的名字其实是Queue的名字)
消费端yml配置:
在消费端,配置prefectch和concurrency参数便可以实现消费端mq并发处理消息,那么这两个参数具有有什么含义呢?
prefetch是每次从一次性从broker里面取的待消费的消息的个数。
每个customer会在MQ预取一些消息放入内存的LinkedBlockingQueue中进行消费,这个值越高,消息传递的越快,但非顺序处理消息的风险更高。如果ack模式为none,则忽略。
prefetch默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。
不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch的值应当设置为1。
对于低容量消息和多个消费者的情况(也包括单listener容器的concurrency配置)希望在多个使用者之间实现更均匀的消息分布,建议在手动ack下并设置prefetch=1。
如果要保证消息的可靠不丢失,当prefetch大于1时,可能会出现因为服务宕机引起的数据丢失,故建议将prefetch=1。
concurrency设置的是对每个listener在初始化的时候设置的并发消费者的个数。在上面的yml配置中,concurrency=1,即每个Listener容器将开启一个线程去处理消息。在2.0以后的版本中,可以在注解中配置该参数:
在服务启动后,可以发现在Listener容器中产生了两个线程去消费queue。如果在Listener配置了exclusive参数,即确定此容器中的单个customer是否具有对队列的独占访问权限。如果为true,则容器的并发性必须为1。
若一个消费者配置prefetch=10,concurrency=2,会有两个消费者(或是线程)同时监听Queue,但是注意这里的消息只要有被一个消费者消费掉就会自动ack,另外一个消费者就不会再获取到此消息,Prefetch Count为配置设置的值10,意味着每个消费者每次会预取10个消息准备消费(注意不是两个消费者去共享内存中抓取的消息)。
每个消费者对应的listener有个Exclusive参数,默认为false,如果设置为true,concurrency就必须设置为1,即只能单个消费者消费队列里的消息,适用于必须严格执行消息队列的消费顺序(先进先出)。
前面说过,设置并发的时候,要考虑具体的业务场景,对那种对消息的顺序有苛刻要求的场景不适合并发消费,而对于其他场景,比如用户注册后给用户发个提示短信,是不太在意哪个消息先被消费,哪个消息后被消费,因为每个消息是相对独立的,后注册的用户先收到短信也并没有太大影响。
设置并发消费除了能提高消费的速度,还有另外一个好处:当某个消费者长期阻塞,此时在当前消费者内部的BlockingQueue的消息也会被一直阻塞,但是新来的消息仍然可以投递给其他消费者消费,这种情况顶多会导致prefetch个数目的消息消费有问题,而不至于单消费者情况下整个Rab**tMQ的队列会因为一个消息有问题而全部堵死。所有在合适的业务场景下,需要合理设置concurrency和prefetch值。
二、Rab**tMQ-消息堆积&高可用
前置文章:
Rab**tMQ-消息可靠性&延迟消息
一、MQ常见问题
二、消息堆积-惰性队列
1、消息堆积问题
2、解决消息堆积方法
3、惰性队列
三、高可用-MQ集群
1、集群分类
2、普通集群
3、镜像集群
4、冲裁队列
确保发送的消息至少被消费一次;
实现消息的延迟投递;
处理消息无法及时消费的问题;
避免单点MQ故障导致整体不可用;
1、消息堆积问题
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。
2、解决消息堆积方法
3、惰性队列
从Rab**tMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。
Ⅰ接收到消息后直接存入磁盘而非内存;
Ⅱ消费者要消费消息时才会从磁盘中读取并加载到内存;
Ⅲ支持数百万条的消息存储。
要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。
Ⅰ可以通过命令行将一个运行中的队列修改为惰性队列,如下:
rab**tmqctl set_policy Lazy"^lazy-queue$"'{"queue-mode":"lazy"}'--apply-to queues
Ⅱ用SpringAMQP声明惰性队列,如下:
@Bean注解的形式,如下:
@Rab**tListener注解的形式,如下:
Ⅰ优点
基于磁盘存储,消息上限高;
没有间歇性的page-out,性能比较稳定;
Ⅱ缺点
基于磁盘存储,消息时效性会降低;
性能受限于磁盘的IO。
官方文档: Clustering Guide— Rab**tMQ。
1、集群分类
是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。
注意:镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。
在Rab**tMQ的3.8版本以后推出的,底层采用Raft协议确保主从的数据一致性。
2、普通集群
Ⅰ会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息;
Ⅱ当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回;
Ⅲ队列所在节点宕机,队列中的消息就会丢失。
Ⅰ获取Cookie
Rab**tMQ底层依赖于Erlang,而Erlang虚拟机就是一个面向分布式的语言,默认就支持集群模式。集群模式中的每个Rab**tMQ节点使用 cookie来确定它们是否被允许相互通信。
要使两个节点能够通信,它们必须具有相同的共享秘密,称为 Erlang cookie。cookie只是一串多 255个字符的字母数字字符。
每个集群节点必须具有相同的 cookie。实例之间也需要它来相互通信。
首先获取Cookie,指令如下:
其中 YYNCLCJEKVNUFYQFPNZH这一串就是生成的Cookie,如下:
Ⅱ删除现有mq容器
Ⅲ准备rab**tmq.conf配置文件
此处选择在tmp目录下创建,如下:
配置文件内容如下:
Ⅳ准备Cookie记录文件
Ⅴ准备集群目录
Ⅵ拷贝配置文件、Cookie文件到目录
echo:用于字符串的输出,输出字符串到|后面;
-t:表示先打印命令,再执行;
-n 1:表示执行命令时用的args个数为1个。
Ⅶ创建集群网络
Ⅷ运行容器
集群中的节点标示默认都是: rab**t@[hostname]。
Ⅰ往rab**t@mq1添加队列
在mq2、mq3中也可以查看到该队列,因为元信息共享。
Ⅱ往simple.queue添加数据
在mq2、mq3中可以查看到消息,如下:
Ⅲ让mq1宕机
mq2、mq3无法读取到数据,因为只共享元信息,没有同步备份数据,如下:
3、镜像集群
镜像集群官方文档: Classic Queue Mirroring— Rab**tMQ。
普通集群不具备高可用的特性,使用镜像集群可以解决这个问题。
Ⅰ镜像队列结构是一主多从(从就是镜像);
Ⅱ所有*作都是主节点完成,然后同步给镜像节点;
Ⅲ主宕机后,镜像节点会替代成新的主(如果在主从同步完成前,主就已经宕机,可能出现数据丢失);
Ⅳ不具备负载均衡功能,因为所有*作都会有主节点完成(但是不同队列,其主节点可以不同,可以利用这个提高吞吐量)。
Ⅰ设置exactly模式
Ⅱ创建队列
Ⅲ发送消息
Ⅳ让mq1宕机
注意:mq1恢复后,该队列的主节点仍然为mq3。
4、冲裁队列
Ⅰ与镜像队列一样,都是主从模式,支持主从数据同步;
Ⅱ使用非常简单,没有复杂的配置;
Ⅲ主从同步基于Raft协议,强一致。
注意:仲裁队列是3.8版本以后才有的新功能。
+2表示有2个镜像节点,仲裁队列默认镜像数为5,集群节点不足5则都是镜像。
@Bean注解配置
修改配置文件
以上即为Rab**tMQ-消息堆积&高可用的全部内容,感谢阅读。
三、rab**tmq保证消息不丢失
我是在网上粘贴**的。
mq原则
数据不能多,也不能少,不能多是说消息不能重复消费,这个我们上一节已解决;不能少,就是说不能丢失数据。如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的。
2.丢失数据场景
丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了。下面从rab**tmq和kafka分别说一下,丢失数据的场景,
(1)rab**tmq
A:生产者弄丢了数据生产者将数据发送到rab**tmq的时候,可能在传输过程中因为网络等问题而将数据弄丢了。
B:rab**tmq自己丢了数据如果没有开启rab**tmq的持久化,那么rab**tmq一旦重启,那么数据就丢了。所依必须开启持久化将消息持久化到磁盘,这样就算rab**tmq挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,rab**tmq还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。
C:消费端弄丢了数据主要是因为消费者消费时,刚消费到,还没有处理,结果消费者就挂了,这样你重启之后,rab**tmq就认为你已经消费过了,然后就丢了数据。
3.如何防止消息丢失
(1)rab**tmq
A:生产者丢失消息
①:可以选择使用rab**tmq提供是事物功能,就是生产者在发送数据之前开启事物,然后发送消息,如果消息没有成功被rab**tmq接收到,那么生产者会受到异常报错,这时就可以回滚事物,然后尝试重新发送;如果收到了消息,那么就可以提交事物
channel.txSelect();//开启事物try{//发送消息}catch(Exection e){channel.txRollback();//回滚事物//重新提交}**代码
缺点:rab**tmq事物已开启,就会变为同步阻塞*作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。
②:可以开启confirm模式。在生产者哪里设置开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如何写入了rab**tmq之中,rab**tmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rab**tmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。
//开启confirmchannel.confirm();//发送成功回调public void ack(String messageId){}//发送失败回调public void nack(String messageId){//重发该消息}**代码
二者不同事务机制是同步的,你提交了一个事物之后会阻塞住,但是confirm机制是异步的,发送消息之后可以接着发送下一个消息,然后rab**tmq会回调告知成功与否。一般在生产者这块避免丢失,都是用confirm机制。
B:rab**tmq自己弄丢了数据设置消息持久化到磁盘。设置持久化有两个步骤:
①创建queue的时候将其设置为持久化的,这样就可以保证rab**tmq持久化queue的元数据,但是不会持久化queue里面的数据。
②发送消息的时候讲消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时rab**tmq就会将消息持久化到磁盘上。必须要同时开启这两个才可以。
而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前rab**tmq挂了,数据丢了,生产者收不到ack回调也会进行消息重发。
C:消费者弄丢了数据使用rab**tmq提供的ack机制,首先关闭rab**tmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。