The system runs on a proprietary cloud, the application runtime environment is EDAS Container, the message processing uses [ons SDK], the message consumers use [PUSH] to consume [normal messages], and the MessageModel is [CLUSTERING].

2025/06/2823:05:36 technology 1434

background introduction

system runs on the proprietary cloud, and the application runtime environment is EDAS Container (EDAS Container is the basic container for the operation of HSF applications on the EDAS platform. EDAS Container contains Ali-Tomcat and Pandora). Message processing uses [ons SDK], message consumers use [PUSH] to consume [normal messages], and MessageModel is [CLUSTERING].
In order to solve a performance problem of RocketMQ Producer, Pandora was upgraded (mainly to upgrade RocketMQ version).
summarizes the problems encountered in the upgrade and the analysis process from a technical perspective, and accumulates experience to avoid the occurrence of similar problems.

Problem Description

Pandora Upgrade is completed, we saw in RocketMQ console [Consumer Status] - [Real-time Message Stacking] 800 million, and each Consumer instance stacking is dozens, as shown in Figure 1:

The system runs on a proprietary cloud, the application runtime environment is EDAS Container, the message processing uses [ons SDK], the message consumers use [PUSH] to consume [normal messages], and the MessageModel is [CLUSTERING]. - DayDayNews

Figure 1

in [Consumer Status] - [Connection Details] There is a situation where message consumption fails, as shown in Figure 2:

The system runs on a proprietary cloud, the application runtime environment is EDAS Container, the message processing uses [ons SDK], the message consumers use [PUSH] to consume [normal messages], and the MessageModel is [CLUSTERING]. - DayDayNews

Figure 2

can also view the metric information of message consumption in real time on the application server ons.log, as shown in Figure 3:

The system runs on a proprietary cloud, the application runtime environment is EDAS Container, the message processing uses [ons SDK], the message consumers use [PUSH] to consume [normal messages], and the MessageModel is [CLUSTERING]. - DayDayNews

Figure 3

The implementation of statistical indicators in this part of can be viewed: org.apache.rocketMQ.client. stat.ConsumerStatsManager

Analysis process

According to our previous articles on MQ message accumulation, we can know:

  1. When the total message accumulation is consistent with the message accumulation of Consumer instance, it is usually caused by the weak consumer consumption capacity, see:

The system runs on a proprietary cloud, the application runtime environment is EDAS Container, the message processing uses [ons SDK], the message consumers use [PUSH] to consume [normal messages], and the MessageModel is [CLUSTERING]. - DayDayNews

Figure 4

For easy understanding, we use the flow chart to express the main logic of the code in Figure 4 below, see Figure 5:

The system runs on a proprietary cloud, the application runtime environment is EDAS Container, the message processing uses [ons SDK], the message consumers use [PUSH] to consume [normal messages], and the MessageModel is [CLUSTERING]. - DayDayNews

Figure 5

Analyzing the above process and code, and found that the ackIndex variable of the ConsumeConcurrentlyContext class is the core variable for analyzing the success and failure of the message.

Is the business handling exception? When the following situation occurs in the business processing class, the

RocketMQ framework believes that the message consumption fails:

  1. Business processing class returns ConsumeConcurrentlyStatus.RECONSUME_LATER
  2. Business processing class returns null
  3. Business processing class throws exception

Through the business processing class log, it can be determined that the business has not returned ConsumeConcurrentlyStatus.RECONSUME_LATER;
From the code, it can be seen that when the 2 or 3 situations occur, the framework will print the warning log to ons.log, and filter "consumeMessage" in ons.log The keywords exception" and "consumeMessage return null" do not have corresponding log records, so it is not caused by these two situations. Note: When 2 or 3 situations occur, the thread stack information is not printed in the ons.log log. If you want to specifically locate the location where the exception occurs, you can use the arthas stack command to analyze it.

arthas watch processConsumeResult

Since sending a failed message to the Broker retry queue is called in the processConsumeResult method, we can analyze the entry parameters and return value of the method.

watch com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService processConsumeResult "{params,returnObj}" "target.consumeGroup=='GID_CID_XXX'" -x 3 -n3

