Canal的工作原理
- Canal伪装自己为MySQL的从节点,向MySQL主节点发送dump协议
- MySQL主节点一旦收到dump请求,开始推送binlog给canal
- Canal会接收并解析这些变更事件并解析binlog,并发送到其它服务器(比如es、redis等等)
环境安装
MySQL-Canal-MQ-ES的环境基本已经配置完毕
配置Mysql
- 在MySQL中需要创建一个用户,并授权
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| docker exec -it mysql /bin/bash
mysql -u root -p
create user 'canal'@'%' identified WITH mysql_native_password by 'canal';
GRANT SELECT,REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
|
- 修改MySQL配置文件/usr/mysql/conf/my.cnf,开启Binlog功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| [mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
expire_logs_days=3
max_binlog_size = 100m
max_binlog_cache_size = 512m
|
配置文件修改完毕之后,使用下面命令重启mysql容器
- 查看当前mysql的状态
1 2 3 4 5 6 7 8 9
| SHOW VARIABLES LIKE 'log_bin'; show variables like 'binlog_format';
SHOW BINARY LOGS;
SHOW MASTER STATUS;
|
配置RabbitMQ
给出的服务器中已经提供好了rabbitmq的软件,启动之后可以直接访问
地址:http://192.168.101.68:15672/ 账号:czri 密码:czri1234
目前虚拟机中提前准备好了内容有
- 虚拟主机:/xzb
- 虚拟主机对应的账户和密码:xzb
- top类型的交换机:exchange.canal-jzo2o,以及对应队列的绑定

配置Canal
Canal容器在虚拟机中已经提供好了,要做的是对其进行配置
最终达到的效果就是:操作jzo2o-foundations数据库下serve_sync表的数据后通过canal将修改信息发送到MQ
- 配置MySQL的连接信息,修改/data/soft/canal/instance.properties
1 2 3 4 5 6 7 8 9 10 11 12
| canal.instance.master.address=192.168.101.68:3306
canal.instance.master.journal.name=mysql-bin.000001 canal.instance.master.position=0 canal.instance.master.timestamp= canal.instance.master.gtid=
canal.instance.dbUsername=canal canal.instance.dbPassword=canal
|
- 配置RabbitMQ的连接信息,修改/data/soft/canal/canal.properties
1 2 3 4 5 6 7 8 9 10 11 12 13
| canal.serverMode = rabbitMQ
rabbitmq.host = 192.168.101.68 rabbitmq.virtual.host = /xzb rabbitmq.exchange = exchange.canal-jzo2o rabbitmq.username = xzb rabbitmq.password = xzb rabbitmq.deliveryMode = 2
|
- 配置需要监听的mysql表以及同步的mq的信息,修改/data/soft/canal/instance.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
canal.instance.filter.regex= jzo2o-orders-1\\.orders_dispatch, jzo2o-orders-1\\.orders_seize, jzo2o-foundations\\.serve_sync, jzo2o-customer\\.serve_provider_sync, jzo2o-orders-1\\.serve_provider_sync, jzo2o-orders-1\\.history_orders_sync, jzo2o-orders-1\\.history_orders_serve_sync, jzo2o-market\\.activity
canal.mq.dynamicTopic= canal-mq-jzo2o-orders-dispatch:jzo2o-orders-1\\.orders_dispatch, canal-mq-jzo2o-orders-seize:jzo2o-orders-1\\.orders_seize, canal-mq-jzo2o-foundations:jzo2o-foundations\\.serve_sync, canal-mq-jzo2o-customer-provider:jzo2o-customer\\.serve_provider_sync, canal-mq-jzo2o-orders-provider:jzo2o-orders-1\\.serve_provider_sync, canal-mq-jzo2o-orders-serve-history:jzo2o-orders-1\\.history_orders_serve_sync, canal-mq-jzo2o-orders-history:jzo2o-orders-1\\.history_orders_sync, canal-mq-jzo2o-market-resource:jzo2o-market\\.activity
|
测试MySQL-Canal-MQ
在测试之前,需要将binlog日志重置一下,以方便canal从mysql-bin.000001文件开始读取内容
- 重置MySQL的binlog
1 2 3 4 5
| reset master;
show master status;
|
- 删除meta.dat文件,重启canal
1 2 3 4 5 6 7 8
| docker stop canal
rm -rf /data/soft/canal/conf/meta.dat
docker start canal
|
安装ES
目前已经将数据同步到了MQ中,接下来编写一个程序从mq中监听数据,然后写入到es中
在这个es中已经创建好了对应的索引库serve_aggregation,结构如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| PUT /serve_aggregation { "mappings" : { "properties" : { "city_code" : { "type" : "keyword" }, "detail_img" : { "type" : "text", "index" : false }, "hot_time_stamp" : { "type" : "long" }, "id" : { "type" : "keyword" }, "is_hot" : { "type" : "short" }, "price" : { "type" : "double" }, "serve_item_icon" : { "type" : "text", "index" : false }, "serve_item_id" : { "type" : "keyword" }, "serve_item_img" : { "type" : "text", "index" : false }, "serve_item_name" : { "type" : "text", "analyzer": "ik_max_word", "search_analyzer":"ik_smart" }, "serve_item_sort_num" : { "type" : "short" }, "serve_type_icon" : { "type" : "text", "index" : false }, "serve_type_id" : { "type" : "keyword" }, "serve_type_img" : { "type" : "text", "index" : false }, "serve_type_name" : { "type" : "text", "analyzer": "ik_max_word", "search_analyzer":"ik_smart" }, "serve_type_sort_num" : { "type" : "short" } } } }
|
启动虚拟机中的kibana软件
1
| docker start kibana7.17.7
|
然后使用下面地址访问kibana
http://192.168.101.68:5601
最后查看目前的索引库信息
1 2 3 4 5 6 7 8 9 10 11 12 13
| # 查看索引库 GET /serve_aggregation
# 查看索引库中的数据 GET /serve_aggregation/_search { "query": { "match_all": {} } }
# 查看 1686352662791016449记录 GET /serve_aggregation/_doc/1686352662791016449
|
编写同步程序
- 在foundations工程添加下边的依赖
1 2 3 4 5 6 7 8
| <dependency> <groupId>com.jzo2o</groupId> <artifactId>jzo2o-canal-sync</artifactId> </dependency> <dependency> <groupId>com.jzo2o</groupId> <artifactId>jzo2o-es</artifactId> </dependency>
|
- 修改foundations的配置文件,连接es和mq

- 修改nacos中es的配置

- 修改nacos中rabbitmq的配置

- 实现数据同步
在jzo2o-foundations模块中添加com.jzo2o.foundations.handler.ServeCanalDataSyncHandler
负责监听mq,然后将消息同步到es中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package com.jzo2o.foundations.handler;
import com.jzo2o.canal.listeners.AbstractCanalRabbitMqMsgListener; import com.jzo2o.es.core.ElasticSearchTemplate; import com.jzo2o.foundations.constants.IndexConstants; import com.jzo2o.foundations.model.domain.ServeSync; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import java.util.List;
@Component public class ServeCanalDataSyncHandler extends AbstractCanalRabbitMqMsgListener<ServeSync> {
@Resource private ElasticSearchTemplate elasticSearchTemplate;
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "canal-mq-jzo2o-foundations", arguments = { @Argument(name = "x-single-active-consumer", value = "true", type = "java.lang.Boolean") }), exchange = @Exchange(name = "exchange.canal-jzo2o", type = ExchangeTypes.TOPIC), key = "canal-mq-jzo2o-foundations") ) public void onMessage(Message message) throws Exception { parseMsg(message); }
@Override public void batchSave(List<ServeSync> data) { elasticSearchTemplate.opsForDoc().batchInsert(IndexConstants.SERVE, data); }
@Override public void batchDelete(List<Long> ids) { elasticSearchTemplate.opsForDoc().batchDelete(IndexConstants.SERVE, ids); } }
|
模版方法设计模式
适用场景:
一个功能的完成需要一系列的步骤,这些步骤是固定的,但是某些步骤的具体实现是待定的
比如:老师和学生在上午的经历步骤是一样的,都是吃早饭、工作、吃午饭
但是其中工作的内容是不一样的,老师的工作是讲课,学生的工作是学习
实现流程:
- 定义一个抽象类作为父类,在父类中提供模版方法
- 在模板方法中定义出流程的步骤,能确定的行为直接写出,不能确定的行为定义为抽象方法
- 根据不同的行为定义不同的子类,继承抽象类,重写父类的抽象方法,完成各自的行为内容