Spring-Kafka @KafkaListener和errorHandler设置监听异常处理

2021年07月26日08:08:42 科技 1409

Apache Kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。Spring创建了一个项目Spring-kafka,封装了Apache Kafka-client,用于在Spring项目里快速集成kafka。除了简单的收发消息外,Spring-kafka还提供了很多高级功能。

Spring-Kafka @KafkaListener和errorHandler设置监听异常处理 - 天天要闻


一、简单集成

首先需要引入依赖:

<dependency>

 <groupId>org.springframework.kafka</groupId>

 <artifactId>spring-kafka</artifactId>

 <version>2.2.6.RELEASE</version>

</dependency>

然后,添加配置:

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

二、@KafkaListener的使用

关于@KafkaListener接收消息的能力就不赘述了,但是@KafkaListener的功能不止如此,其他的比较常见的,使用场景比较多的功能点如下:

  • 显式的指定消费哪些Topic和分区的消息;
  • 设置每个Topic以及分区初始化的偏移量;
  • 设置消费线程并发度;
  • 设置消息异常处理器;

使用errorHandler做异常处理,首先实现KafkaListenerErrorHandler接口,然后做一些异常处理,比如记录日志等等。而且注解里的配置,是你自定义实现实例在Spring上下文中的Name。

@Component

public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {

 private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaListenerErrorHandler.class);

 @Override

 public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {

 LOGGER.error("消息:" + message.toString() + "\n" + "异常:" + exception.getMessage());

 return null;

 }

}

例如,调用的时候,填写beanName,例如errorHandler=“kafkaDefaultListenerErrorHandler”,具体示例代码如下所示:

@Service

public class CargoBookedEventHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(CargoBookedEventHandler.class);

@KafkaListener(topics = DomainEventConstant.CARGO_BOOKED_EVENT_TOPIC, errorHandler = "myKafkaListenerErrorHandler")

public void receiveEvent(ConsumerRecord<String, Object> consumerRecord) {

CargoBookedEvent routedEvent = (CargoBookedEvent)consumerRecord.value();

LOGGER.info("Recevied key='{}', message='{}'", consumerRecord.key(), consumerRecord.value());

 }

}

下面是有意在Topic中发送的错误消息,不符合消费方consumer解析的JSON格式,如图所示:

Spring-Kafka @KafkaListener和errorHandler设置监听异常处理 - 天天要闻



下面是errorHandler针对错误消息的处理,输出的异常日志:

2021-07-19 11:08:45.991 ERROR 29308 --- [ntainer#0-1-C-1] c.r.t.e.MyKafkaListenerErrorHandler : 消息:GenericMessage [payload=hello world, headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@e0bcdd6, kafka_timestampType=CREATE_TIME, type=[B@475a94e, kafka_receivedMessageKey=null, kafka_receivedPartitionId=1, kafka_receivedTopic=CARGO_BOOKED_EVENT_TOPIC, kafka_receivedTimestamp=1626664125969}]

异常:Listener method 'public void com.rickie.tracking.eventhandler.CargoBookedEventHandler.receiveEvent(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)' threw exception; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.rickie.dto.event.CargoBookedEvent

2021-07-19 11:08:50.872 ERROR 29308 --- [ntainer#0-0-C-1] c.r.t.e.MyKafkaListenerErrorHandler : 消息:GenericMessage [payload=hello world, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@67ecd40c, kafka_timestampType=CREATE_TIME, type=[B@4d795117, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=CARGO_BOOKED_EVENT_TOPIC, kafka_receivedTimestamp=1626664130870}]

异常:Listener method 'public void com.rickie.tracking.eventhandler.CargoBookedEventHandler.receiveEvent(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)' threw exception; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.rickie.dto.event.CargoBookedEvent

Spring-Kafka @KafkaListener和errorHandler设置监听异常处理 - 天天要闻

Kafka技术专栏《Kafka v2.3快速入门与实践》,该专栏从实战出发,通过零基础入门-环境搭建-项目案例实战,让初学者快速掌握Kafka相关技术要点并投入实际项目开发。

(此处已添加圈子卡片,请到今日头条客户端查看)

科技分类资讯推荐

长安与东风重组新进展:朱华荣称不会改变长安既定战略 - 天天要闻

长安与东风重组新进展:朱华荣称不会改变长安既定战略

2月9日,长安汽车和东风集团股份(00489.HK)同步发布了控股股东“正在与其他国资央企集团筹划重组事项”的信息。长安汽车的控股股东是兵装集团,而东风集团股份的控股股东是东风公司。随即,长安汽车和东风集团这两家汽车央企将合并重组,成为业内关注的焦点。
公安部出手了!年龄限制放宽10年、送考下乡,2025年考驾照不难了 - 天天要闻

公安部出手了!年龄限制放宽10年、送考下乡,2025年考驾照不难了

电动车加强管理以后,要求机动车类型的车辆需要持证上路,但是老年人考驾照却受阻,一方面有年龄的限制,另一方面偏远山区考驾照不方便,所以在2025年公安部出手了,年龄限制放宽10年,同时推出送考下乡服务,还进一步的降低考驾照的费用,2025年起考摩托车驾照不难了。
从“星灵安全守护体系”到昊铂HL,看懂广汽科技日 - 天天要闻

从“星灵安全守护体系”到昊铂HL,看懂广汽科技日

发布会以技术切入,并全程围绕安全展开。广汽集团董事长、总经理冯兴亚率先登场,宣布2025年四季度将正式上市支持L3级智能驾驶的车型,他同时强调面向自动驾驶时代对智能驾驶技术、整车安全架构以及突发风险处理能力的要求更高。如何才能满足更高的要求?冯兴亚提到了“广汽
关税大棒下,最受伤的车企出现了 - 天天要闻

关税大棒下,最受伤的车企出现了

特朗普的关税大棒刚挥出,尚未吓退“外敌”,却先刺痛了自己。近日,拥有玛莎拉蒂、Jeep等14个品牌的全球第四大车企斯泰兰蒂斯突然宣布裁撤900名美国工人,关闭加拿大和墨西哥两家工厂,北美生产线陷入瘫痪。几乎同一时间,捷豹路虎宣布暂停对美出口一个月,奥迪更是直接