Mastering-Spark-SQL学习笔记01 SparkSQL-程序员宅基地

技术标签: scala  json  大数据  

SparkSQL可以让开发人员使用关系化查询对大规模结构化数据进行处理。

像Apache Spark一样,Spark SQL特别适合大规模的分布式内存计算。SparkSQL将关系型处理与Spark的函数式编程API进行整合。

SparkSQL和SparkCore的计算模型的主要区别是注入、查询和持久化(半)结构化数据的关系化框架,使用可以由SQL(带有许多HiveQL的功能)和高级的类SQL的函数式定义的DataSet API进行关系型查询(即结构化查询)。

这里的(半)结构化数据是指可以由列名、列类型以及列是否为空组成的Schema所描述的数据集。

当使用SQL或Query DSL时,查询都会成为一个有强制Encoder的DataSet。

 

DataSet是带有transformation和action算子进行结构化查询执行管道编程的接口。结构化查询在内部是一棵(逻辑的和物理的)关系型算子和表达式组成的Catalyst tree。

当一个action算子(如直接的show、count,间接的save、saveAsTable)在DataSet上被执行时,(在DataSet背后的)结构化查询会通过以下执行阶段:

1)Logical Analysis 逻辑分析

2)Caching Replacement 缓存替换

3)Logical Query Optimization,逻辑查询优化使用 rule-based和cost-based优化器

4)Physical Planning 物理计划

5)Physical Optimization 物理优化(如 Whole-Stage Java Code Generation 或 Adaptive Query Execution)

6)Constructing the RDD of Internal Binary Rows(根据Spark Core的RDD API表示结构化查询)

 

从Spark 2.0开始,Spark SQL实际上就是Spark底层内存分布式平台的主要和功能丰富的接口。将Spark Core的RDDs隐藏在更高层次的抽象之后,即使没有您的同意,也可以使用逻辑和物理查询优化策略。

换句话说,Spark SQL的Dataset API描述了一个分布式计算,它最终将被转换为RDDs的DAG(有向无环图)来执行。在幕后,结构化查询被自动编译成相应的RDD操作。

Spark SQL支持批处理和流模式下的结构化查询(后者是Spark SQL的独立模块,称为Spark Structured Streaming)

 

Spark SQL编程模型:// Define the schema using a case class

case class Person(name: String, age: Int)
// you could read people from a CSV file
// It's been a while since you saw RDDs, hasn't it?
// Excuse me for bringing you the old past.
import org.apache.spark.rdd.RDD
val peopleRDD: RDD[Person] = sc.parallelize(Seq(Person("Jacek",10)))
// Convert RDD[Person] to Dataset[Person] and run a query
// Automatic schema inferrence from existing RDDs
scala> val people = peopleRDD.toDS
people: org.apache.spark.sql.Dataset[Person] = [name: string, ag
e: int]
// Query for teenagers using Scala Query DSL
scala> val teenagers = people.where('age >= 10').where('age <= 19').select('name').as[String]
teenagers: org.apache.spark.sql.Dataset[String] = [name: string]
scala> teenagers.show
+-----+
| name|
+-----+
|Jacek|
+-----+// You could however want to use good ol' SQL, couldn't you?
// 1. Register people Dataset as a temporary view in Catalog 要先将DataSet注册为View,才可以使用SQL
people.createOrReplaceTempView("people") // 2. Run SQL query val teenagers = sql("SELECT * FROM people WHERE age >= 10 AND age <= 19") scala> teenagers.show +-----+---+ | name|age| +-----+---+ |Jacek| 10| +-----+---+

Spark SQL提供使用了基于规则的查询优化器Wole-Stage Codegen(全流程代码生成,在运行时动态生成比手写更好的代码)、使用内部二进制行格式的Tungsten执行引擎

自Spark SQL 2.2起,结构化查询可以使用Hint框架进一步优化。

Spark SQL引入了一个名为Dataset(以前是DataFrame)的表格数据抽象。DataSet数据抽象的目的是使在Spark基础设施上处理大量结构化表格数据更加简单和快速。

// 处理JSON文件并将它的子集存为CSV
spark.read
.format("json")
.load("input-json")
.select("name", "score")
.where($"score" > 15)
.write
.format("csv")
.save("output-csv")
// 结构化流式处理代码
import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) ::
  StructField("score", DoubleType, nullable = false) :: Nil)

