一次RocketMQ ons SDK Bug導致消息不斷堆積到重試隊列的案例分析

背景介紹

系統運行在專有雲,應用運行時環境是EDAS Container( EDAS Container是EDAS 平台 HSF 應用運行的基礎容器,EDAS Container 包含 Ali-Tomcat 和 Pandora),消息處理使用的是【ons SDK】,消息消費者使用【PUSH】方式【批量】消費【普通消息】,MessageModel是【CLUSTERING】。
為了解決RocketMQ Producer某個性能問題,對Pandora進行了升級(主要是升級RocketMQ版本)。
下面從技術角度對升級中遇到的問題及分析過程進行總結,積累經驗以避免類似問題的發生。

問題描述

Pandora升級完成後,我們在RocketMQ控制台看到【消費者狀態】->【實時消息堆積量】有8億條,而每個Consumer實例堆積量是幾十條,如圖1:

圖1

在【消費者狀態】->【連接詳情】有消息消費失敗的情況,如圖2:

圖2

在應用伺服器ons.log也可以實時查看消息消費的指標信息,如圖3:

圖3

這部分的統計指標的實現可以查看:org.apache.rocketMQ.client.stat.ConsumerStatsManager

分析過程

根據我們前面幾篇關於MQ消息堆積的文章,可以知道:

  1. 消息堆積總量與Consumer實例消息堆積量相符的情況下,通常是Consumer消費能力弱導致堆積,詳情見:

圖4

為了便於理解,我們使用流程圖來表達下圖4中代碼主要邏輯,見圖5:

圖5

分析上面流程及代碼,發現ConsumeConcurrentlyContext類的ackIndex變數是分析消息成功與失敗的核心變數。

是否業務處理異常?

RocketMQ框架在業務處理類出現下面情況的時候,認為消息消費失敗:

  1. 業務處理類返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  2. 業務處理類返回null
  3. 業務處理類拋出異常

通過業務處理類日誌可以確定業務沒有返回ConsumeConcurrentlyStatus.RECONSUME_LATER的情況;
從代碼可以看出,當出現2、3情況的時候,框架會將warn日誌列印到ons.log中,通過過濾ons.log中「consumeMessage exception」和「consumeMessage return null」關鍵詞,沒有相應的日誌記錄,所以也不是這兩種情況造成的。
備註:
當出現2、3情況的時候,ons.log日誌中並沒有列印出線程棧信息,如果想具體定位異常產生的位置,可以通過arthas stack命令進行分析。

arthas watch processConsumeResult

既然發送失敗消息到Broker重試隊列是在processConsumeResult方法調用的,那麼我們可以分析下該方法的入參及返回值情況。

watch com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService 
processConsumeResult "{params,returnObj}" "target.consumeGroup=='GID_CID_XXX'" -x 3 -n3

watch正常機器

圖6

watch異常機器

圖7

通過上面的watch,我們找到了問題最關鍵的地方,我們用下面的場景來分析下ackIndex不同值的影響。
場景一

  1. 業務處理類批量消費了【8】條數據,消費成功返回:CONSUME_SUCCESS
  2. ackIndex=Integer.MAX_VALUE
  3. RocketMQ框架分析消費成功了【8】條,失敗【0】條
  4. 因為都消費成功了,不會將消息發送到Broker重試隊列中

場景二

  1. 業務處理類批量消費了【8】條數據,消費成功返回:CONSUME_SUCCESS
  2. ackIndex=0
  3. RocketMQ框架分析消費成功了【1】條,失敗【7】條
  4. 因為有【7】條消費失敗,所以會將【7】條消費失敗的消息發送到Broker重試隊列中

arthas watch setAckIndex

既然有地方在修改ackIndex,先驗證下我們的判斷是否正確。

watch com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext setAckIndex "{params,returnObj}" "params[0]==0"

通過觀察,確實有地方在不斷將ackIndex的值修改為0。

arthas stack setAckIndex

我們繼續定位是什麼地方將ackIndex修改為0的。

stack com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext setAckIndex "{params,returnObj}" "params[0]==0"

圖8

通過線程棧可知BatchConsumerImpl類調用了ConsumeConcurrentlyContext.setAckIndex方法。

arthas jad BatchConsumerImpl

沒有源碼的情況下,我們可以使用arthas jad對類進行反編譯。

jad com.aliyun.openservices.ons.api.impl.rocketmq.BatchConsumerImpl

圖9

ConsumeContext類實例欄位acknowledgeIndex默認值是多少呢?如果是0,問題的原因就找到了。

athas jad ConsumeContext

沒有源碼的情況下,我們可以使用arthas jad對類進行反編譯。

jad com.aliyun.openservices.ons.api.ConsumeContext

圖10

通過上面代碼可以看出,ConsumeContext類實例欄位acknowledgeIndex的默認值是0。

ProcessQueue

通過上面的分析,我們已經定位到了問題,ProcessQueue做下簡單描述,不做具體分析了。

圖11

解決辦法

由上面的分析,這個問題屬於RocketMQ ons SDK的一個Bug,修復就交給相應的產研團隊來fix吧。

經驗總結

1-5-10,1分鐘發現,5分鐘定位,10分鐘恢復。
當故障發生的時候,需要【1】最短時間內發現(監控報警是否做好),需要【10】最快的速度恢復(變更管理和預案是否做好),【5】似乎不是最主要的。