01-Flink安装部署及入门案例(仅供学习)(2),2024年最新2024-2024历年字节跳动大数据开发面试真题解析_stopping taskexecutor daemon (pid: 24457) on host -程序员宅基地

技术标签: 2024年程序员学习  flink  学习  大数据  

先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7

深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年最新大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
img
img
img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

如果你需要这些资料,可以添加V获取:vip204888 (备注大数据)
img

正文

vim /etc/profile
	添加内容:
	export HADOOP\_CONF\_DIR=/export/server/hadoop/etc/hadoop
# 执行生效
source /etc/profile

6)、将Flink依赖Hadoop 框架JAR包上传至/export/server/flink-standalone/lib目录
在这里插入图片描述

[root@node1 ~]# cd /export/server/flink-standalone/lib/

[root@node1 lib]# rz
	commons-cli-1.4.jar
	flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar

7)、分发到集群其他机器

scp -r /export/server/flink-standalone root@node2:/export/server

scp -r /export/server/flink-standalone root@node3:/export/server

。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
接下来,启动服务进程,运行批处理程序:词频统计WordCount。
1)、启动HDFS集群,在node1上执行如下命令

start-dfs.sh

2)、启动集群,执行如下命令

# 一键启动所有服务JobManager和TaskManagers
[root@node1 ~]# /export/server/flink-standalone/bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node1.
Starting taskexecutor daemon on host node1.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.

在这里插入图片描述3)、访问Flink UI界面:http://node1:8081/#/overview
在这里插入图片描述在这里插入图片描述4)、执行官方测试案例

# 准备测试数据
[root@node1 ~]# hdfs dfs -mkdir -p /wordcount/input/
[root@node1 ~]# hdfs dfs -put /root/words.txt /wordcount/input/

在这里插入图片描述

运行程序,使用--input指定处理数据文件路径
/export/server/flink-standalone/bin/flink run \
/export/server/flink-standalone/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input/words.txt

在这里插入图片描述

# 使用--output指定处理结果数据存储目录
/export/server/flink-standalone/bin/flink run \
/export/server/flink-standalone/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input/words.txt \
--output hdfs://node1:8020/wordcount/output/result

[root@node1 ~]# hdfs dfs -text /wordcount/output/result

在这里插入图片描述
5)、关闭Standalone集群服务

# 一键停止所有服务JobManager和TaskManagers
[root@node1 ~]# /export/server/flink-standalone/bin/stop-cluster.sh 
Stopping taskexecutor daemon (pid: 6600) on host node1.
Stopping taskexecutor daemon (pid: 3016) on host node2.
Stopping taskexecutor daemon (pid: 3034) on host node3.
Stopping standalonesession daemon (pid: 6295) on host node1.

补充:Flink Standalone集群启动与停止,也可以逐一服务启动

# 每个服务单独启动
# 在node1上启动
/export/server/flink-standalone/bin/jobmanager.sh start
# 在node1、node2、node3.
/export/server/flink-standalone/bin/taskmanager.sh start  # 每台机器执行

# ===============================================================
# 每个服务单独停止
# 在node1上停止
/export/server/flink-standalone/bin/jobmanager.sh stop
# 在node1、node2、node3
/export/server/flink-standalone/bin/taskmanager.sh stop 

07-安装部署之Standalone HA

从Standalone架构图中,可发现JobManager存在单点故障(SPOF),一旦JobManager出现意外,整个集群无法工作。为了确保集群的高可用,需要搭建Flink的Standalone HA。
在这里插入图片描述Flink Standalone HA集群,类似YARN HA 集群安装部署,可以启动多个主机点JobManager,使用Zookeeper集群监控JobManagers转态,进行选举leader,实现自动故障转移。
在这里插入图片描述 在 Zookeeper 的协助下,一个 Standalone的Flink集群会同时有多个活着的 JobManager,其中**只有一个处于Active工作状态,其他处于 Standby 状态。**当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选一个新的 JobManager 来接管 Flink 集群。
1)、集群规划
在这里插入图片描述

# 在node1上复制一份standalone
[root@node1 ~]# cd /export/server/
[root@node1 server]# cp -r flink-standalone flink-ha

