Delta File Fomat 2:扩展Spark读取Delta文件_wankunde的博客-程序员宝宝

技术标签: spark  

DataSource

DataSource 是Spark用来描述对应的数据文件格式的入口,对应的Delta也是一种数据文件格式,所以了解DataSource实现原理,是了解 Delta 的基础。

Spark 对外暴漏的读写文件的入口:

// 创建一个writer再调用writer.save()方法保存 DF
val writer = df.write.format("delta").mode("append")
writer.save(path)

// 创建一个DataFrameReader,再调用 load() 方法,返回DF
sparkSession.read.format("delta").load(path)

在上面的save方法和load方法中有一部分相同的代码,val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) 这个是根据用户指定的文件格式来找到对应的 DataSource 入口;

  • 如果writer的DataSource 是 DataSourceV2 子类,返回case class WriteToDataSourceV2(writer: DataSourceWriter, query: LogicalPlan)(WriteToDataSourceV2 已经被废弃,已经被AppendData 这样具体的实现类代替);
  • 如果writer的DataSource 是 v1 版本,是根据相关元数据信息,生成一个 DataSource对象,然后来执行数据写入;
  • 如果reader的DataSource 是 DataSourceV2 子类,会根据dataSource类和数据文件路径,还有相关元信息,创建一个DataSourceV2Relation对象,然后通过 Dataset.ofRows() 方法来读取;
  • 如果reader的DataSource 是 v1 版本,创建一个根据相关元数据信息,创建一个BaseRelation 对象,再继续封装为 LogicalRelation 对象,再解析和读取;

writer.save() 方法

def save(): Unit = {
    
    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
    
      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
        "write files of Hive data source directly.")
    }

    assertNotBucketed("save")

    val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
    if (classOf[DataSourceV2].isAssignableFrom(cls)) {
    
      val source = cls.newInstance().asInstanceOf[DataSourceV2]
      source match {
    
        case ws: WriteSupport =>
          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
            source,
            df.sparkSession.sessionState.conf)
          val options = sessionOptions ++ extraOptions

          val writer = ws.createWriter(
            UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
            new DataSourceOptions(options.asJava))

          if (writer.isPresent) {
    
            runCommand(df.sparkSession, "save") {
    
              WriteToDataSourceV2(writer.get, df.logicalPlan)
            }
          }

        // Streaming also uses the data source V2 API. So it may be that the data source implements
        // v2, but has no v2 implementation for batch writes. In that case, we fall back to saving
        // as though it's a V1 source.
        case _ => saveToV1Source()
      }
    } else {
    
      saveToV1Source()
    }
  }

  private def saveToV1Source(): Unit = {
    
    //....

    // Code path for data source v1.
    runCommand(df.sparkSession, "save") {
    
      DataSource(
        sparkSession = df.sparkSession,
        className = source,
        partitionColumns = partitioningColumns.getOrElse(Nil),
        options = extraOptions.toMap).planForWriting(mode, df.logicalPlan)
    }
  }

DataFrameReader.load() 方法

def load(paths: String*): DataFrame = {
    
    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
    
      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
        "read files of Hive data source directly.")
    }

    val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
    if (classOf[DataSourceV2].isAssignableFrom(cls)) {
    
      val ds = cls.newInstance().asInstanceOf[DataSourceV2]
      if (ds.isInstanceOf[ReadSupport]) {
    
        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
          ds = ds, conf = sparkSession.sessionState.conf)
        val pathsOption = {
    
          val objectMapper = new ObjectMapper()
          DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
        }
        Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
          ds, sessionOptions ++ extraOptions.toMap + pathsOption,
          userSpecifiedSchema = userSpecifiedSchema))
      } else {
    
        loadV1Source(paths: _*)
      }
    } else {
    
      loadV1Source(paths: _*)
    }
  }

  private def loadV1Source(paths: String*) = {
    
    // Code path for data source v1.
    sparkSession.baseRelationToDataFrame(
      DataSource.apply(
        sparkSession,
        paths = paths,
        userSpecifiedSchema = userSpecifiedSchema,
        className = source,
        options = extraOptions.toMap).resolveRelation())
  }

java.util.ServiceLoader

