Apache Spark【从无到有从有到无】【编程指南】【AS6】Spark Streaming编程指南_as6 编程-程序员宅基地

技术标签: Apache Spark  

目录

1.概观

2.一个简单的例子

3.基本概念

3.1.链接

3.2.初始化StreamingContext

3.3.离散流(DStreams)

3.4.输入DStreams和Receivers

3.4.1.基本来源

3.4.2.高级资源

3.4.3.自定义来源

3.4.4.接收器可靠性

3.5.DStreams的转换

3.5.1.UpdateStateByKey操作

3.5.2.变换操作

3.5.3.窗口操作

3.5.4.加入运营

3.6.DStreams的输出操作

3.6.1.使用foreachRDD的设计模式

3.7.DataFrame和SQL操作

3.8.MLlib运营

3.9.缓存/持久性(Caching / Persistence)

3.10.检查点

3.10.1.何时启用检查点

3.10.2.如何配置检查点

3.11.累加器,广播变量和检查点(Accumulators, Broadcast Variables, and Checkpoints)

3.12.部署应用程序

3.12.1.要求(Requirements)

3.12.2.升级应用程序代码

3.13.监控应用

4.性能调优

4.1.减少批处理时间

4.1.1.数据接收中的并行度

4.1.2.数据处理中的并行度

4.1.3.数据序列化

4.1.4.任务启动开销

4.2.设置正确的批次间隔

4.3.内存调整

5.容错语义

5.1.背景.

5.2.定义

5.3.基本语义

5.4.接收数据的语义

5.4.1.使用文件

5.4.2.使用基于Receiver的源

5.4.2.使用Kafka Direct API

5.5.输出操作的语义


参考:官方文档

1.概观

Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。数据可以从许多来源(如Kafka,Flume,Kinesis或TCP套接字)中提取,并且可以使用以高级函数表示的复杂算法进行处理,例如map,reducejoinwindow。最后,处理后的数据可以推送到文件系统,数据库和实时仪表板。实际上,您可以在数据流上应用Spark的 机器学习和 图形处理算法。

Spark Streaming

在内部,它的工作原理如下。Spark Streaming接收实时输入数据流并将数据分成批处理,然后由Spark引擎处理以批量生成最终结果流。

Spark Streaming

Spark Streaming提供称为离散流DStream的高级抽象,表示连续的数据流。DStream可以从来自Kafka,Flume和Kinesis等源的输入数据流创建,也可以通过在其他DStream上应用高级操作来创建。在内部,DStream表示为一系列 RDD

 

2.一个简单的例子

假设我们想要计算从TCP套接字上监听的数据服务器接收的文本数据中的字数。

首先,我们创建一个 JavaStreamingContext对象,它是所有流功能的主要入口点。我们使用两个执行线程创建一个本地StreamingContext,批处理间隔为1秒。

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

使用此上下文,我们可以创建一个DStream来表示来自TCP源的流数据,指定为主机名(例如localhost)和端口(例如9999)。

// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

linesDStream表示将从数据服务器接收的数据流。此流中的每条记录都是一行文本。然后,我们想要将空格分割为单词。

// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());

flatMap是一个DStream操作,它通过从源DStream中的每个记录生成多个新记录来创建新的DStream。在这种情况下,每行将被分成多个单词,单词流表示为 wordsDStream。请注意,我们使用FlatMapFunction对象定义了转换 。正如我们将要发现的那样,Java API中有许多这样的便利类可以帮助定义DStream转换。

接下来,我们要计算这些单词。

// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();

使用PairFunction 对象将wordsDStream进一步映射(一对一转换)到(word, 1)对的DStream。然后,使用Function2对象将其缩小以获得每批数据中的单词频率。最后,wordCounts.print()将打印每秒生成的一些计数。

请注意,执行这些行时,Spark Streaming仅设置它在启动后将执行的计算,并且尚未启动实际处理。要在设置完所有转换后开始处理,我们最终调用start方法。

jssc.start();              // Start the computation
jssc.awaitTermination();   // Wait for the computation to terminate

完整的代码可以在Spark Streaming示例 JavaNetworkWordCount中找到。 

如果您已经下载构建了 Spark,则可以按如下方式运行此示例。您首先需要使用Netcat(在大多数类Unix系统中找到的小实用程序)作为数据服务器运行

$ nc -lk 9999

然后,在不同的终端中,您可以使用启动示例 

$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999

然后,在运行netcat服务器的终端中键入的任何行将被计数并每秒在屏幕上打印。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world
# TERMINAL 2: RUNNING JavaNetworkWordCount

$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)

 

3.基本概念

 

3.1.链接

与Spark类似,Spark Streaming可通过Maven Central获得。要编写自己的Spark Streaming程序,必须将以下依赖项添加到SBT或Maven项目中

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>2.4.3</version>
    <scope>provided</scope>
</dependency>

要从Kafka,Flume和Kinesis等源中提取Spark Streaming核心API中不存在的数据,您必须将相应的工件添加spark-streaming-xyz_2.12到依赖项中。

Source Artifact
Kafka spark-streaming-kafka-0-10_2.12
Flume spark-streaming-flume_2.12
Kinesis spark-streaming-kinesis-asl_2.12 [Amazon Software License]

有关最新列表,请参阅 Maven存储库 以获取受支持的源和工件的完整列表。

 

3.2.初始化StreamingContext

要初始化Spark Streaming程序,必须创建一个StreamingContext对象,它是所有Spark Streaming功能的主要入口点。

JavaStreamingContext对象可以从被创建SparkConf对象。

import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));

appName参数是应用程序在集群UI上显示的名称。 masterSpark,Mesos或YARN群集URL,或在本地模式下运行的特殊“local [*]”字符串。实际上,当在群集上运行时,您不希望master在程序中进行硬编码,而是启动应用程序spark-submit并在那里接收它。但是,对于本地测试和单元测试,您可以传递“local [*]”以在进程中运行Spark Streaming。请注意,这会在内部创建一个JavaSparkContext(所有Spark功能的起点),可以将其作为ssc.sparkContext

必须根据应用程序的延迟要求和可用的群集资源设置批处理间隔。有关 更多详细信息,请参见性能调整部分。

JavaStreamingContext目的还可以从现有的创建JavaSparkContext

import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

定义上下文后,您必须执行以下操作。

  1. 通过创建输入DStreams来定义输入源。
  2. 通过将转换和输出操作应用于DStream来定义流式计算。
  3. 开始接收数据并使用它进行处理streamingContext.start()
  4. 等待处理停止(手动或由于任何错误)使用streamingContext.awaitTermination()
  5. 可以使用手动停止处理streamingContext.stop()

要记住的要点:

  • 一旦启动了上下文,就不能设置或添加新的流式计算。
  • 上下文停止后,无法重新启动。
  • 在JVM中只能同时激活一个StreamingContext。
  • StreamingContext上的stop()也会停止SparkContext。要仅停止StreamingContext,请将stop()的可选参数stopSparkContext设置为false。
  • 只要在创建下一个StreamingContext之前停止前一个StreamingContext(不停止SparkContext),就可以重复使用SparkContext创建多个StreamingContexts。

 

3.3.离散流(DStreams)

Discretized StreamDStream是Spark Streaming提供的基本抽象。它表示连续的数据流,可以是从源接收的输入数据流,也可以是通过转换输入流生成的已处理数据流。在内部,DStream由一系列连续的RDD表示,这是Spark对不可变分布式数据集的抽象(有关更多详细信息,请参阅Spark编程指南)。DStream中的每个RDD都包含来自特定时间间隔的数据,如下图所示。

Spark Streaming

应用于DStream的任何操作都转换为底层RDD上的操作。例如,在先前将行流转换为单词的示例中,flatMap操作应用于linesDStream中的每个RDD 以生成DStream的 wordsRDD。如下图所示。

Spark Streaming

这些底层RDD转换由Spark引擎计算。DStream操作隐藏了大部分细节,并为开发人员提供了更高级别的API以方便使用。

 

3.4.输入DStreams和Receivers

输入DStream是表示从流源接收的输入数据流的DStream。在快速示例中lines输入DStream是表示从netcat服务器接收的数据流。每个输入DStream(文件流除外)都与Receiver (Scala doc, Java doc)对象相关联,该对象从源接收数据并将其存储在Spark的内存中进行处理。

