First Canal
Canal 是阿里巴巴开源的 MySQL binlog 增量订阅 & 消费组件。
官方仓库:https://github.com/alibaba/canal/
以下是根据官方的 Wiki 文档,在本地(Windows)搭建 Canal Server 的过程。
拉取 Canal Server 镜像
bash
docker pull canal/canal-server:v1.1.7
1
拉取不下来的可以参考 这篇博客,也可以直接使用下面的私有镜像。
bash
docker pull registry.cn-hangzhou.aliyuncs.com/pusher/canal-server:v1.1.7
docker tag registry.cn-hangzhou.aliyuncs.com/pusher/canal-server:v1.1.7 canal/canal-server:v1.1.7
1
2
2
配置 MySQL
NOTE
这里的 MySQL 8.0 是部署在 K8s 上的。清单见后面的 附 1. MySQL 8.0 的部署清单,仅供参考。
新增 binlog 配置
我这里是修改 mysql-8-config 中的 my.cnf 配置项,然后重启 MySQL 服务即可。
ini
[mysqld]
log-bin=mysql-bin #添加这一行就 ok
binlog-format=ROW #选择 row 模式
server_id=1 #配置 mysql replaction 需要定义,不能和 canal 的 slaveId 重复
1
2
3
4
2
3
4
创建 canal 用户
sql
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
1
2
3
4
2
3
4
启动 Canal Server
官方文档 里提供了启动的脚本,可以通过如下命令下载并启动。
下载运行脚本
bash
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh
1
构建一个 destination name 为 test 的队列
bash
sh run.sh -e canal.auto.scan=false \
-e canal.destinations=test \
-e canal.instance.master.address=127.0.0.1:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
Windows 下启动镜像
我这里由于是 Windows,只好手动拼 docker run
的参数。
bash
docker run -d --privileged=true -it -h 127.0.0.1 -e canal.instance.mysql.slaveId=2 -e canal.auto.scan=false -e canal.destinations=test -e canal.instance.master.address=127.0.0.1:3306 -e canal.instance.dbUsername=canal -e canal.instance.dbPassword=canal -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false --name=canal-server-test-8 -p 11110:11110 -p 11111:11111 -p 11112:11112 -p 9100:9100 -m 4096m canal/canal-server:v1.1.7
1
我这边启动后日志如下:
txt
DOCKER_DEPLOY_TYPE=VM
==> INIT /alidata/init/02init-sshd.sh
==> EXIT CODE: 0
==> INIT /alidata/init/fix-hosts.py
==> EXIT CODE: 0
==> INIT DEFAULT
Failed to get D-Bus connection: Operation not permitted
Failed to get D-Bus connection: Operation not permitted
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
start canal successful
==> START SUCCESSFUL ...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
出现了两个 Failed to get D-Bus connection: Operation not permitted 的错误,但貌似不影响使用。
运行 Canal Client
这里是直接运行了官方示例 SimpleCanalClientTest
。由于上面指定了 canal.destinations
为 test,所以示例里的 destination
也要修改为一致的名字。
java
package com.alibaba.otter.canal.example;
import java.net.InetSocketAddress;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
/**
* 单机模式的测试例子
*
* @author jianghang 2013-4-15 下午 04:19:20
* @version 1.0.4
*/
public class SimpleCanalClientTest extends AbstractCanalClientTest {
public SimpleCanalClientTest(String destination){
super(destination);
}
public static void main(String args[]) {
// 根据 ip,直接创建链接,无 HA 的功能
String destination = "test";
String ip = AddressUtils.getHostIp();
CanalConnector connector = CanalConnectors
.newSingleConnector(new InetSocketAddress(ip, 11111), destination, "canal", "canal");
final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
clientTest.setConnector(connector);
clientTest.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
logger.info("## stop the canal client");
clientTest.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal:", e);
} finally {
logger.info("## canal client is down.");
}
}));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
启动后在 DB 中执行了如下几个操作:
- 创建数据库
canal_test
- 创建表
t_user
- 插入数据
- 修改数据
Canal Client 打印的日志如下:
txt
****************************************************
* Batch Id: [1] ,count : [1] , memsize : [181] , Time : 2024-09-25 11:00:05
* Start : [mysql-bin.000001:979:1727233201000(2024-09-25 11:00:01)]
* End : [mysql-bin.000001:979:1727233201000(2024-09-25 11:00:01)]
****************************************************
----------------> binlog[mysql-bin.000001:979] , name[,] , eventType : QUERY , executeTime : 1727233201000(2024-09-25 11:00:01) , gtid : () , delay : 4793 ms
ddl : true , sql ----> CREATE DATABASE `canal_test` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci'
****************************************************
* Batch Id: [2] ,count : [1] , memsize : [307] , Time : 2024-09-25 11:01:33
* Start : [mysql-bin.000001:1239:1727233293000(2024-09-25 11:01:33)]
* End : [mysql-bin.000001:1239:1727233293000(2024-09-25 11:01:33)]
****************************************************
----------------> binlog[mysql-bin.000001:1239] , name[canal_test,t_user] , eventType : CREATE , executeTime : 1727233293000(2024-09-25 11:01:33) , gtid : () , delay : 823 ms
ddl : true , sql ----> CREATE TABLE `canal_test`.`t_user` (
`id` bigint UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'ID',
`name` varchar(80) NULL COMMENT '姓名',
PRIMARY KEY (`id`)
) ENGINE = InnoDB COMMENT = '用户表'
****************************************************
* Batch Id: [3] ,count : [3] , memsize : [165] , Time : 2024-09-25 11:01:45
* Start : [mysql-bin.000001:1625:1727233305000(2024-09-25 11:01:45)]
* End : [mysql-bin.000001:1823:1727233305000(2024-09-25 11:01:45)]
****************************************************
================> binlog[mysql-bin.000001:1625] , executeTime : 1727233305000(2024-09-25 11:01:45) , gtid : () , delay : 711ms
BEGIN ----> Thread id: 128
----------------> binlog[mysql-bin.000001:1770] , name[canal_test,t_user] , eventType : INSERT , executeTime : 1727233305000(2024-09-25 11:01:45) , gtid : () , delay : 718 ms
id : 1 type=bigint unsigned update=true
name : 用户 1 type=varchar(80) update=true
----------------
END ----> transaction id: 688
================> binlog[mysql-bin.000001:1823] , executeTime : 1727233305000(2024-09-25 11:01:45) , gtid : () , delay : 729ms
****************************************************
* Batch Id: [4] ,count : [3] , memsize : [165] , Time : 2024-09-25 11:02:20
* Start : [mysql-bin.000001:1933:1727233340000(2024-09-25 11:02:20)]
* End : [mysql-bin.000001:2131:1727233340000(2024-09-25 11:02:20)]
****************************************************
================> binlog[mysql-bin.000001:1933] , executeTime : 1727233340000(2024-09-25 11:02:20) , gtid : () , delay : 814ms
BEGIN ----> Thread id: 128
----------------> binlog[mysql-bin.000001:2078] , name[canal_test,t_user] , eventType : INSERT , executeTime : 1727233340000(2024-09-25 11:02:20) , gtid : () , delay : 814 ms
id : 2 type=bigint unsigned update=true
name : 用户 2 type=varchar(80) update=true
----------------
END ----> transaction id: 707
================> binlog[mysql-bin.000001:2131] , executeTime : 1727233340000(2024-09-25 11:02:20) , gtid : () , delay : 814ms
****************************************************
* Batch Id: [5] ,count : [3] , memsize : [195] , Time : 2024-09-25 11:02:23
* Start : [mysql-bin.000001:2241:1727233343000(2024-09-25 11:02:23)]
* End : [mysql-bin.000001:2469:1727233343000(2024-09-25 11:02:23)]
****************************************************
================> binlog[mysql-bin.000001:2241] , executeTime : 1727233343000(2024-09-25 11:02:23) , gtid : () , delay : 640ms
BEGIN ----> Thread id: 128
----------------> binlog[mysql-bin.000001:2395] , name[canal_test,t_user] , eventType : UPDATE , executeTime : 1727233343000(2024-09-25 11:02:23) , gtid : () , delay : 640 ms
id : 1 type=bigint unsigned
name : 用户 1-1 type=varchar(80) update=true
----------------
END ----> transaction id: 710
================> binlog[mysql-bin.000001:2469] , executeTime : 1727233343000(2024-09-25 11:02:23) , gtid : () , delay : 641ms
****************************************************
* Batch Id: [6] ,count : [3] , memsize : [165] , Time : 2024-09-25 12:00:18
* Start : [mysql-bin.000001:2579:1727236818000(2024-09-25 12:00:18)]
* End : [mysql-bin.000001:2777:1727236818000(2024-09-25 12:00:18)]
****************************************************
================> binlog[mysql-bin.000001:2579] , executeTime : 1727236818000(2024-09-25 12:00:18) , gtid : () , delay : 302ms
BEGIN ----> Thread id: 128
----------------> binlog[mysql-bin.000001:2724] , name[canal_test,t_user] , eventType : DELETE , executeTime : 1727236818000(2024-09-25 12:00:18) , gtid : () , delay : 303 ms
id : 2 type=bigint unsigned
name : 用户 2 type=varchar(80)
----------------
END ----> transaction id: 2105
================> binlog[mysql-bin.000001:2777] , executeTime : 1727236818000(2024-09-25 12:00:18) , gtid : () , delay : 305ms
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
附 1. MySQL 8.0 的部署清单
点击查看清单详细内容
yaml
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
deployment.kubernetes.io/revision: "7"
creationTimestamp: "2024-04-09T09:37:59Z"
generation: 9
labels:
app: mysql-8
release: mysql-8
name: mysql-8
namespace: common
spec:
progressDeadlineSeconds: 600
replicas: 1
revisionHistoryLimit: 2
selector:
matchLabels:
app: mysql-8
release: mysql-8
strategy:
type: Recreate
template:
metadata:
labels:
app: mysql-8
release: mysql-8
spec:
containers:
- env:
- name: TZ
value: Asia/Shanghai
- name: MYSQL_ROOT_PASSWORD
valueFrom:
secretKeyRef:
key: mysql-root-password
name: mysql-8-secret
image: mysql:8.0.31
imagePullPolicy: IfNotPresent
livenessProbe:
exec:
command:
- sh
- -c
- mysqladmin ping -u root -p${MYSQL_ROOT_PASSWORD}
failureThreshold: 3
initialDelaySeconds: 30
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 5
name: mysql-8
ports:
- containerPort: 3306
name: mysql
protocol: TCP
readinessProbe:
exec:
command:
- sh
- -c
- mysqladmin ping -u root -p${MYSQL_ROOT_PASSWORD}
failureThreshold: 3
initialDelaySeconds: 5
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
resources:
requests:
cpu: 100m
memory: 256Mi
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /var/lib/mysql
name: data
- mountPath: /etc/mysql/conf.d/
name: my-conf-volume
readOnly: true
dnsPolicy: ClusterFirst
initContainers:
- command:
- rm
- -fr
- /var/lib/mysql/lost+found
image: busybox:1.32
imagePullPolicy: IfNotPresent
name: remove-lost-found
resources:
requests:
cpu: 10m
memory: 10Mi
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /var/lib/mysql
name: data
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
serviceAccount: default
serviceAccountName: default
terminationGracePeriodSeconds: 30
volumes:
- name: data
persistentVolumeClaim:
claimName: pvc-common-mysql-8
- configMap:
defaultMode: 420
items:
- key: my.cnf
path: my.cnf
name: mysql-8-config
name: my-conf-volume
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
附 2. Canal Client example 代码
SimpleCanalClientTest.java
点击查看代码
javapackage com.alibaba.otter.canal.example; import java.net.InetSocketAddress; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; /** * 单机模式的测试例子 * * @author jianghang 2013-4-15 下午04:19:20 * @version 1.0.4 */ public class SimpleCanalClientTest extends AbstractCanalClientTest { public SimpleCanalClientTest(String destination){ super(destination); } public static void main(String args[]) { // 根据ip,直接创建链接,无HA的功能 String destination = "test"; String ip = AddressUtils.getHostIp(); CanalConnector connector = CanalConnectors .newSingleConnector(new InetSocketAddress(ip, 11111), destination, "canal", "canal"); final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination); clientTest.setConnector(connector); clientTest.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { logger.info("## stop the canal client"); clientTest.stop(); } catch (Throwable e) { logger.warn("##something goes wrong when stopping canal:", e); } finally { logger.info("## canal client is down."); } })); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43AbstractCanalClientTest.java
点击查看代码
javapackage com.alibaba.otter.canal.example; import org.slf4j.MDC; import org.springframework.util.Assert; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.Message; /** * 测试基类 * * @author jianghang 2013-4-15 下午04:17:12 * @version 1.0.4 */ public class AbstractCanalClientTest extends BaseCanalClientTest { public AbstractCanalClientTest(String destination){ this(destination, null); } public AbstractCanalClientTest(String destination, CanalConnector connector){ this.destination = destination; this.connector = connector; } protected void start() { Assert.notNull(connector, "connector is null"); thread = new Thread(this::process); thread.setUncaughtExceptionHandler(handler); running = true; thread.start(); } protected void stop() { if (!running) { return; } running = false; if (thread != null) { try { thread.join(); } catch (InterruptedException e) { // ignore } } MDC.remove("destination"); } protected void process() { int batchSize = 5 * 1024; while (running) { try { MDC.put("destination", destination); connector.connect(); connector.subscribe(); while (running) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // } } else { printSummary(message, batchId, size); printEntry(message.getEntries()); } if (batchId != -1) { connector.ack(batchId); // 提交确认 } } } catch (Throwable e) { logger.error("process error!", e); try { Thread.sleep(1000L); } catch (InterruptedException e1) { // ignore } connector.rollback(); // 处理失败, 回滚数据 } finally { connector.disconnect(); MDC.remove("destination"); } } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92BaseCanalClientTest.java
点击查看代码
javapackage com.alibaba.otter.canal.example; import java.io.UnsupportedEncodingException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.SystemUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.google.protobuf.InvalidProtocolBufferException; public class BaseCanalClientTest { protected final static Logger logger = LoggerFactory .getLogger(AbstractCanalClientTest.class); protected static final String SEP = SystemUtils.LINE_SEPARATOR; protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; protected volatile boolean running = false; protected Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e); protected Thread thread = null; protected CanalConnector connector; protected static String context_format = null; protected static String row_format = null; protected static String transaction_format = null; protected String destination; static { context_format = SEP + "****************************************************" + SEP; context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP; context_format += "* Start : [{}] " + SEP; context_format += "* End : [{}] " + SEP; context_format += "****************************************************" + SEP; row_format = SEP + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms" + SEP; transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms" + SEP; } protected void printSummary(Message message, long batchId, int size) { long memsize = 0; for (Entry entry : message.getEntries()) { memsize += entry.getHeader().getEventLength(); } String startPosition = null; String endPosition = null; if (!CollectionUtils.isEmpty(message.getEntries())) { startPosition = buildPositionForDump(message.getEntries().get(0)); endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1)); } SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT); logger.info(context_format, new Object[] { batchId, size, memsize, format.format(new Date()), startPosition, endPosition }); } protected String buildPositionForDump(Entry entry) { long time = entry.getHeader().getExecuteTime(); Date date = new Date(time); SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT); String position = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":" + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")"; if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) { position += " gtid(" + entry.getHeader().getGtid() + ")"; } return position; } protected void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { long executeTime = entry.getHeader().getExecuteTime(); long delayTime = new Date().getTime() - executeTime; Date date = new Date(entry.getHeader().getExecuteTime()); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) { TransactionBegin begin = null; try { begin = TransactionBegin.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } // 打印事务头信息,执行的线程id,事务耗时 logger.info(transaction_format, new Object[] { entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime) }); logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId()); printXAInfo(begin.getPropsList()); } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) { TransactionEnd end = null; try { end = TransactionEnd.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } // 打印事务提交信息,事务id logger.info("----------------\n"); logger.info(" END ----> transaction id: {}", end.getTransactionId()); printXAInfo(end.getPropsList()); logger.info(transaction_format, new Object[] { entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime) }); } continue; } if (entry.getEntryType() == EntryType.ROWDATA) { RowChange rowChange = null; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } EventType eventType = rowChange.getEventType(); logger.info(row_format, new Object[] { entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime) }); if (eventType == EventType.QUERY || rowChange.getIsDdl()) { logger.info("ddl : " + rowChange.getIsDdl() + " , sql ----> " + rowChange.getSql() + SEP); continue; } printXAInfo(rowChange.getPropsList()); for (RowData rowData : rowChange.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { printColumn(rowData.getAfterColumnsList()); } } } } } protected void printColumn(List<Column> columns) { for (Column column : columns) { StringBuilder builder = new StringBuilder(); try { if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB") || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) { // get value bytes builder.append( column.getName() + " : " + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8")); } else { builder.append(column.getName() + " : " + column.getValue()); } } catch (UnsupportedEncodingException e) { } builder.append(" type=" + column.getMysqlType()); if (column.getUpdated()) { builder.append(" update=" + column.getUpdated()); } builder.append(SEP); logger.info(builder.toString()); } } protected void printXAInfo(List<Pair> pairs) { if (pairs == null) { return; } String xaType = null; String xaXid = null; for (Pair pair : pairs) { String key = pair.getKey(); if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) { xaType = pair.getValue(); } else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) { xaXid = pair.getValue(); } } if (xaType != null && xaXid != null) { logger.info(" ------> " + xaType + " " + xaXid); } } public void setConnector(CanalConnector connector) { this.connector = connector; } /** * 获取当前Entry的 GTID信息示例 * * @param header * @return */ public static String getCurrentGtid(CanalEntry.Header header) { List<CanalEntry.Pair> props = header.getPropsList(); if (props != null && props.size() > 0) { for (CanalEntry.Pair pair : props) { if ("curtGtid".equals(pair.getKey())) { return pair.getValue(); } } } return ""; } /** * 获取当前Entry的 GTID Sequence No信息示例 * * @param header * @return */ public static String getCurrentGtidSn(CanalEntry.Header header) { List<CanalEntry.Pair> props = header.getPropsList(); if (props != null && props.size() > 0) { for (CanalEntry.Pair pair : props) { if ("curtGtidSn".equals(pair.getKey())) { return pair.getValue(); } } } return ""; } /** * 获取当前Entry的 GTID Last Committed信息示例 * * @param header * @return */ public static String getCurrentGtidLct(CanalEntry.Header header) { List<CanalEntry.Pair> props = header.getPropsList(); if (props != null && props.size() > 0) { for (CanalEntry.Pair pair : props) { if ("curtGtidLct".equals(pair.getKey())) { return pair.getValue(); } } } return ""; } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267