黑洞

这里藏着一些独特的想法

0%

Pyspark学习之概念篇

本文只是站长的学习记录,内容并不完善。

RDD

定义

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变(只读)、可分区、可并行计算的集合。

  • Dataset:一个数据集合,用于存放数据的。
  • Distributed:RDD中的数据是分布式存储的,可用于分布式计算。
  • Resilient:RDD中的数据可以存储在内存中或者磁盘中。

五大特性

  1. 分区列表

    • 每个RDD都至少拥有一个分区或多个分区;
    • 对于RDD来说,每个分片都会被一个计算任务处理,分片数决定并行度;
    • 用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值;
  2. 一个函数会被作用在每一个分区上

    • Spark中RDD的计算是以分区为单位的,每一个算子都会被作用到每个分区上;
  3. 一个RDD会依赖于其他多个RDD

    • RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算(Spark的容错机制);
  4. 对于Key-Value类型的RDD会有一个Partitioner,即RDD的分区函数;

    • 当前Spark中实现了两种类型的分区函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner;
    • 只有Key-Value类型的RDD,才会有Partitioner,非Key-Value的RDD,其Parititioner的值是None;
    • Partitioner不但决定了RDD本身的分片数量,也决定了父RDD(Parent RDD)Shuffle输出时的分区数量;
  5. 分区位置优先级列表

    • Spark在进行任务调度的时候,会尽可能选择那些存有数据的worker节点来进行任务计算;
    • 对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置;

宽依赖与窄依赖

RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种:一种是窄依赖,RDDs之间分区是一一对应的;另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。

dependencies.png

算子

RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。由一个RDD转换到另一个RDD,可以通过丰富的算子实现。

RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中。

  • 常见转换算子(Transformations):map() filter() reduceByKey()

    • 返回一个新的RDD。
    • 所有Transformation函数都是惰性求值,只会建立RDD间的关系;不会立即执行,需要Action函数触发。
  • 常见行动算子(Actions):collect() count() saveAsFile()

    • 返回计算结果或无返回值
    • Transformation操作只是建立计算关系,而Action操作才是实际的执行者。每个Action操作都会调用SparkContext的runJob方法向集群正式提交请求,所以每个Action操作对应一个Job。

共享变量

在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。
为了满足这种需求,Spark提供了两种类型的变量:

  1. 广播变量(Broadcast Variables)
    • 广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本;
  2. 累加器(Accumulators)
    • 累加器支持在所有不同节点之间进行累加计算(比如计数或者求和);

广播变量

广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点输入大数据集副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。 Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。

broadcast.png

累加器

在算子中我们可以使用driver程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的新副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果。

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,即确提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取Accumulator的值,只有Driver程序可以读取Accumulator的值。创建的Accumulator变量的值能够在Spark Web UI上看到,在创建时应该尽量为其命名。

注意事项

由于RDD仅记录一系列转换操作,如果在使用累加器后没有及时缓存(cache),每次执行action算子都会从上游RDD开始计算,重复累加得到错误的结果。

持久化

在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。

Cache/Persist

Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或者缓存数据集。当持久化某个RDD后,每一个节点都将把计算分区结果保存在内存中,在此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。

Checkpoint

RDD的数据可以缓存,把数据放在内存中,虽然是快速的,但是也是最不可靠的;把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。

Checkpoint的诞生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在HDFS上。借助HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。

持久化和Checkpoint的区别

  1. 存储位置

    • Persist和Cache只能保存在本地的磁盘和内存中(或者堆外内存);
    • Checkpoint可以保存数据到HDFS这类可靠的存储上;
  2. 生命周期

    • 使用Cache和Persist的RDD(计算结果)会在程序结束后被清除或者手动调用unpersist方法;
    • 使用Checkpoint的RDD(计算结果)在程序结束后依然存在,不会被删除;
  3. 依赖链

    • Persist和Cache不会丢掉RDD间的依赖链关系,因为这种缓存是不可靠的,如果出现了一些错误(例如Executor宕机),需要通过回溯依赖链重新计算出来;
    • Checkpoint会丢弃依赖链,因为Checkpoint会把结果保存在HDFS这类存储中,更加的安全可靠,一般不需要回溯依赖链;

Spark容错机制:首先会查看RDD是否被Cache,如果被Cache到内存或磁盘,直接获取;否则查看Checkpoint所指定的HDFS中是否存在相应数据,如果都没有则直接从父RDD开始重新计算还原。

Dataframe

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。DataFrame中每条数据封装在Row中,Row表示每行数据。

rdd dataframe contrast.png

如果觉得文章写得不错或对您有帮助,请我喝杯柠檬茶吧!