在继续讲解之前,先学习Java的一个工具类ServiceLoader
如果我们定义了一个接口,并在我们系统内部实现了一部分子类,满足了系统的需求,可是过了一段时间,我们有了一个新的计算逻辑,或者一个新的外部扩展需要实现,这个时候,我们需要修改我们的原始代码,通过类名来反射实例化出我们需要的实现类的对象,再操作吗?如果项目很大,修改原始代码就变的太重了,而且,我们可能还有很多第三方的实现,扩展功能将在第三方变的封闭;
幸好,Java给我们提供了ServiceLoader 这个工具类,来看一下Java是如果来实现系统功能的自动扩展的把:

  • 创建Java接口或scala trait,这里使用的是 trait DataSourceRegister
  • 在项目资源文件夹下的META-INF/services/文件夹创建以类名为名字的配置文件 $delta/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister.
    在配置文件中保存当前接口的所有实现类 org.apache.spark.sql.delta.sources.DeltaDataSource
  • 代码中load当前接口的所有实现类 ServiceLoader.load(classOf[DataSourceRegister], loader)
  • 返回结果serviceLoader 实现了Iterable接口,可以进行遍历返回结果

扩展Spark 支持的DataSource

Spark在最开始设计的时候,就预备了不仅支持内部自定义的数据文件格式,还提供了扩展 DataSource来支持各种文件格式的入口;还是从上面的加载DataSource类的入口出发:val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)

现在看这个代码,逻辑就非常清晰了:

  • 通过 ServiceLoader 去加载所有的DataSource 子类,并根据类中的 shortName() 方法和外部传入的文件格式进行过滤
    • 如果找到一个,直接返回该类
    • 如果找到多个,需要返回类名以 org.apache.spark 的那一个,如果还是有多个,查找失败
  • 如果用户传入的是一种 FileFormat 子类,则尝试直接去加载该子类或该子类的DefaultSource对应类,加载成功,直接返回
  // DataSource.scala
/** Given a provider name, look up the data source class definition. */
  def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
    
    // 通过传入的参数,找到对应的 FileFormatClass类,传入的参数也可以是orc, 或者类似 org.apache.spark.sql.parquet 这样的简写
    // orc 在native 和 hive 中使用不同的FileFormat来存取
    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
    
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
        classOf[OrcFileFormat].getCanonicalName
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
        "org.apache.spark.sql.hive.orc.OrcFileFormat"
      case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
        "org.apache.spark.sql.avro.AvroFileFormat"
      case name => name
    }
    val provider2 = s"$provider1.DefaultSource"
    // 使用 ServiceLoader 类来加载实现类 DeltaDataSource 类
    val loader = Utils.getContextOrSparkClassLoader
    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)

    try {
    
      serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
    
        // the provider format did not match any given registered aliases
        case Nil =>
          try {
    
            Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
    
              case Success(dataSource) =>
                // Found the data source using fully qualified path
                dataSource
              case Failure(error) =>
                //...
            }
          } catch {
    
            case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
              // NoClassDefFoundError's class name uses "/" rather than "." for packages
              val className = e.getMessage.replaceAll("/", ".")
              if (spark2RemovedClasses.contains(className)) {
    
                throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
                  "Please check if your library is compatible with Spark 2.0", e)
              } else {
    
                throw e
              }
          }
        case head :: Nil =>
          // there is exactly one registered alias
          head.getClass
        case sources =>
          // There are multiple registered aliases for the input. If there is single datasource
          // that has "org.apache.spark" package in the prefix, we use it considering it is an
          // internal datasource within Spark.
          val sourceNames = sources.map(_.getClass.getName)
          val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
          if (internalSources.size == 1) {
    
            logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
              s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
            internalSources.head.getClass
          } else {
    
            throw new AnalysisException(s"Multiple sources found for $provider1 " +
              s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
          }
      }
    } catch {
    
      // ...
    }
  }

backwardCompatibilityMap 中的 DataSource 映射关系

