Spring-Kafka @KafkaListener和errorHandler設置監聽異常處理

2021年07月26日08:08:42 科技 1409

Apache Kafka是一個消息隊列產品,基於Topic partitions的設計,能達到非常高的消息發送處理性能。Spring創建了一個項目Spring-kafka,封裝了Apache Kafka-client,用於在Spring項目里快速集成kafka。除了簡單的收發消息外,Spring-kafka還提供了很多高級功能。

Spring-Kafka @KafkaListener和errorHandler設置監聽異常處理 - 天天要聞


一、簡單集成

首先需要引入依賴:

<dependency>

 <groupId>org.springframework.kafka</groupId>

 <artifactId>spring-kafka</artifactId>

 <version>2.2.6.RELEASE</version>

</dependency>

然後,添加配置:

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

二、@KafkaListener的使用

關於@KafkaListener接收消息的能力就不贅述了,但是@KafkaListener的功能不止如此,其他的比較常見的,使用場景比較多的功能點如下:

  • 顯式的指定消費哪些Topic和分區的消息;
  • 設置每個Topic以及分區初始化的偏移量;
  • 設置消費線程並發度;
  • 設置消息異常處理器;

使用errorHandler做異常處理,首先實現KafkaListenerErrorHandler接口,然後做一些異常處理,比如記錄日誌等等。而且註解里的配置,是你自定義實現實例在Spring上下文中的Name。

@Component

public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {

 private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaListenerErrorHandler.class);

 @Override

 public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {

 LOGGER.error("消息:" + message.toString() + "\n" + "異常:" + exception.getMessage());

 return null;

 }

}

例如,調用的時候,填寫beanName,例如errorHandler=「kafkaDefaultListenerErrorHandler」,具體示例代碼如下所示:

@Service

public class CargoBookedEventHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(CargoBookedEventHandler.class);

@KafkaListener(topics = DomainEventConstant.CARGO_BOOKED_EVENT_TOPIC, errorHandler = "myKafkaListenerErrorHandler")

public void receiveEvent(ConsumerRecord<String, Object> consumerRecord) {

CargoBookedEvent routedEvent = (CargoBookedEvent)consumerRecord.value();

LOGGER.info("Recevied key='{}', message='{}'", consumerRecord.key(), consumerRecord.value());

 }

}

下面是有意在Topic中發送的錯誤消息,不符合消費方consumer解析的JSON格式,如圖所示:

Spring-Kafka @KafkaListener和errorHandler設置監聽異常處理 - 天天要聞



下面是errorHandler針對錯誤消息的處理,輸出的異常日誌:

