开发者

Spring Cloud Stream实现数据流处理

目录
  • 1.什么是Spring Cloud Stream?
  • 2.环境准备
    • kafka-map
  • 3.代码工程
    • 实验目标
    • pom.XML
    • 处理流
    • 配置文件
  • 4.测试

    1.什么是Spring Cloud Stream?

    我看很多回答都是“为了屏蔽消息队列的差异,使我们在使用消息队列的时候能够用统一的一套API,无需关心具体的消息队列实现”。 这样理解是有些不全面的,Spring Cloud Stream的核心是Stream,准确来讲Spring Cloud Stream提供了一整套数据流走向(流向)的API, 它的最终目的是使我们不关心数据的流入和写出,而只关心对数据的业务处理 我们举一个例子:你们公司有一套系统,这套系统由多个模块组成,你负责其中一个模块。数据会从第一个模块流入,处理完后再交给下一个模块。对于你负责的这个模块来说,它的功能就是接收上一个模块处理完成的数据,自己再加工加工,扔给下一个模块。

    Spring Cloud Stream实现数据流处理

    我们很容易总结出每个模块的流程:

    • 从上一个模块拉取数据
    • 处理数据
    • 将处理完成的数据发给下一个模块

    其中流程1和3代表两个模块间的数据交互,这种数据交互往往会采用一些中间件(middleware)。比如模块1和模块2间数据可能使用的是kafyZfcivUhdJka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitMQ。很明显,它们的功能都是一样的:**提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。**但由于中间件的不同,需要使用不同的API。 为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了Spring Cloud Stream。

    2.环境准备

    采用docker-compose搭建kafaka环境

    version: '3'
    
    networks:
      kafka:
        ipam:
          driver: default
          config:
            - subnet: "172.22.6.0/24"
    
    services:
      zookepper:
        image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest
        container_name: zookeeper-server
        restart: unless-stopped
        volumes:
          - "/etc/localtime:/etc/localtime"
        environment:
          ALLOW_ANONYMOUS_LOGIN: yes
        ports:
          - "2181:2181"
        networks:
          kafka:
            ipv4_address: 172.22.6.11
    
      kafka:
        image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1
        container_name: kafka
        restart: unless-stopped
        volumes:
          - "/etc/localtime:/etc/localtime"
        environment:
          ALLOW_PLAINTEXT_LISTENER编程客栈: yes
          KAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181
          KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092
        ports:
          - "9092:9092"
        depends_on:
          - zookepper
        networks:
          kafka:
            ipv4_address: 172.22.6.12
    
      kafka-map:
        image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map
        container_name: kafka-map
        restart: unless-stopped
        volumes:
          - "./kafka/kafka-map/data:/usr/local/kafka-map/data"
        environment:
          DEFAULT_USERNAME: admin
          DEFAULT_PASSWORD: 123456
        ports:
          - "9080:8080"
        depends_on:                         
          - kafka
        networks:
          kafka:
            ipv4_address: 172.22.6.13
    

    run

    docker-compose -f docker-compose-kafka.yml -p kafka up -d
    

    kafka-map

    https://github.com/dushixiang/kafka-map

    • 访问:http://127.0.0.1:9080
    • 账号密码:admin/123456

    3.代码工程

    Spring Cloud Stream实现数据流处理

    实验目标

    • 生成UUID并将其发送到Kafka主题BATch-in
    • batch-in主题接收UUID的批量消息,移除其中的数字,并将结果发送到batch-out主题。
    • 监听batch-out主题并打印接收到的消息。

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>springcloud-demo</artifactId>
            <groupId>com.et</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>spring-cloud-stream-kafaka</artifactId>
    
        <properties>
            <maven.compiler.source>17</maven.compiler.source>
            <maven.compiler.target>17</maven.compiler.target>
        </properties>
        <dependencies>
            <!-- Spring Boot Starter Web -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- Spring Boot Starter Test -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId&gpythont;lombok</artifactId>
            </dependency>
    
        </dependencies>
    
    </project>
    

    处理流

    /*
     * Copyright 2023 the original author or authors.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *      https://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.et;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.Spri编程客栈ngBootApplication;
    import org.springframework.context.annotation.Bean;
    imporjavascriptt org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    
    import Java.util.List;
    import java.util.UUID;
    import java.util.function.Function;
    import java.util.function.Supplier;
    
    /**
     * @author Steven Gantz
     */
    @SpringBootApplication
    public class CloudStreamsFunctionBatch {
    
       public static void main(String[] args) {
          SpringApplication.run(CloudStreamsFunctionBatch.class, args);
       }
    
       @Bean
       public Supplier<UUID> stringSupplier() {
          return () -> {
             var uuid = UUID.randomUUID();
             System.out.println(uuid + " -> batch-in");
             return uuid;
          };
       }
    
       @Bean
       public Function<List<UUID>, List<Message<String>>> digitRemovingConsumer() {
          return idBatch -> {
             System.out.println("Removed digits from batch of " + idBatch.size());
             return idBatch.stream()
                .map(UUID::toString)
                // Remove all digits from the UUID
                .map(uuid -> uuid.replaceAll("\\d",""))
                .map(noDigitString -> MessageBuilder.withPayload(noDigitString).build())
                .toList();
          };
       }
    
       @KafkaListener(id = "batch-out", topics = "batch-out")
       public void listen(String in) {
          System.out.println("batch-out -> " + in);
       }
    
    }
    
    • 定义一个名为stringSupplier的Bean,它实现了Supplier<UUID>接口。这个方法生成一个随机的UUID,并打印到控制台,表示这个UUID将被发送到batch-in主题。

    • 定义一个名为digitRemovingConsumer的Bean,它实现了Function<List<UUID>, List<Message<String>>>接口。这个方法接受一个UUID的列表,打印出处理的UUID数量,然后将每个UUID转换为字符串,移除其中的所有数字,最后将结果封装为消息并返回。

    • 使用@KafkaListener注解定义一个Kafka监听器,监听batch-out主题。当接收到消息时,调用listen方法并打印接收到的消息内容。

    配置文件

    spring:
      cloud:
        function:
          definition: stringSupplier;digitRemovingConsumer
        stream:
          bindings:
            stringSupplier-out-0:
              destination: batch-in
            digitRemovingConsumer-in-0:
              destination: batch-in
              group: batch-in
              consumer:
                batch-mode: true
            digitRemovingConsumer-out-0:
              destination: batch-out
          kafka:
            binder:
              brokers: localhost:9092
            bindings:
              digitRemovingConsumer-in-0:
                consumer:
                  configuration:
                    # Forces consumer to wait 5 seconds before polling for messages
                    fetch.max.wait.ms: 5000
                    fetch.min.bytes: 1000000000
                    max.poll.records: 10000000
    

    参数解释

    spring:
      cloud:
        function:
          definition: stringSupplier;digitRemovingConsumer
    

    spring.cloud.function.definition:定义了两个函数,stringSupplierdigitRemovingConsumer。这两个函数将在应用程序中被使用。

    stream:
      bindings:
        stringSupplier-out-0:
          destination: batch-in
    

    stream.bindings.stringSupplier-out-0.destination:将stringSupplier函数的输出绑定到Kafka主题batch-in

        digitRemovingConsumer-in-0:
          destination: batch-in
          group: batch-in
          consumer:
            batch-mode: true
    
    • stream.bindings.digitRemovingConsumer-in-0.destination:将digitRemovingConsumer函数的输入绑定到Kafka主题batch-in

    • group: batch-in:指定消费者组为batch-in,这意味着多个实例可以共享这个组来处理消息。

    • consumer.batch-mode: true:启用批处理模式,允许消费者一次处理多条消息。

        digitRemovingConsumer-out-0:
          destination: batch-out
    
    • stream.bindings.digitRemovingConsumer-out-0.destination:将digitRemovingConsumer函数的输出绑定到Kafka主题batch-out

    以上只是一些关键代码

    4.测试

    启动弄Spring Boot应用,可以看到控制台输出日志如下:

    291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in
    c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in
    a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in
    db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in
    b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in
    Removed digits from batch of 5
    batch-out -> eacc-ee-dfb-b-dead
    batch-out -> cbae-e-f-c-acfb
    batch-out -> ab-dd---adade
    batch-out -> db-fb-f-bbb-bfdec
    batch-out -> bdb--d-ad-bbbb
    

    以上就是Spring Cloud Stream实现数据流处理的详细内容,更多关于Spring Cloud Stream数据流处理的资料请关注编程客栈(www.devze.com)其它相关文章!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