Spark Streaming提供两类内置流媒体源。

  • 基本来源:StreamingContext API中直接提供的。示例:文件系统和套接字连接。
  • 高级资源:Kafka,Flume,Kinesis等资源可通过额外的实用程序类获得。这些需要链接额外的依赖关系,如 链接部分所述。

请注意,如果要在流应用程序中并行接收多个数据流,可以创建多个输入DStream(在“ 性能调整”部分中进一步讨论)。这将创建多个接收器,这些接收器将同时接收多个数据流。但请注意,Spark worker / executor是一个长期运行的任务,因此它占用了分配给Spark Streaming应用程序的其中一个核心。因此,重要的是要记住,Spark Streaming应用程序需要分配足够的内核(或线程,如果在本地运行)来处理接收的数据,以及运行接收器。

要记住的要点

  • 在本地运行Spark Streaming程序时,请勿使用“local”或“local [1]”作为主URL。这两种方法都意味着只有一个线程将用于本地运行任务。如果您正在使用基于接收器的输入DStream(例如套接字,Kafka,Flume等),那么将使用单个线程来运行接收器,而不留下用于处理接收数据的线程。因此,在本地运行时,始终使用“local [ n ]”作为主URL,其中n >要运行的接收器数量(有关如何设置主服务器的信息,请参阅Spark属性)。

  • 将逻辑扩展到在集群上运行时,分配给Spark Streaming应用程序的核心数必须大于接收器数。否则系统将接收数据,但无法处理数据。

3.4.1.基本来源

我们已经ssc.socketTextStream(...)快速示例 中查看了通过TCP套接字连接接收的文本数据创建DStream的示例。除了套接字之外,StreamingContext API还提供了从文件创建DStream作为输入源的方法。

文件流

对于从与HDFS API兼容的任何文件系统(即HDFS,S3,NFS等)上的文件读取数据,可以通过StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]创建DStream。

文件流不需要运行接收器,因此不需要为接收文件数据分配任何内核。

对于简单的文本文件,最简单的方法是StreamingContext.textFileStream(dataDirectory)

streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);

 对于文本文件

streamingContext.textFileStream(dataDirectory);

如何监控目录

Spark Streaming将监视目录dataDirectory并处理在该目录中创建的任何文件。

  • 可以监视一个简单的目录,例如"hdfs://namenode:8040/logs/"。直接在这种路径下的所有文件将在发现时进行处理。
  • POSIX glob pattern可以被支持,,例如 "hdfs://namenode:8040/logs/2017/*"。这里,DStream将包含与模式匹配的目录中的所有文件。那就是:它是目录的模式,而不是目录中的文件。
  • 所有文件必须采用相同的数据格式。
  • 根据文件的修改时间而不是创建时间,文件被视为时间段的一部分。
  • 处理完毕后,对当前窗口中文件的更改不会导致重新读取文件。那就是:忽略更新
  • 目录下的文件越多,扫描更改所需的时间就越长 - 即使没有修改过任何文件。
  • 如果使用通配符来标识目录,例如"hdfs://namenode:8040/logs/2016-*",重命名整个目录以匹配路径,则会将该目录添加到受监视目录列表中。只有修改时间在当前窗口内的目录中的文件才会包含在流中。
  • 调用FileSystem.setTimes() 修复时间戳是一种在稍后的窗口中拾取文件的方法,即使其内容未更改。

使用对象存储作为数据源

“完整”文件系统(如HDFS)会在创建输出流后立即在其文件上设置修改时间。打开文件时,即使在数据完全写入之前,它也可能包含在DStream- 之后 - 将忽略同一窗口中文件的更新。即:可能会遗漏更改,并从流中省略数据。

要确保在窗口中选择更改,请将文件写入不受监视的目录,然后在关闭输出流后立即将其重命名为目标目录。如果重命名的文件在其创建窗口期间出现在扫描的目标目录中,则将拾取新数据。

相比之下,Amazon S3和Azure Storage等对象存储通常具有较慢的重命名操作,因为实际上是复制了数据。此外,重命名的对象可能将rename()操作的时间作为其修改时间,因此可能不被视为原始创建时间所暗示的窗口的一部分。

需要对目标对象存储进行仔细测试,以验证存储的时间戳行为是否与Spark Streaming所期望的一致。可能是直接写入目标目录是通过所选对象存储流传输数据的适当策略。

有关此主题的更多详细信息,请参阅Hadoop文件系统规范

 

基于自定义接收器的流

可以使用通过自定义接收器接收的数据流创建DStream。有关更多详细信息,请参阅自定义接收器指南

 

RDD作为流的队列

为了测试带有测试数据的Spark Streaming应用程序,还可以使用基于RDD队列创建DStream streamingContext.queueStream(queueOfRDDs)。推入队列的每个RDD将被视为DStream中的一批数据,并像流一样处理。

有关从套接字和文件流的详细信息, see the API documentations of the relevant functions in StreamingContext for Scala, JavaStreamingContext for Java, and StreamingContext for Python.

 

3.4.2.高级资源

Python API 从Spark 2.4.3开始,在这些来源中,Kafka,Kinesis和Flume在Python API中可用。

此类源需要与外部非Spark库连接,其中一些库具有复杂的依赖性(例如,Kafka和Flume)。因此,为了最大限度地减少与依赖项版本冲突相关的问题,从这些源创建DStream的功能已移至可在必要时显式链接的单独库。

请注意,Spark shell中不提供这些高级源,因此无法在shell中测试基于这些高级源的应用程序。如果您真的想在Spark shell中使用它们,则必须下载相应的Maven工件JAR及其依赖项,并将其添加到类路径中。

其中一些先进的来源如下。

  • Kafka: Spark Streaming 2.4.3与Kafka经纪人版本0.8.2.1或更高版本兼容。有关更多详细信息,请参阅Kafka集成指南

  • Flume: Spark Streaming 2.4.3与Flume 1.6.0兼容。有关详细信息,请参阅Flume集成指南

  • Kinesis: Spark Streaming 2.4.3与Kinesis Client Library 1.2.1兼容。有关详细信息,请参阅Kinesis集成指南

 

3.4.3.自定义来源

Python API Python 尚不支持此功能

输入DStream也可以从自定义数据源创建。您所要做的就是实现一个用户定义的接收器,它可以从自定义源接收数据并将其推送到Spark。有关详细信息,请参阅自定义接收器指南

 

3.4.4.接收器可靠性

根据其可靠性,可以有两种数据源。来源(如Kafka和Flume)允许传输数据得到确认。如果从这些可靠来源接收数据的系统正确地确认接收到的数据,则可以确保不会因任何类型的故障而丢失数据。这导致两种接收器:

  1. 可靠的接收器 - 可靠的接收器在接收到数据并将其存储在带复制的Spark中时,正确地将确认发送到可靠的源。
  2. 不可靠的接收器 -一个不可靠的接收器没有发送确认的资源等。这可以用于不支持确认的源,甚至可以用于不需要或需要进入确认复杂性的可靠源。

“ 自定义接收器指南”中讨论了如何编写可靠接收器的详细信息 。

 

3.5.DStreams的转换

与RDD类似,转换允许修改来自输入DStream的数据。DStreams支持普通Spark RDD上可用的许多转换。一些常见的如下。

转换 含义
map(func) 通过将源DStream的每个元素传递给函数func来返回一个新的DStream 。
flatMap(func) 与map类似,但每个输入项可以映射到0个或更多输出项。
filter(func) 通过仅选择func返回true 的源DStream的记录来返回新的DStream 。
repartition(numPartitions) 通过创建更多或更少的分区来更改此DStream中的并行度级别。
union(otherStream) 返回一个新的DStream,它包含源DStream和otherDStream中元素的 并 
count() 通过计算源DStream的每个RDD中的元素数量,返回单元素RDD的新DStream。
reduce(func) 通过使用函数func(它接受两个参数并返回一个)聚合源DStream的每个RDD中的元素,返回单元素RDD的新DStream 。该函数应该是关联的和可交换的,以便可以并行计算。
countByValue() 当在类型为K的元素的DStream上调用时,返回(K,Long)对的新DStream,其中每个键的值是其在源DStream的每个RDD中的频率。
reduceByKey(func, [numTasks]) 当在(K,V)对的DStream上调用时,返回(K,V)对的新DStream,其中使用给定的reduce函数聚合每个键的值。注意:默认情况下,这使用Spark的默认并行任务数(本地模式为2,在群集模式下,数量由config属性确定spark.default.parallelism)进行分组。您可以传递可选numTasks参数来设置不同数量的任务。
join(otherStream, [numTasks]) 当在(K,V)和(K,W)对的两个DStream上调用时,返回(K,(V,W))对的新DStream与每个键的所有元素对。
cogroup(otherStream, [numTasks]) 当在(K,V)和(K,W)对的DStream上调用时,返回(K,Seq [V],Seq [W])元组的新DStream。
transform(func) 通过将RDD-to-RDD函数应用于源DStream的每个RDD来返回新的DStream。这可以用于在DStream上执行任意RDD操作。
updateStateByKey(func) 返回一个新的“状态”DStream,其中通过在键的先前状态和键的新值上应用给定函数来更新每个键的状态。这可用于维护每个密钥的任意状态数据。

