Canal在windows下的安装和监听初体验
一、在MySQL创建一个子账号最小权限原则创建账号给权限拉binary log查看主库的权限等-- 账号canal 密码 canal create user canal% identified by canal; -- 授权 grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to canal% identified by canal;二、MySQL开启binlog1. 找到 MySQL 的安装文件。2. 与bin目录同级创建my.cnf3. 写如下配置[mysqld] log-bitnmysql-bin binlog-formatROW server_id1三、下载Canalhttps://github.com/alibaba/canal/releases找到最新版本解压即可windows打开git bash即可执行命令tar -xzf canal.deployer-1.1.8.tar.gz解压得到如下文件夹四、配置Canal1. 进入conf目录下的example对instance.properties进行配置。2. 主库地址配置好记得看一下端口和ipcanal.instance.master.address127.0.0.1:33063. canal的账号密码配置好canal.instance.dbUsernamecanal 与canal.instance.dbPasswordcanal五、启动Canal1. 进入bin目录打开cmd执行startup.bat命令看到如下信息执行成功如果遇到报错遇到错误才看1. 如果你不是java8的环境请你编辑startup.bat,并且去掉其中的-XX:PermSize128m并且之后再次执行该文件。六、java后端引入依赖1. 引入pom.xml如下信息dependency groupIdcom.alibaba.otter/groupId artifactIdcanal.client/artifactId version1.1.8/version /dependency dependency groupIdcom.alibaba.otter/groupId artifactIdcanal.protocol/artifactId version1.1.8/version /dependency2. 编写一段监听代码public static void main(String args[]) { // 创建链接 CanalConnector connector CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), example, , ); int batchSize 1000; int emptyCount 0; try { connector.connect(); connector.subscribe(.*\\..*); connector.rollback(); int totalEmptyCount 120; while (emptyCount totalEmptyCount) { Message message connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId message.getId(); int size message.getEntries().size(); if (batchId -1 || size 0) { emptyCount; System.out.println(empty count : emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount 0; // System.out.printf(message[batchId%s,size%s] \n, batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println(empty too many times, exit); } finally { connector.disconnect(); } } private static void printEntry(ListEntry entrys) { for (Entry entry : entrys) { if (entry.getEntryType() EntryType.TRANSACTIONBEGIN || entry.getEntryType() EntryType.TRANSACTIONEND) { continue; } RowChange rowChage null; try { rowChage RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException(ERROR ## parser of eromanga-event has an error , data: entry.toString(), e); } EventType eventType rowChage.getEventType(); System.out.println(String.format(gt; binlog[%s:%s] , name[%s,%s] , eventType : %s, entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println(-------gt; before); printColumn(rowData.getBeforeColumnsList()); System.out.println(-------gt; after); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(ListColumn columns) { for (Column column : columns) { System.out.println(column.getName() : column.getValue() update column.getUpdated()); } }3. mysql插入一条数据测试通过mysql插入的数据被监听成功了完成。七、安装MQ1. 输入https://github.com/rabbitmq/rabbitmq-server/releases2. 选择对应的版本解压。3. 解压进入到sbin目录执行15672控制面板执行enable命令rabbitmq-plugins enable rabbitmq_management八、Canal推送MQ1. 进入canal的配置文件