前言
如何保证数据一致性已然成为面试中高频出现的问题,也是我们项目中经常出现的问题,现在各大大小小的项目比如一个正常规模下的电商平台就会有mysql做为数据的持久化层,redis做数据缓存层,还会使用到es搜索引擎去加强用户对商品搜索的体验,那数据一致性就已然成为该架构的一个通病了,如mysql与redis缓存层的数据一致性,还有mysql的商品数据与es的数据一致性,一般没了解过数据一致性问题的小伙伴一般都会使用最简单粗暴的方法在修改数据库的时候去同步修改redis和es,这种方法抛开性能不说,风险也是极大的,那么今天我们来学一个阿里巴巴开发的开源项目-Canal,然后我们如何使用canal去解决数据一致性问题
需要的环境/版本注释
jdk - 因为canal是依据java开发的所以需要jdk环境 现在新版本支持的jdk一般是1.8或者jdk11
mysql - 本文测试用的5.7版本
canal安装包 本文用的1.1.7版本,2023-4-23最新版本
Canal的运行原理
canal是基于mysql的主从模式实现的,所以必须先开启mysql的binlog日志,具体运行原理如下:
- canal模拟mysql slave(数据库的从机)的交互协议,伪装自己为mysql slave(数据库从机),向mysql master(数据库主机)发送dump协议,
- mysql master(主机)收到dump请求,会向slave(也就是canal)开始推送binary log(数据库更改日志),slave会将master主库推送的binary log events拷贝到它的中继日志(relay log)
- canal解析binary log(日志)对象(原始为byte流),然后获取数据库更改记录
开启mysql-binlog日志
- 进入mysql服务查看mysql是否开启了binlog日志功能
show variables like '%log_bin%';
发现binlog日志是关闭的,如果是on的可以直接跳过开启binlog日志这一步,如果和我一样是off的就需要先打开binlog日志
- 打开mysql-binlog日志
#先退出mysql服务
exit
#修改/etc/下的my.cnf配置文件也就是mysql的配置文件
vim /etc/my.cnf
在结尾新增以下配置
log_bin=mysql-bin
binlog-format=ROW
server-id=1
- binlog-format : binlog的格式有三种:STATEMENT,MIXED,ROW对比如下
- server-id 服务的id,这里定义为1是防止与canal的service-id配置冲突
保存-重启mysql服务,然后再去查看mysql的binlog日志信息
#重启mysql
systemctl restart mysqld
#进入mysql服务
mysql -u [mysql用户名] -p [mysql密码]
#查看binlog信息
show variables like '%log_bin%';
可以看见binlog日志已经开启了
部署Canal
可以将共享云盘中的canal压缩包下载下来,或者直接去官网下载最新的压缩包 : https://github.com/alibaba/canal/releases
新建一个文件夹来存放canal
mkdir /usr/local/canal
把下载下来的压缩包使用xftp工具上传到服务器/usr/local/canal文件夹中
然后解压
cd /usr/local/canal
tar -zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz
因为canal是伪装成mysql的从机进行数据同步请求的,所以我们需要创建一个mysql账号专门用来帮住canal获取mysql授权使用,首先我们进入mysql服务,然后使用用户创建命令创建一个mysql用户,下面示例创建的mysql用户名为 canal 密码为 Canal(888),你们可以根据自己喜好自定义用户名密码
CREATE USER canal IDENTIFIED BY 'Canal(888)';
然后进行授权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
然后重新加载权限表
FLUSH PRIVILEGES;
然后我们在修改canal实例的配置文件
cd /usr/local/canal/conf/example
vim instance.properties
修改以下配置
#监听的数据库ip地址和端口号
canal.instance.master.address=127.0.0.1:3306
#上面新建的mysql用户的用户名
canal.instance.dbUsername=canal
#上面新建的mysql用户的密码
canal.instance.dbPassword=Canal(888)
#数据库编码
canal.instance.connectionCharset = UTF-8
配置完成后保存即可
启动Canal
先进入canal的bin文件夹下
cd /usr/local/canal/bin
启动canal
./startup.sh
关闭canal
./stop.sh
使用Canal
方法一 直接使用canal对接MQ消息队列进行数据监听
现在的canal已经集成了现比较大众的mq,比如 RocketMQ , RabbitMQ , Kafka , canal都支持直接将mysql的数据变动发送到mq中,然后我们只需要在项目中监听对应队列即可
修改canal配置文件
cd /usr/local/canal/conf
vim canal.properties
修改以下参数
# 修改canal的执行端口 , 可以选 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rabbitMQ
上面选的rabbitmq,下面就找rabbitmq的配置
然后进行修改
/
canal默认发送的信息带的路由key是example,如果要修改在/usr/local/canal/conf/example/instance.properties配置中修改
然后就可以使用mq就可以监听饿了
mq监听测试
我们新建了一个名为canal的exchanges,然后再新建一个叫canal的队列,然后将队列绑定到交换机上面,路由key就设置example
然后我们修改一个数据库,看一下mq是否能监听到数据的变动,下面是我模拟的一个数据库
我们把王五的年龄从17变成20,看一下mq的情况
发现有一条消息进来了,我们去队列哪里看一下收到的消息是什么样子的
看见是一条json格式的数据,我们进行一下格式化看一下
可以清楚的看见data是最新的数据,old是修改之前的数据,表是student,操作类型是update,那我们直接使用mq对接canal就完成了,其他队列我就不一一去给你们演示了,就靠你们举一反三了!!!
如果有的小伙伴项目没有用到队列的话,可以参考一下第二种方法使用默认的tcp进行监听
方法二 使用canal的TCP模式集成到SpringBoot中进行监听mysql
首先我们还是需要把canal.properties配置中的canal.serverMode改成默认的tcp (canal.serverMode = tcp)
- 先项目中创建一个canal微服务
- 在resources中新增lib文件夹,并把共享云盘中的starter-canal.zip下载,然后把压缩文件解压出的文件复制到lib文件下
- 在resources中新增application.yml配置文件,添加以下配置
canal:
client:
instances:
example:
host: #[你服务器的ip地址]
port: 11111
batchSize: 1000
server:
port: 8007
spring:
application:
name: canal
- 把lib下的jar引入pom
<dependencies>
<dependency>
<groupId>com.starter-canal</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/lib/starter-canal-0.0.1-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
<groupId>com.canal.protocol</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.0.25</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/lib/canal.protocol-1.0.25.jar</systemPath>
</dependency>
<dependency>
<groupId>com.canal.client</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.25</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/lib/canal.client-1.0.25.jar</systemPath>
</dependency>
<dependency>
<groupId>com.protobuf-java</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/lib/protobuf-java-3.6.1.jar</systemPath>
</dependency>
<dependency>
<groupId>canal.common</groupId>
<artifactId>canal.common</artifactId>
<version>1.0.25</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/lib/canal.common-1.0.25.jar</systemPath>
</dependency>
<!--web起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.4</version>
</dependency>
</dependencies>
- 编写启动类
@SpringBootApplication(exclude={DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class})
@EnableCanalClient //声明当前服务是canal的客户端
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class,args);
}
}
- 新增监听类
@CanalEventListener //声明当前的类是canal的监听类
public class canalListener {
/**
*
* @param eventType 当前操作数据库的类型
* @param rowData 当前操作数据库的数据
*/
//监听具体的数据库
@ListenPoint(destination = "example", schema = "sangmuen.com")
//监听精准到表
//@ListenPoint(destination = "example", schema = "sangmuen.com", table = "student")
//监听具体请求
//@ListenPoint(destination = "example", schema = "sangmuen.com", table = "student",eventType = CanalEntry.EventType.INSERT)
public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
System.out.println("学生表发生变化");
//获取改变之前的数据
rowData.getBeforeColumnsList().forEach((c)-> System.out.println("改变前的数据:"+c.getName()+"::"+c.getValue()));
//获取改变之后的数据
rowData.getAfterColumnsList().forEach((c)-> System.out.println("改变之后的数据:"+c.getName()+"::"+c.getValue()));
}
canal微服务监听测试
进入启动类把项目起起来,然后我在我自己的数据库中新增了一个student表,然后手动使用navicat修改该表,看看这里会不会监听到,把王五的年龄从18改成19
看项目的控制台
这样子我们就使用springboot去监听到mysql变化了就不需要使用mq队列去实现数据监听了
小结
经过这一章的学习,后面项目中的数据一致性问题应该大家都有应对的方法了吧,如果大家有什么疑惑或者有什么问题出现什么bug之类的都可以评论区提问,我会在第一时间帮住大家解决的!