3.5.1.UpdateStateByKey操作

updateStateByKey操作允许您在使用新信息不断更新时保持任意状态。要使用它,您必须执行两个步骤。

  1. 定义状态 - 状态可以是任意数据类型。
  2. 定义状态更新功能 - 使用函数指定如何使用先前状态和输入流中的新值更新状态。

在每个批处理中,Spark都会对所有现有密钥应用状态更新功能,无论它们是否在批处理中都有新数据。如果更新函数返回,None则将删除键值对。

让我们举一个例子来说明这一点。假设您要维护文本数据流中看到的每个单词的运行计数。这里,运行计数是状态,它是一个整数。我们将更新功能定义为:

Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
  (values, state) -> {
    Integer newSum = ...  // add the new values with the previous running count to get the new count
    return Optional.of(newSum);
  };

这应用于包含字的数据流(也就是说,快速示例中包含(word,1)对的对数据流)。

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

将为每个单词调用更新函数,newValues其序列为1(来自(word, 1)成对)并runningCount具有前一个计数。有关完整的Java代码,请查看示例JavaStatefulNetworkWordCount.java

请注意,使用updateStateByKey需要配置检查点目录,这将在检查点部分中详细讨论。

 

3.5.2.变换操作

transform操作(及其变体transformWith)允许在DStream上应用任意RDD到RDD功能。它可用于应用未在DStream API中公开的任何RDD操作。例如,将数据流中的每个批次与另一个数据集连接的功能不会直接在DStream API中公开。但是,您可以轻松地使用它transform来执行此操作。这使得非常强大的可能性。例如,可以通过将输入数据流与预先计算的垃圾邮件信息(也可以使用Spark生成)连接,然后根据它进行过滤,来进行实时数据清理。

import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
  rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
  ...
});

请注意,在每个批处理间隔中都会调用提供的函数。这允许您进行时变RDD操作,即RDD操作,分区数,广播变量等可以在批次之间进行更改。

 

3.5.3.窗口操作

Spark Streaming还提供窗口计算,允许您在滑动数据窗口上应用转换。下图说明了此滑动窗口。

Spark Streaming

如图所示,每次窗口在源数据流上滑动时,窗口中的源RDD都被组合并操作,以生成窗口数据流的RDD。在这种特定情况下,该操作将应用于过去3个时间单位的数据,并按2个时间单位滑动。这表明任何窗口操作都需要指定两个参数。

  • 窗口长度 - 窗口的持续时间(图中的3)。
  • 滑动间隔 - 执行窗口操作的间隔(图中的2)。

这两个参数必须是源DStream的批处理间隔的倍数(图中的1)。

让我们用一个例子来说明窗口操作。比如说,您希望通过每隔10秒生成最后30秒数据的字数来扩展 前面的示例。为此,我们必须在最后30秒的数据reduceByKey上对pairsDStream (word, 1)对应用操作。这是使用该操作完成的reduceByKeyAndWindow

// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));
转换 含义
windowwindowLengthslideInterval 返回一个新的DStream,它是根据源DStream的窗口批次计算的。
countByWindowwindowLengthslideInterval 返回流中元素的滑动窗口数。
reduceByWindowfuncwindowLengthslideInterval 返回一个新的单元素流,通过使用func在滑动间隔内聚合流中的元素而创建。该函数应该是关联的和可交换的,以便可以并行正确计算。
reduceByKeyAndWindowfuncwindowLengthslideInterval,[ numTasks ])

当在(K,V)对的DStream上调用时,返回(K,V)对的新DStream,其中使用给定的reduce函数func 在滑动窗口中的批次聚合每个键的值。

注意:默认情况下,这使用Spark的默认并行任务数(本地模式为2,在群集模式下,数量由config属性确定spark.default.parallelism)进行分组。您可以传递可选 numTasks参数来设置不同数量的任务。

reduceByKeyAndWindowfuncinvFuncwindowLengthslideInterval,[ numTasks ])

上述更有效的版本,reduceByKeyAndWindow()其中使用前一窗口的reduce值逐步计算每个窗口的reduce值。这是通过减少进入滑动窗口的新数据和“反向减少”离开窗口的旧数据来完成的。一个例子是当窗口滑动时“添加”和“减去”键的计数。但是,它仅适用于“可逆减少函数”,即那些具有相应“反向减少”函数的减函数(作为参数invFunc)。同样reduceByKeyAndWindow,reduce任务的数量可通过可选参数进行配置。请注意,必须启用检查点才能使用此操作。

countByValueAndWindowwindowLength, slideInterval,[numTasks ]) 当在(K,V)对的DStream上调用时,返回(K,Long)对的新DStream,其中每个键的值是其在滑动窗口内的频率。同样 reduceByKeyAndWindow,reduce任务的数量可通过可选参数进行配置。

3.5.4.加入运营

最后,值得强调的是,您可以轻松地在Spark Streaming中执行不同类型的连接。

流连接

JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);

这里,在每个批处理间隔中,生成的RDD stream1将与生成的RDD连接stream2。你也可以做leftOuterJoinrightOuterJoinfullOuterJoin。此外,在流的窗口上进行连接通常非常有用。这也很容易

JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);

流数据集连接

在解释DStream.transform操作时已经显示了这一点。这是将窗口流与数据集连接的另一个示例。

JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));

实际上,您还可以动态更改要加入的数据集。提供给的函数在transform每个批处理间隔进行评估,因此将使用dataset引用指向的当前数据集。

API文档中提供了完整的DStream转换列表。对于Scala API,请参阅DStream 和PairDStreamFunctions。对于Java API,请参阅JavaDStream 和JavaPairDStream。对于Python API,请参阅DStream

 

3.6.DStreams的输出操作

输出操作允许将DStream的数据推送到外部系统,如数据库或文件系统。由于输出操作实际上允许外部系统使用转换后的数据,因此它们会触发所有DStream转换的实际执行(类似于RDD的操作)。目前,定义了以下输出操作:

输出操作 含义
print() 在运行流应用程序的驱动程序节点上打印DStream中每批数据的前十个元素。这对开发和调试很有用。 
Python API这在Python API中称为 pprint()
saveAsTextFiles前缀,[ 后缀 ]) 将此DStream的内容保存为文本文件。每个批处理间隔的文件名基于前缀后缀生成:“prefix-TIME_IN_MS [.suffix]”
saveAsObjectFiles前缀,[ 后缀 ]) 将此DStream的内容保存为SequenceFiles序列化Java对象。每个批处理间隔的文件名基于前缀和 后缀生成:“prefix-TIME_IN_MS [.suffix]”。 
Python API这在Python API中不可用。
saveAsHadoopFiles前缀,[ 后缀 ]) 将此DStream的内容保存为Hadoop文件。每个批处理间隔的文件名基于前缀后缀生成:“prefix-TIME_IN_MS [.suffix]”。 
Python API这在Python API中不可用。
foreachRDDfunc 最通用的输出运算符,它将函数func应用于从流生成的每个RDD。此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。请注意,函数func在运行流应用程序的驱动程序进程中执行,并且通常会在其中执行RDD操作,这将强制计算流式RDD。

3.6.1.使用foreachRDD的设计模式

dstream.foreachRDD是一个功能强大的原语,允许将数据发送到外部系统。但是,了解如何正确有效地使用此原语非常重要。一些常见的错误要避免如下。

通常将数据写入外部系统需要创建连接对象(例如,与远程服务器的TCP连接)并使用它将数据发送到远程系统。为此,开发人员可能无意中尝试在Spark驱动程序中创建连接对象,然后尝试在Spark工作程序中使用它来保存RDD中的记录

