Java-Spark系列10-Spark性能调优概述_spark.sql.files.minpartitionnum-程序员宅基地

技术标签: Spark优化  Spark数据倾斜  Spark参数调整  大数据和数据仓库  # Spark  

一.Spark 性能优化概述

首先笔者能力优先,使用Spark有一段时间,如下是笔者的工作经验的总结。

Spark任务运行图:
image.png

Spark的优化思路:
一般是从3个层面进行Spark程序的优化:

  1. 运行环境优化
  2. RDD算子优化
  3. 参数微调

二.运行环境优化

2.1 数据本地性

我们知道HDFS的数据文件存储在不同的datanode,一般数据副本数量是3,因为Spark计算的数据量比较大,如果数据不在本节点,需要通过网络去其它的datanode读取数据。

所以此时我们可以通过提高数据本地性,减少网络传输,来达到性能优化的目的。

  1. 计算和存储同节点(executor和HDFS的datanode、hbase的region server同节点)
  2. executor数目合适: 如果100个数据界定,3个计算节点,就有97份网络传递,所以此种情况可以适当增加计算节点。
  3. 适当增加数据副本数量

2.2 数据存储格式

推荐使用列式存储格式: parquet.
parquet存在如下优先:

  1. 相同数据类型的数据有很高压缩比
  2. Hive主要支持ORC、也支持parquet

三.RDD算子优化

3.1 尽可能复用同一个RDD

每创建一个RDD都会带来性能的开销,尽可能的对同一个RDD做算子操作,而不要频繁创建新的
RDD。

3.2 对多次使用的RDD进行持久化

如果RDD的算子特别多,需要频繁多次操作同一个RDD,最好的办法是将该RDD进行持久化,

四.参数微调

  1. num-executors
    参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。

  2. executor-cores
    参数说明:该参数用于设置每个Executor进程的CPU core数量。

  3. driver-memory
    参数说明:该参数用于设置Driver进程的内存。

  4. spark.default.parallelism
    参数说明:该参数用于设置每个stage的默认task数量。

  5. spark.storage.memoryFraction
    参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。

  6. spark.shuffle.memoryFraction
    参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。

资源参数参考示例:

./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

五.数据倾斜

绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。

数据倾斜图例:
image.png

解决数据倾斜一般有如下几种常用方法:

  1. 使用Hive ETL预处理数据
    先使用Hive进行预处理数据,也就是使用Hive先计算一层中间数据,Spark从中间层数据开始计算。

  2. 过滤少数导致倾斜的key
    如果发生导致倾斜的key非常少,可以将Spark任务拆分为包含 导致倾斜的key的任务和不包含key的任务。

  3. sample采样倾斜key单独进行join
    通过采样,提前预估会发生数据倾斜的key,然后将一个join拆分为两个join,其中一个不包含该key,一个只包含该key,最后将结果集进行union。

  4. 调整并行度
    调整Shuffle并行度,数据打散

  5. 广播小数据集
    适用于一个大表,一个小表
    不用join连接操作,而改用Broadcast变量与map模拟join操作,完全规避shuffle操作
    spark.sql: spark.sql.autoBroadcastJoinThreshold=104857600

  6. 增加随机前缀
    对发生倾斜的RDD增加随机前缀
    对另外一个RDD等量扩容
    如果少量的key发生倾斜,可以先过滤出一个单独的RDD,对另外一个RDD同理吹,join之后再合并

六. Spark常用的调优参数

6.1 在内存中缓存数据

Spark SQL可以通过调用Spark.catalog.cachetable (“tableName”)或DataFrame.cache()来使用内存中的columnar格式缓存表。然后Spark SQL将只扫描所需的列,并自动调优压缩以最小化内存使用和GC压力。你可以调用spark.catalog.uncacheTable(“tableName”)从内存中删除表。

内存缓存的配置可以在SparkSession上使用setConf方法或者使用SQL运行SET key=value命令来完成。

参数名 默认值 参数说明 启始版本
spark.sql.inMemoryColumnarStorage.compressed true 当设置为true时,Spark SQL会根据数据统计自动为每列选择压缩编解码器。 1.0.1
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制柱状缓存的批大小。更大的批处理大小可以提高内存利用率和压缩,但在缓存数据时可能会带来OOMs风险。 1.1.1

6.2 其它配置项

还可以使用以下选项调优查询执行的性能。随着更多的优化被自动执行,这些选项可能会在未来的版本中被弃用。

