说明
使用阿里云Flink,实时同步mysql到hologres。
参考地址:https://help.aliyun.com/zh/flink/getting-started/ingest-data-into-data-warehouses-in-real-time
操作过程
一、资源创建
- 云数据库 RDS MySQL
- 实时数仓Hologre
- 实时计算Flink版
二、初始化数据库
-
初始化mysql
创建数据库、账户、添加白名单、mysql开启binlog -
初始化hologres
创建数据库、添加白名单,账户(增加superuser权限)
三、mysql导入源数据库
-
数据表
-
order数据量
四. Flink中创建Catalog(元数据)
分别创建mysql和hologres的Catalog
五、Flink中SQL开发
代码如下:
mysql-hologres-sql
USE CATALOG hologres;
BEGIN STATEMENT SET;
-- 同步mysql指定表到Hologres的指定表中。
CREATE TABLE IF NOT EXISTS bigdata_sink.orders
AS TABLE mysql.`bigdata-db`.`orders`;
END;
还可以直接同步所有表
CREATE DATABASE IF NOT EXISTS tpc_ds
AS DATABASE mysql.tpc_ds INCLUDING ALL TABLES
六、部署Flink作业
- 调试并部署作业
- 启动作业
七、查看运行结果
Flink作业运行一段时间后,Hologres中就可以看到同步过来的历史数据
八、新数据同步
-
MySQL中新增一条数据
-
Hologres中查看条数
九、关联查询同步数据(宽表)
- hologres中创建目标表
CREATE TABLE public.cust_orders (
"O_ORDERKEY" integer NOT NULL,
"O_CUSTKEY" integer NOT NULL,
"C_NAME" character varying(25) NOT NULL,
"C_ADDRESS" character varying(40) NOT NULL,
"O_ORDERSTATUS" character(1) NOT NULL,
"O_TOTALPRICE" numeric(15,2) NOT NULL,
"O_ORDERDATE" date NOT NULL,
"O_ORDERPRIORITY" character(15) NOT NULL,
"O_CLERK" character(15) NOT NULL,
"O_SHIPPRIORITY" integer NOT NULL,
"O_COMMENT" character varying(79) NOT NULL
,PRIMARY KEY ("O_ORDERKEY")
);
- Flink作业代码
insert into hologres.bigdata_sink.cust_orders
select
t1.O_ORDERKEY,
t1.O_CUSTKEY,
t2.C_NAME,
t2.C_ADDRESS,
t1.O_ORDERSTATUS,
t1.O_TOTALPRICE,
t1.O_ORDERDATE,
t1.O_ORDERPRIORITY,
t1.O_CLERK,
t1.O_SHIPPRIORITY,
t1.O_COMMENT
from mysql.`bigdata-db`.`orders` t1 LEFT JOIN mysql.`bigdata-db`.`customer` t2 on t1.O_CUSTKEY=t2.C_CUSTKEY;
-
运行Flink作业
-
mysql中新增数据
-
hologres查看结果