dstream.foreachRDD(rdd -> {
  Connection connection = createNewConnection(); // executed at the driver
  rdd.foreach(record -> {
    connection.send(record); // executed at the worker
  });
});

这是不正确的,因为这需要连接对象被序列化并从驱动程序发送到worker。这种连接对象很少跨机器传输。此错误可能表现为序列化错误(连接对象不可序列化),初始化错误(需要在worker处初始化连接对象)等。正确的解决方案是在worker处创建连接对象。

但是,这可能会导致另一个常见错误 - 为每条记录创建一个新连接。例如,

dstream.foreachRDD(rdd -> {
  rdd.foreach(record -> {
    Connection connection = createNewConnection();
    connection.send(record);
    connection.close();
  });
});

通常,创建连接对象会产生时间和资源开销。因此,为每个记录创建和销毁连接对象可能会产生不必要的高开销,并且可能显着降低系统的总吞吐量。更好的解决方案是使用 rdd.foreachPartition- 创建单个连接对象并使用该连接发送RDD分区中的所有记录。

dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    Connection connection = createNewConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    connection.close();
  });
});

这会分摊许多记录的连接创建开销。

最后,通过在多个RDD /批处理中重用连接对象,可以进一步优化这一点。由于多个批次的RDD被推送到外部系统,因此可以维护连接对象的静态池,而不是可以重用的连接对象,从而进一步减少了开销。

dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    // ConnectionPool is a static, lazily initialized pool of connections
    Connection connection = ConnectionPool.getConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    ConnectionPool.returnConnection(connection); // return to the pool for future reuse
  });
});

请注意,池中的连接应根据需要延迟创建,如果暂时不使用,则会超时。这实现了最有效的数据发送到外部系统。

要记住的其他要点:

  • DStreams由输出操作延迟执行,就像RDD由RDD操作延迟执行一样。具体而言,DStream输出操作中的RDD操作会强制处理接收到的数据。因此,如果您的应用程序没有任何输出操作,或者输出操作dstream.foreachRDD()没有任何RDD操作,那么就不会执行任何操作。系统将简单地接收数据并将其丢弃。

  • 默认情况下,输出操作一次执行一次。它们按照应用程序中定义的顺序执行。

 

3.7.DataFrame和SQL操作

您可以轻松地对流数据使用DataFrames和SQL操作。您必须使用StreamingContext正在使用的SparkContext创建SparkSession。此外,必须这样做,以便可以在驱动器故障时重新启动。这是通过创建一个延迟实例化的SparkSession单例实例来完成的。这在以下示例中显示。它修改了早期的单词计数示例,以使用DataFrames和SQL生成单词计数。每个RDD都转换为DataFrame,注册为临时表,然后使用SQL进行查询。

/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
  private String word;

  public String getWord() {
    return word;
  }

  public void setWord(String word) {
    this.word = word;
  }
}

...

/** DataFrame operations inside your streaming program */

JavaDStream<String> words = ... 

words.foreachRDD((rdd, time) -> {
  // Get the singleton instance of SparkSession
  SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();

  // Convert RDD[String] to RDD[case class] to DataFrame
  JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
    JavaRow record = new JavaRow();
    record.setWord(word);
    return record;
  });
  DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);

  // Creates a temporary view using the DataFrame
  wordsDataFrame.createOrReplaceTempView("words");

  // Do word count on table using SQL and print it
  DataFrame wordCountsDataFrame =
    spark.sql("select word, count(*) as total from words group by word");
  wordCountsDataFrame.show();
});

查看完整的源代码

您还可以对从不同线程(即,与正在运行的StreamingContext异步)的流数据上定义的表运行SQL查询。只需确保将StreamingContext设置为记住足够数量的流数据,以便查询可以运行。否则,不知道任何异步SQL查询的StreamingContext将在查询完成之前删除旧的流数据。例如,如果要查询最后一批,但查询可能需要5分钟才能运行,则调用streamingContext.remember(Minutes(5))(在Scala中,或在其他语言中等效)。

有关DataFrame的详细信息,请参阅DataFrames和SQL指南。

 

3.8.MLlib运营

您还可以轻松使用MLlib提供的机器学习算法。首先,有流媒体机器学习算法(例如流媒体线性回归流媒体KMeans等),它们可以同时学习流数据以及将模型应用于流数据。除此之外,对于更大类的机器学习算法,您可以离线学习学习模型(即使用历史数据),然后在线将数据应用于流数据。有关详细信息,请参阅MLlib指南。

 

3.9.缓存/持久性(Caching / Persistence)

与RDD类似,DStreams还允许开发人员将流的数据保存在内存中。也就是说,persist()在DStream上使用该方法会自动将该DStream的每个RDD保留在内存中。如果DStream中的数据将被多次计算(例如,对相同数据进行多次操作),这将非常有用。对于像reduceByWindow和这样的基于窗口的操作和 reduceByKeyAndWindow基于状态的操作updateStateByKey,这是隐含的。因此,基于窗口的操作生成的DStream会自动保留在内存中,而无需开发人员调用persist()

对于通过网络接收数据的输入流(例如,Kafka,Flume,套接字等),默认持久性级别设置为将数据复制到两个节点以实现容错。

请注意,与RDD不同,DStreams的默认持久性级别会将数据序列化为内存。“ 性能调整”部分对此进行了进一步讨论。有关不同持久性级别的更多信息,请参阅“ Spark编程指南”

 

3.10.检查点

流应用程序必须全天候运行,因此必须能够适应与应用程序逻辑无关的故障(例如,系统故障,JVM崩溃等)。为了实现这一点,Spark Streaming需要将足够的信息检查到容错存储系统,以便它可以从故障中恢复。检查点有两种类型的数据。

  • 元数据检查点 (Metadata checkpointing)- 将定义流式计算的信息保存到容错存储(如HDFS)。这用于从运行流应用程序的驱动程序的节点的故障中恢复。元数据包括:
    • 配置(Configuration ) - 用于创建流应用程序的配置。
    • DStream操作 - 定义流应用程序的DStream操作集。
    • 不完整的批次 (Incomplete batches )- 其工作排队但尚未完成的批次。
  • 数据检查点 (Data checkpointing)- 将生成的RDD保存到可靠的存储。在一些跨多个批次组合数据的有状态转换中,这是必需的。在这种转换中,生成的RDD依赖于先前批次的RDD,这导致依赖关系链的长度随时间增加。为了避免恢复时间的这种无限增加(与依赖链成比例),有状态变换的中间RDD周期性地检查点到可靠存储(例如HDFS)以切断依赖链。

总而言之,元数据检查点主要用于从驱动程序故障中恢复,而如果使用状态转换,即使对于基本功能也需要数据或RDD检查点。

 

3.10.1.何时启用检查点

必须为具有以下任何要求的应用程序启用检查点:

  • 有状态转换的用法 - 如果在应用程序中使用了updateStateByKeyreduceByKeyAndWindow(和反函数),则必须提供检查点目录以允许定期RDD检查点。
  • 从运行应用程序的驱动程序的故障中恢复 - 元数据检查点用于使用进度信息进行恢复。

请注意,可以在不启用检查点的情况下运行没有上述有状态转换的简单流应用程序。在这种情况下,驱动程序故障的恢复也将是部分的(某些已接收但未处理的数据可能会丢失)。这通常是可以接受的,并且许多以这种方式运行Spark Streaming应用程序。预计对非Hadoop环境的支持将在未来得到改善。

 

3.10.2.如何配置检查点

可以通过在容错,可靠的文件系统(例如,HDFS,S3等)中设置目录来启用检查点,检查点信息将保存到该文件系统中。这是通过使用streamingContext.checkpoint(checkpointDirectory)完成的。这将允许您使用上述有状态转换。此外,如果要使应用程序从驱动程序故障中恢复,则应重写流应用程序以使其具有以下行为。

  • 当程序第一次启动时,它将创建一个新的StreamingContext,设置所有流然后调用start()。
  • 当程序在失败后重新启动时,它将从检查点目录中的检查点数据重新创建StreamingContext。
// Create a factory object that can create and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
  @Override public JavaStreamingContext create() {
    JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
    JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
    ...
    jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
    return jssc;
  }
};

// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start();
context.awaitTermination();