0 = {[email protected]} "(org.apache.spark.sql.hive.orc.DefaultSource,org.apache.spark.sql.hive.orc.OrcFileFormat)"
1 = {[email protected]} "(org.apache.spark.sql.execution.datasources.json,org.apache.spark.sql.execution.datasources.json.JsonFileFormat)"
2 = {[email protected]} "(org.apache.spark.sql.execution.streaming.RateSourceProvider,org.apache.spark.sql.execution.streaming.sources.RateStreamProvider)"
3 = {[email protected]} "(org.apache.spark.sql.execution.datasources.json.DefaultSource,org.apache.spark.sql.execution.datasources.json.JsonFileFormat)"
4 = {[email protected]} "(org.apache.spark.ml.source.libsvm.DefaultSource,org.apache.spark.ml.source.libsvm.LibSVMFileFormat)"
5 = {[email protected]} "(org.apache.spark.ml.source.libsvm,org.apache.spark.ml.source.libsvm.LibSVMFileFormat)"
6 = {[email protected]} "(org.apache.spark.sql.execution.datasources.orc.DefaultSource,org.apache.spark.sql.execution.datasources.orc.OrcFileFormat)"
7 = {[email protected]} "(org.apache.spark.sql.jdbc.DefaultSource,org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider)"
8 = {[email protected]} "(org.apache.spark.sql.json.DefaultSource,org.apache.spark.sql.execution.datasources.json.JsonFileFormat)"
9 = {[email protected]} "(org.apache.spark.sql.json,org.apache.spark.sql.execution.datasources.json.JsonFileFormat)"
10 = {[email protected]} "(org.apache.spark.sql.execution.datasources.jdbc,org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider)"
11 = {[email protected]} "(org.apache.spark.sql.parquet,org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat)"
12 = {[email protected]} "(org.apache.spark.sql.parquet.DefaultSource,org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat)"
13 = {[email protected]} "(org.apache.spark.sql.execution.datasources.parquet.DefaultSource,org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat)"
14 = {[email protected]} "(com.databricks.spark.csv,org.apache.spark.sql.execution.datasources.csv.CSVFileFormat)"
15 = {[email protected]} "(org.apache.spark.sql.hive.orc,org.apache.spark.sql.hive.orc.OrcFileFormat)"
16 = {[email protected]} "(org.apache.spark.sql.execution.datasources.orc,org.apache.spark.sql.execution.datasources.orc.OrcFileFormat)"
17 = {[email protected]} "(org.apache.spark.sql.jdbc,org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider)"
18 = {[email protected]} "(org.apache.spark.sql.execution.datasources.jdbc.DefaultSource,org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider)"
19 = {[email protected]} "(org.apache.spark.sql.execution.streaming.TextSocketSourceProvider,org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider)"
20 = {[email protected]} "(org.apache.spark.sql.execution.datasources.parquet,org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat)"
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/wankunde/article/details/102639822

智能推荐

matlab破解方法_小蒋小蒋快乐成长的博客-程序员宝宝

matlab 不能使用大都都是上次破解的时间到了,现在给出如下破解方法:1.更改电脑系统时间到上次破解时间之前,应考虑到改变后会给我们带来很多不方便所以不建议使用。2.

C/C++编程语言中指针类型空间占用情况介绍_liitdar的博客-程序员宝宝_指针类型的大小

本文主要介绍 C/C++ 编程语言中指针类型的空间占用情况。1 示例程序这里使用一个示例程序进行讲解,示例程序代码(pointer_sizeof.c)内容如下:#include <stdio.h>int main(){ // 定义指向char类型的指针s char* s = "hello world."; // 定义指向int类型的指针a int x = 11; int* a = &x; // 打印指针s所指向的字符串...

lua脚本加载解析机制_huzilinitachi的博客-程序员宝宝

blog的源码参考来自于lua5.3.4lua字节指令处理脚本指令处理过程一般分为 加载脚本、解析脚本、生成虚拟机能处理的指令、根据生成的指令执行相应的功能。lua虚拟机支持.lua脚本文件加载解析以及lua脚本字符串的解析处理。lua字节码指令加载解析处理的时序如下图lua加载脚本lua在API接口层面提供了两种加载脚本的方式:luaL_loadbufferx加载lua脚本字符串,lu...

C语言中指针定义的时候初始化的必要性_爱编程的小屁孩的博客-程序员宝宝_c语言初始化的重要性

定义一个指针时,不进行初始化,系统会为这个变量自动分配一个地址,这个地址的值就是指针的值可能为0X1234,不初始化这个指针的值的话,操作这个指针就想当于操作这个0X1234这个地方的内容,就会出现问题,因为你并不知道这个指针指向地址的内容,可能为段系统代码,错改的话,可能会让系统崩溃。比如 int *p;*p=100;让p这个指针指向地址的值为100,有可能这个指针指向地址的值为系统的一个重...