# 删除日志文件
[root@node1 ~]# rm -rf /export/server/flink-ha/log/\*

2)、启动ZooKeeper,在node1上启动

start-zk.sh

3)、启动HDFS,在node1上启动,如果没有关闭,不用重启

start-dfs.sh

4)、停止集群,在node1操作,进行HA高可用配置

/export/server/flink-standalone/bin/stop-cluster.sh 

5)、修改flink-conf.yaml,在node1操作

vim /export/server/flink-ha/conf/flink-conf.yaml
	修改内容:
jobmanager.rpc.address: node1	

high-availability: zookeeper
high-availability.storageDir: hdfs://node1:8020/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_standalone

state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://node1:8020/flink/checkpoints
state.savepoints.dir: hdfs://node1:8020/flink/savepoints

6)、修改masters,在node1操作

vim /export/server/flink-ha/conf/masters
	修改内容:
	node1:8081
	node2:8081

7)、分发到集群其他机器,在node1操作

scp -r /export/server/flink-ha root@node2:/export/server/
scp -r /export/server/flink-ha root@node3:/export/server/

8)、修改node2上的flink-conf.yaml

[root@node2 ~]# vim /export/server/flink-ha/conf/flink-conf.yaml 
	修改内容:33 行
	jobmanager.rpc.address: node2

9)、重新启动Flink集群

# node1和node2上执行
/export/server/flink-ha/bin/jobmanager.sh start

# node1和node2、node3执行
/export/server/flink-ha/bin/taskmanager.sh start  # 每台机器执行

在这里插入图片描述

08-Flink on YARN之运行流程

​ 在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的Workload,因此 Flink 也支持在 Yarn 集群运行。

为什么使用Flink on Yarn或Spark on Yarn?

  • 1)、Yarn的资源可以按需使用,提高集群的资源利用率
  • 2)、Yarn的任务有优先级,根据优先级运行作业
  • 3)、基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)

当应用程序(MR、Spark、Flink)运行在YARN集群上时,可以实现容灾恢复。

09-Flink on YARN之安装部署

Flink on YARN安装配置,此处考虑高可用HA配置,集群机器安装软件框架示意图:
在这里插入图片描述1)、关闭YARN的内存检查(node1操作)

# yarn-site.xml中添加配置
vim /export/server/hadoop/etc/hadoop/yarn-site.xml

添加如下内容:

<!-- 关闭yarn内存检查 -->
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>	

2)、 配置Application最大的尝试次数(node1操作)

# yarn-site.xml中添加配置
vim /export/server/hadoop/etc/hadoop/yarn-site.xml

添加如下内容:

<property>
	<name>yarn.resourcemanager.am.max-attempts</name>
	<value>4</value>
</property>

3)、同步yarn-site.xml配置文件(node1操作)

cd /export/server/hadoop/etc/hadoop
scp -r yarn-site.xml root@node2:$PWD
scp -r yarn-site.xml root@node3:$PWD

4)、启动HDFS集群和YARN集群(node1操作)

[root@node1 ~]# start-dfs.sh

[root@node1 ~]# start-yarn.sh

5)、添加HADOOP_CONF_DIR环境变量(集群所有机器

# 添加环境变量
 vim /etc/profile

添加内容:

export HADOOP\_CONF\_DIR=/export/server/hadoop/etc/hadoop

环境变量生效

source /etc/profile

6)、上传软件及解压(node1操作)

[root@node1 ~]# cd /export/software/
[root@node1 software]# rz
	上传软件包:flink-1.13.1-bin-scala_2.11.tgz
	
[root@node1 software]# chmod u+x flink-1.13.1-bin-scala\_2.11.tgz
[root@node1 software]# tar -zxf flink-1.13.1-bin-scala\_2.11.tgz -C /export/server/ 

[root@node1 ~]# cd /export/server/
[root@node1 server]# chown -R root:root flink-1.13.1
[root@node1 server]# mv flink-1.13.1 flink-yarn

7)、将Flink依赖Hadoop 框架JAR包上传至/export/server/flink-yarn/lib目录
在这里插入图片描述

[root@node1 ~]# cd /export/server/flink-yarn/lib/
[root@node1 lib]# rz
	commons-cli-1.4.jar
	flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar

