弹性分布式数据集RDD(概念)

云计算 waitig 545℃ 百度已收录 0评论

RDD概述:
RDD(Resilient DistributedDataset)叫做分布式数据集
是Spark中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可并行计算的集合
RDD具有数据流模型的特点:自动容错;位置感知性调度和可伸缩性.
RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度

RDD的属性:
1.一组分片(Partition),即数据集的基本组成单位.对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度.用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值.默认值就是程序所分配到的CPU Core的数目

2.一个计算每个分区的函数.Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的.compute函数会对迭代器进行复合,不需要保存每次计算的结果

3.RDD之间的依赖关系.RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系.在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算

4.一个Partitioner,即RDD的分片函数;当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另一个是基于范围的RangePartitioner.只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None;Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量

5.一个列表,存储存取每个Partitioner的优先位置(Preferred location).对于一个HDFS文件来说,这个列表保存的就是每个Partitioner所在的块的位置;按照”移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置

创建RDD
1.由一个已经存在的Scala集合创建
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))

2.由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集
val rdd2 = sc.textFile(“hdfs://minimaster:9000/words”)

RDD编程API
Transformation
RDD中的所有转换都是延迟加载,也就是说,它们并不会直接计算结果;它们只是记住这些应用到基础数据集(一个文件)上的转换动作.只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行

常用的Transformation:
mapPartitions(func): 独立地在RDD事务每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func): 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

宽依赖与窄依赖的误区??
join间的宽依赖与窄依赖

RDD的缓存
Spark速度非常快的原因:在不同操作中可以在内存中持久化或缓存数据集.当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用.这使得后续的动作变得更加迅速.RDD相关的持久化和缓存,是Spark最重要的特征之一;可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键

将经常用到的结果数据或者shuffle以后的数据往往先缓存起来:
1.便于以后快捷访问
2.提高结果数据的安全性

RDD缓存方式:
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
def cache(): this.type = persist()
通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行.通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition

DAG的生成
DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就形成了DGA,根据RDD之间的依赖关系的不同将DGA划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算;对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据
DAG的生成与Stage的划分

Stage划分:
为什么划分Stage??
因为要把RDD最终生成一个个的task提交到Executor执行,所以需要把RDD先stage划分再生成task
Stage划分
stage划分的依据??
查看父RDD是否发生了宽依赖,然后通过递归,从最后一个Action类型的RDD开始,从后往前推,当遇到发生宽依赖的RDD,就把前面所有的RDD划分一个Stage,最后所有的RDD再划分一个stage,直到所有的RDD都划分完

RDD的生成,stage的划分,任务的生成,任务的提交这4个阶段
4个阶段

Yarn和Spark的提交任务:
ResourceManager相当于Master,负责任务调度
NodeManager相当于Worker,负责创建一个容器和启动自己的子进程(YarnChild和Executor)
Client相当于Driver,用来提交任务
YarnChild:相当于Executor,直接参与计算的进程
Yarn和Spark

Accumulator(累加器)
Spark提供的Accumulator,主要用于多个节点对一个变量进行共享操作
他提供了多个task对一个变量并行的操作的功能
task只能对Accumulator进行累加的操作,不能读取其值
只有Driver程序可以读取其值

Task的生成依据Stage的划分,在Stage中会首先划分pipeline,然后根据pipeline生成Task

shuffle Read: 把父RDD的数据读取到子RDD中,发生在Shuffle之后

Shuffle Write: 把中间结果数据写到磁盘,为了保证数据的安全性

为什么Shuffle Write到磁盘而不是内存?
1.避免因为结果数据太大而占用太多的内存资源,造成内存溢出
2.保存到磁盘可以保证数据的安全性

取消cache() → unpersist()

为什么要checkpoint??
运行处的中间结果往往很重要,所以为了保证数据的安全性,要把数据做检查点,最好把数据checkpoint到HDFS,便于该集群所有节点访问到
在checkpoint之前最好先cache一下,就是先把数据缓存到内存,这样便于运行任务时调用,也便于在checkpoint的时候直接从缓存获取数据

在什么时候做checkpoint??
在发生shuffle之后做checkpoint;

checkpoint的步骤:
1.创建checkpoint存储目录
sc.setCheckpointDir(“hdfs://…..”)
2.把数据cache起来
rdd.cache()
3.checkpoint
rdd.checkpoint()
rdd.collect


本文由【waitig】发表在等英博客
本文固定链接:弹性分布式数据集RDD(概念)
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)