开发者

聊聊Spring Boot如何配置多个Kafka数据源

目录
  • 一、配置文件
  • 二、pom依赖
  • 三、生产者、消费者配置
    • 1.第一个kakfa
    • 2.第二个kakfa
  • 四.生产者
    • 五.消费者

      一、配置文件

      application.properties配置文件如下

      #kafka多数据源配置
      #kafka数据源一,日志审计推送
      spring.kafka.one.bootstrap-servers=172.19.12.109:32182
      spring.kafka.one.producer.retries=0
      spring.kafka.one.producer.properties.max.block.ms=5000
      #kafka数据源二,动环数据消费
      spring.kafka.two.bootstrap-servers=172.19.12.109:32182
      spring.kafka.two.producer.retries=0
      spring.kafka.two.producer.properties.max.block.ms=5000
      spring.kafka.two.consumer.group-id=bw-convert-data
      spring.kafka.two.consumer.enable-auto-commit=true

      二、pom依赖

      		<dependency>
      			<groupId>org.springframework.kafka</groupId>
      			<artifactId>spring-kafka</artifactId>
      		</dependency>
      

      三、生产者、消费者配置

      1.第一个kakfa

      package com.gstanzer.convert.config;
      import org.apache.kafka.clients.producer.ProducerConfig;js
      import org.apache.kafka.common.serialization.StringSerializer;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.kafka.annotation.EnableKafka;
      import org.springframework.kafka.core.*;
      import Java.util.HashMap;
      import java.util.Map;
      @EnableKafka
      @Configuration
      public class KafkaOneConfig {
          @Value("${spring.kafka.one.bootstrap-servers}")
          private String bootstrapServers;
          @Value("${spring.kafka.one.producer.retries}")
          private String retries;
          @Value("${spring.kafka.one.producer.properties.max.block.ms}")
          private String maxBlockMs;
          @Bean
          public KafkaTemplate<String, String> kafkaOneTemplate() {
              return new KafkaTemplate<>(producerFactory());
          }
          private ProducerFactory<String, String> producerFactory() {
              return new DefaultKafkaProducerFactory<>(producerConfigs());
          }
          private Map<String, Object> producerConfigs() {
              Map<String, Object> props = new HashMap<>();
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
              props.put(ProducerConfig.RETRIES_CONFIG, retries);
              props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
              props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
              props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
              return props;
          }
      }

      2.第二个kakfa

      package com.gstanzer.convert.config;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.common.serialization.StringDeserializer;
      import org.apache.kafka.common.serialization.StringSerializer;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.kafka.annotation.EnableKafka;
      import org.pythonspringframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
      import org.springframework.kafka.config.KafkaListenerContainerFactory;
      import org.springframework.kafka.core.*;
      import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
      import java.util.HashMap;
      import java.util.Map;
      @Configuration
      @EnableKafka
      public class KafkaTwoConfig {
          @Value("${spring.kafka.two.bootstrap-servers}")
          private String bootstrapServers;
          @Value("${spring.kafka.two.producer.retries}")
          private String retries;
          @Value("${spring.kafka.two.producer.properties.max.block.ms}")
          private String maxBlockMs;
          @Value("${spring.kafka.two.consumer.group-id}")
          private String groupId;
          @Value("${spring.kafka.two.consumer.enable-auto-commit}")
       www.devze.com   private boolean enableAutoCommit;
          @Bean
          public KafkaTemplate<String, String> kafkaTwoTemplate() {
              return new KafkaTemplate<>(producerFactory());
          }
          @Bean
          KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
              ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
              factory.setConsumerFactory(consumerFactory());
              factory.setConcurrency(3);
              factory.getContainerProperties().setPollTimeout(3000);
              return factory;
          }
          private ProducerFactory<String, String> producerFactory() {
              return new DefaultKafkaProducerFactory<>(producerConfigs());
          }
          public ConsumerFactory<Integer, String> consumerFactory() {
              return new DefaultKafkaConsumerFactory<>(consumerConfigs());
          }
          private Map<String, Object> producerConfigs() {
              Map<String, Object> props = new HashMap<>();
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
              props.put(ProducerConfig.RETRIES_CONFIG, retries);
              props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
              props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
              props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
              return props;
          }
          private Map<String, Object> consumerConfigs() {
              Map&http://www.devze.comlt;String, Object> props = new HashMap<>();
              props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
              props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
              props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              return props;
          }
      }

      四.生产者

      @Controller
      public class TestController {
          @Autowired
          private KafkaTemplate kafkaOneTemplate;
          @Autowired
          private KafkaTemplate kafkaTwoTemplate;
          @RequestMapping("/send")
          @ResponseBody
          public String send() {
              final String TOPIC = "TOPIC_1";
              kafkaOneTemplate.send(TOPIC, "kafka one");
              kafkaTwoTemplate.send(TOPIC, "kafka two");
              return "success";
          }
      }

      五.消费者

      @Component
      public class Kafka编程客栈Consumer {
          private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
          final String TOPIC = "TOPIC_1";
          // containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同
          @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")
          public void listenerOne(ConsumerRecord<?, ?> record) {
              LOGGER.info(" kafka one 接收到消息:{}", record.value());
          }
          @KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")
          public void listenerTwo(ConsumerRecord<?, ?> record) {
              LOGGER.info(" kafka two 接收到消息:{}", record.value());
          }
      }

      备注:

      生产者消费者代码参考链接,开发同学需要以实际情况按要求自己变更下代码即可:

      Spring Boot 集成多个 Kafka_springboot集成多个kafka_//承续缘

      到此这篇关于Spring Boot配置多个Kafka数据源的文章就介绍到这了,更多相关Spring Boot配置Kafka数据源内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

      0

      上一篇:

      下一篇:

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      最新开发

      开发排行榜