进学阁

业精于勤荒于嬉,行成于思毁于随

0%

Canal

什么是Canal

canal译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
从这句话理解到了什么?
基于MySQL,并且通过MySQL日志进行的增量解析,这也就意味着对原有的业务代码完全是无侵入性的。

工作原理:解析MySQL的binlog日志,提供增量数据。

Canal能做什么

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
官方文档:https://github.com/alibaba/canal

Canal数据如何传输?

先来一张官方图:

Canal分为服务端和客户端,这也是阿里常用的套路:

  • 服务端:负责解析MySQL的binlog日志,传递增量数据给客户端或者消息中间件
  • 客户端:负责解析服务端传过来的数据,然后定制自己的业务处理。

目前为止支持的消息中间件很全面了,比如Kafka、RocketMQ,RabbitMQ。

数据同步还有其他中间件吗?

有,当然有,还有一些开源的中间件也是相当不错的,比如Bifrost。
常见的几款中间件的区别如下:

当然要我选择的话,首选阿里的中间件Canal。

Canal的实战,集成到SpringBoot

Canal服务端安装

服务端需要下载压缩包,下载地址:https://github.com/alibaba/canal/releases
image.png
下载稳定版本v1.1.5
image.png
下载完成解压,目录如下:

本文使用Canal+RabbitMQ进行数据的同步,因此下面步骤完全按照这个base进行。

打开MySQL的binlog日志

请参考https://www.yuque.com/u25495771/yqh657/cv1548

设置MySQL的配置

需要设置服务端配置文件中的MySQL配置,这样Canal才能知道需要监听哪个库、哪个表的日志文件。
一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,本篇就用 example 实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加实例的名称。
修改canal.deployer-1.1.5\conf\example\instance.properties配置文件

1
2
3
4
5
6
7
8
9
10
# url
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# 监听的数据库
canal.instance.defaultDatabaseName=test

# 监听的表,可以指定,多个用逗号分割,这里正则是监听所有
canal.instance.filter.regex=.*\\..*

设置RabbitMQ的配置

服务端默认的传输方式是tcp,需要在配置文件中设置MQ的相关信息。
这里需要修改两处配置文件,如下;

  1. canal.deployer-1.1.5\conf\canal.properties

这个配置文件主要是设置MQ相关的配置,比如URL,用户名、密码…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 传输方式:tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host =/
# exchange
rabbitmq.exchange =canal.exchange
# 用户名、密码
rabbitmq.username =guest
rabbitmq.password =guest
## 是否持久化
rabbitmq.deliveryMode = 2
  1. canal.deployer-1.1.5\conf\example\instance.properties

这个文件设置MQ的路由KEY,这样才能路由到指定的队列中,如下:

1
canal.mq.topic=canal.routing.key

RabbitMQ新建exchange和Queue

在RabbitMQ中需要新建一个canal.exchange(必须和配置中的相同)的exchange和一个名称为 canal.queue(名称随意)的队列。
其中绑定的路由KEY为:canal.routing.key(必须和配置中的相同),如下图:

启动服务端

点击bin目录下的脚本,windows直接双击startup.bat,启动成功如下:

测试