参数名 默认值 参数说明 启始版本
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 读取文件时装入单个分区的最大字节数。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。 2.0.0
spark.sql.files.openCostInBytes 4194304 (4 MB) 打开一个文件的估计成本,由可以在同一时间扫描的字节数来衡量。当将多个文件放入一个分区时使用。最好是高估,那么带有小文件的分区将比带有大文件的分区更快(这是首先安排的)。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。 2.0.0
spark.sql.files.minPartitionNum Default Parallelism 建议的(不是保证的)最小分割文件分区数。如果没有设置,默认值是’ spark.default.parallelism '。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。 3.1.0
spark.sql.broadcastTimeout 300 broadcast join 等待时间的超时(秒) 1.3.0
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置在执行联接时将广播到所有工作节点的表的最大字节大小。通过将此值设置为-1,可以禁用广播。注意:目前统计只支持运行ANALYZE TABLE COMPUTE statistics noscan命令的Hive Metastore表。 1.1.0
spark.sql.shuffle.partitions 200 配置将数据变换为连接或聚合时要使用的分区数量。 1.1.0
spark.sql.sources.parallelPartitionDiscovery.threshold 32 配置阈值以启用作业输入路径的并行列出。如果输入路径数大于该阈值,Spark将通过Spark分布式作业列出文件。否则,它将退回到顺序列表。此配置仅在使用基于文件的数据源(如Parquet、ORC和JSON)时有效。 1.5.0
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 配置作业输入路径的最大列出并行度。如果输入路径的数量大于这个值,它将被降低到使用这个值。与上面一样,此配置仅在使用基于文件的数据源(如Parquet、ORC和JSON)时有效。 2.1.1

6.3 SQL查询连接的hint

join策略提示BROADCAST、MERGE、SHUFFLE_HASH和SHUFFLE_REPLICATE_NL,在将指定的关系加入到另一个关系时,指示Spark对每个指定的关系使用暗示策略。例如,在表’ t1 '上使用BROADCAST提示时,广播加入(广播散列连接或广播嵌套循环联接取决于是否有等值连接键)与t1的构建方面将由火花即使大小的优先表t1的建议的统计配置spark.sql.autoBroadcastJoinThreshold之上。

当连接两端指定了不同的连接策略提示时,Spark会优先考虑BROADCAST提示而不是MERGE提示,优先考虑SHUFFLE_HASH提示而不是SHUFFLE_REPLICATE_NL提示。当双方都指定了BROADCAST提示或SHUFFLE_HASH提示时,Spark将根据连接类型和关系的大小选择构建端。

请注意,不能保证Spark会选择提示中指定的连接策略,因为特定的策略可能不支持所有的连接类型。

-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

Coalesce hint允许Spark SQL用户控制输出文件的数量,就像Dataset API中的Coalesce、repartition和repartitionByRange一样,它们可以用于性能调优和减少输出文件的数量。COALESCE hint只有一个分区号作为参数。“REPARTITION”提示有一个分区号、列或它们都作为参数。“REPARTITION_BY_RANGE”提示必须有列名,分区号是可选的。

