Spring-Kafka @KafkaListener and errorHandler set up monitoring exception handling

2021/07/2608:08:42 technology 1402

Apache Kafka is a message queue product, based on the design of Topic partitions, which can achieve very high message sending and processing performance. Spring created a project Spring -kafka, which encapsulates Apache Kafka-client, which is used to quickly integrate kafka in the Spring project. In addition to simply sending and receiving messages, Spring-kafka also provides many advanced features.

Spring-Kafka @KafkaListener and errorHandler set up monitoring exception handling - DayDayNews


1. Simple integration

First need to introduce dependencies:

   org.springframework.kafka -Idfka 2.2.6.RELEASE  

Then, add the configuration:

  spring.kafka.producer.bootstrap-servers=127.0.0.1:9092  

h113p h113p h113p The use of @KafkaListener

I won’t go into details about the ability of @KafkaListener to receive messages, but the function of @KafkaListener is more than that. Others are more common.The features with more usage scenarios are as follows:

  • explicitly specify which topics and partition messages are consumed;
  • sets the offset of each topic and partition initialization;
  • sets the concurrency of consumption threads;
  • sets messages Exception handler;

Use errorHandler for exception handling, first implement the KafkaListenerErrorHandler interface, and then do some exception handling, such as logging and so on. And the configuration in the annotation is the Name of your custom implementation instance in the Spring context.

  @Componentpublic class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaListenerErrorHandler.class); @Override public Object handleError(Message message, ListenerExecutionFailedException exception) {LOGGER.error("Message:" + message.toString() + "\n" + "Exception:" + exception.getMessage()); return null; }}  

For example, when calling, fill in bean Name,For example, errorHandler="kafkaDefaultListenerErrorHandler", the specific sample code is as follows:

  @Servicepublic class CargoBookedEventHandler {private static final Logger LOGGER = LoggerFactory.getLogger(CargoBookedEventHandler.class); @KafkaListener(topics = DomainEventConstant.CARGO_BOOKED_EVHandler = "DomainEventConstant" ")public void receiveEvent(ConsumerRecord consumerRecord) {CargoBookedEvent routedEvent = (CargoBookedEvent)consumerRecord.value();LOGGER.info("Recevied key='{}', message='{}'", consumerRecord. key(), consumerRecord.value()); }}  

The following is an error message intentionally sent in Topic, which does not conform to the JSON format parsed by the consumer, as shown in the figure:

Spring-Kafka @KafkaListener and errorHandler set up monitoring exception handling - DayDayNews



The following is the handling of error messages by errorHandler,Output exception log:

2021-07-19 11:08:45.991 ERROR 29308 --- [ntainer#0-1-C-1] crteMyKafkaListenerErrorHandler: Message: GenericMessage [payload=hello world, headers={kafka_offset= 4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@e0bcdd6, kafka_timestampType=CREATE_TIME, type=[B@475a94e, kafka_receivedMessageKey=null, kafka_receivedPartitionId=1_TOampka_receivedPartitionId=1, kafka_receivedPartitionId=1_kafka_receivedPartitionId=1_d626_receivedKafka_receivedPartitionId=1_kafka_receivedPartitionId=1_d626p969_received_p969 Listener method'public void com.rickie.tracking.eventhandler.CargoBookedEventHandler.receiveEvent(org.apache.kafka.clients.consumer.ConsumerRecord)' threw exception; nested exception is java .lang.ClassCastException: java.lang.String cannot be cast to com.rickie.dto.event.CargoBookedEvent

2021-07-19 11:08:50.872 ERROR 29308 --- [ntainer#0-0-C-1 ] crteMyKafkaListenerErrorHandler: Message: GenericMessage [payload=hello world, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.co nsumer.KafkaConsumer@67ecd40c, kafka_timestampType=CREATE_TIME, type=[B@4d795117, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=CARGO_BOOKED_Timestamp_TOPIC, kafka=1626 commendation method of peculiar peculiarty in CARGO_BOOKED_EVENT_TOPIC, kafka 1626. .CargoBookedEventHandler.receiveEvent(org.apache.kafka.clients.consumer.ConsumerRecord)' threw exception; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.rickie.dto.event.CargoBookedEvent

Spring-Kafka @KafkaListener and errorHandler set up monitoring exception handling - DayDayNews

Kafka technical column "Kafka v2.3 Quick Start and Practice",Starting from actual combat, this column allows beginners to quickly master Kafka-related technical points and invest in actual project development through zero-based entry-environment construction-project case combat.

(The circle card has been added here, please check it in today's headline client)

.

technology Category Latest News