开发者

使用SpringBoot+InfluxDB实现高效数据存储与查询

目录
  • 1、项目介绍
  • 2、 InfluxDB 介绍
  • 3、Spring Boot 配置 InfluxDB
  • 4、InfluxDB 连接配置
  • 5、Service 层:数据写入 & 查询
    • 5.1 单条数据写入
    • 5.2 批量写入(异步)
    • 5.3 查询数据
  • 6、Controller 层:API 设计
    • 7、运行 & 测试
      • 7.1 启动项目
      • 7.2 使用 Postman 进行测试
        • 1、写入单条数据
        • 2、批量写入
        • 3、查询数据

    1、项目介绍

    本项目使用 Spring Boot + InfluxDB 2.x 来存储和查询时间序列数据,适用于 物联网(IoT)、实时监控、日志分析 等场景。

    2、 InfluxDB 介绍

    InfluxDB 是一个高性能的时间序列数据库(TSDB),适用于存储温度、传感器数据、日志、监控指标等。

    特点

    • 采用 Flux 查询语言
    • 高吞吐量,支持 批量写入
    • Tag(索引)+ Field(数据) 结构,提高查询效率
    • 精确时间戳(支持纳秒级)

    3、Spring Boot 配置 InfluxDB

    application.yml 中配置 InfluxDB 连接:

    # 编程客栈InfluxDB 独立配置
    influxdb:
      url: http://192.168.1.1xx:28086/  # InfluxDB 服务器地址
      token: _7FZlXGJJcd8Ayox-F-hvbDdXb_a5SI3530x1DdFKZfQ65uOhnpQciJWHpd7ULhpAOcgj5oV2jsR-Xf0qTtAxg==
      org: xxx     # 组织名称
      bucket: xxx  # 存储桶名称
      # InfluxDB 客户端日志级别
      # ERROR: 仅js记录错误日志
      # WARN: 记录警告和错误日志
      # INFO: 记录普通信息、警告和错误日志
      # DEBUG: 记录调试级别的详细日志
      # BODY: 记录完整的 HTTP 请求和响应主体
      # TRACE: 记录极其详细的跟踪日志
      # ALL: 记录所有日志级别(视客户端而定)
      logLevel: BODY

    4、InfluxDB 连接配置

    InfluxDBConfig.Java 中配置 InfluxDB 客户端:

    import com.influxdb.client.InfluxDBClient;
    import com.influxdb.client.InfluxDBClientFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
     
    @Configuration
    public class InfluxDBConfig {
     
        @Value("${influxdb.url}")
        private String url;
     
        @Value("${influxdb.token}")
        private String token;
     
        @Value("${influxdb.org}")
        private String org;
     
        @Value("${influxdb.bucket}")
        private String bucket;
     
        @Bean
        public InfluxDBClient influxDBClient() {
            return InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket);
        }
    }

    说明

    • InfluxDBClientFactory.create(url, token, org, bucket) 创建 InfluxDB 客户端
    • @Value 读取 application.yml 配置

    5、Service 层:数据写入 & 查询

    5.1 单条数据写入

    public void writeSingleData(TemperatureDTO temperatureDTO) {
        WriteApiblocking writeApi = influxDBClient.getWriteApiBlocking();
        
        Point point = Point.measurement("temperature")
                .addTag("location", temperatureDTO.getLocation()) // 添加标签(索引)
                .addField("value", temperatureDTO.getValue()) // 添加字段(数据)
                .time(Instant.now(), WritePrecision.NS); // 记录当前时间戳
     
        writeApi.writePoint(point);
    }

    5.2 批量写入(异步)

    public void writeBATchData(List<TemperatureDTO> temperatureDTOs) {
        WriteApi writeApi = influxDBClient.makeWriteApi(); // 获取异步 API
     
        List<Point> points = temperatureDTOs.stream()
                .map(dto -> Point.measurement("temperature")
                        .addTag("location", dto.getLocation())
                        .addField("value", dto.getValue())
                        .time(Instant.now(), WritePrecision.NS))
                .collect(Collectors.toList());
     
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> writeApi.writePoints(points));
        future.whenComplete((result, error) -> {
            if (error != null) {
                System.err.println(" 写入失败:" + error.getMessage());
            } else {
                writeApi.close();  // 关闭 API 避免资源泄露
                log.info("✅ 批量数据写入成功(异步)");
            }
        });
    }

    说明

    • 异步写入 不会阻塞主线程,提高吞吐量
    • 异常回调 捕获写入失败的信息
    • 使用 writeApi.close() 避免资源泄露

    5.3 查询数据

    public List<TemperatureVO> queryTemperatureData() {
        String query = "from(bucket: \"test\") |> range(start: -1h)";
        QueryApi queryApi = influxDBClient.getQueryApi();
     
        return queryApi.query(query)
                .stream()
                .flatMjuFXaap(fluxTable -> fluxTable.getRecords().stream()) // 遍历 FluxTable
                .map(record -> {
                    TemperatureVO vo = new TempythonperatureVO();
                    vo.setLocation((String) record.getValueByKey("location")); // 获取标签信息
                    Object valueObj = record.getValueByKey("_value");
                    vo.setValue(valueObj != null ? ((Number) valueObj).doubleValue() : 0.0);
                    vo.setTimestamp(record.getTime().toString());
                    return vo;
                })
                .collect(Collectors.toList());
    }

    说明

    • Flux 查询 过去 1h 内的数据
    • 遍历 FluxTable 提取 标签 + 字段 数据

    6、Controller 层:API 设计

    @RestController
    @RequestMapping("/api/influxdb")
    public class InfluxDBController {
     
       编程 @Autowired
        private TestService influxDBService;
     
        @PostMapping("/write")
        public String writeData(@RequestBody TemperatureDTO temperatureDTO) {
            influxDBService.writeSingleData(temperatureDTO);
            return "✅ 单条数据写入成功!";
        }
     
        @PostMapping("/write-batch")
        public String writeBatchData() {
            List<TemperatureDTO> data = generateTestData(10000);
            influxDBService.writeBatchData(data);
            return "✅ 10,000 条数据成功写入!";
        }
     
        @GetMapping("/query")
        public List<TemperatureVO> queryTemperatureData() {
            return influxDBService.queryTemperatureData();
        }
     
        private List<TemperatureDTO> generateTestData(int count) {
            List<TemperatureDTO> dataList = new ArrayList<>();
            Random random = new Random();
            for (int i = 0; i < count; i++) {
                TemperatureDTO dto = new TemperatureDTO();
                dto.setLocation("office-" + (random.nextInt(1000) + 1));
                dto.setValue(15 + (random.nextDouble() * 10));
                dataList.add(dto);
            }
            return dataList;
        }
    }

    说明

    • /write单条写入
    • /write-batch生成 10,000 条数据并写入
    • /query查询过去 1 小时数据

    7、运行 & 测试

    7.1 启动项目

    mvn spring-boot:run 

    7.2 使用 Postman 进行测试

    1、写入单条数据

    POST http://localhost:8080/api/influxdb/write

    {
      "location": "office-1",
      "value": 22.5
    }

    2、批量写入

    POST http://localhost:8080/api/influxdb/write-batch

    3、查询数据

    GET http://localhost:8080/api/influxdb/query

    以上就是使用SpringBoot+InfluxDB实现高效数据存储与查询的详细内容,更多关于SpringBoot InfluxDB数据存储与查询的资料请关注编程客栈(www.devze.com)其它相关文章!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