阿里云实时计算Flink-实现数据库实时入仓(mysql->hologres)

mtain 2023年08月22日 279次浏览

说明

使用阿里云Flink,实时同步mysql到hologres。

参考地址:https://help.aliyun.com/zh/flink/getting-started/ingest-data-into-data-warehouses-in-real-time

操作过程

一、资源创建

  • 云数据库 RDS MySQL
  • 实时数仓Hologre
  • 实时计算Flink版

二、初始化数据库

  1. 初始化mysql
    创建数据库、账户、添加白名单、mysql开启binlog

  2. 初始化hologres
    创建数据库、添加白名单,账户(增加superuser权限)

image.png

三、mysql导入源数据库

  1. 数据表
    image.png

  2. order数据量
    image.png

四. Flink中创建Catalog(元数据)

分别创建mysql和hologres的Catalog
image.png

五、Flink中SQL开发

代码如下:
mysql-hologres-sql

USE CATALOG hologres;

BEGIN STATEMENT SET;

-- 同步mysql指定表到Hologres的指定表中。
CREATE TABLE IF NOT EXISTS bigdata_sink.orders
AS TABLE mysql.`bigdata-db`.`orders`;

END;

还可以直接同步所有表

CREATE DATABASE IF NOT EXISTS tpc_ds
AS DATABASE mysql.tpc_ds INCLUDING ALL TABLES

六、部署Flink作业

  1. 调试并部署作业
    image.png
  2. 启动作业
    image.png

image.png

七、查看运行结果

Flink作业运行一段时间后,Hologres中就可以看到同步过来的历史数据
image.png

八、新数据同步

  • MySQL中新增一条数据
    微信截图_20230822174042.png

  • Hologres中查看条数
    微信截图_20230822174058.png

九、关联查询同步数据(宽表)

  1. hologres中创建目标表
CREATE TABLE public.cust_orders (
    "O_ORDERKEY" integer NOT NULL,
    "O_CUSTKEY" integer NOT NULL,
    "C_NAME" character varying(25) NOT NULL,
    "C_ADDRESS" character varying(40) NOT NULL,
    "O_ORDERSTATUS" character(1) NOT NULL,
    "O_TOTALPRICE" numeric(15,2) NOT NULL,
    "O_ORDERDATE" date NOT NULL,
    "O_ORDERPRIORITY" character(15) NOT NULL,
    "O_CLERK" character(15) NOT NULL,
    "O_SHIPPRIORITY" integer NOT NULL,
    "O_COMMENT" character varying(79) NOT NULL
    ,PRIMARY KEY ("O_ORDERKEY")
);
  1. Flink作业代码
insert into hologres.bigdata_sink.cust_orders
select 
t1.O_ORDERKEY,
t1.O_CUSTKEY,
t2.C_NAME,
t2.C_ADDRESS,
t1.O_ORDERSTATUS,
t1.O_TOTALPRICE,
t1.O_ORDERDATE,
t1.O_ORDERPRIORITY,
t1.O_CLERK,
t1.O_SHIPPRIORITY,
t1.O_COMMENT 
from mysql.`bigdata-db`.`orders` t1 LEFT JOIN mysql.`bigdata-db`.`customer` t2 on t1.O_CUSTKEY=t2.C_CUSTKEY;
  1. 运行Flink作业
    image.png

  2. mysql中新增数据
    image.png

  3. hologres查看结果
    image.png