Spring-Kafka @KafkaListener和errorHandler設置監聽異常處理

2021年07月26日08:08:42 科技 1409

Apache Kafka是一個消息隊列產品,基於Topic partitions的設計,能達到非常高的消息發送處理性能。Spring創建了一個項目Spring-kafka,封裝了Apache Kafka-client,用於在Spring項目里快速集成kafka。除了簡單的收發消息外,Spring-kafka還提供了很多高級功能。

Spring-Kafka @KafkaListener和errorHandler設置監聽異常處理 - 天天要聞


一、簡單集成

首先需要引入依賴:

<dependency>

 <groupId>org.springframework.kafka</groupId>

 <artifactId>spring-kafka</artifactId>

 <version>2.2.6.RELEASE</version>

</dependency>

然後,添加配置:

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

二、@KafkaListener的使用

關於@KafkaListener接收消息的能力就不贅述了,但是@KafkaListener的功能不止如此,其他的比較常見的,使用場景比較多的功能點如下:

  • 顯式的指定消費哪些Topic和分區的消息;
  • 設置每個Topic以及分區初始化的偏移量;
  • 設置消費線程並發度;
  • 設置消息異常處理器;

使用errorHandler做異常處理,首先實現KafkaListenerErrorHandler介面,然後做一些異常處理,比如記錄日誌等等。而且註解里的配置,是你自定義實現實例在Spring上下文中的Name。

@Component

public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {

 private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaListenerErrorHandler.class);

 @Override

 public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {

 LOGGER.error("消息:" + message.toString() + "\n" + "異常:" + exception.getMessage());

 return null;

 }

}

例如,調用的時候,填寫beanName,例如errorHandler=「kafkaDefaultListenerErrorHandler」,具體示例代碼如下所示:

@Service

public class CargoBookedEventHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(CargoBookedEventHandler.class);

@KafkaListener(topics = DomainEventConstant.CARGO_BOOKED_EVENT_TOPIC, errorHandler = "myKafkaListenerErrorHandler")

public void receiveEvent(ConsumerRecord<String, Object> consumerRecord) {

CargoBookedEvent routedEvent = (CargoBookedEvent)consumerRecord.value();

LOGGER.info("Recevied key='{}', message='{}'", consumerRecord.key(), consumerRecord.value());

 }

}

下面是有意在Topic中發送的錯誤消息,不符合消費方consumer解析的JSON格式,如圖所示:

Spring-Kafka @KafkaListener和errorHandler設置監聽異常處理 - 天天要聞



下面是errorHandler針對錯誤消息的處理,輸出的異常日誌:

2021-07-19 11:08:45.991 ERROR 29308 --- [ntainer#0-1-C-1] c.r.t.e.MyKafkaListenerErrorHandler : 消息:GenericMessage [payload=hello world, headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@e0bcdd6, kafka_timestampType=CREATE_TIME, type=[B@475a94e, kafka_receivedMessageKey=null, kafka_receivedPartitionId=1, kafka_receivedTopic=CARGO_BOOKED_EVENT_TOPIC, kafka_receivedTimestamp=1626664125969}]

異常:Listener method 'public void com.rickie.tracking.eventhandler.CargoBookedEventHandler.receiveEvent(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)' threw exception; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.rickie.dto.event.CargoBookedEvent

2021-07-19 11:08:50.872 ERROR 29308 --- [ntainer#0-0-C-1] c.r.t.e.MyKafkaListenerErrorHandler : 消息:GenericMessage [payload=hello world, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@67ecd40c, kafka_timestampType=CREATE_TIME, type=[B@4d795117, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=CARGO_BOOKED_EVENT_TOPIC, kafka_receivedTimestamp=1626664130870}]

異常:Listener method 'public void com.rickie.tracking.eventhandler.CargoBookedEventHandler.receiveEvent(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)' threw exception; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.rickie.dto.event.CargoBookedEvent

Spring-Kafka @KafkaListener和errorHandler設置監聽異常處理 - 天天要聞

Kafka技術專欄《Kafka v2.3快速入門與實踐》,該專欄從實戰出發,通過零基礎入門-環境搭建-項目案例實戰,讓初學者快速掌握Kafka相關技術要點並投入實際項目開發。

(此處已添加圈子卡片,請到今日頭條客戶端查看)

科技分類資訊推薦

長安與東風重組新進展:朱華榮稱不會改變長安既定戰略 - 天天要聞

長安與東風重組新進展:朱華榮稱不會改變長安既定戰略

2月9日,長安汽車和東風集團股份(00489.HK)同步發布了控股股東「正在與其他國資央企集團籌劃重組事項」的信息。長安汽車的控股股東是兵裝集團,而東風集團股份的控股股東是東風公司。隨即,長安汽車和東風集團這兩家汽車央企將合併重組,成為業內關注的焦點。
公安部出手了!年齡限制放寬10年、送考下鄉,2025年考駕照不難了 - 天天要聞

公安部出手了!年齡限制放寬10年、送考下鄉,2025年考駕照不難了

電動車加強管理以後,要求機動車類型的車輛需要持證上路,但是老年人考駕照卻受阻,一方面有年齡的限制,另一方面偏遠山區考駕照不方便,所以在2025年公安部出手了,年齡限制放寬10年,同時推出送考下鄉服務,還進一步的降低考駕照的費用,2025年起考摩托車駕照不難了。
從「星靈安全守護體系」到昊鉑HL,看懂廣汽科技日 - 天天要聞

從「星靈安全守護體系」到昊鉑HL,看懂廣汽科技日

發布會以技術切入,並全程圍繞安全展開。廣汽集團董事長、總經理馮興亞率先登場,宣布2025年四季度將正式上市支持L3級智能駕駛的車型,他同時強調面向自動駕駛時代對智能駕駛技術、整車安全架構以及突發風險處理能力的要求更高。如何才能滿足更高的要求?馮興亞提到了「廣汽
關稅大棒下,最受傷的車企出現了 - 天天要聞

關稅大棒下,最受傷的車企出現了

特朗普的關稅大棒剛揮出,尚未嚇退「外敵」,卻先刺痛了自己。近日,擁有瑪莎拉蒂、Jeep等14個品牌的全球第四大車企斯泰蘭蒂斯突然宣布裁撤900名美國工人,關閉加拿大和墨西哥兩家工廠,北美生產線陷入癱瘓。幾乎同一時間,捷豹路虎宣布暫停對美出口一個月,奧迪更是直接