SpringBoot集成Kafka并使用多个死信队列详解
目录
- 1. 添加依赖 (pom.XML)
- 2. 配置文件 (application.yml)
- 3. 自定义异常类
- 4. Kafka配置类
- 5. Kafka消费者服务
- 6. 启动类
- 7. 测试步骤
以下是Spring Boot集成Kafka并使用多个死信队列的完整示例,包含代码和配置说明。
1. 添加依赖 (pom.xml)
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
2. 配置文件 (application.yml)
spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: my-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest
3. 自定义异常类
public class BusinessException extends RuntimeException { public BusinessException(String message) { super(message); } }
4. Kafka配置类
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.util.backoff.FixedBackOff; @Configuration @EnableKafka public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; // Kafka生产者配置 @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(config); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } // Kafka消费者配置 @Bean public ConsumermDQKtFactory<String, String> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(config); } // 自定义错误处理器(支持多个死信队列) 编程@Bean public CommonErrorHandler errorHandler(KafkaTemplate<String, String> kafkaTemplate) { // 重试策略:3次重试,间隔1秒 FixedBackOff backOff = new FixedBackOff(1000L, 3); DefaultErrorHandler errorHandler = new DefaultErrorHandler((record, exception) -> { String dlqTopic = determineDlqTopic(exception); kafkaTemplate.send(dlqTopic, record.key(), record.value()); System.out.println("消息发送到死信队列: " + dlqTopic); }, backOff); // 配置需要重试的异常类型 errorHandler.addRetryableExceptions(BusinessException.class); errorHandler.addNotRetryableExceptions(SerializationException.class); return errorHandler; } // 根据异常类型选择死信队列 private String determineDlqTopic(Throwable exception) { if (exception.getCause() instanceof SerializationException) { return "serialization-error-dlq"; } else if (exception.getCause() instanceof BusinessException) { return "business-error-dlq"; } else { return "general-error-dlq"; } } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<android;>(); factory.setConsumerFactory(consumeandroidrFactory()); factory.setCommonErrorHandler(errorHandler(kafkaTemplate())); return factory; } }
5. Kafka消费者服务
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumerService { @KafkaListener(topics = "main-topic") public void consume(String message) { try { if (message.contains("invalid-format")) { throw new SerializationException("消息格式错误"); } else if (message.contains("business-error")) { throw new BusinessException("业务处理失败"); } mDQKt System.out.println("成功处理消息: " + message); } catch (Exception e) { throw new RuntimeException(e); } } }
6. 启动类
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }
7. 测试步骤
1.创建Kafka主题:
kafka-topics --create --bootstrap-server localhost:9092 --topic main-topic
kafka-topics --create --bootstrap-server localhost:9092 --topic serialization-error-dlqkafka-topics --create --bootstrap-server localhost:9092 --topic business-error-dlqkafka-topics --create --bootstrap-server localhost:9092 --topic general-error-dlq
2.发送测试消息:
@Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendTestMessages() { kafkaTemplate.send("main-topic", "valid-message"); kafkaTemplate.send("main-topic", "invalid-format"); kafkaTemplate.send("main-topic", "business-error"); }
3.观察死信队列:
- 格式错误的消息会进入 serialization-error-dlq
- 业务异常的消息会进入 business-error-dlq
- 其他异常进入 general-error-dlq
关键点说明
错误路由逻辑:通过determineDlqTopic方法根据异常类型选择不同的死信队列。
重试机制:通过FixedBackOff配置重试策略(最多重试3次,间隔1秒)。
异常分类:
- SerializationException(序列化问题)直接进入死信队列,不重试。
- BusinessException(业务异常)会触发重试,最终失败后进入死信队列。
到此这篇关于SpringBoot集成Kafka并使用多个死信队列详解的文章就介绍到这了,更多相关SpringBoot Kafka使用死信队列内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
精彩评论