开发者

MySQL 迁移至 Doris 最佳实践方案(最新整理)

目录
  • 一、编程客栈JDBC Catalog 联邦查询方案(适合跨库实时查询)
    • 1. 方案概述
    • 2. 环境要求
    • 3. 实施步骤
      • 3.1 安装 JDBC 驱动(所有 FE/BE 节点)
      • 3.2 创建 JDBC Catalog
      • 3.3 创建查询或者导入
    • 4. 注意事项
    • 二、Binlog 实时同步方案(适合增量数据同步)
      • 1. 方案概述
        • 2. 环境要求
          • 3. 实施步骤
            • 3.1 配置 mysql 开启 Binlog
            • 3.2 部署 Canal 服务
            • 3.3 在 Doris 中创建同步任务
          • 4. 注意事项
          • 三、Flink CDC 流式同步方案(适合高实时性场景)
            • 1. 方案概述
              • 2. 环境要求
                • 3. 实施步骤
                  • 3.1 构建 Flink 作业
                  • 3.2 提交 Flink 作业
                • 4. 注意事项
                • 四、Datax 同步方案(适合全量 / 批量数据迁移)
                  • 1. 方案概述
                    • 2. 环境要求
                      • 3. 实施步骤
                        • 3.1 安装 Datax
                        • 3.2 编写同步配置文件
                        • 3.3 执行同步任务
                      • 4. 注意事项
                      • 五、方案对比与选型建议
                        • 五、数据迁移最佳实践要点

                          在数据架构不断演进的背景下,从 MySQL 迁移至 Doris 成为许多企业提升数据处理效率的关键选择。本文将深入剖析三种经过实践验证的 MySQL 迁移至 Doris 的最佳方案,涵盖全量迁移、增量同步、混合迁移以及基于 CDC(Change Data Capture)的实时迁移。每种方案都将从技术原理、实施步骤、适用场景、资源消耗等维度展开分析,并通过对比选型帮助读者结合自身业务需求与技术栈,找到最契合的迁移路径,实现数据平稳过渡与性能跃升。

                          一、JDBC Catalog 联邦查询方案(适合跨库实时查询)

                          1. 方案概述

                          通过 Doris 1.2 + 引入的JDBC Catalog功能,直接通过标准 JDBC 协议连接 MySQL 数据库,实现跨库联邦查询和数据写入。相比 ODBC,JDBC 接口更统一、兼容性更强,且无需复杂的驱动安装和版本匹配。

                          2. 环境要求

                          组件版本要求说明
                          Doris1.2.0+支持 JDBC Catalog 功能
                          MySQLMySQL 5.7, 8.0 或更高版本需开启远程访问权限
                          MySQL JDBC8.0.28+驱动下载:MySQL JDBC 驱动

                          3. 实施步骤

                          3.1 安装 JDBC 驱动(所有 FE/BE 节点)

                          步骤 1:创建驱动目录

                          mkdir -p /your_path/doris/jdbc_drivers 

                          步骤 2:上传驱动包下载 MySQL JDBC 驱动(如mysql-connector-Java-8.0.31.jar),并上传至所有 FE/BE 节点的/your_path/doris/jdbc_drivers目录。

                          步骤 3:配置驱动路径编辑fe.confbe.conf,添加以下配置(如果你使用的是默认的路径就不需要修改这个了):

                          jdbc_drivers_dir = /opt/doris/jdbc_drivers 

                          步骤 4:重启服务

                          # 重启FE 
                          cd /opt/doris/fe 
                          bin/stop_fe.sh && bin/start_fe.sh --daemon 
                          # 重启BE 
                          cd /opt/doris/be 
                          bin/stop_be.sh && bin/start_be.sh --daemon 

                          3.2 创建 JDBC Catalog

                          使用 MySQL 客户端连接 Doris,执行以下 SQL 创建 Catalog:

                          CREATE CATALOG mysql PROPERTIES (
                              "type"="jdbc",
                              "user"="root",
                              "password"="secret",
                              "jdbc_url" = "jdbc:mysql://example.net:3306",
                              "driver_url" = "mysql-connector-j-8.3.0.jar",
                              "driver_class" = "com.mysql.cj.jdbc.Driver"
                          )

                          jdbc_url:MySQL 连接地址,可指定数据库(如jdbc:mysql://host:port/dbname)。

                          driver_url:驱动文件名,需与/opt/doris/jdbc_drivers目录下的文件一致,默认就是/your_path/doris/jdbc_drivers

                          3.3 创建查询或者导入

                          执行联邦查询

                          -- 直接查询mysql表 
                          SELECT * FROM mysql_catalog.mysql_db.user_table LIMIT 10; 
                          -- 与Doris表关联查询 
                          SELECT a.id, a.name, b.sales 
                          FROM mysql_catalog.mysql_db.user_table a 
                          JOIN doris_table b ON a.id = b.user_id; 

                          执行数据导入

                          -- 建表
                          create table as select  mysql_catalog.mysql_db.user_table
                          -- 导入
                          insert into doris_table select * from mysql_catalog.mysql_db.user_table

                          4. 注意事项

                          数据类型映射

                          MySQL 的BIT类型映射为 Doris 的STRING,需在查询时转换(如CAST(bit_col AS BOOLEAN))。

                          大字段类型(如BLOB)建议通过VARCHAR映射,避免查询性能问题。

                          权限控制

                          为 Doris 创建专用 MySQL 用户,仅授予SELECT权限,避免数据泄露。

                          在 Doris 中通过GRANT语句限制外部表访问权限。

                          连接安全

                          如果您使用数据源上安装的全局信任证书配置了 TLS,则可以通过将参数附加到在 jdbc_url 属性中设置的 JDBC 连接字符串来启用集群和数据源之间的 TLS。

                          二、Binlog 实时同步方案(适合增量数据同步)

                          1. 方案概述

                          通过 Canal 解析 MySQL Binlog,实时同步增量数据至 Doris,支持 INSERT/UPDATE/DELETE 操作,适用于实时数据仓库场景。

                          2. 环境要求

                          组件版本要求说明
                          MySQL5.7+需开启 Binlog(ROW 模式)
                          Canal1.1.5+下载地址:Canal 官网
                          Doris1.2.0+支持 BATCH_DELETE 特性

                          3. 实施步骤

                          3.1 配置 MySQL 开启 Binlog

                          步骤 1:编辑 my.cnf

                          [mysqld] 
                          log-bin=mysql-bin         # 开启Binlog 
                          binlog-format=ROW          # 使用ROW模式 
                          binlog-row-image=FULL      # 记录完整行数据 
                          server-id=1                # 唯一服务器ID 

                          步骤 2:重启 MySQL 服务

                          sudo systemctl restart mysqld 

                          3.2 部署 Canal 服务

                          步骤 1:下载并解压

                          wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz 
                          tar -zxvf canal.deployer-1.1.5.tar.gz 

                          步骤 2:创建 Instance 配置

                          mkdir conf/demo 
                          cp conf/example/instance.properties conf/demo/ 
                          vim conf/demo/instance.properties 

                          关键配置

                          canal.instance.master.address=192.168.1.100:3306   # MySQL地址 
                          canal.instance.dbUsername=canal                  # 具有Binlog读取权限的用户 
                          canal.instance.dbPassword=canal                  # 用户密码 
                          canal.destination=demo                           # Instance名称 

                          步骤 3:启动 Canal

                          sh bin/startup.sh 
                          # 验证日志:cat logs/demo/demo.log,显示"start successful"表示启动成功 

                          3.3 在 Doris 中创建同步任务

                          步骤 1:创建 Doris 目标表(需开启 BATCH_DELETE)

                          --create Mysql table
                          CREATE TABLE `test.source_test` (
                            `id` int(11) NOT NULL COMMENT "",
                            `name` int(11) NOT NULL COMMENT ""
                          ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
                          -- create Doris table
                          CREATE TABLE `target_test` (
                            `id` int(11) NOT NULL COMMENT "",
                            `name` int(11) NOT NULL COMMENT ""
                          ) ENGINE=OLAP
                          UNIQUE KEY(`id`)
                          COMMENT "OLAP"
                          DISTRIBUTED BY HASH(`id`) BUCKETS 8
                          PROPERTIES (
                          "replication_allocation" = "tag.location.default: 3"
                          );

                          步骤 2:创建同步作业

                          CREATE SYNC demo_job ( 
                             FROM mysql.test_table INTO target_test (id, name) 
                          ) 
                          FROM BINLOG ( 
                             "type" = "canal", 
                             "canal.server.ip" = "192.168.1.101",   # Canal服务器IP 
                             "canal.server.port" = "11111",         # Canal默认端口 
                             "canal.destination" = "demo",          # Canal Instance名称 
                             "canal.username" = "canal", 
                             "canal.password" = "canal", 
                             "sync.mode" = "incremental"            # 增量同步模式 
                          ); 

                          4. 注意事项

                          权限要求:需为 Canal 创建专用用户(如 canal),并授予REPLICATION SLAVE权限:

                          CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; 
                          GRANT REPLICATION SLAVE ON *.* TO 'canal'@'%'; 
                          FLUSH PRIVILEGES; 

                          数据一致性:建议在 Doris 目标表中使用UNIQUE KEY 模型,并确保 MySQL 源表与 Doris 表字段一一对应。

                          三、Flink CDC 流式同步方案(适合高实时性场景)

                          1. 方案概述

                          基于 Flink CDC(Change Data Capture)实时捕获 MySQL 变更数据,通过 Flink-Doris 连接器写入 Doris,支持秒级延迟,适用于实时分析、实时报表场景。

                          2. 环境要求

                          组件版本要求说明
                          Flink1.13+支持 Flink CDC 2.0+
                          Doris1.2.0+支持 Delete 操作
                          依赖库flink-cdc-2.1.0, flink-doris-connector-1.1.0需下载对应版本 JAR 包

                          3. 实施步骤

                          3.1 构建 Flink 作业

                          步骤 1:添加 Maven 依赖

                          <dependency>
                            <groupId>js;org.apache.doris</groupId>
                            <artifactId>flink-doris-connector-1.16</artifactId>
                            <version>25.1.0</version>
                          </dependency> 

                          步骤 2:编写 Flink SQL 脚本

                          -- enable checkpoint
                          SET 'execution.checkpointing.interval' = '30s';
                          -- 创建MySQL CDC源表 
                          CREATE TABLE mysql_cdc_source ( 
                             id INT PRIMARY KEY NOT ENFORCED, 
                             name STRING, 
                             age INT, 
                             ts TIMESTAMP(3) METADATA FROM 'source_ts' VIRTUAL 
                          ) WITH ( 
                             'connector' = 'mysql-cdc', 
                             'hostname' = '192.168.1.100', 
                             'port' = '3306', 
                             'username' = 'root', 
                             'password' = 'your_mysql_password', 
                             'database-name' = 'test', 
                             'table-name' = 'user_table' 
                          ); 
                          -- 创建Doris目标表 
                          CREATE TABLE doris_sink ( 
                             id INT, 
                             name STRING, 
                             age INT, 
                             update_time TIMESTAMP(3) 
                          ) WITH ( 
                             'connector' = 'doris', 
                             'fenodes' = 'doris-fe:8030', 
                             'table.identifier' = 'test.user_table', 
                             'username' = 'root', 
                             'password' = '', 
                             'sink.properties.format' = 'json', 
                             'sink.enable-delete' = 'true', 
                             'sink.label-prefix' = 'flink_cdc_' 
                          ); 
                          -- 启动数据同步 
                          INSERT INTO doris_sink SELECT id, name, age, ts FROM mysql_cdc_source; 

                          3.2 提交 Flink 作业

                          <FLINK_HOME>bin/flink run \
                              -Dexecution.checkpointing.interval=10s \
                              -Dparallelism.default=1 \
                              -c org.apache.doris.flink.tools.cdc.CdcTools \
                              lib/flink-doris-connector-1.16-24.0.1.jar \
                              mysql-sync-database \
                              --database test_db \
                              --mysql-conf hostname=127.0.0.1 \
                              --mysql-conf port=3306 \
                              --mysql-conf username=root \
                              --mysql-conf password=123456 \
                              --mysql-conf database-name=mysql_db \
                              --including-tables "tbl1|test.*" \
                              --sink-conf fenodes=127.0.0.1:8030 \
                              --sink-conf username=root \
                              --sink-conf password=123456 \
                              --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
                              --sink-conf sink.label-prefix=label \
                              --table-conf replication_num=1 

                          4. 注意事项

                          并行度优化:根据 MySQL 表数量和数据量调整 Flink 作业并行度,避免单任务压力过大。

                          写入机制:开启 Flink Checkpoint,Doris Flink Connpythonector 提供了两种攒批模式,默认使用基于 Flink Checkpoint 的流式写入方式。

                          写入方式流式写入批量写入
                          触发条件依赖 Flink 的 Checkpoint,跟随 Flink 的 Checkpoint 周期写入到 Doris 中基于 Connector 内的时间阈值、数据量阈值进行周期性提交,写入到 Doris 中
                          一致性Exactly-OnceAt-Least-Once,基于主键模型可以保证 Exactly-Once
                          延迟受 Checkpoint 时间间隔限制,通常较高独立的批处理机制,灵活调整
                          容错与恢复与 Flink 状态恢复完全一致依赖外部去重逻辑(如 Doris 主键去重)

                          四、Datax 同步方案(适合全量 / 批量数据迁移)

                          1. 方案概述

                          Datax 是一款异构数据源之间数据同步工具,通过编写 JSON 格式的配置文件,实现 MySQL 与 Doris 之间的数据抽取、转换和加载(ETL)。该方案适用于全量数据迁移、定期批量数据同步场景,对实时性要求不高,但配置灵活,可自定义数据处理逻辑。

                          2. 环境要求

                          组件版本要求说明
                          Datax3.0+下载地址:Datax 官网
                          MySQL5.6+支持标准 JDBC 协议
                          Doris1.0+支持通过 JDBC 或 Broker 导入数据

                          3. 实施步骤

                          3.1 安装 Datax

                          # 下载Datax压缩包
                          wget https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz
                          tar -zxvf datax.tar.gz

                          3.2 编写同步配置文件

                          创建mysql_to_doris.json文件,示例配置如下:

                          {
                              "job": {
                                  "content": [
                                      {
                                          "reader": {
                                              "name": "mysqlreader",
                                              "parameter": {
                                                  "column": [xxx],
                                                  "connection": [
                                                      {
                                                          "jjsdbcUrl": ["jdbc:mysql://localhost:3306/demo"],
                                                          "table": ["table_name"]
                                                      }
                                                  ],
                                                  "username": "root",
                                                  "password": "xxxxx",
                                                  "where": ""
                                              }
                                          },
                                          "writer": {
                                              "name": "doriswriter",
                                              "parameter": {
                                                  "loadUrl": ["127.0.0.1:8030"],
                                                  "column": [xxxx],
                                                  "username": "root",
                                                  "password": "xxxxxx",
                                                  "postSql": ["select count(1) from table_name"],
                                                  "preSql": [],
                                                  "flushInterval":30000,
                                                  "connection": [
                                                    {
                                                      "jdbcUrl": "jdbc:mysql://127.0.0.1:9030/demo",
                                                      "selectedDatabase": "demo",
                                                      "table": ["table_name"]
                                                    }
                                                  ],
                                                  "loadProps": {
                                                      "format": "json",
                                                      "strip_outer_array":"true",
                                                      "line_delimiter": "\\x02"
                                                  }
                                              }
                                          }
                                      }
                                  ],
                                  "setting": {
                                      "speed": {
                                          "channel": "1"
                                      }
                                  }
                              }
                          }

                          3.3 执行同步任务

                          python bin/datax.py mysql_to_doris.json

                          4. 注意事项

                          数据类型映射:确保 MySQL 与 Doris 字段类型兼容,如 MySQL 的DATETIME对应 Doris 的DATETIME,避免类型转换错误。

                          性能调优:通过调整channel参数控制并发度,但过高的并发可能导致资源耗尽,建议根据数据库负载测试后设置。

                          五、方案对比与选型建议

                          方案适用场景复杂度资源依赖
                          JDBC Catalog跨库实时查询、联邦分析★★☆☆☆依赖 Doris JDBC 功能
                          Binlog 同步增量数据实时同步★★★☆☆需部署 Canal 及相关组件
                          Flink CDC 同步高实时性流式处理、复杂清洗★★★★☆依赖 Flink 集群
                          Datax 同步全量 / 批量数据迁移、离线同步★★★☆☆轻量级工具,独立部署

                          优先选择 JDBC Catalog:当需要快速验证跨库查询功能,或需要与 Doris 本地表进行关联分析时。

                          推荐 Binlog 同步:常规增量同步场景,兼顾实时性和部署复杂度。

                          选择 Flink CDC:对实时性要求高(如实时看板),或需要复杂数据清洗(通过 Flink Shttp://www.devze.comQL 实现)。

                          使用 Datax 同步:适合一次性全量迁移,或周期性批量同步历史数据。

                          五、数据迁移最佳实践要点

                          全量初始化与增量同步结合

                          首次迁移时,先通过DataX导入 MySQL 全量数据,再启动 Binlog 或 Flink CDC 同步增量数据,避免历史数据积压。

                          数据类型兼容性测试

                          重点验证 MySQL 的ENUMSETBIT等类型在 Doris 中的映射是否符合预期。

                          监控与告警体系

                          结合 Prometheus + Grafana 监控或者Doris Manager工具观察资源使用情况。

                          这份文档已涵盖多种迁移方式。若还想对某个迁移方式补充更多细节,或增加其他方面的内容,欢迎讨论。

                          到此这篇关于MySQL 迁移至 Doris 最佳实践方案的文章就介绍到这了,更多相关MySQL 迁移至 Doris内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

                          0

                          上一篇:

                          下一篇:

                          精彩评论

                          暂无评论...
                          验证码 换一张
                          取 消

                          最新数据库

                          数据库排行榜