SELECT /*+ COALESCE(3) */ * FROM t
SELECT /*+ REPARTITION(3) */ * FROM t
SELECT /*+ REPARTITION(c) */ * FROM t
SELECT /*+ REPARTITION(3, c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t

6.4 自适应查询执行

Adaptive Query Execution (AQE)是Spark SQL中的一种优化技术,它利用运行时统计信息来选择最高效的查询执行计划。默认情况下AQE是禁用的。Spark SQL可以使用Spark.SQL.adaptive.enabled的伞配置来控制是否打开/关闭。从Spark 3.0开始,AQE中有三个主要特性,包括合并shuffle后分区、将排序合并连接转换为广播连接以及倾斜连接优化。

6.5 合并分区后重新组合

当spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled配置都为true时,该特性根据map输出统计信息来合并post shuffle分区。这个特性简化了运行查询时shuffle分区号的调优。您不需要设置合适的shuffle分区号来适合您的数据集。一旦您通过Spark .sql. adaptive.coalescepartitions . initialpartitionnum配置设置了足够大的初始shuffle分区数,Spark就可以在运行时选择适当的shuffle分区号。

参数名 默认值 参数说明 启始版本
spark.sql.adaptive.coalescePartitions.enabled true 当true和Spark .sql. adaptive_enabled为true时,Spark会根据目标大小(由Spark .sql. adaptive_advisorypartitionsizeinbytes指定)合并连续的shuffle分区,以避免过多的小任务。 3.0.0
spark.sql.adaptive.coalescePartitions.minPartitionNum Default Parallelism 合并后的最小洗牌分区数。如果不设置,则默认为Spark集群的默认并行度。此配置仅在spark.sql. adaptive.net enabled和spark.sql. adaptive.net coalescepartitions .enabled同时启用时有效。 3.0.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum 200 合并前的初始shuffle分区数。默认情况下它等于spark.sql.shuffle.partitions。此配置仅在spark.sql. adaptive.net enabled和spark.sql. adaptive.net coalescepartitions .enabled同时启用时有效。 3.0.0
spark.sql.adaptive.advisoryPartitionSizeInBytes 64 MB 自适应优化期间shuffle分区的建议大小(当spark.sql. adaptive_enabled为true时)。当Spark对小shuffle分区或斜shuffle分区进行合并时生效。 3.0.0

6.6 将排序合并联接转换为广播联接

当任何连接侧的运行时统计数据小于广播散列连接阈值时,AQE将排序合并连接转换为广播散列连接。这不是一样有效规划一个广播散列连接首先,但这总比继续做分类合并加入,我们可以节省连接双方的排序,并在本地读取洗牌文件节省网络流量(如果spark.sql.adaptive.localShuffleReader.enabled被设置为true)

6.7 优化倾斜连接

数据倾斜会严重降低连接查询的性能。该特性通过将倾斜任务拆分(如果需要的话还可以复制)为大小大致相同的任务,动态处理排序-合并连接中的倾斜任务。当spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled配置同时启用时生效。

参数名 默认值 参数说明 启始版本
spark.sql.adaptive.skewJoin.enabled true 当true和Spark .sql.adaptive.enabled为true时,Spark通过拆分(并在需要时复制)倾斜分区来动态处理排序-合并连接中的倾斜。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 10 如果一个分区的大小大于这个因子乘以中值分区大小,并且大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,则认为该分区是倾斜的。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 如果分区的字节大小大于这个阈值,并且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor乘以分区中值大小,则认为该分区是倾斜的。理想情况下,该配置应该设置为大于spark.sql.adaptive.advisoryPartitionSizeInBytes。 3.0.0

参考:

  1. http://spark.apache.org/docs/latest/rdd-programming-guide.html
  2. https://tech.meituan.com/2016/04/29/spark-tuning-basic.html
  3. https://blog.csdn.net/meihao5/article/details/81084876
  4. http://spark.apache.org/docs/latest/sql-performance-tuning.html
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u010520724/article/details/120708086

智能推荐

关于ribboon负载均衡失败的低级错误java.net.UnknownHostException: user-service_java.net.unknownhostexception: userservice-程序员宅基地

文章浏览阅读3.7k次。关于ribboon负载均衡失败的低级错误java.net.UnknownHostException: user-service今天在做springCloud练习的时候,在实现负载均衡均衡时,犯了一个低级错误,下面展示下部分代码注册中心Eureka启动类@SpringBootApplication@EnableEurekaServer//声明是一个注册服务中心public class E..._java.net.unknownhostexception: userservice

IOS从客户端实现下载断点续传,可暂停可继续下载_苹果商店app断点续传-程序员宅基地

文章浏览阅读912次。网上找了很多例子,还是这个比较靠谱,直接拿来就能用,已测试过,断点续传,可暂停可继续下载,把里面的链接随便换一下就可以。可以多个同步下载,但是考虑到土豆优酷都没有采用多个同步下载的方式,我建议还是不要使用。ASIHttpRequest:创建队列、下载请求、断点续传、解压缩http://ryan.easymorse.com/?p=12_苹果商店app断点续传

TCP连接及可靠性、提高性能详解_tcp连接成功率-程序员宅基地

文章浏览阅读109次。TCP连接及可靠性、提高性能详解_tcp连接成功率

六、基于FFmpeg和SDL的视频播放器(快进和暂停)_ffmpeg + sdl 多线程解码播放怎么实现快进-程序员宅基地

文章浏览阅读1.1k次。快进功能的实现主要通过使用av_seek_frame函数来实现快进的功能一、函数原型int av_seek_frame(AVFormatContext *s, int stream_index, int64_t timestamp, int flags);二、参数解释AVFormatContext *s 解码的格式上下文in..._ffmpeg + sdl 多线程解码播放怎么实现快进

npm install 错误问题 gyp ERR! configure error gyp ERR! stack Error: Command failed: C:\Users\Zhan_npm err! gyp err! configure error npm err! gyp err-程序员宅基地

文章浏览阅读2.1w次,点赞3次,收藏8次。刚开始使用vscode打开一个已写好的vue。1.根据网上教程,下载并配置好nodejs2.利用vscode打开已有vue文件3.使用命令行,输入npm install4.然后就报出如下错误localhost:react-first changwei$ npm install --save react-routernpm WARN deprecated [email protected]:..._npm err! gyp err! configure error npm err! gyp err! stack error: command fai

ContentProvider 与Content Resolver_怎么知道contentresolver update数据库失败-程序员宅基地

文章浏览阅读512次。ContentProvider 与Content Resolver_怎么知道contentresolver update数据库失败

随便推点

python常用代码-python常用代码-程序员宅基地

文章浏览阅读109次。常用代码片段及技巧自动选择GPU和CPUdevice = torch.device('cuda' if torch.cuda.is_available() else 'cpu')# model and tensor to devicevgg = models.vgg16().to(device)切换当前目录import ostry:os.chdir(os.path.join(os.getcwd()..._python 读wts

STM32的USB通信资料_ep1_out_callback-程序员宅基地

文章浏览阅读2.1k次。作者:cy757转自:http://blog.csdn.net/cy757/article/details/5089309以下资料由网上收集usb的传输字节问题(来自computer00) STM32的USB端点缓冲不是固定的,由寄存器设置。而我的程序中,是如下定义的,端点大小为0x40,端点2做为输出端点,0xD8+0x40=0x118,也就是说,端点1缓冲前_ep1_out_callback

HTTP 错误 500.21 - Internal Server Error 处理程序“PageHandlerFactory-ISAPI-4.0_32bit”在其模块_http 错误 500.21 - internal server error 处理程序“eshand-程序员宅基地

文章浏览阅读1.9k次。copy:http://www.cnblogs.com/ahao214/archive/2013/08/02/3233494.html问题: 系统是win7。今天把我做过的项目发布后,在IIS上运行时一直出现一个错误,HTTP 错误500.21-Internal Server Error.处理程序“PageHandlerFactory-ISAPI--4.0_32BIT”在其模_http 错误 500.21 - internal server error 处理程序“eshandlerfactory”在其模

NEMU PA2实验思路-程序员宅基地

文章浏览阅读1.4w次,点赞21次,收藏144次。PA2实验思路版权归zzy所有,不许外传!本文主要是提供PA2思路,为了避免踩了一堆坑而浪费时间。若想copy代码请移步他处,本文仅供学习交流用,谢谢!阅读前请确保仔细阅读了PA2实验指导书的有关内容!TIPQ:为什么HIT BAD TRAP了?A:这是我的一些总结,当然因人而异了。(1)未仔细阅读i386手册以及勘误手册,导致某个jcc命令的判断条件的&&与||写错;(2)call指令和ret指令跳转地址时出现错误,导致$eip无法跳转到正确的地址;(3)未能仔细理_nemu pa2

CC2530入门篇————实现四盏灯全亮_cc2530控制四盏灯的亮灭-程序员宅基地

文章浏览阅读3.8k次。#include<iocc2530.h>//四个引脚分别对应板子上四个小灯#define LED1 P1_0#define LED2 P1_5#define LED3 P1_3#define LED4 P1_4void Init_LED(void){P1SEL&=~0x39;//功能寄存器 :0为普通IO口,1为第二功能–外设P1DIR|=0x39;//方向寄存器 :0为输入 , 1为输出P1&=~0x39;;//将四个灯熄灭}main(){Init__cc2530控制四盏灯的亮灭

android 中右上角的数字BadgeView_android badge 数字-程序员宅基地

文章浏览阅读5.2k次。转载自[https://github.com/jgilfelt/android-viewbadger]最近项目中有用到右上角有数字的效果,上网查了一下记录下来以供以后需要 BadgeView主要是继承了TextView,底层放了一个label,可以自定义背景图,自定义背景颜色,是否显示,显示进入的动画效果以及显示的位置等等;两张效果图 构造方法: View target = findVie_android badge 数字