SQL Api方式
依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-job</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.15.0</flink.version>
<flinkcdc.version>2.3.0</flinkcdc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
代码
package job.demo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class MySQLCDCTable {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE IF NOT EXISTS t_posts ( " +
" type INT " +
" , id INT " +
" , create_time TIMESTAMP " +
" , update_time TIMESTAMP " +
" , disallow_comment BOOLEAN " +
" , edit_time TIMESTAMP " +
" , editor_type INT " +
" , format_content STRING " +
" , likes BIGINT " +
" , meta_description STRING " +
" , meta_keywords STRING " +
" , original_content STRING " +
" , password STRING " +
" , slug STRING " +
" , status INT " +
" , summary STRING " +
" , template STRING " +
" , thumbnail STRING " +
" , title STRING " +
" , top_priority INT " +
" , url STRING " +
" , visits BIGINT " +
" , word_count BIGINT " +
") WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = '192.168.100.101', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = 'password', " +
" 'database-name' = 'blog', " +
" 'table-name' = 'blog.posts', " +
" 'scan.incremental.snapshot.enabled' = 'false' " +
");"
);
tableEnv.executeSql("CREATE TABLE IF NOT EXISTS s_posts ( " +
" `type` INT " +
" ,`id` INT NOT NULL " +
" ,`create_time` TIMESTAMP " +
" ,`update_time` TIMESTAMP " +
" ,`disallow_comment` BOOLEAN " +
" ,`edit_time` TIMESTAMP " +
" ,`editor_type` INT " +
" ,`format_content` STRING " +
" ,`likes` BIGINT " +
" ,`meta_description` STRING " +
" ,`meta_keywords` STRING " +
" ,`original_content` STRING " +
" ,`password` STRING " +
" ,`slug` STRING " +
" ,`status` INT " +
" ,`summary` STRING " +
" ,`template` STRING " +
" ,`thumbnail` STRING " +
" ,`title` STRING " +
" ,`top_priority` INT " +
" ,`url` STRING " +
" ,`visits` BIGINT " +
" ,`word_count` BIGINT " +
" ,primary key (id) not enforced" +
") WITH ( " +
" 'connector' = 'jdbc', " +
" 'driver' = 'com.mysql.cj.jdbc.Driver', " +
" 'url' = 'jdbc:mysql://192.168.100.102:3306/tpch?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false', " +
" 'table-name' = 'posts', " +
" 'username' = 'root', " +
" 'password' = 'password' " +
");");
TableResult tableResult = tableEnv.executeSql("INSERT INTO s_posts " +
"SELECT " +
" type " +
" , id " +
" , create_time " +
" , update_time " +
" , disallow_comment " +
" , edit_time " +
" , editor_type " +
" , format_content " +
" , likes " +
" , meta_description " +
" , meta_keywords " +
" , original_content " +
" , password " +
" , slug " +
" , status " +
" , summary " +
" , template " +
" , thumbnail " +
" , title " +
" , top_priority " +
" , url " +
" , visits " +
" , word_count " +
" FROM t_posts; ");
tableResult.print();
}
}
运行结果,无报错,无结果,原因未知
Flink SQL客户端模式
在sql-client.sh
中执行sql
CREATE TABLE IF NOT EXISTS t_posts (
type INT
, id INT
, create_time TIMESTAMP
, update_time TIMESTAMP
, disallow_comment BOOLEAN
, edit_time TIMESTAMP
, editor_type INT
, format_content STRING
, likes BIGINT
, meta_description STRING
, meta_keywords STRING
, original_content STRING
, password STRING
, slug STRING
, status INT
, summary STRING
, template STRING
, thumbnail STRING
, title STRING
, top_priority INT
, url STRING
, visits BIGINT
, word_count BIGINT
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.100.101',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'blog',
'table-name' = 'blog.posts',
'scan.incremental.snapshot.enabled' = 'false'
);
CREATE TABLE IF NOT EXISTS s_posts (
`type` INT
,`id` INT NOT NULL
,`create_time` TIMESTAMP
,`update_time` TIMESTAMP
,`disallow_comment` BOOLEAN
,`edit_time` TIMESTAMP
,`editor_type` INT
,`format_content` STRING
,`likes` BIGINT
,`meta_description` STRING
,`meta_keywords` STRING
,`original_content` STRING
,`password` STRING
,`slug` STRING
,`status` INT
,`summary` STRING
,`template` STRING
,`thumbnail` STRING
,`title` STRING
,`top_priority` INT
,`url` STRING
,`visits` BIGINT
,`word_count` BIGINT
,primary key (id) not enforced
) WITH (
'connector' = 'jdbc',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://192.168.100.102:3306/tpch?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',
'table-name' = 'posts',
'username' = 'root',
'password' = 'password'
);
INSERT INTO s_posts
SELECT
type
, id
, create_time
, update_time
, disallow_comment
, edit_time
, editor_type
, format_content
, likes
, meta_description
, meta_keywords
, original_content
, password
, slug
, status
, summary
, template
, thumbnail
, title
, top_priority
, url
, visits
, word_count
FROM t_posts;
报各种错误,缺少类,最终将maven项目中的jar,全部放在flink的lib中
maven项目导出所有依赖
mvn dependency:copy-dependencies -DoutputDirectory=lib
之后仍然报错,暂时放弃