Java使用Canal同步MySQL数据到Redis
目录
- 一、引言
- 二、工作原理
- 1. mysql主备复制原理
- 2. canal 工作原理
- 三、环境准备
- 1. 安装和配置 MySQL
- 2. 安装和配置 Canal
- 3. 安装和配置 Redis
- 四、开发 Java 应用
- 1. 添加依赖
- 2. 编写 Canal 客户端代码
- 3. 运行和测试
- 五、注意事项
- 六、结论
一、引言
在现代微服务架构中,数据同步是一个常见的需求。特别是将 MySQL 数据实时同步到 Redis,可以显著提升应用的性能和响应速度。本文将详细介绍如何使用 Canal 实现这一目标。Canal 是阿里巴巴开源的一个数据库 Binlog 同步工具,可以实时捕获 MySQL 的 Binlog 日志并将其同步到其他存储系统。
项目地址:alibaba/canal
二、工作原理
1. MySQL主备复制原理
MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
2. canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
三、环境准备
1. 安装和配置 MySQL
Canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,并且配置binlog模式为row。编辑 MySQL 配置文件 my.cnf 或 my.ini,添加或修改以下内容:
[mysqld] server-id=1 log-bin=mysql-bin binlog-format=ROW
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
重启 MySQL 服务以使配置生效:
sudo service mysql restart
2. 安装和配置 Canal
下载并解压 Canal 服务端:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal cd /opt/canal
编辑 Canal 配置文件 conf/example/instance.properties,配置 MySQL 服务器的相关信息:
canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset=UTF-8 canal.instance.filter.regex=.*\\..*
启动 Canal 服务:
sh bin/startup.sh
查看 server 日志
vi logs/canal/canal.log</pre> 2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111] 2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
查看 instance 的日志
vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
如果启动失败,注意检查配置文件conf/example/instance.properties的内容,还要注意JDK版本及配置。建议使用1.6.25。我用openjdk 21启动报错,改回JDK8u421启动成功。
3. 安装和配置 Redis
确保 Redis 服务已经安装并启动。可以在 Redis 客户端中执行以下命令检查:
redis-cli
ping
四、开发 Java 应用
1. 添加依赖
在你的 pom.XML 文件中添加 Canal 客户端和 Redis 客户端的依赖。以下是一个示例:
<dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>5.1.5</version> </dependency> </dependencies>
2. 编写 Canal 客户端代码
创建一个 Java 类来连接 Canal 服务并处理 Binlog 事件,将数据同步到 Redis:
package org.hbin.canal; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import redis.clients.jedis.Jedis; import java.net.InetSocketAddress; import java.util.List; javascript public class CanalToRedisSync { public static void main(String[] args) { // 创建 Canal 连接 InetSocketAddress address = new InetSocketAddress("127.0.0.1", 11111); CanalConnector connector = CanalConnectors.newSingleConnector(address, "example", "", ""); // 连接到 Canal 服务 connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); // 创建 Redis 客户端 Jedis jedis = new Jedis("127.0.0.1", 6379); while (true) { Message message = connector.getWithoutAck(100); // 获取最多 100 条记录 long BATchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { handleEntry(message.getEntries(), jedis); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } } private static void handleEntry(List<CanalEntry.Entry> entries, Jedis jedis) { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChange.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getjavascriptHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { syncDelete(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().jsgetTableName()); } else if (eventType == CanalEntry.EventType.INSERT) { syncInsert(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName()); } else { System.out.println("-------> before"); syncUpdate(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName()); System.out.println("-------> after"); syncUpdate(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName()); } } } } private static void syncInsert(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) { StringBuilder key = new StringBuilder(); StringBuilder value = new StringBuilder(); for (CanalEntry.Column column : columns) { if (column.getName().equals("id")) { key.append(column.getValue()); } else { value.append(column.getName()).append(":").append(column.getValue()).append(","); } } System.out.println("Insert: " + key.toString() + " -> " + value.toString()); jedis.hset(schema + ":" + table, key.toString(), value.toString()); } private static void syncUpdate(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) { StringBuilder key = new StringBuilder(); StringBuilder value = new StringBuilder(); for (CanalEntry.Column column : columns) { if (column.getName().equals("id")) { key.append(column.getValue()); } else { value.append(column.getName()).append(":").append(column.getValue()).append(","); } } System.out.println("Update: " + key.toString() + " -> " + value.toString()); jedis.hset(schema + ":" + table, key.toString(), value.toString()); } private static void syncDelete(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) { StringBuilder key = new StringBuilder(); for (CanalEntry.Column column : columns) { if (column.getName().equals("id")) { key.append(column.getValue()); } } System.out.println("Delete: " + key.toString()); jedis.hdel(schema + ":" + table, key.toString()); } }
3. 运行和测试
3.1 启动 Canal 服务:
sh /opt/canal/bin/startup.sh
3.2 启动 Redis 服务:
确保 Redis 服务已经启动,可以在 Redis 客户端中执行以下命令检查:
redis-cli
ping
3.3 启动 Java 应用:
编译并运行上述 Java 应用,确保 Canal 服务和 MySQL 服务器正常运行。
3.4 测试数据同步:
在 MySQL 中插入、更新或删除数据,观察 Java 应用是否能够实时捕获这些变化并将数据同步到 Redis。
相关SQL如下:
drop database if exists canal; create database canal; use canal; drop table if exists user; create table user( `id` bigint AUTO_INCREMENT primary key, `name` varchar(20) NOT NULL, `age` tinyint DEFAULT 0, `detail` varchar(100) DEFAULT '', `create_time` date, `update_time` date ); insert into user value(1, 'Tom1', 25, 'canal', '2024-11-07', '2024-11-07'); insert into user value(2, 'Tom2', 25, 'canal', '2024-11-07', '2024-11-07'); insert into user value(3, 'Tom3', 25, 'canal', '2024-11-07', '2024-11-07'); update user set age=26 where id=2; delete from user where id=3;
输出信息:
================> binlog[binlog.000008:6390] , name[caphpnal,user] , eventType : CREATE
================> binlog[binlog.000008:6899] , name[canal,user] , eventType : INSERTInsert: 1 -> name:Tom1,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,================> binlog[binlog.000008:7213] , name[canal,user] , eventType : INSERTInsert: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,================> binlog[binlog.000008:7527] , name[canal,user] , eventType : INSERTInsert: 3 -> name:Tom3,age:25,detail:canal,crhttp://www.devze.comeate_time:2024-11-07,update_time:2024-11-07,================> binlog[binlog.000008:7850] , name[canal,user] , eventType : UPDATE-------> beforeUpdate: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,-------> afterUpdate: 2 -> name:Tom2,age:26,detail:canal,create_time:2024-11-07,update_time:2024-11-07,================> binlog[binlog.000008:8193] , name[canal,user] , eventType : DELETEDelete: 3
五、注意事项
性能优化:根据实际需求调整 Canal 和 Redis 的配置,以优化性能。
错误处理:在生产环境中,需要增加错误处理和重试机制,确保数据同步的可靠性。
安全性:确保 Canal 和 Redis 的连接是安全的,使用适当的认证和授权机制。
六、结论
通过使用 Canal,我们可以轻松地将 MySQL 数据实时同步到 Redis、Kafka 或其他系统。这不仅提高了数据的一致性和实时性,还为应用提供了更高的性能和响应速度。
以上就是Java使用Canal同步MySQL数据到Redis的详细内容,更多关于Java Canal同步MySQL数据的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论