如果checkpointDirectory存在,则将从检查点数据重新创建上下文。如果目录不存在(即第一次运行),则将contextFactory调用该函数以创建新上下文并设置DStream。请参阅Java示例 JavaRecoverableNetworkWordCount。此示例将网络数据的字数附加到文件中

除了使用getOrCreate之外,还需要确保驱动程序进程在失败时自动重启。这只能通过用于运行应用程序的部署基础结构来完成。这在“ 部署”部分中进一步讨论 。

请注意,RDD的检查点会导致节省可靠存储的成本。这可能会导致RDD被检查点的那些批次的处理时间增加。因此,需要仔细设置检查点的间隔。在小批量(例如1秒)时,每批次的检查点可能会显着降低操作吞吐量。相反,检查点过于频繁会导致谱系和任务大小增长,这可能会产生不利影响。对于需要RDD检查点的有状态转换,默认间隔是批处理间隔的倍数,至少为10秒。它可以通过使用dstream.checkpoint(checkpointInterval)来设置。通常,DStream的5-10个滑动间隔的检查点间隔是一个很好的设置。

 

3.11.累加器,广播变量和检查点(Accumulators, Broadcast Variables, and Checkpoints)

无法从Spark Streaming中的检查点恢复累加器广播变量。如果启用了检查点并使用 累加器广播变量 ,则必须为累加器广播变量创建延迟实例化的单例实例, 以便在驱动程序重新启动失败后重新实例化它们。这在以下示例中显示。

class JavaWordBlacklist {

  private static volatile Broadcast<List<String>> instance = null;

  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaWordBlacklist.class) {
        if (instance == null) {
          List<String> wordBlacklist = Arrays.asList("a", "b", "c");
          instance = jsc.broadcast(wordBlacklist);
        }
      }
    }
    return instance;
  }
}

class JavaDroppedWordsCounter {

  private static volatile LongAccumulator instance = null;

  public static LongAccumulator getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaDroppedWordsCounter.class) {
        if (instance == null) {
          instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
        }
      }
    }
    return instance;
  }
}

wordCounts.foreachRDD((rdd, time) -> {
  // Get or register the blacklist Broadcast
  Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
  // Get or register the droppedWordsCounter Accumulator
  LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
  // Use blacklist to drop words and use droppedWordsCounter to count them
  String counts = rdd.filter(wordCount -> {
    if (blacklist.value().contains(wordCount._1())) {
      droppedWordsCounter.add(wordCount._2());
      return false;
    } else {
      return true;
    }
  }).collect().toString();
  String output = "Counts at time " + time + " " + counts;
}

查看完整的源代码

 

3.12.部署应用程序

3.12.1.要求(Requirements)

要运行Spark Streaming应用程序,您需要具备以下条件。

  • 具有集群管理器的集群 - 这是任何Spark应用程序的一般要求,并在部署指南中进行了详细讨论。
  • 打包应用程序JAR - 您必须将流应用程序编译为JAR。如果您正在使用spark-submit启动应用程序,那么您将不需要在JAR中提供Spark和Spark Streaming。但是,如果您的应用程序使用高级源(例如Kafka,Flume),则必须将它们链接的额外工件及其依赖项打包在用于部署应用程序的JAR中。例如,使用的应用程序KafkaUtils 必须包含spark-streaming-kafka-0-10_2.12应用程序JAR中的所有传递依赖项。
  • 为执行程序配置足够的内存 - 由于接收的数据必须存储在内存中,因此必须为执行程序配置足够的内存来保存接收的数据。请注意,如果您正在进行10分钟的窗口操作,则系统必须至少将最后10分钟的数据保留在内存中。因此,应用程序的内存要求取决于其中使用的操作。
  • 配置检查点 - 如果流应用程序需要它,则必须将Hadoop API兼容容错存储中的目录(例如HDFS,S3等)配置为检查点目录,并以检查点信息可以写入的方式编写流应用程序用于故障恢复。有关详细信息,请参阅检查点部分。
  • 配置应用程序驱动程序的自动重新启动 - 要自动从驱动程序故障中恢复,用于运行流应用程序的部署基础结构必须监视驱动程序进程并在驱动程序失败时重新启动驱动程序。不同的集群管理器 有不同的工具来实现这一点
  1. Spark Standalone - 可以提交Spark应用程序驱动程序以在Spark Standalone集群中运行(请参阅 集群部署模式),即应用程序驱动程序本身在其中一个工作节点上运行。此外,可以指示独立集群管理器监督驱动程序,如果驱动程序由于非零退出代码而失败,或者由于运行驱动程序的节点故障,则重新启动它。有关详细信息,请参阅Spark Standalone指南中的 集群模式监督
  2. YARN - Yarn支持类似的机制来自动重启应用程序。有关更多详细信息,请参阅YARN文档。
  3. Mesos - Marathon已被用于与Mesos实现这一目标。
  • 配置预写日志 - 自Spark 1.2起,我们引入了预写日志以实现强大的容错保证。如果启用,则从接收器接收的所有数据都将写入配置检查点目录中的预写日志。这可以防止驱动程序恢复时的数据丢失,从而确保零数据丢失(在容错语义部分中详细讨论 )。这可以通过设置来启用配置参数spark.streaming.receiver.writeAheadLog.enable=true。然而,这些更强的语义可能以单个接收器的接收吞吐量为代价。这可以通过并行运行更多接收器来纠正 增加总吞吐量。此外,建议在启用预写日志时禁用Spark中接收数据的复制,因为日志已存储在复制存储系统中。这可以通过将输入流的存储级别设置为来完成StorageLevel.MEMORY_AND_DISK_SER。使用S3(或任何不支持刷新的文件系统)进行预写日志时,请记得启用 spark.streaming.driver.writeAheadLog.closeFileAfterWrite和 spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。有关详细信息,请参阅 Spark Streaming配置。请注意,启用I / O加密时,Spark不会加密写入预写日志的数据。如果需要加密预写日志数据,则应将其存储在本机支持加密的文件系统中。
  • 设置最大接收速率 - 如果群集资源不足以使流应用程序以接收数据的速度处理数据,则可以通过设置记录/秒的最大速率限制来限制接收器。请参阅接收器和 Direct Kafka方法的配置参数 。在Spark 1.5中,我们引入了一项称为背压(backpressure )的功能,无需设置此速率限制,因为Spark Streaming会自动计算出速率限制,并在处理条件发生变化时动态调整它们。这个背压可以通过设置来启用配置参数来。spark.streaming.receiver.maxRatespark.streaming.kafka.maxRatePerPartition spark.streaming.backpressure.enabledtrue

3.12.2.升级应用程序代码

如果需要使用新的应用程序代码升级正在运行的Spark Streaming应用程序,则有两种可能的机

  • 升级的Spark Streaming应用程序启动并与现有应用程序并行运行。一旦新的(接收与旧的数据相同的数据)已经预热并准备好黄金时间,旧的可以被放下。请注意,这可以用于支持将数据发送到两个目标(即早期和升级的应用程序)的数据源。

  • 现有应用程序正常关闭(请参阅 StreamingContext.stop(...) 或JavaStreamingContext.stop(...) 用于正常关闭选项),确保在关闭之前完全处理已接收的数据。然后可以启动升级的应用程序,该应用程序将从早期应用程序停止的同一点开始处理。请注意,这只能通过支持源端缓冲的输入源(如Kafka和Flume)来完成,因为在前一个应用程序关闭且升级的应用程序尚未启动时需要缓冲数据。并且无法从早期检查点重新启动升级前代码的信息。检查点信息基本上包含序列化的Scala / Java / Python对象,并且尝试使用新的修改类反序列化对象可能会导致错误。在这种情况下,要么使用不同的检查点目录启动升级的应用程序,要么删除以前的检查点目录。

 

3.13.监控应用

除了Spark的监控功能外,还有Spark Streaming特有的其他功能。使用StreamingContext时, Spark Web UI会显示一个附加Streaming选项卡,其中显示有关运行接收器的统计信息(接收器是否处于活动状态,接收的记录数,接收器错误等)和已完成的批处理(批处理时间,排队延迟等)。 )。这可用于监视流应用程序的进度。

Web UI中的以下两个指标尤为重要:

  • 处理时间(Processing Time) - 处理每批数据的时间。
  • 计划延迟(Scheduling Delay) - 批处理在队列中等待处理先前批处理完成的时间。

如果批处理时间始终大于批处理间隔和/或排队延迟不断增加,则表明系统无法以最快的速度处理批次并且落后。在这种情况下,请考虑 减少批处理时间。

