背景介紹
系統運行在專有雲,應用運行時環境是EDAS Container( EDAS Container是EDAS 平台 HSF 應用運行的基礎容器,EDAS Container 包含 Ali-Tomcat 和 Pandora),消息處理使用的是【ons SDK】,消息消費者使用【PUSH】方式【批量】消費【普通消息】,MessageModel是【CLUSTERING】。
為了解決RocketMQ Producer某個性能問題,對Pandora進行了升級(主要是升級RocketMQ版本)。
下面從技術角度對升級中遇到的問題及分析過程進行總結,積累經驗以避免類似問題的發生。
問題描述
Pandora升級完成後,我們在RocketMQ控制台看到【消費者狀態】->【實時消息堆積量】有8億條,而每個Consumer實例堆積量是幾十條,如圖1:
圖1
在【消費者狀態】->【連接詳情】有消息消費失敗的情況,如圖2:
圖2
在應用服務器ons.log也可以實時查看消息消費的指標信息,如圖3:
圖3
這部分的統計指標的實現可以查看:org.apache.rocketMQ.client.stat.ConsumerStatsManager
分析過程
根據我們前面幾篇關於MQ消息堆積的文章,可以知道:
- 消息堆積總量與Consumer實例消息堆積量相符的情況下,通常是Consumer消費能力弱導致堆積,詳情見:
圖4
為了便於理解,我們使用流程圖來表達下圖4中代碼主要邏輯,見圖5:
圖5
分析上面流程及代碼,發現ConsumeConcurrentlyContext類的ackIndex變量是分析消息成功與失敗的核心變量。
是否業務處理異常?
RocketMQ框架在業務處理類出現下面情況的時候,認為消息消費失敗:
- 業務處理類返回ConsumeConcurrentlyStatus.RECONSUME_LATER
- 業務處理類返回null
- 業務處理類拋出異常
通過業務處理類日誌可以確定業務沒有返回ConsumeConcurrentlyStatus.RECONSUME_LATER的情況;
從代碼可以看出,當出現2、3情況的時候,框架會將warn日誌打印到ons.log中,通過過濾ons.log中「consumeMessage exception」和「consumeMessage return null」關鍵詞,沒有相應的日誌記錄,所以也不是這兩種情況造成的。
備註:
當出現2、3情況的時候,ons.log日誌中並沒有打印出線程棧信息,如果想具體定位異常產生的位置,可以通過arthas stack命令進行分析。
arthas watch processConsumeResult
既然發送失敗消息到Broker重試隊列是在processConsumeResult方法調用的,那麼我們可以分析下該方法的入參及返回值情況。
watch com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
processConsumeResult "{params,returnObj}" "target.consumeGroup=='GID_CID_XXX'" -x 3 -n3
watch正常機器
圖6
watch異常機器
圖7
通過上面的watch,我們找到了問題最關鍵的地方,我們用下面的場景來分析下ackIndex不同值的影響。
場景一
- 業務處理類批量消費了【8】條數據,消費成功返回:CONSUME_SUCCESS
- ackIndex=Integer.MAX_VALUE
- RocketMQ框架分析消費成功了【8】條,失敗【0】條
- 因為都消費成功了,不會將消息發送到Broker重試隊列中
場景二
- 業務處理類批量消費了【8】條數據,消費成功返回:CONSUME_SUCCESS
- ackIndex=0
- RocketMQ框架分析消費成功了【1】條,失敗【7】條
- 因為有【7】條消費失敗,所以會將【7】條消費失敗的消息發送到Broker重試隊列中
arthas watch setAckIndex
既然有地方在修改ackIndex,先驗證下我們的判斷是否正確。
watch com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext setAckIndex "{params,returnObj}" "params[0]==0"
通過觀察,確實有地方在不斷將ackIndex的值修改為0。
arthas stack setAckIndex
我們繼續定位是什麼地方將ackIndex修改為0的。
stack com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext setAckIndex "{params,returnObj}" "params[0]==0"
圖8
通過線程棧可知BatchConsumerImpl類調用了ConsumeConcurrentlyContext.setAckIndex方法。
arthas jad BatchConsumerImpl
沒有源碼的情況下,我們可以使用arthas jad對類進行反編譯。
jad com.aliyun.openservices.ons.api.impl.rocketmq.BatchConsumerImpl
圖9
ConsumeContext類實例字段acknowledgeIndex默認值是多少呢?如果是0,問題的原因就找到了。
athas jad ConsumeContext
沒有源碼的情況下,我們可以使用arthas jad對類進行反編譯。
jad com.aliyun.openservices.ons.api.ConsumeContext
圖10
通過上面代碼可以看出,ConsumeContext類實例字段acknowledgeIndex的默認值是0。
ProcessQueue
通過上面的分析,我們已經定位到了問題,ProcessQueue做下簡單描述,不做具體分析了。
圖11
解決辦法
由上面的分析,這個問題屬於RocketMQ ons SDK的一個Bug,修復就交給相應的產研團隊來fix吧。
經驗總結
1-5-10,1分鐘發現,5分鐘定位,10分鐘恢復。
當故障發生的時候,需要【1】最短時間內發現(監控報警是否做好),需要【10】最快的速度恢復(變更管理和預案是否做好),【5】似乎不是最主要的。