在本地数据库test中的oauth_client_details插入一条数据,如下:
INSERT INTO oauth_client_details VALUES (‘myjszl’, ‘res1’, ‘$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W’, ‘all’, ‘password,refresh_token,authorization_code,client_credentials,implicit’, ‘http://www.baidu.com‘, NULL, 1000, 1000, NULL, ‘false’);

此时查看MQ中的canal.queue已经有了数据,如下:
其实就是一串JSON数据,这个JSON如下:
{
 “data”: [{
  “client_id”: “myjszl”,
  “resource_ids”: “res1”,
  “client_secret”: “$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W”,
  “scope”: “all”,
  “authorized_grant_types”: “password,refresh_token,authorization_code,client_credentials,implicit”,
  “web_server_redirect_uri”: “http://www.baidu.com“,
  “authorities”: null,
  “access_token_validity”: “1000”,
  “refresh_token_validity”: “1000”,
  “additional_information”: null,
  “autoapprove”: “false”
 }],
 “database”: “test”,
 “es”: 1640337532000,
 “id”: 7,
 “isDdl”: false,
 “mysqlType”: {
  “client_id”: “varchar(48)”,
  “resource_ids”: “varchar(256)”,
  “client_secret”: “varchar(256)”,
  “scope”: “varchar(256)”,
  “authorized_grant_types”: “varchar(256)”,
  “web_server_redirect_uri”: “varchar(256)”,
  “authorities”: “varchar(256)”,
  “access_token_validity”: “int(11)”,
  “refresh_token_validity”: “int(11)”,
  “additional_information”: “varchar(4096)”,
  “autoapprove”: “varchar(256)”
 },
 “old”: null,
 “pkNames”: [“client_id”],
 “sql”: “”,
 “sqlType”: {
  “client_id”: 12,
  “resource_ids”: 12,
  “client_secret”: 12,
  “scope”: 12,
  “authorized_grant_types”: 12,
  “web_server_redirect_uri”: 12,
  “authorities”: 12,
  “access_token_validity”: 4,
  “refresh_token_validity”: 4,
  “additional_information”: 12,
  “autoapprove”: 12
 },
 “table”: “oauth_client_details”,
 “ts”: 1640337532520,
 “type”: “INSERT”
}

每个字段的意思已经很清楚了,有表名称、方法、参数、参数类型、参数值…..
客户端要做的就是监听MQ获取JSON数据,然后将其解析出来,处理自己的业务逻辑。

新建项目

打开IDEA创建项目canal-toos
image.png
引用POM文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
 <!-- Mybatis plus代码生产插件 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-generator</artifactId>
<version>3.5.2</version>
</dependency>
<!-- Mybatisplus代码生成模板 -->
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity-engine-core</artifactId>
<version>2.3</version>
</dependency>
<!-- Mybatis基础依赖 -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<!--mybatisplus 依赖插件-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
<!--mysql依赖插件-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!-- rabbitmq 服务调用依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.3</version>
</dependency>
<!-- jackosn基础依赖 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9</version>
</dependency>

创建消息实体类

MQ传递过来的是JSON数据,当然要创建个实体类接收数据,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Canal消息接收实体类
*/
@NoArgsConstructor
@Data
public class CanalMessage<T> {
@JsonProperty("type")
private String type;

@JsonProperty("table")
private String table;

@JsonProperty("data")
private List<T> data;

@JsonProperty("database")
private String database;

@JsonProperty("es")
private Long es;

@JsonProperty("id")
private Integer id;

@JsonProperty("isDdl")
private Boolean isDdl;

@JsonProperty("old")
private List<T> old;

@JsonProperty("pkNames")
private List<String> pkNames;

@JsonProperty("sql")
private String sql;

@JsonProperty("ts")
private Long ts;
}

添加RabbitMQ消费端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import cn.hutool.json.JSONUtil;
import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 监听MQ获取Canal增量的数据消息
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class CanalRabbitMQListener {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "canal.queue", durable = "true"),
exchange = @Exchange(value = "canal.exchange"),
key = "canal.routing.key"
)
})
public void handleDataChange(String message) {
//将message转换为CanalMessage
CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
String tableName = canalMessage.getTable();
log.info("Canal 监听 {} 发生变化;明细:{}", tableName, message);
String tableName =canalMessage.getTable();
if (canalMessage.getData()!=null){
for (int i=0;i<canalMessage.getData().size();i++ ) {
Map<String,Object> map= mapper.readValue(canalMessage.getData().get(i).toString(),Map.class);

if (canalMessage.getType().contains("INSERT")){
sava(map,tableName);
}
else if (canalMessage.getType().contains("UPDATE")){
update(map,tableName);
}
else if (canalMessage.getType().contains("DELETE")){
delect(map,tableName);
}
}
}
}
public String underlineToHump(String str) {
if (StringUtils.isEmpty(str)){
return null;
}
//正则匹配下划线及后一个字符,删除下划线并将匹配的字符转成大写
Matcher matcher = UNDERLINE_PATTERN.matcher(str);
StringBuffer sb = new StringBuffer(str);
if (matcher.find()) {
sb = new StringBuffer();
//将当前匹配的子串替换成指定字符串,并且将替换后的子串及之前到上次匹配的子串之后的字符串添加到StringBuffer对象中
//正则之前的字符和被替换的字符
matcher.appendReplacement(sb, matcher.group(1).toUpperCase());
//把之后的字符串也添加到StringBuffer对象中
matcher.appendTail(sb);
} else {
//去除除字母之外的前面带的下划线
return sb.toString().replaceAll("_", "");
}
return underlineToHump(sb.toString());
}
public void sava(Map<String,Object> oldTable,String tableName) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InvocationTargetException {
Class tableClass = Class.forName("com.koalas.canal.tools.base.entity."+tableName);
Object tableObjet = tableClass.newInstance();
for (String key :oldTable.keySet()){
Method[] methods = tableObjet.getClass().getMethods();
Object mapValue=oldTable.get(key);

String methodName = underlineToHump("set"+upperFirstCase(key.toLowerCase()));
for (Method method:methods){
if (method.getName().equals(methodName)){
if (method.getParameterTypes()[0].equals(String.class)) {
if (ObjectUtils.isEmpty(mapValue)){
mapValue =" ";
}
method.invoke(tableObjet, mapValue.toString());
} else if (method.getParameterTypes()[0].equals(Date.class)|| method.getParameterTypes()[0].equals(LocalDateTime.class)) {
if (ObjectUtils.isEmpty(mapValue)){
continue;
}
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
method.invoke(tableObjet, LocalDateTime.parse(mapValue.toString(),formatter));
} else if (method.getParameterTypes()[0].equals(Integer.class)||method.getParameterTypes()[0].equals(int.class)) {
if (ObjectUtils.isEmpty(mapValue)){
continue;
}
method.invoke(tableObjet, Integer.parseInt(mapValue.toString()));
} else if (method.getParameterTypes()[0].equals(Long.class)||method.getParameterTypes()[0].equals(long.class)) {
if (ObjectUtils.isEmpty(mapValue)){
continue;
}
method.invoke(tableObjet, Long.parseLong(mapValue.toString()));
} else if (method.getParameterTypes()[0].equals(Double.class)||method.getParameterTypes()[0].equals(double.class)) {
if (ObjectUtils.isEmpty(mapValue)){
continue;
}
method.invoke(tableObjet, Double.parseDouble(mapValue.toString()));
} else if (method.getParameterTypes()[0].equals(Boolean.class)||method.getParameterTypes()[0].equals(boolean.class)) {
if (ObjectUtils.isEmpty(mapValue)){
continue;
}
method.invoke(tableObjet, Boolean.parseBoolean(mapValue.toString()));
}
break;
}
}
}
// Object pojo = mapper.readValue(mapper.writeValueAsString(oldTable), tableClass.getClass());

String newTable= lowerFirstCase(tableName+"ServiceImpl");

ServiceImpl service = (ServiceImpl) SpringBeanUtils.getBeanByString(newTable);

service.save(tableObjet);
}
public void update(Map<String,Object> oldTable,String tableName) throws ClassNotFoundException, IOException, InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchMethodException, NoSuchFieldException {
Class tableClass = Class.forName("com.koalas.canal.tools.base.entity."+tableName);
Object tableObjet = tableClass.newInstance();
for (String key :oldTable.keySet()) {
Method[] methods = tableObjet.getClass().getMethods();
Object mapValue = oldTable.get(key);
if (ObjectUtils.isEmpty(mapValue)){
continue;
}
String methodName = underlineToHump("set" + upperFirstCase(key.toLowerCase()));
for (Method method : methods) {
if (method.getName().equals(methodName)) {
if (method.getParameterTypes()[0].equals(String.class)) {
method.invoke(tableObjet, mapValue.toString());
} else if (method.getParameterTypes()[0].equals(Date.class) || method.getParameterTypes()[0].equals(LocalDateTime.class)) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
method.invoke(tableObjet, LocalDateTime.parse(mapValue.toString(), formatter));
} else if (method.getParameterTypes()[0].equals(Integer.class) || method.getParameterTypes()[0].equals(int.class)) {
method.invoke(tableObjet, Integer.parseInt(mapValue.toString()));
} else if (method.getParameterTypes()[0].equals(Long.class) || method.getParameterTypes()[0].equals(long.class)) {
method.invoke(tableObjet, Long.parseLong(mapValue.toString()));
} else if (method.getParameterTypes()[0].equals(Double.class) || method.getParameterTypes()[0].equals(double.class)) {
method.invoke(tableObjet, Double.parseDouble(mapValue.toString()));
} else if (method.getParameterTypes()[0].equals(Boolean.class) || method.getParameterTypes()[0].equals(boolean.class)) {
method.invoke(tableObjet, Boolean.parseBoolean(mapValue.toString()));
}
//method.invoke(tableObjet,oldTable.get(key));
break;
}
}
}
String newTable = lowerFirstCase(tableName + "ServiceImpl");

ServiceImpl service = (ServiceImpl) SpringBeanUtils.getBeanByString(newTable);
service.updateById(tableObjet);
}
public void delect(Map<String,Object> oldTable,String tableName) throws ClassNotFoundException, IOException {
ObjectMapper mapper = new ObjectMapper();

Class tableClass = Class.forName("com.koalas.canal.tools.base.entity."+tableName);

QueryWrapper wrapper = new QueryWrapper();
wrapper.eq("ID",oldTable.get("ID"));
String newTable= lowerFirstCase(tableName+"ServiceImpl");

ServiceImpl service = (ServiceImpl) SpringBeanUtils.getBeanByString(newTable);
service.remove(wrapper);
}
}