8)、配置HA高可用,依赖Zookeeper及重试次数(node1操作)

# 修改配置文件
vim /export/server/flink-yarn/conf/flink-conf.yaml

添加如下内容:

high-availability: zookeeper
high-availability.storageDir: hdfs://node1:8020/flink/yarn-ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
high-availability.zookeeper.path.root: /flink-yarn-ha
high-availability.cluster-id: /cluster_yarn

yarn.application-attempts: 10

9)、集群所有机器,同步分发Flink 安装包,便于任意机器提交运行Flink Job。

scp -r /export/server/flink-yarn root@node2:/export/server/

scp -r /export/server/flink-yarn root@node3:/export/server/

10)、启动Zookeeper集群(node1操作)

start-zk.sh

在Flink中执行应用有如下三种部署模式(Deployment Mode):
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/28d9a859154f9ba2e2fd90d2110735.png#pic_center

10-Flink on YARN之Session模式运行

Flink on YARN :Session 模式,表示多个Flink Job运行共享Standalone集群资源。

​ 先向Hadoop YARN申请资源,启动运行服务JobManager和TaskManagers,再提交多个Job到Flink 集群上执行。
在这里插入图片描述

  • 无论JobManager还是TaskManager,都是运行NodeManager Contanier容器中,以JVM 进程方式运行;
  • 提交每个Flink Job执行时,找的就是JobManager(AppMaster),找运行在YARN上应用ID;

Session 会话模式:arn-session.sh(开辟资源) + flink run(提交任务)

  • 第一、Hadoop YARN 运行Flink 集群,开辟资源,使用:yarn-session.sh
    • 在NodeManager上,启动容器Container运行JobManager和TaskManagers
  • 第二、提交Flink Job执行,使用:flink run

准备测试数据,测试运行批处理词频统计WordCount程序

[root@node1 ~]# vim /root/words.txt

添加数据

spark python spark hive spark hive
python spark hive spark python
mapreduce spark hadoop hdfs hadoop spark
hive mapreduce

数据文件上传

[root@node1 ~]# hdfs dfs -mkdir -p /wordcount/input/
[root@node1 ~]# hdfs dfs -put /root/words.txt /wordcount/input/

在这里插入图片描述

  • 第一步、在yarn上启动一个Flink会话,node1上执行以下命令
export HADOOP\_CLASSPATH=`hadoop classpath`
/export/server/flink-yarn/bin/yarn-session.sh -d -jm 1024 -tm 1024 -s 2

# 参数说明
-d:后台执行
-s:	每个TaskManager的slot数量
-jm:JobManager的内存(单位MB)
-tm:每个TaskManager容器的内存(默认值:MB)

# 提交flink 集群运行yarn后,提示信息
JobManager Web Interface: http://node1:44263
..................................................................
$ echo "stop" | ./bin/yarn-session.sh -id application_1633441564219_0001
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1633441564219_0001

  • 第二步、查看UI界面,http://node1:8088/cluster/apps
    在这里插入图片描述 JobManager提供WEB UI:http://node1:8088/proxy/application_1614756061094_0002/#/overview

在这里插入图片描述
此时,没有任何TaskManager运行在容器Container中,需要等待有Flink Job提交执行时,才运行TaskManager。

  • 第三步、使用flink run提交任务
/export/server/flink-yarn/bin/flink run \
-t yarn-session \
-Dyarn.application.id=application_1652168669227_0001 \
/export/server/flink-yarn/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input/words.txt

在这里插入图片描述

  • 第四步、通过上方的ApplicationMaster可以进入Flink的管理界面
    在这里插入图片描述
  • 第五步、关闭yarn-session
# 优雅 停止应用,如果设置重启次数,即使停止应用,也会重启,一直到超过次数以后,才能真正停止应用
echo "stop" | /export/server/flink-yarn/bin/yarn-session.sh -id application_1633441564219_0001

# kill 命令,直接将运行在yarn应用杀死,毫不留情
yarn application -kill application_1633441564219_0001

11-Flink on YARN之PerJob模式运行

每个Flink Job提交运行到Hadoop YARN集群时,根据自身的情况,单独向YARN申请资源,直到作业执行完成

在这里插入图片描述