2021-07-19 11:08:45.991 ERROR 29308 --- [ntainer#0-1-C-1] c.r.t.e.MyKafkaListenerErrorHandler : 消息:GenericMessage [payload=hello world, headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@e0bcdd6, kafka_timestampType=CREATE_TIME, type=[B@475a94e, kafka_receivedMessageKey=null, kafka_receivedPartitionId=1, kafka_receivedTopic=CARGO_BOOKED_EVENT_TOPIC, kafka_receivedTimestamp=1626664125969}]

異常:Listener method 'public void com.rickie.tracking.eventhandler.CargoBookedEventHandler.receiveEvent(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)' threw exception; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.rickie.dto.event.CargoBookedEvent

2021-07-19 11:08:50.872 ERROR 29308 --- [ntainer#0-0-C-1] c.r.t.e.MyKafkaListenerErrorHandler : 消息:GenericMessage [payload=hello world, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@67ecd40c, kafka_timestampType=CREATE_TIME, type=[B@4d795117, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=CARGO_BOOKED_EVENT_TOPIC, kafka_receivedTimestamp=1626664130870}]

異常:Listener method 'public void com.rickie.tracking.eventhandler.CargoBookedEventHandler.receiveEvent(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)' threw exception; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.rickie.dto.event.CargoBookedEvent

Spring-Kafka @KafkaListener和errorHandler設置監聽異常處理 - 天天要聞

Kafka技術專欄《Kafka v2.3快速入門與實踐》,該專欄從實戰出發,通過零基礎入門-環境搭建-項目案例實戰,讓初學者快速掌握Kafka相關技術要點並投入實際項目開發。

(此處已添加圈子卡片,請到今日頭條客戶端查看)

科技分類資訊推薦

全球媒體聚焦丨79%全球專利+80%市場份額!外媒從一場救援看中國無人機產業實力 - 天天要聞

全球媒體聚焦丨79%全球專利+80%市場份額!外媒從一場救援看中國無人機產業實力

近日,一段中國無人機在洪水中成功營救被困人員的短視頻在海外社交平台廣泛傳播,多家國際媒體也競相報道,並深入探討中國無人機產業技術發展與創新應用。 《紐約時報》網站截圖 據了解,這段短視頻中的救援發生在廣西柳州三江侗族自治縣一村莊。受上游來水影響,這個村子裏一些處於低洼地帶的房屋被淹。由於水流上漲快,一...
博士天團攻堅激光芯片,拿到3個億融資 - 天天要聞

博士天團攻堅激光芯片,拿到3個億融資

記者|鄢子為編輯|陳曉平7月1日,北京颶芯科技對外官宣,完成3億元B輪融資。颶芯成立於2017年7月,核心團隊由多名經驗豐富的博士組成,主攻氮化鎵激光芯片產業化,實現關鍵核心器件的自主可控。本輪融資,颶芯獲得國家基金、半導體產業方和一線投資機構的認可。3億融資由深創投製造業轉型升級新材料基金(國家製造業轉型升...
臻寶科技科創板IPO獲受理 系半導體零部件製造商 大基金二期等參投 - 天天要聞

臻寶科技科創板IPO獲受理 系半導體零部件製造商 大基金二期等參投

《科創板日報》7月2日訊(記者 黃修眉 實習記者 戴嘉怡) 重慶臻寶科技股份有限公司(下稱「臻寶科技」)科創板IPO申請近日獲上交所受理,輔導機構為中信證券。臻寶科技是國內少數實現集成電路先進制程設備和高世代、高電壓顯示面板製造設備非金屬零部件多品類供應、規模化量產的企業之一。此次IPO,臻寶科技擬募資13.98億...
BW2025即將開展,技嘉AORUS雕妹約你3H|3A08 雕宅見 - 天天要聞

BW2025即將開展,技嘉AORUS雕妹約你3H|3A08 雕宅見

史上規模空前的BilibiliWorld2025將於2025年7月11日-13日在上海國家會展中心開展!知名電競硬件品牌技嘉AORUS已確認參展,為玩家打造遊戲盛宴。現場不僅能體驗新款硬核電競裝備、暢玩熱門遊戲大作,參與激烈的1V1對戰PK,更有甜辣萌趣的雕妹喊你3H|3A08等你來!多重互動火力全開,帶你玩轉整個BW,開啟今夏最燃電競狂歡。...
35項服務可跨境辦理,「澳政易」自助服務機上線珠海市民服務中心 - 天天要聞

35項服務可跨境辦理,「澳政易」自助服務機上線珠海市民服務中心

「十幾分鐘就辦完了,現場的協助人員指導我操作,太方便了!」7月1日上午,澳門居民梁女士來到珠海市民服務中心1號樓3樓的綜合服務廳辦理業務,在工作人員的幫助下,她在港澳跨境服務自助辦理區的「澳政易」自助服務機上很快就辦完了身份證明業務。6月30日,廣州、珠海、中山、江門四個大灣區城市的政務服務中心正式啟用了...
65億美元芯片收購案,遭美國二次調查 - 天天要聞

65億美元芯片收購案,遭美國二次調查

本文由半導體產業縱橫(ID:ICVIEWS)綜合 美國FTC對軟銀收購Ampere展開深度調查。 據知情人士透露,美國聯邦貿易委員會就軟銀擬收購 Arm 服務器處理器廠商Ampe....
DRAM市場,將創新高 - 天天要聞

DRAM市場,將創新高

本文由半導體產業縱橫(ID:ICVIEWS)綜合 傳統通用型DRAM和服務器高價值DRAM量價齊升雙重驅動,2025年DRAM市場有望創新高。 根據CFM最新報告顯示,2025年....
國產晶圓代工,市場巨變! - 天天要聞

國產晶圓代工,市場巨變!

未來十年,將是晶圓代工業的關鍵轉折期。 這一判斷,在近期一組數據中得到了清晰印證。根據 Yole Group 的最新報告,中國大陸有望在 2030 年超越中國台灣,躍居全球最大半導體晶圓代....