数据同步工具chunjun(原Flinkx)

mtain 2023年08月18日 398次浏览

说明

数据同步分为:离线数据同步和实时数据同步。实时数据同步主要是通过数据库日志来实现。

常见的数据库同步工具

  • chunjun
  • datax
  • Flink CDC

部署chunjun

chunjun github:https://github.com/DTStack/chunjun

以部署单节点flink为例

wget https://dlcdn.apache.org/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz

tar xf flink-1.12.7-bin-scala_2.12.tgz

配置环境变量

vi /etc/profile

export FLINK_HOME=/data/flink-1.12.7
export CHUNJUN_DIST=/data/chunjun

source /etc/profile

配置flink
vi $FLINK_HOME/conf/flink-conf.yaml
增加配置
taskmanager.host: localhost

2. 安装chunjun

github下载编译后的包:chunjun-dist-1.12-SNAPSHOT.tar.gz

解压到chunjun目录
tar zxvf chunjun-dist-1.12-SNAPSHOT.tar.gz

复制依赖包到flink
cp -r /data/chunjun/chunjun-dist $FLINK_HOME/lib
bash bash $FLINK_HOME/bin/start-cluster.sh

使用chunjun

1. 运行示例demo

sh bin/chunjun-local.sh -job chunjun-examples/json/stream/stream.json

2. 同步数据mysql->mysql

创建job定义文件

vi mysql-job.json

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              }
            ],
            "username": "root",
            "password": "123456",
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://192.168.100.101:3306/test01?useSSL=false&allowPublicKeyRetrieval=true"
                ],
                "table": [
                  "test_user01"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "123456",
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://92.168.100.101:3306/test02?useSSL=false&allowPublicKeyRetrieval=true",
                "table": [
                  "test_user02"
                ]
              }
            ],
            "writeMode": "insert",
            "column": [
              {
                "name": "id",
                "type": "int"
              },
              {
                "name": "name",
                "type": "string"
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      }
    }
  }
}


执行job

sh bin/chunjun-standalone.sh -job ./mysql-job.json

flink控制台查看状态
image.png

参考文章

Flink CDC:https://ververica.github.io/flink-cdc-connectors/release-2.1/index.html