一、简介1、介绍Canal 是用 Java 开发的基于数据库增量日志解析提供增量数据订阅消费的中间件。 目前Canal 主要支持了MySQL的Binlog解析解析完成后利用Canal Client来处理获得相关数据。数据库同步需要阿里的Otter中间件基于Canal。GitHub地址https://github.com/alibaba/canal2、MySQL的binlog1) 什么是binlogMySQL 的二进制日志可以说MySQL最重要的日志了它记录了所有的DDL和DML(除了数据查询语句)语句以事件形式记录还包含语句所执行的消耗的时间MySQL的二进制日志是事务安全型的。一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:其一MySQL Replication在Master端开启binlogMaster 把它的二进制日志传递给slaves 来达到master-slave 数据一致的目的。其二数据恢复通过使用mysqlbinlog工具来使恢复数据。二进制日志包括两类文件二进制日志索引文件文件名后缀为.index用于记录所有的二进制文件二进制日志文件文件名后缀为.00000*记录数据库所有的DDL和DML(除了数据查询语句)语句事件。2) binlog的开启MySQL配置文件的位置Linux: /etc/my.cnf 如果/etc目录下没有可以通过locate my.cnf查找位置Windows: \my.ini在mysql的配置文件下,修改配置 在[mysqld]区块添加log-binmysql-bin这个表示binlog日志的前缀是mysql-bin以后生成的日志文件就是 mysql-bin.000001 的文件后面的数字按顺序生成每次mysql重启或者到达单个文件大小的阈值时新生一个 文件按顺序编号。3) binlog的分类设置mysql的binlog格式有三种分别是STATEMENT,MIXED,ROW。 在配置文件中可以选择配置binlog_format statement|mixed|row三种格式的区别a、statement 语句级binlog会记录每次一执行写操作的语句。 相对row模式节省空间但是可能产生不一致性比如 update test set create_datenow(); 如果用binlog日志进行恢复由于执行时间不同可能产生的数据就不同。优点 节省空间 缺点 有可能造成数据不一致。b、row 行级binlog会记录每次操作后每行记录的变化。优点保持数据的绝对一致性。因为不管sql是什么引用了什么函数他只记录执行后的效果。 缺点占用较大空间。c、mixed 混合级别statement的升级版一定程度上解决了statement模式因为一些情况而造成的数据不一致问题。默认还是statement在某些情况下譬如当函数中包含 UUID() 时包含 AUTO_INCREMENT 字段的表被更新时执行 INSERT DELAYED 语句时用 UDF 时会按照 ROW的方式进行处理优点节省空间同时兼顾了一定的一致性。 缺点还有些极个别情况依旧会造成不一致另外statement和mixed对于需要对 binlog监控的情况都不方便。3、工作原理就是把自己伪装成Slave从Master复制数据。二、安装1、MySQL环境准备1Binlog设置修改mysql的配置文件开启MySQL Binlog设置sudo vim /etc/my.cnf在[mysqld]模块下添加一下内容[mysqld]server_id1log-binmysql-binbinlog_formatrow# 需要监控的库binlog-do-dbtest_maxwell并重启Mysql服务sudo systemctl restart mysqld登录mysql并查看是否修改完成mysql -uroot -p123456mysql show variables like %binlog%;查看下列属性binlog_format | ROWwin2查看binlog文件进入/var/lib/mysql目录查看MySQL生成的binlog文件注MySQL生成的binlog文件初始大小一定是154字节前缀是log-bin参数配置的后缀是默认从.000001然后依次递增。除了binlog文件文件以外MySQL还会额外生产一个.index索引文件用来记录当前使用的binlog文件。3创建账号分配一个账号可以操作该数据库GRANT ALL ON *.* TO canal% IDENTIFIED BY 123456;GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO canal%;刷新mysql表权限flush privileges;2、安装Canal1上传并解压注意canal解压后是分散的我们在指定解压目录的时候需要将canal指定上mkdir /opt/module/canaltar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal/2修改配置文件1修改canal.properties的配置文件......canal.port 11111......# tcp, kafka, RocketMQcanal.serverMode tcp......########################################################## destinations ##############################################################canal.destinations example说明canal端口号默认就是11111修改canal的输出model默认tcp改为输出到kafka多实例配置如果创建多个实例一个canal服务中可以有多个instanceconf/下的每一个example即是一个实例每个实例下面都有独立的配置文件。默认只有一个实例example如果需要多个实例处理不同的MySQL数据的话直接拷贝出多个example并对其重新命名命名和配置文件中指定的名称一致修改 canal.properties中的canal.destinations实例1实例2实例3。2修改instance.properties配置文件修改conf/example目录下的配置文件如果是多个实例可以配置多个配置文件。## mysql serverId , v1.0.26 will autoGencanal.instance.mysql.slaveId10canal.instance.master.address192.168.10.139:3306......# username/passwordcanal.instance.dbUsernamerootcanal.instance.dbPasswordrootcanal.instance.connectionCharset UTF-8canal.instance.defaultDatabaseName test# enable druid Decrypt database passwordcanal.instance.enableDruidfalse3启动./bin/startup.sh三、实时监控1、TCP监控1数据结构2创建maven项目3添加依赖?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd modelVersion4.0.0/modelVersion parent groupIdcom.hk/groupId artifactIdhadoopDemo/artifactId version1.0-SNAPSHOT/version /parent artifactIdcanalDemo/artifactId properties maven.compiler.source8/maven.compiler.source maven.compiler.target8/maven.compiler.target project.build.sourceEncodingUTF-8/project.build.sourceEncoding /properties dependencies dependency groupIdcom.alibaba.otter/groupId artifactIdcanal.client/artifactId version1.1.2/version /dependency dependency groupIdorg.apache.kafka/groupId artifactIdkafka-clients/artifactId version2.4.1/version /dependency /dependencies /project4编写客户端代码package com.hk; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; public class Main { public static void main(String[] args) throws InvalidProtocolBufferException { // 1.获取canal连接对象 CanalConnector canalConnector CanalConnectors.newSingleConnector(new InetSocketAddress(hd01, 11111), example, , ); // 循环监听 while (true) { // 获取连接 canalConnector.connect(); // 要监听的数据库和表 canalConnector.subscribe(test_maxwell.*); // 获取message一次获取10条修改 Message message canalConnector.get(10); // 获取entry ListCanalEntry.Entry entries message.getEntries(); // 遍历entry if(entries.size() 0) { try { System.out.println(暂无数据修改......); Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); } } else { for (CanalEntry.Entry entry : entries) { // 获取表名 String tableName entry.getHeader().getTableName(); // 获取entry类型 CanalEntry.EntryType entryType entry.getEntryType(); // 判断entryType是否为ROWDATA if(entryType CanalEntry.EntryType.ROWDATA) { // 序列化数据 ByteString storeValue entry.getStoreValue(); // 反序列化 CanalEntry.RowChange rowChange CanalEntry.RowChange.parseFrom(storeValue); // 获取事件类型 CanalEntry.EventType eventType rowChange.getEventType(); // 获取具体的数据 ListCanalEntry.RowData rowDatasList rowChange.getRowDatasList(); // 打印数据 for (CanalEntry.RowData rowData : rowDatasList) { // 获取修改前的数据 ListCanalEntry.Column beforeColumnsList rowData.getBeforeColumnsList(); JSONObject beforeData new JSONObject(); for (CanalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } // 获取修改后的数据 ListCanalEntry.Column afterColumnsList rowData.getAfterColumnsList(); JSONObject afterData new JSONObject(); for (CanalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } System.out.println(TableName: tableName ,EventType: eventType ,Before: beforeData ,After: afterData); } } } } } } }2、发送到Kafka1启动Kafka和zookeeper2修改配置文件修改canal.properties中canal的输出model默认tcp改为输出到kafka# tcp, kafka, RocketMQcanal.serverMode kafka......########################################################### MQ ###############################################################canal.mq.servers hd01:6667,hd02:6667,hd03:6667修改instance.properties输出到Kafka的主题以及分区数# mq configcanal.mq.topiccanal_testcanal.mq.partitionsNum1# hash partition config#canal.mq.partition0#canal.mq.partitionHashmytest.person:id,mytest.role:id注意默认还是输出到指定Kafka主题的一个kafka分区因为多个分区并行可能会打 乱binlog的顺序如果要提高并行度首先设置kafka的分区数1,然后设置 canal.mq.partitionHash属性3启动Canalbin/startup.sh4测试向MySQL中插入数据后查看消费者控制台插入数据INSERT INTO test VALUES(1001,zhangsan),(1002,lisi);Kafka 消费者控制台{data:[{id:1001,name:zhangsan},{id:1002,name:lisi}],database:test-maxwwell,es:1639360729000,id:1,isDdl:false,mysqlType:{id: varchar(255),name:varchar(255)},old:null,sql:,sqlType:{id:12,name:12,table:test,ts:1639361038454,type:INSERT}