spark.readStream
  .format("json")
  .schema(schema)
  .load("input-json")
  .select("name", "score")
  .where('score > 15)
  .writeStream
  .format("console")
  .start
// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +-----+-----+
// | name|score|
// +-----+-----+
// |Jacek| 20.5|
// +-----+-----+

在Spark 2.0中,Spark SQL的主要数据抽象是Dataset。它表示具有已知模式的记录的结构化数据。这个结构化数据表示数据集支持使用压缩列格式的紧凑二进制表示,这种格式存储在JVM堆之外的托管对象中。

它应该通过减少内存的使用和GC来加快计算速度

Spark SQL支持谓词下推来优化DataSet查询的性能,也可以在运行时生成经过优化的代码。

 

Spark SQL附带了不同的API:

1. Dataset API(前身是DataFrame API)带有强类型的类LINQ的查询特定领域语言

2. 结构化流API(即流Dataset)用于持续增量地执行结构化查询

3. 非程序员可能会通过与Hive的直接集成来使用SQL作为查询语言。

4. JDBC/ODBC爱好者可以使用JDBC接口(通过Thrift JDBC/ODBC)并将它们的工具连接到Spark的分布式查询引擎。

 

Spark SQL使用特定的DataFrameReader和DataFrameWriter对象提供了统一的接口以访问分布式存储,如Cassandra、HDFS(Hive、Parquet、JSON)。

你可以使用类SQL在HDFS、S3上的大量数据,能访问的数据可以来自不同的数据源(文件或表)。

 

Spark SQL定义了以下类型的函数:
1. 标准函数或用户定义函数(UDF),从单个行获取值作为输入,为每个输入行生成单个返回值。

2. 对一组行进行操作并计算每个组的单个返回值的基本聚合函数

3. 窗口聚合函数,对一组行进行操作,并为一组中的每一行计算单个返回值。

 

有两个支持的目录实现—内存(默认)和hive—您可以使用spark.sql.catalogImplementation来设置。

 

您可以解析来自外部数据源的数据,并让模式推断者推断模式。

// Example 1
val df = Seq(1 -> 2).toDF("i", "j")
scala> df.show()
+---+---+
|  i|  j|
+---+---+
|  1|  2|
+---+---+

val query = df.groupBy("i").agg(max("j").as("aggOrdering")).orderBy(sum("j")).as[(Int, Int)]
scala> query.show()
+---+-----------+                                                               
|  i|aggOrdering|
+---+-----------+
|  1|          2|
+---+-----------+
query.collect contains (1, 2) // true

// Example 2
val df = Seq((1, 1), (-1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
scala> sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp").show
+-------------------+
|(IF((a > 0), a, 0))|
+-------------------+
| 1|
| 0|
+-------------------+

 

Spark SQL支持两种“模式”来编写结构化查询:Dataset API和SQL。RuntimeReplaceable表达式只能使用SQL模式,如nvl、nvl2、ifnull、nullif等SQL函数。

 

Spark SQL应用开发的需要以下步骤:

1. 建立开发环境(IDEA,Scala 和 sbt) 

2. 指定库依赖关系

3. 创建 SparkSession

4. 从外部数据源加载 Dataset

5. 转变Dataset

6. 保存Dataset以持久化存储

转载于:https://www.cnblogs.com/sunspeedzy/p/9431577.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_30678349/article/details/96394865

智能推荐

解决STM32F1 系列复位后RTC日期为2000-01-01_stm32f1 复位后rtc日期不对-程序员宅基地

文章浏览阅读5.2k次,点赞3次,收藏12次。问题:STM32F1中RTC 不像 F4中,是一个单独模块。其就是一个计数器,查看HAL库中时间和日期的设置发现,在日期设置的时候,HAL库并没有将日期换算为计数器的值。库源码如下日期设置:HAL_StatusTypeDef HAL_RTC_SetTime(RTC_HandleTypeDef *hrtc, RTC_TimeTypeDef *sTime, uint32_t Format){ uin..._stm32f1 复位后rtc日期不对

MERGE语法详解-程序员宅基地

文章浏览阅读635次,点赞2次,收藏15次。merge语法是根据源表对目标表进行匹配查询,匹配成功时更新,不成功时插入。其基本语法规则是merge into 目标表 ausing 源表 bon(a.条件字段1=b.条件字段1 and a.条件字段2=b.条件字段2 ……) when matched then update set a.更新字段=b.字段when not macthed then inse..._merge into限制目标表字段

pythonocc基础使用:1.读取/写入brep,iges,step,stl文件_pythonocc-core 读取step-程序员宅基地

文章浏览阅读1.2w次,点赞2次,收藏32次。待写_pythonocc-core 读取step

Java数据类型-程序员宅基地

文章浏览阅读2.4w次,点赞60次,收藏62次。文章目录定义分类计算机存储单元Java基本数据类型图数据类型转换定义Java语言是强类型语言,对于每一种数据都定义了明确的具体的数据类型,在内存中分配了不同大小的内存空间。分类基本数据类型数值型整数类型(byte,short,int,long)浮点类型(float,double)字符型(char)布尔型(boolean)引用数据类型类(class)接口(..._java数据类型

mysql hivedb_hive使用mysql localstore方式连接Access denied for user 'hive'@'localhost' to database...-程序员宅基地

文章浏览阅读252次。mysql> create user 'hive' identified by 'hive'; --创建用户Query OK, 0 rows affected (0.00 sec)mysql> grant all privileges on *.* to 'hive@localhost' with grant option; --localhost不可少,否则启动bin..._access denied for user 'hive'@'localhost

ABAP 去除字符串某个字段(去掉的金额逗号例子)_abap怎么去掉千分符-程序员宅基地

文章浏览阅读5.5k次,点赞5次,收藏14次。ABAP 去除字符串某个字段(去掉的金额逗号例子)实际程序中,我们有很多需求是要把excel的数据导入SAP数据之中去,但是这金额是带有千分位逗号的,比如(2,602,568.71)如果要把这个值赋值给金额字段,那程序会直接报错。解决方法一 :解决数据源头上的问题,直接把数据的格式换成数值。解决方式二:在ABAP代码上进行数据处理和校验(业务有时候就是那么无理取闹)1.先上代码DATA: lv_je TYPE string VALUE '2,602,568.71' .DO 10 TIMES. _abap怎么去掉千分符

随便推点

Python爬虫实战:高清二次元女朋友图抓取_爬虫 女朋友-程序员宅基地

文章浏览阅读488次,点赞6次,收藏7次。主要运用到以下知识点:1、 requests请求三大要素2、功能函数的封装.3、os对文件路径处理4、图集分类保存效果动图展示:_爬虫 女朋友

100道python经典练习题_100道 python练习下载-程序员宅基地

文章浏览阅读6.8w次,点赞140次,收藏596次。链接:https://pan.baidu.com/s/1K0iuZKJukLoGQ8OBy7xq1Q提取码:2s6q链接长期有效,如有疑问,欢迎评论区交流。_100道 python练习下载

大麦路由器dw22d不拆机刷breed和openwrt_大麦dw22d改无线打印服务器-程序员宅基地

文章浏览阅读1.6w次,点赞17次,收藏29次。@[TOC]大麦路由器DW22D不拆机刷Breed和OpenWrt大麦路由器DW22D不拆机刷Breed和OpenWrt1.进入http://192.168.10.1/upgrade.html2.开启ssh,在密码框中输入以下内容,最后面有一个空格123 | echo 6c216b27c8c9b051106c969e2077d4e9 > /ezwrt/bin/upgrade_passwd 3.点确定,然后提示密码错误没关系4. 再次打开 http://192.168.10.1/upgra_大麦dw22d改无线打印服务器

re学习笔记(75)BUUCTF - re - [ACTF新生赛2020]Splendid_MineCraft_buuctfsplendid-mine-程序员宅基地

文章浏览阅读454次。[ACTF新生赛2020]Splendid_MineCraftmain函数strtok是用分隔符划分字符串,根据+5 +9差4个,再加上+9强转为WORD,得到每组字符是4+2=6个flag格式为ACTF{xxxxxx_xxxxxx_xxxxxx}36行跳转到数据段先是对数据段进行异或0x72h解密解密后,F5得到两个数异或+35与第一段输入进行比较第二段也是一个smc自解密第二段主要代码。ebx为用户输入与(0x83+i)异或得到的下标根据eax进行索引查表。得到的bl与第二_buuctfsplendid-mine

《深入理解Java虚拟机 JVM高级特性与最佳实践》读书笔记--JAVA自旋锁与自适应自旋锁_为什么自旋等待的方式避免了线程忙等-程序员宅基地

文章浏览阅读315次。自旋锁释义:请求锁的线程(假设为线程A)再未获得锁的时候,不进入阻塞状态,而是让它「再执行一会」即占用CPU一会,看看持有锁的线程是否很快释放锁资源。但是为了让这个线程A进入「等待」的状态,需要让它执行一个忙循环(自旋),这项技术称为自旋锁。自旋锁的优劣势分析线程A的状态切换是由系统进行的,而这个过程则会消耗系统资源的,如果请求锁的「忙循环」时在一个很小的时间片之内就得到锁,..._为什么自旋等待的方式避免了线程忙等

西门子S7系列PLC与现场设备(仪表)通讯解决方案_西门子sm422-程序员宅基地

文章浏览阅读1.1k次。针对西门子S7系列的PLC,SiboTech智能、多功能、紧凑型通用串口/PROFIBUS-DP网关(PM-160)为建立西门子PLC与现场RS232/485/422设备的连接提供了理想解决方案:PM-160能够解决西门子PLC对现场各种RS232/485/422设备的实时监控,如:具有Modbus协议(或者用户自定义非标协议)接口的变频器、电机启动保护装置、智能高低压电器、电量测量装置、各种变送器、智能现场测量设备及仪表等等;PM-160也能够解决现场DCS系统(Modbus主站设备)和PLC之间的实_西门子sm422

推荐文章

热门文章

相关标签