Apache Kafka是一個消息隊列產品,基於Topic partitions的設計,能達到非常高的消息發送處理性能。Spring創建了一個項目Spring-kafka,封裝了Apache Kafka-client,用於在Spring項目里快速集成kafka。除了簡單的收發消息外,Spring-kafka還提供了很多高級功能。
一、簡單集成
首先需要引入依賴:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
然後,添加配置:
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
二、@KafkaListener的使用
關於@KafkaListener接收消息的能力就不贅述了,但是@KafkaListener的功能不止如此,其他的比較常見的,使用場景比較多的功能點如下:
- 顯式的指定消費哪些Topic和分區的消息;
- 設置每個Topic以及分區初始化的偏移量;
- 設置消費線程並發度;
- 設置消息異常處理器;
使用errorHandler做異常處理,首先實現KafkaListenerErrorHandler接口,然後做一些異常處理,比如記錄日誌等等。而且註解里的配置,是你自定義實現實例在Spring上下文中的Name。
@Component
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaListenerErrorHandler.class);
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
LOGGER.error("消息:" + message.toString() + "\n" + "異常:" + exception.getMessage());
return null;
}
}
例如,調用的時候,填寫beanName,例如errorHandler=「kafkaDefaultListenerErrorHandler」,具體示例代碼如下所示:
@Service
public class CargoBookedEventHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(CargoBookedEventHandler.class);
@KafkaListener(topics = DomainEventConstant.CARGO_BOOKED_EVENT_TOPIC, errorHandler = "myKafkaListenerErrorHandler")
public void receiveEvent(ConsumerRecord<String, Object> consumerRecord) {
CargoBookedEvent routedEvent = (CargoBookedEvent)consumerRecord.value();
LOGGER.info("Recevied key='{}', message='{}'", consumerRecord.key(), consumerRecord.value());
}
}
下面是有意在Topic中發送的錯誤消息,不符合消費方consumer解析的JSON格式,如圖所示:
下面是errorHandler針對錯誤消息的處理,輸出的異常日誌:
2021-07-19 11:08:45.991 ERROR 29308 --- [ntainer#0-1-C-1] c.r.t.e.MyKafkaListenerErrorHandler : 消息:GenericMessage [payload=hello world, headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@e0bcdd6, kafka_timestampType=CREATE_TIME, type=[B@475a94e, kafka_receivedMessageKey=null, kafka_receivedPartitionId=1, kafka_receivedTopic=CARGO_BOOKED_EVENT_TOPIC, kafka_receivedTimestamp=1626664125969}]
異常:Listener method 'public void com.rickie.tracking.eventhandler.CargoBookedEventHandler.receiveEvent(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)' threw exception; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.rickie.dto.event.CargoBookedEvent
2021-07-19 11:08:50.872 ERROR 29308 --- [ntainer#0-0-C-1] c.r.t.e.MyKafkaListenerErrorHandler : 消息:GenericMessage [payload=hello world, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@67ecd40c, kafka_timestampType=CREATE_TIME, type=[B@4d795117, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=CARGO_BOOKED_EVENT_TOPIC, kafka_receivedTimestamp=1626664130870}]
異常:Listener method 'public void com.rickie.tracking.eventhandler.CargoBookedEventHandler.receiveEvent(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)' threw exception; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to com.rickie.dto.event.CargoBookedEvent
Kafka技術專欄《Kafka v2.3快速入門與實踐》,該專欄從實戰出發,通過零基礎入門-環境搭建-項目案例實戰,讓初學者快速掌握Kafka相關技術要點並投入實際項目開發。
(此處已添加圈子卡片,請到今日頭條客戶端查看)