说明
数据同步分为:离线数据同步和实时数据同步。实时数据同步主要是通过数据库日志来实现。
常见的数据库同步工具
- chunjun
- datax
- Flink CDC
部署chunjun
chunjun github:https://github.com/DTStack/chunjun
以部署单节点flink为例
1. 安装flink
wget https://dlcdn.apache.org/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz
tar xf flink-1.12.7-bin-scala_2.12.tgz
配置环境变量
vi /etc/profile
export FLINK_HOME=/data/flink-1.12.7
export CHUNJUN_DIST=/data/chunjun
source /etc/profile
配置flink
vi $FLINK_HOME/conf/flink-conf.yaml
增加配置
taskmanager.host: localhost
2. 安装chunjun
github下载编译后的包:chunjun-dist-1.12-SNAPSHOT.tar.gz
解压到chunjun目录
tar zxvf chunjun-dist-1.12-SNAPSHOT.tar.gz
复制依赖包到flink
cp -r /data/chunjun/chunjun-dist $FLINK_HOME/lib
3. 启动Flink
bash bash $FLINK_HOME/bin/start-cluster.sh
使用chunjun
1. 运行示例demo
sh bin/chunjun-local.sh -job chunjun-examples/json/stream/stream.json
2. 同步数据mysql->mysql
创建job定义文件
vi mysql-job.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
],
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://192.168.100.101:3306/test01?useSSL=false&allowPublicKeyRetrieval=true"
],
"table": [
"test_user01"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": "jdbc:mysql://92.168.100.101:3306/test02?useSSL=false&allowPublicKeyRetrieval=true",
"table": [
"test_user02"
]
}
],
"writeMode": "insert",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}
执行job
sh bin/chunjun-standalone.sh -job ./mysql-job.json
flink控制台查看状态
参考文章
Flink CDC:https://ververica.github.io/flink-cdc-connectors/release-2.1/index.html