​ 在Hadoop YARN中,每次提交job都会创建一个新的Flink集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。

采用Job分离模式,每个Flink Job运行,都会申请资源,运行属于自己的Flink 集群

  • 第一步、直接提交job
export HADOOP\_CLASSPATH=`hadoop classpath`
/export/server/flink-yarn/bin/flink run \
-t yarn-per-job -m yarn-cluster \
-yjm 1024 -ytm 1024 -ys 1 \
/export/server/flink-yarn/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input

# 参数说明
-m:指定需要连接的jobmanager(主节点)地址,指定为 yarn-cluster,启动一个新的yarn-session
-yjm:JobManager可用内存,单位兆
-ytm:每个TM所在的Container可申请多少内存,单位兆
-ys:每个TM会有多少个Slot
-yd:分离模式(后台运行,不指定-yd, 终端会卡在提交的页面上)

在这里插入图片描述

  • 第二步、查看UI界面:http://node1:8088/cluster
    在这里插入图片描述
    提交Flink Job在Hadoop YARN执行时,最后给出如下错误警告:
    在这里插入图片描述
解决办法: 在 flink 配置文件里 flink-conf.yaml设置
	classloader.check-leaked-classloader: false

12-Flink on YARN之Application模式运行

Flink 1.11 引入了一种新的部署模式,即 Application 模式,目前可以支持基于 Hadoop YARN 和 Kubernetes 的 Application 模式。

# 1、Session 模式:
	所有作业Job共享1个集群资源,隔离性差,JM 负载瓶颈,每个Job中main 方法在客户端执行。

# 2、Per-Job 模式:
	每个作业单独启动1个集群,隔离性好,JM 负载均衡,Job作业main 方法在客户端执行。

在这里插入图片描述 以上两种模式,main方法都是在客户端执行,需要获取 flink 运行时所需的依赖项,并生成 JobGraph,提交到集群的操作都会在实时平台所在的机器上执行,那么将会给服务器造成很大的压力。此外,提交任务的时候会把本地flink的所有jar包先上传到hdfs上相应的临时目录,带来大量的网络的开销,所以如果任务特别多的情况下,平台的吞吐量将会直线下降。

Application 模式下,用户程序的 main 方法将在集群中运行,用户将程序逻辑和依赖打包进一个可执行的 jar 包里,集群的入口程序 (ApplicationClusterEntryPoint) 负责调用其中的 main 方法来生成 JobGraph。
在这里插入图片描述

Application 模式为每个提交的应用程序创建一个集群,并在应用程序完成时终止。Application 模式在不同应用之间提供了资源隔离和负载平衡保证。在特定一个应用程序上,JobManager 执行 main 可以[节省所需的 CPU 周期],还可以[节省本地下载依赖项所需的带宽]。
Application 模式==使用 bin/flink run-application提交作业,本质上是Session和Per-Job模式的折衷。

  • 通过 -t 指定部署环境,目前支持部署在 yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application);
  • 通过 -D 参数指定通用的运行配置,比如 jobmanager/taskmanager 内存、checkpoint 时间间隔等。
export HADOOP\_CLASSPATH=`hadoop classpath`

/export/server/flink-yarn/bin/flink run-application \
-t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dtaskmanager.numberOfTaskSlots=1 \
/export/server/flink-yarn/examples/batch/WordCount.jar \
--input hdfs://node1:8020/wordcount/input


由于MAIN方法在JobManager(也就是NodeManager的容器Container)中执行,当Flink Job执行完成以后,启动MRJobHistoryServer历史服务器,查看AppMaster日志信息。

# node1 上启动历史服务
[root@node1 ~]# mr-jobhistory-daemon.sh start historyserver 

第二步、查看UI界面:http://node1:8088/cluster
在这里插入图片描述

测试Flink Job不同运行模式时,注意事项如下
在这里插入图片描述

第三部分:Flink入门案例

13-Flink入门案例之编程模型

基于Flink计算引擎,分别实现批处理(Batch)和流计算(Streaming )中:词频统计WordCount。

第一点:Flink API== ,提供四个层次API,越在下面API,越复杂和灵活;越在上面API,使用越简单和抽象
在这里插入图片描述
第二点:编程模型==,无论编写批处理还是流计算程序,分为三个部分:Data Source、Transformations和Data Sink

