canal是什么 canal〔knl〕,译意为水道管道沟渠,主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费,目前主要支持了mysql。工作原理 原理图canal模拟MySQLslave的交互协议,伪装自己为MySQLslave,向MySQLmaster发送dump协议MySQLmaster收到dump请求,开始推送binarylog给slave(即canal)canal解析binarylog对象(原始为byte流)使用场景 基于日志增量订阅消费支持的业务:数据库镜像数据库实时备份多级索引(卖家和买家各自分库索引)searchbuild业务cache刷新价格变化等重要业务消息快速开始 准备 要使用canal必须要开启binlog,通过下面语句可以查看binlog是否开启,关于binlog的文章请移步到showvariableslikelogbin 开启配置 OFF表示没有开启,那么就需要开启binlog,找到mysql配置文件,windows系统配置文件名为my。ini,Linux系统配置文件名为my。conf。 如果不知道配置文件的位置在哪,可以通过SELECTdatadir语句查看文件位置 查询文件位置 配置文件会在datadir的上一级目录中。 找到配置文件之后,添加如下配置表示开启binloglogbinmysqlbin开启binlogbinlogformatROW选择ROW模式 修改完配置文件之后重启mysql,再使用showvariableslikelogbin语句查看binlog是否开启,ON表示开启。 查询是否开启 下载 下载链接:https:github。comalibabacanalreleases,目前最新的版本是1。1。6 下载版本 配置 下载之后解压,得到如下文件夹 解压文件 进入到confexample文件加下,修改instance。properties配置positioninfo,需要改成自己的数据库信息canal。instance。master。address127。0。0。1:3306canal。instance。master。journal。namecanal。instance。master。positioncanal。instance。master。timestampcanal。instance。standby。addresscanal。instance。standby。journal。namecanal。instance。standby。positioncanal。instance。standby。timestampusernamepassword,需要改成自己的数据库信息canal。instance。dbUsernamerootcanal。instance。dbPassword1234canal。instance。connectionCharsetUTF8tableregex数据库表的过滤canal。instance。filter。regexdbtest。 这里配置的是本地数据dbtest,里面有张user表CREATETABLEuser(idbigint(20)NOTNULLAUTOINCREMENTCOMMENTid,usernamevarchar(40)NOTNULLDEFAULTCOMMENT全名,gendertinyint(2)NOTNULLDEFAULT0COMMENT性别,createdatedatetimeNOTNULLCOMMENT创建时间,PRIMARYKEY(id))ENGINEInnoDBAUTOINCREMENT14DEFAULTCHARSETutf8mb4COMMENT用户表 启动 进入到解压后的bin文件夹下,双击startup。bat或者通过命令行启动 启动 然后进入logscanal文件夹下查看canal。log文件,出现以下界面表示成功 查询日志文件实操 canal1。1。1版本之后,默认支持将canalserver接收到的binlog数据直接投递到MQ,目前默认支持的MQ系统有:kafka、RocketMQ、RabbitMQ、pulsarmq。 本篇文章将使用springbootcanalRocketMQ进行一次简单的实操,如果项目中用不到消息队列也可参考官网示例使用https:github。comalibabacanalwikiClientExample,这里就不多赘述。 准备实操前要先把rocketmq安装好,关于rocketmq的安装这里也不过多赘述,可以看这篇文章RocketMq系列】rocketmq安装教程修改instance配置文件confexampleinstance。propertiesmqconfig要发送到的topiccanal。mq。topicconsumertopic针对库名或者表名发送动态topiccanal。mq。dynamicTopicmytest1。user,topic2:mytest2。。,。。。canal。mq。partition0hashpartitionconfigcanal。mq。enableDynamicQueuePartitionfalsecanal。mq。partitionsNum3canal。mq。dynamicTopicPartitionNumtest。:4,mycanal:6库名。表名:唯一主键,多个表之间用逗号分隔canal。mq。partitionHashtest。table:idname,。。。修改canal配置文件confcanal。properties默认为tcp,可选项:tcp,kafka,rocketMQ,rabbitMQ,pulsarMQcanal。serverModerocketMQRocketMQ生产者组rocketmq。producer。groupcanalproducergrouprocketmq。enable。message。tracefalserocketmq。customized。trace。topicrocketmq。namespacerocketmq。namesrv。addr127。0。0。1:9876rocketmq。retry。times。when。send。failed0rocketmq。vip。channel。enabledfalserocketmqtag标签rocketmq。tag 修改完重启一下canal。 集成到springboot中新建一个springboot项目,引入依赖 canal把数据都发送到了mq里面,所以这里可以不引入canal的依赖,直接从rockemq中获取数据即可。dependencygroupIdorg。apache。rocketmqgroupIdrocketmqspringbootstarterartifactIdversion2。1。1versiondependency修改配置server。port8090rocketmq。nameserverlocalhost:9876消费者组rocketmq。producer。groupcanalproducergroup消息发送rocketmq。producer。retrytimeswhensendasyncfailed2消息发送失败重试次数,默认为2rocketmq。producer。retrytimeswhensendfailed2rocketmq。consumer。topicconsumertopicrocketmq。consumer。groupconsumergroup新建一个RocketMQListener实现类RocketMQMessageListener(topic{rocketmq。consumer。topic},consumerGroup{rocketmq。consumer。group})ComponentpublicclassCanalListenerimplementsRocketMQListenerMessageExt{OverridepublicvoidonMessage(MessageExtmessage){finalbyte〔〕bodymessage。getBody();System。out。println(newString(body,StandardCharsets。UTF8));}}测试 修改表中一条记录信息UPDATEdbtest。userSETid13,usernameusername13,gender1,createdate2022090910:13:53WHERE(id13); 控制台会打印一条如下数据:{data:〔{createdate:2022090910:13:53,gender:1,id:13,username:username13}〕,database:dbtest,es:1665633148000,id:8,isDdl:false,mysqlType:{createdate:datetime,gender:tinyint(2),id:bigint(20),username:varchar(40)},old:〔{createdate:2022090910:18:53,username:username133}〕,pkNames:〔id〕,sql:,sqlType:{createdate:93,gender:6,id:5,username:12},table:user,ts:1665633148529,type:UPDATE} 数据解释:data:修改前的字段和数据database:数据库名old:修改字段和数据table:表名type:事件类型。INSERT、UPDATE、DELETE、CREATE等mysqlType:字段名及数据结构isDdl:是否是DDL操作pkNames:主键名 至此,一个canal的简单入门就完成了,感兴趣的可以更深入的了解一下。 参考资料: https:github。comalibabacanalwiki