watch Normal machine

The system runs on a proprietary cloud, the application runtime environment is EDAS Container, the message processing uses [ons SDK], the message consumers use [PUSH] to consume [normal messages], and the MessageModel is [CLUSTERING]. - DayDayNews

Figure 6

watch Exception machine

The system runs on a proprietary cloud, the application runtime environment is EDAS Container, the message processing uses [ons SDK], the message consumers use [PUSH] to consume [normal messages], and the MessageModel is [CLUSTERING]. - DayDayNews

Figure 7

Through the above watch, we found the most critical part of the problem. We use the following scenario to analyze the impact of different values ​​of ackIndex. scene 1

  1. business processing class batch consumption [8 pieces of data], consumption is successful and returned: CONSUME_SUCCESS
  2. ackIndex=Integer.MAX_VALUE
  3. RocketMQ framework analysis consumption successfully [8] pieces, failed [0] pieces
  4. Because all consumption is successful, the message will not be sent to the Broker retry queue

scene 2

  1. business processing class batch consumption [8 pieces of data, and the consumption was successful and returned: CONSUME_SUCCESS
  2. ackIndex=0
  3. RocketMQ framework analysis consumption successfully [1]
  4. RocketMQ framework failed [1]
  5. Because there were [7] till consumption failed, the [7] till consumption failed message will be sent to the Broker retry queue

arthas watch setAckIndex

Since there is a place to modify ackIndex, first verify whether our judgment is correct.

watch com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext setAckIndex "{params,returnObj}" "params[0]==0"

After observation, there is indeed some place that is constantly changing the value of ackIndex to 0.

arthas stack setAckIndex

We continue to locate where to modify ackIndex to 0.

stack com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext setAckIndex "{params,returnObj}" "params[0]==0"

The system runs on a proprietary cloud, the application runtime environment is EDAS Container, the message processing uses [ons SDK], the message consumers use [PUSH] to consume [normal messages], and the MessageModel is [CLUSTERING]. - DayDayNews

Figure 8

Through the thread stack, we can see that the BatchConsumerImpl class calls the ConsumeConcurrentlyContext.setAckIndex method.

arthas jad BatchConsumerImpl

Without source code, we can use arthas jad to decompile the class.

jad com.aliyun.openservices.ons.api.impl.rocketmq.BatchConsumerImpl

The system runs on a proprietary cloud, the application runtime environment is EDAS Container, the message processing uses [ons SDK], the message consumers use [PUSH] to consume [normal messages], and the MessageModel is [CLUSTERING]. - DayDayNews

Figure 9

ConsumeContext class instance field acknowledgeIndex What is the default value of? If it is 0, the cause of the problem will be found.

athas jad ConsumeContexttml1

Without source code, we can use arthas jad to decompile the class.

jad com.aliyun.openservices.ons.api.ConsumeContexttml22

The system runs on a proprietary cloud, the application runtime environment is EDAS Container, the message processing uses [ons SDK], the message consumers use [PUSH] to consume [normal messages], and the MessageModel is [CLUSTERING]. - DayDayNews

Figure 10

From the above code, it can be seen that the default value of the ConsumeContext class instance field acknowledgeIndex is 0.

ProcessQueue

Through the above analysis, we have located the problem. ProcessQueue will give a brief description, but will not make specific analysis.

The system runs on a proprietary cloud, the application runtime environment is EDAS Container, the message processing uses [ons SDK], the message consumers use [PUSH] to consume [normal messages], and the MessageModel is [CLUSTERING]. - DayDayNews

Figure 11

Solution

According to the above analysis, this problem belongs to a bug in the RocketMQ ons SDK. Please leave the fix to the corresponding production and research team to fix it.

Experience summary

1-5-10, 1 minute discovery, 5 minute location, 10 minute recovery.
When a fault occurs, it is necessary to discover it in the shortest time (whether the monitoring alarm is done well), and to recover as quickly as possible (whether the change management and plan is done well), [5] does not seem to be the most important.

technology Category Latest News