Canal入门使用小结
目录
- Canal介绍
- Canal安装
- Canal使用
- 总结
说明:canal [k’næl],译意为水道/管道/沟渠,主要用途是基于 mysql 数据库增量日志解析,提供增量数据订阅和消费(官方介绍)。一言以蔽之,Canal是一款实现数据同步的组件。可以实现数据库之间、数据库与Redis、ES之间的数据同步。本文介绍Canal的入门使用。
Canal介绍
Canal实现原理是伪装成MySQL主节点的从节点,接收主节点的binlog日志,解析、提取数据库操作,将对数据库的操作通过代码更新到其他组件中,如其他数据库、ES、Redis等,官方解释如下:
官方提供的结构图如下:
Canal安装
首先,从官网上下载Canal服务器,地址:https://github.com/alibaba/canal/releases
下载下来,解压,如下:
canal配置文件暂时不用管,先修改一下example
文件中监测的目前节点配置,修改成自己需要监测的MySQL配置,如下:
修改完,启动canal服务,双击startup.BAT
文件,如下:
Canal使用
只要你的MySQL服务器的IP、账号密码没输错,且测试过能用Navicat或其他数据库连接工具成功连接数据库,那么就可以进行下面的编码工作了。
首先,创建一个Maven项目,pom.XML如下,导个canal依赖就行了
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hezy</groupId> <artifactId>canal_demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!--canal客户端--> <dependency> <groupId>top.Javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId>编程; <version>1.2.1-RELEASE</version> </dependency> </dependencies> </project>
测试代码如下,用来连接canal服务器,打印canal监测到的数据内容;
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; /** * Canal处理器 * 作用:打印canal服务器监测到的数据 */ public class CanalHandler { public static void main(String[] args) throws InvalidProtocolBufferException { // 1.创建连接 CanalConnector canalConnector = CanalConnectors .newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", ""); // 2.抓取数据 while (true) { // 3.开始连接 canalConnector.connect(); // 4.订阅数据,所有的库和表 canalConnector.subscribe(".*\\..*"); // 5.抓取数据,每次抓取100条 Message message = canalConnector.get(100); // 6.获取entry集合 List<CanalEntry.Entry> entries = message.getEntries(); // 7.判断是否有数据 if (entries.size() == 0) { System.out.println(">>>暂无数据<<<"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { // 8.解析数据 www.devze.com for (CanalEntry.Entry entry : entries) { // 获取表名 String tableName = entry.getHeader().getTableName(); // 获取操作类型 CanalEntry.Entrwww.devze.comyType entryType = entry.getEntryType(); // 判断entryType是否为ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { // 序列化数据 ByteString storeValue = entry.getStoreValue(); // 反序列化数据 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); // 获取事件类型 CanalEntry.EventType eventType = rowChange.getEventType(); // 获取具体的数据 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); // 遍历打印 for (CanalEntry.RowData rowData : rowDatasList) { // 获取拉取前后的数据 List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); // 用Map存储每条数据 HashMap<String, Object> beforeMap = new HashMap<>(); HashMap<String, Object> afterMap = new HashMap<>(); // 获取不同操作的数据 if (CanalEntry.EventType.INSERT.equals(eventType)) { System.out.println("【" + tableName + "】表插入数据"); for (CanalEntry.Column column : afterColumnsList) { afterMap.put(column.getName(), column.getValue()); } System.out.println("新增数据:" + afterMap); } else if (CanalEntry.EventType.UPDATE.equals(eventType)) { System.out.println("【" + tableName + "】表更新数据"); for (CanalEntry.ColuPDkGJmn column : beforeColumnsList) { beforeMap.put(column.getName(), column.getValue()); } System.out.println("更新前:" + beforeMap); System.out.println("----"); for (CanalEntry.Column column : afterColumnsList) { afterMap.put(column.getName(), column.getValue()); } System.out.println("更新后:" + afterMap); } else if (CanalEntry.EventType.DELETE.equals(eventType)) { System.out.println("【" + tableName + "】表删除数据"); for (CanalEntry.Column column : beforeColumnsList) { beforeMap.put(column.getName(), column.getValue()); } System.out.println("被删除的数据:" + beforeMap); } } } } } } } }
启动程序,查看控制台,检测中……
使用Navicat连接数据库,查看数据库test库,i_user表内容;
此时,我们新增一条数据,看控制台,canal成功接收到了这次修改;
更新数据;
删除数据;
头能过身体就能过,接下来不就好办了。将Canal接收到的数据转为对象,根据不同的操作类型分发给自己想要同步的组件,同步给从MySQL,就调用对应的Mapper;同步给Redis,就调用Redis对应的方法,ES同样。
总结
本文介绍了Canal入门使用,参考B站视频:Canal极简入门:一小时让你快速上手Canal数据同步神技~
到此这篇关于Canal入门使用小结的文章就介绍到这了python,更多相关Canal入门内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支
精彩评论