FlinkSQL实现MySQL到MySQL(CDC)-未实现

mtain 2023年08月30日 140次浏览

SQL Api方式

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-job</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.15.0</flink.version>
        <flinkcdc.version>2.3.0</flinkcdc.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flinkcdc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-postgres-cdc</artifactId>
            <version>${flinkcdc.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-access</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.1.2</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

代码

package job.demo;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class MySQLCDCTable {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE IF NOT EXISTS t_posts ( " +
                "     type  INT   " +
                "    , id  INT   " +
                "    , create_time  TIMESTAMP " +
                "    , update_time  TIMESTAMP " +
                "    , disallow_comment  BOOLEAN " +
                "    , edit_time  TIMESTAMP " +
                "    , editor_type  INT " +
                "    , format_content  STRING " +
                "    , likes  BIGINT " +
                "    , meta_description  STRING " +
                "    , meta_keywords  STRING " +
                "    , original_content  STRING " +
                "    , password  STRING " +
                "    , slug  STRING " +
                "    , status  INT " +
                "    , summary  STRING " +
                "    , template  STRING " +
                "    , thumbnail  STRING " +
                "    , title  STRING " +
                "    , top_priority  INT " +
                "    , url  STRING " +
                "    , visits  BIGINT " +
                "    , word_count  BIGINT " +
                ") WITH ( " +
                "    'connector' = 'mysql-cdc',    " +
                "    'hostname' = '192.168.100.101',    " +
                "    'port' = '3306',    " +
                "    'username' = 'root',    " +
                "    'password' = 'password',    " +
                "    'database-name' = 'blog',    " +
                "    'table-name' = 'blog.posts',    " +
                "    'scan.incremental.snapshot.enabled' = 'false'   " +
                ");"
        );

        tableEnv.executeSql("CREATE TABLE IF NOT EXISTS s_posts ( " +
                "    `type` INT " +
                "    ,`id` INT NOT NULL " +
                "    ,`create_time` TIMESTAMP " +
                "    ,`update_time` TIMESTAMP " +
                "    ,`disallow_comment` BOOLEAN " +
                "    ,`edit_time` TIMESTAMP " +
                "    ,`editor_type` INT " +
                "    ,`format_content` STRING " +
                "    ,`likes` BIGINT " +
                "    ,`meta_description` STRING " +
                "    ,`meta_keywords` STRING " +
                "    ,`original_content` STRING " +
                "    ,`password` STRING " +
                "    ,`slug` STRING " +
                "    ,`status` INT " +
                "    ,`summary` STRING " +
                "    ,`template` STRING " +
                "    ,`thumbnail` STRING " +
                "    ,`title` STRING " +
                "    ,`top_priority` INT " +
                "    ,`url` STRING " +
                "    ,`visits` BIGINT " +
                "    ,`word_count` BIGINT " +
                "    ,primary key (id) not enforced" +
                ") WITH ( " +
                " 'connector' = 'jdbc',  " +
                " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
                " 'url' = 'jdbc:mysql://192.168.100.102:3306/tpch?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',   " +
                " 'table-name' = 'posts',   " +
                " 'username' = 'root',   " +
                " 'password' = 'password'  " +
                ");");

        TableResult tableResult = tableEnv.executeSql("INSERT INTO s_posts   " +
                "SELECT " +
                "     type  " +
                "    , id  " +
                "    , create_time  " +
                "    , update_time  " +
                "    , disallow_comment  " +
                "    , edit_time  " +
                "    , editor_type  " +
                "    , format_content  " +
                "    , likes  " +
                "    , meta_description  " +
                "    , meta_keywords  " +
                "    , original_content  " +
                "    , password  " +
                "    , slug  " +
                "    , status  " +
                "    , summary  " +
                "    , template  " +
                "    , thumbnail  " +
                "    , title  " +
                "    , top_priority  " +
                "    , url  " +
                "    , visits  " +
                "    , word_count  " +
                " FROM t_posts; ");

        tableResult.print();
    }
}

运行结果,无报错,无结果,原因未知

sql-client.sh中执行sql

CREATE TABLE IF NOT EXISTS t_posts (
     type  INT  
    , id  INT  
    , create_time  TIMESTAMP
    , update_time  TIMESTAMP
    , disallow_comment  BOOLEAN
    , edit_time  TIMESTAMP
    , editor_type  INT
    , format_content  STRING
    , likes  BIGINT
    , meta_description  STRING
    , meta_keywords  STRING
    , original_content  STRING
    , password  STRING
    , slug  STRING
    , status  INT
    , summary  STRING
    , template  STRING
    , thumbnail  STRING
    , title  STRING
    , top_priority  INT
    , url  STRING
    , visits  BIGINT
    , word_count  BIGINT
) WITH (
    'connector' = 'mysql-cdc',   
    'hostname' = '192.168.100.101',   
    'port' = '3306',   
    'username' = 'root',   
    'password' = 'password',   
    'database-name' = 'blog',   
    'table-name' = 'blog.posts',   
    'scan.incremental.snapshot.enabled' = 'false'  
);

CREATE TABLE IF NOT EXISTS s_posts (
    `type` INT
    ,`id` INT NOT NULL
    ,`create_time` TIMESTAMP
    ,`update_time` TIMESTAMP
    ,`disallow_comment` BOOLEAN
    ,`edit_time` TIMESTAMP
    ,`editor_type` INT
    ,`format_content` STRING
    ,`likes` BIGINT
    ,`meta_description` STRING
    ,`meta_keywords` STRING
    ,`original_content` STRING
    ,`password` STRING
    ,`slug` STRING
    ,`status` INT
    ,`summary` STRING
    ,`template` STRING
    ,`thumbnail` STRING
    ,`title` STRING
    ,`top_priority` INT
    ,`url` STRING
    ,`visits` BIGINT
    ,`word_count` BIGINT
    ,primary key (id) not enforced
) WITH (
 'connector' = 'jdbc', 
 'driver' = 'com.mysql.cj.jdbc.Driver',
 'url' = 'jdbc:mysql://192.168.100.102:3306/tpch?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',  
 'table-name' = 'posts',  
 'username' = 'root',  
 'password' = 'password'
);

INSERT INTO s_posts  
SELECT
     type 
    , id 
    , create_time 
    , update_time 
    , disallow_comment 
    , edit_time 
    , editor_type 
    , format_content 
    , likes 
    , meta_description 
    , meta_keywords 
    , original_content 
    , password 
    , slug 
    , status 
    , summary 
    , template 
    , thumbnail 
    , title 
    , top_priority 
    , url 
    , visits 
    , word_count 
 FROM t_posts;

报各种错误,缺少类,最终将maven项目中的jar,全部放在flink的lib中

maven项目导出所有依赖
mvn dependency:copy-dependencies -DoutputDirectory=lib

之后仍然报错,暂时放弃
image.png