Flink流处理示例项目从代码到部署

mtain 2023年08月21日 89次浏览

一、说明

Flink支持两种计算模式:批处理流处理,流处理又分为:有界流无界流

批处理是指将一批数据集合在一起,一次性输入到计算系统中进行处理的方式。批处理通常是离线处理,即不需要实时响应用户请求,而是按照一定的时间间隔、容量大小或者规则定期进行批量处理。

流处理是指对实时数据流进行连续的、实时的处理方式。与批处理不同,流处理通常需要以毫秒或者微秒级别响应事件的发生,并及时产生相应的反馈和输出。

DataSet API用于批处理,DataStream API用于流式处理。

本示例说明

WordCount是大数据处理系统的“Hello World”。它计算文本集合中单词的频率。该算法分两步进行:首先,文本将文本分成单个单词。其次,对单词进行分组和计
数。

本实例实现无界流处理WordCount。

二、创建MAVEN工程

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-job</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.12.7</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-access</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies>
</project>

logback.xml

<configuration>
    <property name="CONSOLE_LOG_PATTERN"
              value="%yellow(%d{yyyy-MM-dd HH:mm:ss.SSS}) [%blue(%thread)] %highlight(%-5level) %green(%logger{60}) %blue(%file:%line) %X{sourceThread} - %cyan(%msg%n)"/>
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="console"/>
    </root>


    <logger name="akka" level="INFO">
        <appender-ref ref="console"/>
    </logger>
    <logger name="org.apache.kafka" level="INFO">
        <appender-ref ref="console"/>
    </logger>
    <logger name="org.apache.hadoop" level="INFO">
        <appender-ref ref="console"/>
    </logger>
    <logger name="org.apache.zookeeper" level="INFO">
        <appender-ref ref="console"/>
    </logger>

    <logger name="org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR">
        <appender-ref ref="console"/>
    </logger>
</configuration>

WordCountUnboundedDemo.java

package job.demo;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * dataStream 实现wordCount 读socket(无界流)
 */
public class WordCountUnboundedDemo {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取数据:从文件中读取
        // 在主机上安装netcat,然后启动服务,输入各种单词
        // yum install -y netcat
        // nc -lk 7777
        DataStreamSource<String> socketDS = env.socketTextStream("192.168.100.101", 7777);

        // 3. 处理数据:切分、转换、分组、聚合
        socketDS
                .flatMap(
                        (String s, Collector<Tuple2<String, Integer>> collector) -> {
                            String[] words = s.split(" ");
                            for (String word : words) {
                                collector.collect(Tuple2.of(word, 1));
                            }
                        }

                )
                // 这里由于Lambda表达式存在类型擦除,所以必须指定返回元素的类型
                .returns(Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(stringIntegerTuple2 -> stringIntegerTuple2.f0)
                .sum(1)
                .print();

        // 4. 输出

        // 5、执行
        env.execute();
    }
}

三、本地运行

服务端运行nc模拟数据源

yum install -y netcat
nc -lk 7777

idea中运行

  1. flink的依赖服务器打包部署是需要排除的,maven依赖中设置了<scope>provided</scope>, 本地依赖,需要做如下设置
    image.png

  2. 运行类
    image.png

  3. nc模拟数据源
    image.png

  4. idea的控制台中,就会出现分词统计结果

16> (flink,1)
6> (hello,1)
11> (world,1)
6> (hello,2)
4> (java,1)
6> (hello,3)

四、服务器运行

  1. 打包maven工程
    image.png

  2. 上传jar包到flink
    image.png

  3. 运行jar包中的任务类
    注意先关闭本地idea中运行的类
    image.png

image.png

  1. nc模拟数据源
    image.png

  2. flink中查看运行日志
    image.png

image.png

image.png