# 第一步、从数据源DataSource获取数据
	流计算:DataStream
	批处理:DataSet

# 第二步、对数据进行转换处理
	
# 第三步、结果数据输出DataSink

无论批处理Batch,还是流计算Stream,首先需要创建执行环境ExecutionEnvironment对象,类似Spark中SparkSession或者SparkContext

在这里插入图片描述
创建整个Flink基础课程Maven Project,设置MAVEN Repository仓库目录及Maven安装目录
在这里插入图片描述
约定:每天创建一个Maven Module](),创建第1天Maven Module,模块结构:
在这里插入图片描述
POM文件添加如下内容:

    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        </repository>
        <repository>
            <id>central_maven</id>
            <name>central maven</name>
            <url>https://repo1.maven.org/maven2</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.13.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.13.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>1.13.1</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
 zip -d learn\_spark.jar META-INF/\*.RSA META-INF/\*.DSA META-INF/\*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                 
                               </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

日志配置文件:log4j.properties

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, console

# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console

14-Flink入门案例之WordCount【批处理】

首先,基于Flink计算引擎,[实现离线批处理Batch:从文本文件读取数据,词频统计]。
在这里插入图片描述
批处理时词频统计思路如下伪代码所示:

					spark flink flink flink spark
								|
								| flatMap
								|
			 3-1. 分割单词 spark, flink, flink, flink, spark
			 					|
			                    | map
			                    |
			 3-2. 转换二元组 (spark, 1) (flink, 1) (flink, 1) (flink, 1) (spark, 1)
			 					|
			                    | groupBy(0)
			                    |
			 3-3. 按照单词分组
			        spark -> [(spark, 1) (spark, 1)]
			        flink -> [(flink, 1) (flink, 1) (flink, 1) ]
			        			|
			                    |sum(1)
			                    |
			 3-4. 组内数据求和,第二元素值累加
			        spark -> 1 + 1 = 2
			        flink -> 1 + 1 + 1 =3

基于Flink编写批处理或流计算程序步骤如下:(5个步骤)

1.执行环境-env
2.数据源-source
3.数据转换-transformation
4.数据接收器-sink
5.触发执行-execute

编写批处理词频统计:BatchWordCount,创建Java类

package cn.itqzd.flink.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/\*\*
 \* 使用Flink计算引擎实现离线批处理:词频统计WordCount
 \* 1.执行环境-env
 \* 2.数据源-source
 \* 3.数据转换-transformation
 \* 4.数据接收器-sink
 \* 5.触发执行-execute
 \*/
                                            public class BatchWordCount {

	public static void main(String[] args) throws Exception {
		// 1.执行环境-env
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment() ;

		// 2.数据源-source
		DataSource<String> inputDataSet = env.readTextFile("datas/words.txt");

		// 3.数据转换-transformation
		/\*
 spark flink spark hbase spark
 |flatMap
 分割单词: spark, flink, spark
 |map
 转换二元组:(spark, 1) (flink, 1) (spark, 1), TODO:Flink Java API中提供元组类Tuple
 |groupBy(0)
 分组:spark -> [(spark, 1), (spark, 1)] flink -> [(flink, 1)]
 |sum(1)
 求和:spark -> 1 + 1 = 2, flink = 1
 \*/
		// 3-1. 分割单词
		FlatMapOperator<String, String> wordDataSet = inputDataSet.flatMap(new FlatMapFunction<String, String>() {
			@Override
			public void flatMap(String line, Collector<String> out) throws Exception {
				String[] words = line.trim().split("\\s+");
				for (String word : words) {
					out.collect(word);
				}
			}
		});

		// 3-2. 转换二元组
		MapOperator<String, Tuple2<String, Integer>> tupleDataSet = wordDataSet.map(new MapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public Tuple2<String, Integer> map(String word) throws Exception {
				return Tuple2.of(word, 1);
			}
		});

		// 3-3. 分组及求和, TODO: 当数据类型为元组时,可以使用下标指定元素,从0开始
		AggregateOperator<Tuple2<String, Integer>> resultDataSet = tupleDataSet.groupBy(0).sum(1);

		// 4.数据接收器-sink
		resultDataSet.print();

		// 5.触发执行-execute, TODO:批处理时,无需触发,流计算必须触发执行
		//env.execute("BatchWordCount") ;
	}

}