还可以使用StreamingListener接口监视Spark Streaming程序的进度,该 接口允许您获取接收器状态和处理时间。请注意,这是一个开发人员API,未来可能会对其进行改进(即报告更多信息)。

 

4.性能调优

从群集上的Spark Streaming应用程序中获得最佳性能需要进行一些调整。本节介绍了许多可以调整以提高应用程序性能的参数和配置。在高层次上,您需要考虑两件事:

  1. 通过有效使用群集资源减少每批数据的处理时间。

  2. 设置正确的批量大小,以便可以像接收到的那样快速处理批量数据(即,数据处理与数据提取保持同步)。

 

4.1.减少批处理时间

可以在Spark中进行许多优化,以最大限度地缩短每个批处理的处理时间。这些已在“ 调整指南”中详细讨论过。本节重点介绍一些最重要的内容。

4.1.1.数据接收中的并行度

过网络接收数据(如Kafka,Flume,socket等)需要将数据反序列化并存储在Spark中。如果数据接收成为系统中的瓶颈,则考虑并行化数据接收。请注意,每个输入DStream都会创建一个接收单个数据流的接收器(在工作机器上运行)。因此,可以通过创建多个输入DStream并将它们配置为从源接收数据流的不同分区来实现接收多个数据流。例如,接收两个数据主题的单个Kafka输入DStream可以分成两个Kafka输入流,每个输入流只接收一个主题。这将运行两个接收器,允许并行接收数据,从而提高整体吞吐量。这些多个DStream可以组合在一起以创建单个DStream。然后,可以在统一流上应用在单个输入DStream上应用的转换。这样做如下。

int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

应考虑的另一个参数是接收器的块间隔,它由配置参数决定 spark.streaming.blockInterval。对于大多数接收器,接收的数据在存储在Spark的内存中之前合并为数据块。每批中的块数决定了在类似地图的转换中用于处理接收数据的任务数。每批每个接收器的任务数量大约是(批处理间隔/块间隔)。例如,200 ms的块间隔将每2秒批次创建10个任务。如果任务数量太少(即,小于每台计算机的核心数),那么效率将会很低,因为所有可用核心都不会用于处理数据。要增加给定批处理间隔的任务数,请减少块间隔。但是,建议的块间隔最小值约为50 ms,低于该值时,任务启动开销可能会出现问题。

使用多个输入流/接收器接收数据的替代方案是显式地重新分区输入数据流(使用inputStream.repartition(<number of partitions>))。这会在进一步处理之前将收到的批量数据分布到群集中指定数量的计算机上。

有关直接流,请参阅Spark Streaming + Kafka集成指南

 

4.1.2.数据处理中的并行度

如果在计算的任何阶段中使用的并行任务的数量不够高,则可能未充分利用群集资源。例如,对于像reduceByKey 和的分布式reduce操作reduceByKeyAndWindow,默认的并行任务数由spark.default.parallelism 配置属性控制。您可以将并行级别作为参数传递(请参阅PairDStreamFunctions 文档),或者设置spark.default.parallelism 配置属性以更改默认值。

 

4.1.3.数据序列化

通过调整序列化格式可以减少数据序列化的开销。在流式传输的情况下,有两种类型的数据被序列化。

  • 输入数据(Input data):默认情况下,通过Receiver接收的输入数据通过StorageLevel.MEMORY_AND_DISK_SER_2存储在执行程序的内存中。也就是说,数据被序列化为字节以减少GC开销,并且为了容忍执行器故障而被复制。此外,数据首先保存在内存中,并且仅在内存不足以保存流式计算所需的所有输入数据时才溢出到磁盘。这种序列化显然有开销 - 接收器必须反序列化接收的数据并使用Spark的序列化格式重新序列化。
  • 流式传输操作生成的持久RDD(Persisted RDDs generated by Streaming Operations):流式计算生成的RDD可以保留在内存中。例如,窗口操作将数据保留在内存中,因为它们将被多次处理。但是,与StorageLevel.MEMORY_ONLY的Spark Core默认值不同,流式计算生成的持久RDD 默认使用StorageLevel.MEMORY_ONLY_SER(即序列化)保留,以最大限度地减少GC开销。

在这两种情况下,使用Kryo序列化可以减少CPU和内存开销。有关详细信息,请参阅Spark Tuning Guide。对于Kryo,请考虑注册自定义类,并禁用对象引用跟踪(请参阅“ 配置指南”中的Kryo相关配置)

在需要为流应用程序保留的数据量不大的特定情况下,将数据(两种类型)保存为反序列化对象可能是可行的,而不会产生过多的GC开销。例如,如果您使用几秒钟的批处理间隔而没有窗口操作,则可以尝试通过相应地显式设置存储级别来禁用持久数据中的序列化。这将减少由于序列化导致的CPU开销,可能在没有太多GC开销的情况下提高性能。

 

4.1.4.任务启动开销

果每秒启动的任务数量很高(例如,每秒50或更多),则向从属设备发送任务的开销可能很大,并且将难以实现亚秒级延迟。通过以下更改可以减少开销:

  • 执行模式(Execution mode):在独立模式或粗粒度Mesos模式运行Spark可以获得比细粒度Mesos模式更好的任务启动时间。有关更多详细信息,请参阅Running on Mesos指南

这些更改可以将批处理时间减少100毫秒,从而允许亚秒级批量大小可行。

 

4.2.设置正确的批次间隔

要使群集上运行的Spark Streaming应用程序保持稳定,系统应该能够以接收数据的速度处理数据。换句话说,批处理数据应该在生成时尽快处理。通过监视流式Web UI中的处理时间可以找到是否适用于应用程序 ,其中批处理时间应小于批处理间隔。

根据流式计算的性质,所使用的批处理间隔可能对应用程序在固定的一组集群资源上可以维持的数据速率产生重大影响。例如,让我们考虑一下早期的WordCountNetwork示例。对于特定数据速率,系统可能能够每2秒(即,2秒的批处理间隔)跟上报告字数,但不是每500毫秒。因此需要设置批处理间隔,以便可以维持生产中的预期数据速率。

确定适合您的应用程序批量大小的好方法是使用保守的批处理间隔(例如,5-10秒)和低数据速率进行测试。要验证系统是否能够跟上数据速率,您可以检查每个已处理批处理所遇到的端到端延迟的值(在Spark驱动程序log4j日志中查找“总延迟”,或使用 StreamingListener 接口)。如果延迟保持与批量大小相当,则系统稳定。否则,如果延迟不断增加,则意味着系统无法跟上,因此不稳定。一旦了解了稳定的配置,就可以尝试提高数据速率和/或减小批量。注意,只要延迟减小到低值(即,小于批量大小),由于临时数据速率增加引起的延迟的瞬时增加可能是正常的。

 

4.3.内存调整

“调优指南”中详细讨论了调整 Spark应用程序的内存使用情况和GC行为。强烈建议您阅读。在本节中,我们将特别在Spark Streaming应用程序的上下文中讨论一些调优参数。

Spark Streaming应用程序所需的集群内存量在很大程度上取决于所使用的转换类型。例如,如果要在最后10分钟的数据上使用窗口操作,那么您的群集应该有足够的内存来在内存中保存10分钟的数据。或者,如果您想使用updateStateByKey大量的键,那么必要的内存将很高。相反,如果你想做一个简单的map-filter-store操作,那么必要的内存就会很低。

通常,由于通过接收器接收的数据与StorageLevel.MEMORY_AND_DISK_SER_2一起存储,因此不适合内存的数据将溢出到磁盘。这可能会降低流应用程序的性能,因此建议您根据流应用程序的需要提供足够的内存。最好尝试小规模地查看内存使用情况并进行相应估算。

内存调整的另一个方面是垃圾收集。对于需要低延迟的流应用程序,不希望由JVM垃圾收集引起大的暂停。

