Spring-Kafka @KafkaListener和errorHandler设置监听异常处理

2021年07月26日08:08:42 科技 1409

Apache Kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。Spring创建了一个项目Spring-kafka,封装了Apache Kafka-client,用于在Spring项目里快速集成kafka。除了简单的收发消息外,Spring-kafka还提供了很多高级功能。

Spring-Kafka @KafkaListener和errorHandler设置监听异常处理 - 天天要闻


一、简单集成

首先需要引入依赖:

<dependency>

 <groupId>org.springframework.kafka</groupId>

 <artifactId>spring-kafka</artifactId>

 <version>2.2.6.RELEASE</version>

</dependency>

然后,添加配置:

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

二、@KafkaListener的使用

关于@KafkaListener接收消息的能力就不赘述了,但是@KafkaListener的功能不止如此,其他的比较常见的,使用场景比较多的功能点如下:

  • 显式的指定消费哪些Topic和分区的消息;
  • 设置每个Topic以及分区初始化的偏移量;
  • 设置消费线程并发度;
  • 设置消息异常处理器;

使用errorHandler做异常处理,首先实现KafkaListenerErrorHandler接口,然后做一些异常处理,比如记录日志等等。而且注解里的配置,是你自定义实现实例在Spring上下文中的Name。

@Component

public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {

 private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaListenerErrorHandler.class);

 @Override

 public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {

 LOGGER.error("消息:" + message.toString() + "\n" + "异常:" + exception.getMessage());

 return null;

 }

}

例如,调用的时候,填写beanName,例如errorHandler=“kafkaDefaultListenerErrorHandler”,具体示例代码如下所示:

@Service

public class CargoBookedEventHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(CargoBookedEventHandler.class);

@KafkaListener(topics = DomainEventConstant.CARGO_BOOKED_EVENT_TOPIC, errorHandler = "myKafkaListenerErrorHandler")

public void receiveEvent(ConsumerRecord<String, Object> consumerRecord) {

CargoBookedEvent routedEvent = (CargoBookedEvent)consumerRecord.value();

LOGGER.info("Recevied key='{}', message='{}'", consumerRecord.key(), consumerRecord.value());

 }

}

下面是有意在Topic中发送的错误消息,不符合消费方consumer解析的JSON格式,如图所示:

Spring-Kafka @KafkaListener和errorHandler设置监听异常处理 - 天天要闻



下面是errorHandler针对错误消息的处理,输出的异常日志:

