2026/4/18 9:22:34
网站建设
项目流程
化妆品网站建设可行性报告,论坛营销,ideo设计公司官网,成都装修网站制作价格这篇文章#xff0c;我们聊聊如何应对 RocketMQ 消息堆积。1 基础概念消费者在消费的过程中#xff0c;消费的速度跟不上服务端的发送速度#xff0c;未处理的消息会越来越多#xff0c;消息出现堆积进而会造成消息消费延迟。虽然笔者经常讲#xff1a;RocketMQ 、Kafka 具…这篇文章我们聊聊如何应对 RocketMQ 消息堆积。1 基础概念消费者在消费的过程中消费的速度跟不上服务端的发送速度未处理的消息会越来越多消息出现堆积进而会造成消息消费延迟。虽然笔者经常讲RocketMQ 、Kafka 具备堆积的能力但是以下场景需要重点关注消息堆积和延迟的问题业务系统上下游能力不匹配造成的持续堆积且无法自行恢复。业务系统对消息的消费实时性要求较高即使是短暂的堆积造成的消息延迟也无法接受。2 消费原理客户端使用Push 模式启动后消费消息时分为以下两个阶段阶段一拉取消息客户端通过长轮询批量拉取的方式从 Broker 服务端获取消息将拉取到的消息缓存到本地缓冲队列中。客户端批量拉取消息常见内网环境下都会有很高的吞吐量例如1个单线程单分区的低规格机器4C8GB可以达到几万 TPS 如果是多个分区可以达到几十万 TPS 。所以这一阶段一般不会成为消息堆积的瓶颈。阶段二消费消息提交消费线程客户端将本地缓存的消息提交到消费线程中使用业务消费逻辑进行处理。此时客户端的消费能力就完全依赖于业务逻辑的复杂度消费耗时和消费逻辑并发度了。如果业务处理逻辑复杂处理单条消息耗时都较长则整体的消息吞吐量肯定不会高此时就会导致客户端本地缓冲队列达到上限停止从服务端拉取消息。通过以上客户端消费原理可以看出消息堆积的主要瓶颈在于本地客户端的消费能力即消费耗时和消费并发度。想要避免和解决消息堆积问题必须合理的控制消费耗时和消息并发度其中消费耗时的优先级高于消费并发度必须先保证消费耗时的合理性再考虑消费并发度问题。3 消费瓶颈3.1 消费耗时影响消费耗时的消费逻辑主要分为 CPU 内存计算和外部 I/O 操作通常情况下代码中如果没有复杂的递归和循环的话内部计算耗时相对外部 I/O 操作来说几乎可以忽略。外部 I/O 操作通常包括如下业务逻辑读写外部数据库例如 MySQL 数据库读写。读写外部缓存等系统例如 Redis 读写。下游系统调用例如 Dubbo 调用或者下游 HTTP 接口调用。这类外部调用的逻辑和系统容量需要提前梳理掌握每个调用操作预期的耗时这样才能判断消费逻辑中I/O操作的耗时是否合理。通常消费堆积都是由于这些下游系统出现了服务异常、容量限制导致的消费耗时增加。例如某业务消费逻辑中需要调用下游 Dubbo 接口 单次消费耗时为 20 ms平时消息量小未出现异常。业务侧进行大促活动时下游 Dubbo 服务未进行优化消费单条消息的耗时增加到 200 ms业务侧可以明显感受到消费速度大幅下跌。此时通过提升消费并行度并不能解决问题需要大幅提高下游 Dubbo 服务性能才行。3.2 消费并发度绝大部分消息消费行为都属于 IO 密集型即可能是操作数据库或者调用 RPC这类消费行为的消费速度在于后端数据库或者外系统的吞吐量通过增加消费并行度可以提高总的消费吞吐量但是并行度增加到一定程度反而会下降。所以应用必须要设置合理的并行度。如下有几种修改消费并行度的方法同一个 ConsumerGroup 下通过增加 Consumer 实例数量来提高并行度需要注意的是超过订阅队列数的 Consumer 实例无效。可以通过加机器或者在已有机器启动多个进程的方式。提高单个 Consumer 实例的消费并行线程通过修改参数 consumeThreadMin、consumeThreadMax 实现。4 解决策略当面对消息堆积问题时我们需要明确到底哪个环节出现问题了不要慌张也不要贸然动手。4.1 确认消息的消费耗时是否合理首先我们需要查看消费耗时确认消息的消费耗时是否合理。查看消费耗时一般来讲有两种方式1、打印日志public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt messageExt : msgs) { long start System.currentTimeMillis(); // TODO 业务逻辑 logger.info(MessageId: messageExt.getMsgId() costTime: (System.currentTimeMillis() - start)); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { logger.error(consumeMessage error:, e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }2、查看消息轨迹当确定好消费耗时后可以根据耗时大小采取不同的措施。若查看到消费耗时较长则需要查看客户端 JVM 堆栈信息排查具体业务逻辑并优化消费逻辑。若查看到消费耗时正常则有可能是因为消费并发度不够导致消息堆积需要逐步调大消费线程或扩容节点来解决。4.2 查看客户端 JVM 的堆栈假如消费耗时非常高需要查看 Consumer 实例 JVM 的堆栈 。通过jps -m或者ps -ef | grep java命令获取当前正在运行的 Java 程序通过启动主类即可获得应用的进程 pid ;通过jstack pid stack.log命令获取线程的堆栈。执行以下命令查看ConsumeMessageThread的信息 。cat stack.log | grep ConsumeMessageThread -A 10 --color常见的异常堆栈信息如下示例1空闲无堆积的堆栈。消费空闲情况下消费线程都会处于WAITING状态等待从消费任务队里中获取消息。示例2消费逻辑有抢锁休眠等待等情况。消费线程阻塞在内部的一个睡眠等待上导致消费缓慢。示例3消费逻辑操作数据库等外部存储卡住。消费线程阻塞在外部的 HTTP 调用上导致消费缓慢。5 总结客户端使用Push模式启动后消费消息时分为以下两个阶段拉取消息和消费消息。客户端消费原理可以看出消息堆积的主要瓶颈在于本地客户端的消费能力即消费耗时和消费并发度。首先分析消费耗时然后根据耗时大小采取不同的措施。若查看到消费耗时较长则查看客户端堆栈信息排查具体业务逻辑并优化消费逻辑。若查看到消费耗时正常则有可能是因为消费并发度不够导致消息堆积需要逐步调大消费线程或扩容节点来解决。