有一些参数可以帮助您调整内存使用和GC开销:

  • DStream的持久性级别:如前面数据序列化部分所述,输入数据和RDD默认持久化为序列化字节。与反序列化持久性相比,这减少了内存使用和GC开销。启用Kryo序列化可进一步减少序列化大小和内存使用量。通过压缩(参见Spark配置spark.rdd.compress)可以实现内存使用的进一步减少,但代价是CPU时间。

  • 清除旧数据:默认情况下,DStream转换生成的所有输入数据和持久RDD都会自动清除。Spark Streaming根据使用的转换决定何时清除数据。例如,如果您使用10分钟的窗口操作,那么Spark Streaming将保留最后10分钟的数据,并主动丢弃旧数据。通过设置,可以将数据保留更长的时间(例如,交互式查询旧数据)streamingContext.remember

  • CMS垃圾收集器:强烈建议使用并发标记和清除GC,以保持GC相关的暂停始终较低。尽管已知并发GC会降低系统的整体处理吞吐量,但仍建议使用它来实现更一致的批处理时间。确保在驱动程序(使用--driver-java-options输入spark-submit)和执行程序(使用Spark配置spark.executor.extraJavaOptions)上设置CMS GC 。

  • 其他提示:为了进一步降低GC开销,这里有一些尝试的提示。

    • 使用OFF_HEAP存储级别保留RDD 。请参阅Spark编程指南中的更多详细信息。
    • 使用具有较小堆大小的更多执行程序。这将降低每个JVM堆中的GC压力。

要记住的要点:

  • DStream与单个接收器相关联。为了获得读取并行性,需要创建多个接收器,即多个DStream。接收器在执行器内运行。它占据一个核心。确保在预订接收器插槽后有足够的内核进行处理,即spark.cores.max应考虑接收器插槽。接收器以循环方式分配给执行器。

  • 当从流源接收数据时,接收器创建数据块。每隔blockInterval毫秒生成一个新的数据块。在batchInterval期间创建N个数据块,其中N = batchInterval / blockInterval。这些块由当前执行程序的BlockManager分发给其他执行程序的块管理器。之后,将在驱动程序上运行的网络输入跟踪器通知块位置以进行进一步处理。

  • 在驱动程序上为batchInterval期间创建的块创建RDD。batchInterval期间生成的块是RDD的分区。每个分区都是spark中的任务。blockInterval == batchinterval意味着创建了一个分区,并且可能在本地处理它。

  • 块中的映射任务在执行器中处理(一个接收块,另一个块复制块),具有块而不管块间隔,除非非本地调度启动。具有更大的blockinterval意味着更大的块。较高的值会spark.locality.wait增加在本地节点上处理块的机会。需要在这两个参数之间找到平衡,以确保在本地处理更大的块。

  • 您可以通过调用来定义分区数,而不是依赖于batchInterval和blockInterval inputDstream.repartition(n)。这会随机重新调整RDD中的数据以创建n个分区。是的,为了更大的并行性。虽然以洗牌为代价。RDD的处理由驾驶员的jobcheduler作为工作安排。在给定的时间点,只有一个作业处于活动状态。因此,如果一个作业正在执行,则其他作业将排队。

  • 如果您有两个dstream,将形成两个RDD,并且将创建两个将一个接一个地安排的作业。为了避免这种情况,你可以结合两个dstreams。这将确保为dstream的两个RDD形成单个unionRDD。然后,此unionRDD被视为单个作业。但是,RDD的分区不受影响。

  • 如果批处理时间超过批处理间隔,那么显然接收者的内存将开始填满并最终导致抛出异常(最可能是BlockNotFoundException)。目前,没有办法暂停接收器。使用SparkConf配置spark.streaming.receiver.maxRate,可以限制接收器的速率。

 

5.容错语义

5.1.背景.

要理解Spark Streaming提供的语义,让我们记住Spark的RDD的基本容错语义。

  1. RDD是一个不可变的,确定性可重新计算的分布式数据集。每个RDD都会记住在容错输入数据集上用于创建它的确定性操作的沿袭。
  2. 如果由于工作节点故障导致RDD的任何分区丢失,则可以使用操作系列从原始容错数据集重新计算该分区。
  3. 假设所有RDD转换都是确定性的,那么无论Spark集群中的故障如何,最终转换后的RDD中的数据总是相同的。

Spark对容错文件系统(如HDFS或S3)中的数据进行操作。因此,从容错数据生成的所有RDD也是容错的。但是,Spark Streaming不是这种情况,因为大多数情况下的数据是通过网络接收的(除非 fileStream使用时)。要为所有生成的RDD实现相同的容错属性,接收的数据将在群集中的工作节点中的多个Spark执行程序之间进行复制(默认复制因子为2)。这导致系统中需要在发生故障时恢复的两种数据:

  1. 接收和复制的数据 - 此数据在单个工作节点发生故障时仍然存在,因为其副本存在于其他节点之一上。
  2. 接收到的数据但缓冲用于复制 - 由于未复制,因此恢复此数据的唯一方法是从源中再次获取数据。

此外,我们应该关注两种失败:

  1. 工作节点失败 - 运行执行程序的任何工作节点都可能失败,并且这些节点上的所有内存数据都将丢失。如果任何接收器在故障节点上运行,则它们的缓冲数据将丢失。
  2. 驱动程序节点失败 - 如果运行Spark Streaming应用程序的驱动程序节点出现故障,那么SparkContext显然会丢失,并且所有带有内存数据的执行程序都将丢失。

 

5.2.定义

流系统的语义通常根据系统处理每条记录的次数来捕获。系统可以在所有可能的操作条件下提供三种类型的保证(尽管出现故障等)

  1. 最多一次:每条记录将被处理一次或根本不处理。
  2. 至少一次:每条记录将被处理一次或多次。这比最多一次因为它确保不会丢失任何数据。但可能有重复。
  3. 恰好一次:每条记录只处理一次 - 不会丢失数据,也不会多次处理数据。这显然是三者的最强保证。

 

5.3.基本语义

在任何流处理系统中,从广义上讲,处理数据有三个步骤。

  1. 接收数据:使用接收器或其他方式从数据源接收数据。

  2. 转换数据:使用DStream和RDD转换转换接收的数据。

  3. 推出数据:最终转换的数据被推送到外部系统,如文件系统,数据库,仪表板等。

如果流应用程序必须实现端到端的一次性保证,那么每个步骤都必须提供一次性保证。也就是说,每个记录必须只接收一次,转换一次,然后推送到下游系统一次。让我们在Spark Streaming的上下文中理解这些步骤的语义。

  1. 接收数据:不同的输入源提供不同的保证。

  2. 转换数据:由于RDD提供的保证,所有已接收的数据将只处理一次。即使存在故障,只要接收到的输入数据可访问,最终转换的RDD将始终具有相同的内容。

  3. 推出数据:默认情况下输出操作至少确保一次语义,因为它取决于输出操作的类型(幂等或不是)和下游系统的语义(支持是否支持事务)。但是用户可以实现自己的事务机制来实现一次性语义。

 

5.4.接收数据的语义

不同的输入源提供不同的保证,范围从至少一次恰好一次

5.4.1.使用文件

如果所有输入数据都已存在于HDFS等容错文件系统中,则Spark Streaming可以始终从任何故障中恢复并处理所有数据。这给出 了一次性语义,这意味着无论失败什么,所有数据都将被处理一次

 

5.4.2.使用基于Receiver的源

对于基于接收器的输入源,容错语义取决于故障情形和接收器类型。正如我们前面讨论过的,有两种类型的接收器:

  1. 可靠的接收器 - 这些接收器仅在确保已复制接收的数据后才确认可靠的源。如果此类接收器发生故障,则源将不会收到对缓冲(未复制)数据的确认。因此,如果重新启动接收器,源将重新发送数据,并且不会因失败而丢失数据。
  2. 不可靠的接收器 - 此类接收器发送确认,因此在由于工作人员或驱动程序故障而失败时可能会丢失数据。

根据使用的接收器类型,我们实现以下语义。如果工作节点发生故障,则可靠接收器不会丢失数据。对于不可靠的接收器,接收但未复制的数据可能会丢失。如果驱动程序节点出现故障,那么除了这些丢失之外,在内存中接收和复制的所有过去数据都将丢失。这将影响有状态转换的结果。

为了避免丢失过去收到的数据,Spark 1.2引入了预写日志,将接收到的数据保存到容错存储中。通过启用预写日志和可靠的接收器,数据丢失为零。在语义方面,它提供至少一次保证。

下表总结了失败时的语义:

部署方案(Deployment Scenario) 工作者失败(Worker Failure) 驱动失败(Driver Failure)
Spark 1.1或更早版本,
Spark 1.2或更高版本,没有预写日志
使用不可靠的接收器丢失缓冲数据使用可靠的接收器实现
零数据丢失
至少一次语义
使用不可靠的接收器丢失缓冲数据
过去的数据在所有接收器中丢失
未定义的语义
Spark 1.2或更高版本,具有预写日志 使用可靠的接收器实现零数据丢失
至少一次语义
使用可靠的接收器和文件实现零数据丢失
至少一次语义

 