2021-07-19 11:08:45.991 ERROR 29308 --- [ntainer#0-1-C-1] c.r.t.e.MyKafkaListenerErrorHandler : 消息:GenericMessage [payload=hello world, headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@e0bcdd6, kafka_timestampType=CREATE_TIME, type=[B@475a94e, kafka_receivedMessageKey=null, kafka_receivedPartitionId=1, kafka_receivedTopic=CARGO_BOOKED_EVENT_TOPIC, kafka_receivedTimestamp=1626664125969}]

异常:Listener method 'public void com.rickie.tracking.eventhandler.CargoBookedEventHandler.receiveEvent(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)' threw exception; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.rickie.dto.event.CargoBookedEvent

2021-07-19 11:08:50.872 ERROR 29308 --- [ntainer#0-0-C-1] c.r.t.e.MyKafkaListenerErrorHandler : 消息:GenericMessage [payload=hello world, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@67ecd40c, kafka_timestampType=CREATE_TIME, type=[B@4d795117, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=CARGO_BOOKED_EVENT_TOPIC, kafka_receivedTimestamp=1626664130870}]

异常:Listener method 'public void com.rickie.tracking.eventhandler.CargoBookedEventHandler.receiveEvent(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)' threw exception; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.rickie.dto.event.CargoBookedEvent

Spring-Kafka @KafkaListener和errorHandler设置监听异常处理 - 天天要闻

Kafka技术专栏《Kafka v2.3快速入门与实践》,该专栏从实战出发,通过零基础入门-环境搭建-项目案例实战,让初学者快速掌握Kafka相关技术要点并投入实际项目开发。

(此处已添加圈子卡片,请到今日头条客户端查看)

科技分类资讯推荐

全球媒体聚焦丨79%全球专利+80%市场份额!外媒从一场救援看中国无人机产业实力 - 天天要闻

全球媒体聚焦丨79%全球专利+80%市场份额!外媒从一场救援看中国无人机产业实力

近日,一段中国无人机在洪水中成功营救被困人员的短视频在海外社交平台广泛传播,多家国际媒体也竞相报道,并深入探讨中国无人机产业技术发展与创新应用。 《纽约时报》网站截图 据了解,这段短视频中的救援发生在广西柳州三江侗族自治县一村庄。受上游来水影响,这个村子里一些处于低洼地带的房屋被淹。由于水流上涨快,一...
博士天团攻坚激光芯片,拿到3个亿融资 - 天天要闻

博士天团攻坚激光芯片,拿到3个亿融资

记者|鄢子为编辑|陈晓平7月1日,北京飓芯科技对外官宣,完成3亿元B轮融资。飓芯成立于2017年7月,核心团队由多名经验丰富的博士组成,主攻氮化镓激光芯片产业化,实现关键核心器件的自主可控。本轮融资,飓芯获得国家基金、半导体产业方和一线投资机构的认可。3亿融资由深创投制造业转型升级新材料基金(国家制造业转型升...
臻宝科技科创板IPO获受理 系半导体零部件制造商 大基金二期等参投 - 天天要闻

臻宝科技科创板IPO获受理 系半导体零部件制造商 大基金二期等参投

《科创板日报》7月2日讯(记者 黄修眉 实习记者 戴嘉怡) 重庆臻宝科技股份有限公司(下称“臻宝科技”)科创板IPO申请近日获上交所受理,辅导机构为中信证券。臻宝科技是国内少数实现集成电路先进制程设备和高世代、高电压显示面板制造设备非金属零部件多品类供应、规模化量产的企业之一。此次IPO,臻宝科技拟募资13.98亿...
BW2025即将开展,技嘉AORUS雕妹约你3H|3A08 雕宅见 - 天天要闻

BW2025即将开展,技嘉AORUS雕妹约你3H|3A08 雕宅见

史上规模空前的BilibiliWorld2025将于2025年7月11日-13日在上海国家会展中心开展!知名电竞硬件品牌技嘉AORUS已确认参展,为玩家打造游戏盛宴。现场不仅能体验新款硬核电竞装备、畅玩热门游戏大作,参与激烈的1V1对战PK,更有甜辣萌趣的雕妹喊你3H|3A08等你来!多重互动火力全开,带你玩转整个BW,开启今夏最燃电竞狂欢。...
35项服务可跨境办理,“澳政易”自助服务机上线珠海市民服务中心 - 天天要闻

35项服务可跨境办理,“澳政易”自助服务机上线珠海市民服务中心

“十几分钟就办完了,现场的协助人员指导我操作,太方便了!”7月1日上午,澳门居民梁女士来到珠海市民服务中心1号楼3楼的综合服务厅办理业务,在工作人员的帮助下,她在港澳跨境服务自助办理区的“澳政易”自助服务机上很快就办完了身份证明业务。6月30日,广州、珠海、中山、江门四个大湾区城市的政务服务中心正式启用了...
65亿美元芯片收购案,遭美国二次调查 - 天天要闻

65亿美元芯片收购案,遭美国二次调查

本文由半导体产业纵横(ID:ICVIEWS)综合 美国FTC对软银收购Ampere展开深度调查。 据知情人士透露,美国联邦贸易委员会就软银拟收购 Arm 服务器处理器厂商Ampe....
DRAM市场,将创新高 - 天天要闻

DRAM市场,将创新高

本文由半导体产业纵横(ID:ICVIEWS)综合 传统通用型DRAM和服务器高价值DRAM量价齐升双重驱动,2025年DRAM市场有望创新高。 根据CFM最新报告显示,2025年....
国产晶圆代工,市场巨变! - 天天要闻

国产晶圆代工,市场巨变!

未来十年,将是晶圆代工业的关键转折期。 这一判断,在近期一组数据中得到了清晰印证。根据 Yole Group 的最新报告,中国大陆有望在 2030 年超越中国台湾,跃居全球最大半导体晶圆代....