15-Flink入门案例之WordCount【流计算】

编写Flink程序,接收TCP Socket的单词数据,并以空格进行单词拆分,分组统计单词个数
在这里插入图片描述

package cn.itqzd.flink.stream;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/\*\*
 \* 使用Flink计算引擎实现实时流计算:词频统计WordCount,从TCP Socket消费数据,结果打印控制台。
 \* 1.执行环境-env
 \* 2.数据源-source
 \* 3.数据转换-transformation
 \* 4.数据接收器-sink
 \* 5.触发执行-execute
 \*/
public class StreamWordCount {

	public static void main(String[] args) throws Exception {
		// 1.执行环境-env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 2.数据源-source
		DataStreamSource<String> inputDataStream = env.socketTextStream("node1", 9999);

		// 3.数据转换-transformation
		SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
			// 3-1. 分割单词
			.flatMap(new FlatMapFunction<String, String>() {
				@Override
				public void flatMap(String line, Collector<String> out) throws Exception {
					for (String word : line.trim().split("\\s+")) {
						out.collect(word);
					}
				}
			})
			// 3-2. 转换二元组
			.map(new MapFunction<String, Tuple2<String, Integer>>() {
				@Override
				public Tuple2<String, Integer> map(String word) throws Exception {
					return new Tuple2<>(word, 1);
				}
			})
			// 3-3. 分组和组内求和
			.keyBy(0).sum(1);



**网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。**

**需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)**
![img](https://img-blog.csdnimg.cn/img_convert/1b81a88fce4c35e9b0e26ce532d00816.png)

**一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**

.执行环境-env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 2.数据源-source
		DataStreamSource<String> inputDataStream = env.socketTextStream("node1", 9999);

		// 3.数据转换-transformation
		SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
			// 3-1. 分割单词
			.flatMap(new FlatMapFunction<String, String>() {
				@Override
				public void flatMap(String line, Collector<String> out) throws Exception {
					for (String word : line.trim().split("\\s+")) {
						out.collect(word);
					}
				}
			})
			// 3-2. 转换二元组
			.map(new MapFunction<String, Tuple2<String, Integer>>() {
				@Override
				public Tuple2<String, Integer> map(String word) throws Exception {
					return new Tuple2<>(word, 1);
				}
			})
			// 3-3. 分组和组内求和
			.keyBy(0).sum(1);



**网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。**

**需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)**
[外链图片转存中...(img-ogttqxTk-1713686056777)]

**一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/2303_79055785/article/details/138036321

智能推荐

c# 调用c++ lib静态库_c#调用lib-程序员宅基地

文章浏览阅读2w次,点赞7次,收藏51次。四个步骤1.创建C++ Win32项目动态库dll 2.在Win32项目动态库中添加 外部依赖项 lib头文件和lib库3.导出C接口4.c#调用c++动态库开始你的表演...①创建一个空白的解决方案,在解决方案中添加 Visual C++ , Win32 项目空白解决方案的创建:添加Visual C++ , Win32 项目这......_c#调用lib

deepin/ubuntu安装苹方字体-程序员宅基地

文章浏览阅读4.6k次。苹方字体是苹果系统上的黑体,挺好看的。注重颜值的网站都会使用,例如知乎:font-family: -apple-system, BlinkMacSystemFont, Helvetica Neue, PingFang SC, Microsoft YaHei, Source Han Sans SC, Noto Sans CJK SC, W..._ubuntu pingfang

html表单常见操作汇总_html表单的处理程序有那些-程序员宅基地

文章浏览阅读159次。表单表单概述表单标签表单域按钮控件demo表单标签表单标签基本语法结构<form action="处理数据程序的url地址“ method=”get|post“ name="表单名称”></form><!--action,当提交表单时,向何处发送表单中的数据,地址可以是相对地址也可以是绝对地址--><!--method将表单中的数据传送给服务器处理,get方式直接显示在url地址中,数据可以被缓存,且长度有限制;而post方式数据隐藏传输,_html表单的处理程序有那些

PHP设置谷歌验证器(Google Authenticator)实现操作二步验证_php otp 验证器-程序员宅基地

文章浏览阅读1.2k次。使用说明:开启Google的登陆二步验证(即Google Authenticator服务)后用户登陆时需要输入额外由手机客户端生成的一次性密码。实现Google Authenticator功能需要服务器端和客户端的支持。服务器端负责密钥的生成、验证一次性密码是否正确。客户端记录密钥后生成一次性密码。下载谷歌验证类库文件放到项目合适位置(我这边放在项目Vender下面)https://github.com/PHPGangsta/GoogleAuthenticatorPHP代码示例://引入谷_php otp 验证器

【Python】matplotlib.plot画图横坐标混乱及间隔处理_matplotlib更改横轴间距-程序员宅基地

文章浏览阅读4.3k次,点赞5次,收藏11次。matplotlib.plot画图横坐标混乱及间隔处理_matplotlib更改横轴间距

docker — 容器存储_docker 保存容器-程序员宅基地

文章浏览阅读2.2k次。①Storage driver 处理各镜像层及容器层的处理细节,实现了多层数据的堆叠,为用户 提供了多层数据合并后的统一视图②所有 Storage driver 都使用可堆叠图像层和写时复制(CoW)策略③docker info 命令可查看当系统上的 storage driver主要用于测试目的,不建议用于生成环境。_docker 保存容器

随便推点

网络拓扑结构_网络拓扑csdn-程序员宅基地

文章浏览阅读834次,点赞27次,收藏13次。网络拓扑结构是指计算机网络中各组件(如计算机、服务器、打印机、路由器、交换机等设备)及其连接线路在物理布局或逻辑构型上的排列形式。这种布局不仅描述了设备间的实际物理连接方式,也决定了数据在网络中流动的路径和方式。不同的网络拓扑结构影响着网络的性能、可靠性、可扩展性及管理维护的难易程度。_网络拓扑csdn

JS重写Date函数,兼容IOS系统_date.prototype 将所有 ios-程序员宅基地

文章浏览阅读1.8k次,点赞5次,收藏8次。IOS系统Date的坑要创建一个指定时间的new Date对象时,通常的做法是:new Date("2020-09-21 11:11:00")这行代码在 PC 端和安卓端都是正常的,而在 iOS 端则会提示 Invalid Date 无效日期。在IOS年月日中间的横岗许换成斜杠,也就是new Date("2020/09/21 11:11:00")通常为了兼容IOS的这个坑,需要做一些额外的特殊处理,笔者在开发的时候经常会忘了兼容IOS系统。所以就想试着重写Date函数,一劳永逸,避免每次ne_date.prototype 将所有 ios

如何将EXCEL表导入plsql数据库中-程序员宅基地

文章浏览阅读5.3k次。方法一:用PLSQL Developer工具。 1 在PLSQL Developer的sql window里输入select * from test for update; 2 按F8执行 3 打开锁, 再按一下加号. 鼠标点到第一列的列头,使全列成选中状态,然后粘贴,最后commit提交即可。(前提..._excel导入pl/sql

Git常用命令速查手册-程序员宅基地

文章浏览阅读83次。Git常用命令速查手册1、初始化仓库git init2、将文件添加到仓库git add 文件名 # 将工作区的某个文件添加到暂存区 git add -u # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,不处理untracked的文件git add -A # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,包括untracked的文件...

分享119个ASP.NET源码总有一个是你想要的_千博二手车源码v2023 build 1120-程序员宅基地

文章浏览阅读202次。分享119个ASP.NET源码总有一个是你想要的_千博二手车源码v2023 build 1120

【C++缺省函数】 空类默认产生的6个类成员函数_空类默认产生哪些类成员函数-程序员宅基地

文章浏览阅读1.8k次。版权声明:转载请注明出处 http://blog.csdn.net/irean_lau。目录(?)[+]1、缺省构造函数。2、缺省拷贝构造函数。3、 缺省析构函数。4、缺省赋值运算符。5、缺省取址运算符。6、 缺省取址运算符 const。[cpp] view plain copy_空类默认产生哪些类成员函数

推荐文章

热门文章

相关标签