5.4.2.使用Kafka Direct API

在Spark 1.3中,我们引入了一个新的Kafka Direct API,它可以确保Spark Streaming只接收一次所有Kafka数据。除此之外,如果您实现一次性输出操作,您可以实现端到端的一次性保证。“ 卡夫卡集成指南”中进一步讨论了这种方法。

 

5.5.输出操作的语义

输出操作(例如foreachRDD至少具有一次语义,即,在工作失败的情况下,转换的数据可能被多次写入外部实体。虽然这对于使用saveAs***Files操作保存到文件系统是可以接受的 (因为文件将被简单地用相同的数据覆盖),但是可能需要额外的努力来实现精确一次的语义。有两种方法。

  • 幂等更新:多次尝试始终写入相同的数据。例如,saveAs***Files始终将相同的数据写入生成的文件。

  • 事务性更新:所有更新都是以事务方式进行的,以便以原子方式完成更新。一种方法是做到这一点。

    • 使用批处理时间(可用foreachRDD)和RDD的分区索引来创建标识符。该标识符唯一地标识流应用程序中的blob数据。
    • 使用标识符以事务方式(即,一次,原子地)使用此blob更新外部系统。也就是说,如果标识符尚未提交,则以原子方式提交分区数据和标识符。否则,如果已经提交,请跳过更新。

dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}

 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u012549626/article/details/98488391

智能推荐

攻防世界_难度8_happy_puzzle_攻防世界困难模式攻略图文-程序员宅基地

文章浏览阅读645次。这个肯定是末尾的IDAT了,因为IDAT必须要满了才会开始一下个IDAT,这个明显就是末尾的IDAT了。,对应下面的create_head()代码。,对应下面的create_tail()代码。不要考虑爆破,我已经试了一下,太多情况了。题目来源:UNCTF。_攻防世界困难模式攻略图文

达梦数据库的导出(备份)、导入_达梦数据库导入导出-程序员宅基地

文章浏览阅读2.9k次,点赞3次,收藏10次。偶尔会用到,记录、分享。1. 数据库导出1.1 切换到dmdba用户su - dmdba1.2 进入达梦数据库安装路径的bin目录,执行导库操作  导出语句:./dexp cwy_init/[email protected]:5236 file=cwy_init.dmp log=cwy_init_exp.log 注释:   cwy_init/init_123..._达梦数据库导入导出

js引入kindeditor富文本编辑器的使用_kindeditor.js-程序员宅基地

文章浏览阅读1.9k次。1. 在官网上下载KindEditor文件,可以删掉不需要要到的jsp,asp,asp.net和php文件夹。接着把文件夹放到项目文件目录下。2. 修改html文件,在页面引入js文件:<script type="text/javascript" src="./kindeditor/kindeditor-all.js"></script><script type="text/javascript" src="./kindeditor/lang/zh-CN.js"_kindeditor.js

STM32学习过程记录11——基于STM32G431CBU6硬件SPI+DMA的高效WS2812B控制方法-程序员宅基地

文章浏览阅读2.3k次,点赞6次,收藏14次。SPI的详情简介不必赘述。假设我们通过SPI发送0xAA,我们的数据线就会变为10101010,通过修改不同的内容,即可修改SPI中0和1的持续时间。比如0xF0即为前半周期为高电平,后半周期为低电平的状态。在SPI的通信模式中,CPHA配置会影响该实验,下图展示了不同采样位置的SPI时序图[1]。CPOL = 0,CPHA = 1:CLK空闲状态 = 低电平,数据在下降沿采样,并在上升沿移出CPOL = 0,CPHA = 0:CLK空闲状态 = 低电平,数据在上升沿采样,并在下降沿移出。_stm32g431cbu6

计算机网络-数据链路层_接收方收到链路层数据后,使用crc检验后,余数为0,说明链路层的传输时可靠传输-程序员宅基地

文章浏览阅读1.2k次,点赞2次,收藏8次。数据链路层习题自测问题1.数据链路(即逻辑链路)与链路(即物理链路)有何区别?“电路接通了”与”数据链路接通了”的区别何在?2.数据链路层中的链路控制包括哪些功能?试讨论数据链路层做成可靠的链路层有哪些优点和缺点。3.网络适配器的作用是什么?网络适配器工作在哪一层?4.数据链路层的三个基本问题(帧定界、透明传输和差错检测)为什么都必须加以解决?5.如果在数据链路层不进行帧定界,会发生什么问题?6.PPP协议的主要特点是什么?为什么PPP不使用帧的编号?PPP适用于什么情况?为什么PPP协议不_接收方收到链路层数据后,使用crc检验后,余数为0,说明链路层的传输时可靠传输

软件测试工程师移民加拿大_无证移民,未受过软件工程师的教育(第1部分)-程序员宅基地

文章浏览阅读587次。软件测试工程师移民加拿大 无证移民,未受过软件工程师的教育(第1部分) (Undocumented Immigrant With No Education to Software Engineer(Part 1))Before I start, I want you to please bear with me on the way I write, I have very little gen...

随便推点

Thinkpad X250 secure boot failed 启动失败问题解决_安装完系统提示secureboot failure-程序员宅基地

文章浏览阅读304次。Thinkpad X250笔记本电脑,装的是FreeBSD,进入BIOS修改虚拟化配置(其后可能是误设置了安全开机),保存退出后系统无法启动,显示:secure boot failed ,把自己惊出一身冷汗,因为这台笔记本刚好还没开始做备份.....根据错误提示,到bios里面去找相关配置,在Security里面找到了Secure Boot选项,发现果然被设置为Enabled,将其修改为Disabled ,再开机,终于正常启动了。_安装完系统提示secureboot failure

C++如何做字符串分割(5种方法)_c++ 字符串分割-程序员宅基地

文章浏览阅读10w+次,点赞93次,收藏352次。1、用strtok函数进行字符串分割原型: char *strtok(char *str, const char *delim);功能:分解字符串为一组字符串。参数说明:str为要分解的字符串,delim为分隔符字符串。返回值:从str开头开始的一个个被分割的串。当没有被分割的串时则返回NULL。其它:strtok函数线程不安全,可以使用strtok_r替代。示例://借助strtok实现split#include <string.h>#include <stdio.h&_c++ 字符串分割

2013第四届蓝桥杯 C/C++本科A组 真题答案解析_2013年第四届c a组蓝桥杯省赛真题解答-程序员宅基地

文章浏览阅读2.3k次。1 .高斯日记 大数学家高斯有个好习惯:无论如何都要记日记。他的日记有个与众不同的地方,他从不注明年月日,而是用一个整数代替,比如:4210后来人们知道,那个整数就是日期,它表示那一天是高斯出生后的第几天。这或许也是个好习惯,它时时刻刻提醒着主人:日子又过去一天,还有多少时光可以用于浪费呢?高斯出生于:1777年4月30日。在高斯发现的一个重要定理的日记_2013年第四届c a组蓝桥杯省赛真题解答

基于供需算法优化的核极限学习机(KELM)分类算法-程序员宅基地

文章浏览阅读851次,点赞17次,收藏22次。摘要:本文利用供需算法对核极限学习机(KELM)进行优化,并用于分类。

metasploitable2渗透测试_metasploitable2怎么进入-程序员宅基地

文章浏览阅读1.1k次。一、系统弱密码登录1、在kali上执行命令行telnet 192.168.26.1292、Login和password都输入msfadmin3、登录成功,进入系统4、测试如下:二、MySQL弱密码登录:1、在kali上执行mysql –h 192.168.26.129 –u root2、登录成功,进入MySQL系统3、测试效果:三、PostgreSQL弱密码登录1、在Kali上执行psql -h 192.168.26.129 –U post..._metasploitable2怎么进入

Python学习之路:从入门到精通的指南_python人工智能开发从入门到精通pdf-程序员宅基地

文章浏览阅读257次。本文将为初学者提供Python学习的详细指南,从Python的历史、基础语法和数据类型到面向对象编程、模块和库的使用。通过本文,您将能够掌握Python编程的核心概念,为今后的编程学习和实践打下坚实基础。_python人工智能开发从入门到精通pdf