指针定义的几种形式_nanashiro的博客-程序员宝宝_各种指针定义

1、int(*p)[4];------ptr为指向含4个元素的一维整形数组的指针变量(是指针)2、int *p[4];-------定义指针数组p,它由4个指向整型数据的指针元素组成(是数组)3、int(*)[4];--------实际上可以看作是一种数据类型。也就是第一个(int(*p)[4];)中定义的p的数据类型其实你要看这种到底是什么,就是要看他最先和谁结合。 比如1中p先与*结合,

LeetCode算法题-Plus One(Java实现)_weixin_34392435的博客-程序员宝宝

这是悦乐书的第156次更新,第158篇原创01 看题和准备今天介绍的是LeetCode算法题中Easy级别的第15题(顺位题号是66)。给定一个非空数字数组来表示一个非负整数,并给其加1。该数组已经排序,并且最高位有效数字位于数组的开头,并且数组中的每个元素都表示单个数字。假设该整数不以零开头,除了数字0本身。例如:输入:[1,2,3]输出:[1,2,4]说明:数组表示整数123...

随便推点

PowerPC上电复位的过程描述_qingfengtsing的博客-程序员宝宝

说明:PowerPC 的启动不同于普通的CPU启动,由于其为了适应硬件系统的不同需求,设定了两类启动的地址跳转。而CPU的启动除了涉及到地址跳转的问题外,还牵扯启动源 的选择、硬件配置字的设定以及仿真器的配置等等相关一系列问题。在这系列博客中上述问题都会涉及到,本文重点描述PowerPC上电复位后的启动流程。其实要充分实在的理解PowerPC的启动过程,最可靠的方法就是将官方的芯片手册相关部分读懂

python基本语法1.1--十进制与十六进制数之间的相互转换_shengerjianku的博客-程序员宝宝_python将十进制转化为十六进制

#大端与小端print((1024).to_bytes(2, byteorder = 'big'))print((65536).to_bytes(8, byteorder = 'little'))#有符号与无符号print((-1024).to_bytes(4, byteorder = 'big', signed = True))#b'\xff\xff\xfc\x00'print((...

Vulnhub_Chill-Hack_NowSec的博客-程序员宝宝

发现目标主机使用nmap扫描整个网段,发现目标主机nmap 192.168.254.0/24通过简单的扫描发现目标主机开放了21、22、80端口然后我们再进行一个详细的端口探测nmap...

std::ios::sync_with_stdio(false);详解_CN_BIT的博客-程序员宝宝_std::ios::sync_with_stdio(false);

在阅读学习别人的代码的过程中,我们有时会发现这么一行:std::ios::sync_with_stdio(false);这是由于cin比scanf要慢很多,在需要大量读入时,用此行代码可以使cin更快。为什么cin比scanf更慢呢?标准 C++ 流与标准 C 流在每次输入/输出操作后同步,同步的 C++ 流为无缓冲,而每次 C++ 流上的 I/O 都立即应用到对应 C 流的缓冲区...

FPGA型号命名_chushengdeniudu的博客-程序员宝宝

举例:EP2C20F484C6EP 工艺2C cyclone2 (S代表stratix。A代表arria)20 2wLE数量F484 FBGA484pin 封装C6 八速LE数量在同等器件信号的同时越多的越好。同时越贵管脚数量在同等情况下越多越好。器件速度越快越好。在数字电路中:LE 是latch-enable的缩写,即锁存...

以后 没准会用到 先留着保存学习_Vdownloadjust的博客-程序员宝宝

调试之剑:从堆里抢救丢失的博客文章作者: wuzhimin分类:坊间人语  阅读:1,915 次添加评论文/张银奎 很多使用计算机的人都曾经遇到过丢失数据的尴尬。记得我读大学时,很多文字编辑软件还没有自动存盘功能,而且寝室里偶尔会因为用电超过负荷而跳闸停电。每次断电时,如果赶上有人在电脑前写代码或者编辑文字,那么常常听到那位先生狠狠一跺脚(或者使劲一拍大腿)

推荐文章

热门文章

相关标签