演示文稿
2009年,Spark诞生于加州大学伯克利分校的AMP实验室(算法、机器与人实验室),2010年开源。2013年,Spark向Apache Software Foundation捐款,成为2014年Apache顶级项目。
如今,十年过去了,Spark已经成为大大小小的企业和研究机构常用的工具之一,仍然受到很多开发者的喜爱。如果你是初入江湖的“小虾米”,想了解和学习Spark,那么由InfoQ和飞轮技术专家Leo合著的专题系列文章——Spark:原理详解和开发实践,绝对适合你!
本文是专题系列的第二篇。
继上一本书之后,在上一本书《内存计算的起源——RDD》中,我们从虚拟和真实两个方面介绍了RDD的基本构成。d是由端到端的依赖关系和计算属性组成的计算路径。技术术语是血统-血统,也称为DAG(有向无环图)。为什么一个概念有两个名字?这两个不同的名字有什么区别和联系?简单来说,lineage和DAG从两个不同的角度描述了同一个东西。谱系,侧重于从数据的角度描述不同rdd之间的依赖关系;DAG从计算的角度描述了不同rdd之间的转换逻辑。如果说RDD是Spark对分布式数据模型的抽象,那么相应的,DAG就是Spark对分布式计算模型的抽象。
顾名思义,DAG是一种图。图计算模型的使用由来已久,早在上个世纪就被应用于图数据库的实现。任何图都包含两个基本元素:顶点和边。节点通常用于表示实体,而边表示实体之间的关系。比如《倚天屠龙记》社交 *** 的好友关系中,每个节点代表一个特定的人,每条边意味着两端的实体之间已经建立了好友关系。
易途龙社交 ***
在上述社交 *** 中,朋友是相互的。比如张无忌和周芷若是朋友,所以图中的边没有方向性;另外,细心的同学可能已经发现,上面的图结构中还有“环”,比如张无忌、谢逊与白眉鹰王形成的关系环,张无忌、谢逊、紫衫龙王与小昭的关系环,等等。像上面这样的图结构叫做“无向循环图”。没有比较,就没有歧视。有向无环图(DAG)自然是一种具有方向性,没有“环”结构的图模型。读者们,你们还记得土豆工坊的例子吗?
马铃薯车间山
在上面的土豆加工DAG中,每一个节点都是一个RDD,每一条边都代表了不同rdd之间的父子关系——父子关系天然是单向的,所以整个画面是有方向的。此外,我们注意到在整个图中没有环结构。这样的土豆加工线可以说是最简单的有向无环图。每个节点的入度和出度都是1,整个图只有一个分支。
但工业应用中的Spark DAG要比这复杂得多,往往是由不同rdd的关联和拆分生成的具有多个分支的有向无环图。为了说明这一点,我们以土豆工坊为例。“原味”薯片推向市场一段时间后,作坊老板发现季度销量骤降,老板着急又无奈。这时有人向他建议:“为什么不多推出一些不同口味的薯片来迎合大众多样化的选择呢?”于是老板下了一道命令,工人们对装配线做了如下改动。
马铃薯车间先进生产线
与以前相比,新工艺增加了三条调味线,用于分配不同的调味粉。来自新装配线的辣椒粉被分配到收集小薯片的装配线,孜然粉被分配到中型薯片装配线,番茄粉被分配到大型薯片装配线。改造后的土豆工坊现在可以生产三种口味不同大小的薯片,分别是香辣小薯片、孜然中薯片、番茄大薯片。如果我们用flavoursRDD抽象调味品,那么新车间流程对应的DAG就会演化成有向无环图,有两个分支,如下图所示。
具有多个分支的Dag
在上一篇文章中,我们讨论了星火核心内功的之一精髓——RDD。在本文中,我们来谈谈内功的第二个秘密——DAG。
RDD运算符Dag的边缘在上篇文章《内存计算的起源RDDs》的最后,我们以WordCount为例,展示了不同RDD之间转换形成的DAG计算图。通读代码,从开发的角度,我们发现DAG形成的关键在于RDD算子的调用。与Hadoop MapReduce不同,Spark提供了丰富的RDD算子供开发者灵活排列组合,实现了多样化的数据处理逻辑。那么问题来了,Spark提供了哪些算子?
来源:https://spark . Apache . org/docs/latest/rdd-programming-guide . html
从表中我们可以看出,Spark的RDD运营商是如此的丰富,令人眼花缭乱。对于刚接触Spark的同学来说,如果不稍微分类的话,真的无法像明星一样对很多运营商入门。Apache Spark官网将RDD算子分为两种:转换和动作,这是各种Spark技术博客中常见的分类方法。为了说明转换和操作符之间的本质区别,我们必须提到Spark计算模型的延迟计算(也称为延迟计算)特性。
掌握一个新概念最有效的方法之一就是找到它的对立面——相对于“懒惰计算”,大多数传统编程语言和编程框架的评估策略是“热切评估”。比如大家熟悉的C,C和Java,每条指令都会试图调度CPU,占用时钟周期,触发计算的执行。同时,CPU寄存器需要与内存通信,完成数据交换和数据缓存。在传统编程模式下,每一条指令都是“迫不及待”的,每个人都恨不得被调度到“前线”去参战。
惰性的计算模式并不是Spark所特有的,绝大多数RDD操作者都是“稳定的”并且特别冷静。他们会明确地告诉DAGScheduler,“哥们,你先走吧,别理我,我先伸个懒腰抽包烟。队伍前排是我们大哥。没有他的命令,我们不会轻举妄动。”有了懒算和前期求值的基础知识,再来说说变换和动作的区别。Spark的RDD算子中,Transformations算子都是懒求值运算,只参与DAG计算图的构造和表示计算逻辑,不会被立即调度和执行。评估的特征在于,当且仅当数据需要被具体化时,计算才会被触发。RDD的Actions操作符提供了各种数据物化操作,其主要职责是触发整个DAG计算链的执行。并且只有当Actions操作符触发计算时,从DAG的开头到结尾的所有操作符(之前用于构建DAG的转换操作符)才会按照依赖关系的顺序依次被调度和执行。
说到这里,你不禁要问:Spark的懒评计算模型有什么优点?或者反过来说,为什么Spark不采用传统的早期评测?不知道各位读者有没有听说过“延迟满足效应”(又称“糖果效应”),是指为了获得长远的、更大的利益,主动延迟甚至放弃当下的、更小的满足。俗话说“云想衣裳花想好看,猪想有福,人想红”。火花是一个不仅天赋异禀,而且从小就相当精明的孩子。他原来的内功并不是为了赢现在这一招,而是着眼于整个武林。太远了。让我们把它拿回来。总的来说,懒惰计算为Spark执行引擎的整体优化提供了广阔的[/K0/]区间。至于懒计算如何帮助Spark做全局优化——讲故事的人一张嘴说不出两句话,还是后书慢慢展开吧。
回到RDD算子,除了常见的按变换和作用分类的方法,作者还从适用范围和用途两个维度对老铁进行了分类。毕竟人脑喜欢结构化的知识,官网的长蛇阵列表总是让人昏昏欲睡。有了这个表,我们知道*ByKey的运算一定作用在成对的RDD上。所谓成对的RDD指的是RDD,他的图式清楚地区分(键,值)对。相反,任意RDD指的是没有图式或有任意图式的RDD。从使用的角度来看,RDD算子的分类比较分散,长度的原因这里就不介绍了。我们各取所需吧。
值得一提的是,对于同一个计算场景,不同算子带来的执行性能可能会有很大差异。在接下来的性能调优章节中,我们将详细分析具体问题。好吧,越来越多的洞被挖了。请放轻松。先说一下刚才提到的根据FIFO原理的hot DAGScheduler。
DAG计划程序-DAG的向导DAGScheduler是Spark分布式调度系统的重要组成部分之一。其他组件包括TaskScheduler、MapOutputTracker、SchedulerBackend等。DAGScheduler的主要职责是根据RDD依赖关系将DAG划分为阶段,以阶段粒度提交任务并跟踪任务的进度。如果把DAG看作是Spark操作的执行路径或者“战略地形”,那么DAGScheduler就是这个地形的引导官。这个引导官从头到尾负责搞清楚地形,根据地形特点安排兵力。更形象的说,回到土豆车间的例子,DAGScheduler要做的是将抽象的土豆加工DAG转化为车间流水线上具体的薯片加工作业任务。那么问题来了,DAGScheduler是用什么方式探索“地形”的呢?如何划分阶段?划分阶段的依据是什么?再者,把DAG分成阶段有什么好处?Spark为什么要这么做?
DAGScheduler的核心职责
要回答这些问题,我们首先需要将DAG的“头”和“尾”定义如下:在一个DAG中,没有父RDD的节点称为头节点,而没有子RDD的节点称为尾节点。以土豆工坊为例,这里有两个之一节点,分别是potatosRDD和flavoursRDD,最后一个节点是flavouredBakedChipsRDD。
DAG中头部和尾部的定义
当DAGScheduler试图探索DAG“地形”时,它会以一种反向的方式从后往前走。具体来说,对于土豆工场的DAG,DAGScheduler将从尾节点即flavouredBakedChipsRDD开始,按照RDD依赖依次遍历所有父RDD节点,遍历过程中以Shuffle为边界划分阶段。Shuffle字面意思是“洗牌”。没错,就是扑克游戏中洗牌的意思。在大数据领域,shuffle扩展为“跨节点的数据分布”,指的是数据需要分布在集群中不同的计算节点上,才能实现某些计算逻辑。在大多数场景中,Shuffle是当之无愧的“性能瓶颈”。说白了,哪里有Shuffle,哪里就有空的性能优化空间。关于Spark Shuffle的原理和性能优化技巧,我们稍后会单独开篇讨论。在土豆工坊的DAG中,有两个地方发生了Shuffle,一个是从bakedChipsRDD到flavouredbekedchipsrdd的计算,另一个是从flavoursRDD到flavouredbekedchipsrdd的计算,如下图所示。
在土豆车间洗牌
读者不禁要问:DAGScheduler是如何判断rdd之间的转换是否会发生Shuffle的?这位读者说,“之前的文档说了很久,运算符是rdd之间转换的关键。是不是要根据运营商来判断是否会发生洗牌?”你真的猜错了。运算符和Shuffle之间没有对应关系。以连接操作符为例。大多数场景下,洗牌;将被引入join。但是,在协调连接中,当左表和右表的数据以相同的方式分布时,不会发生洗牌。所以,你看,DAGScheduler真的不能依靠运营商本身来判断是否发生洗牌。要回答这个问题,我们还是要回到上一本书《内存计算的起源——RDD》中RDD介绍中提到的五个属性。
名称成员类型属性含义依赖关系变量生成RDD所依赖的父RDD计算方法,生成RDD的计算接口分区变量,RDD的所有数据切片实体分区方法划分数据切片规则首选位置变量数据切片的物理位置首选项
RDD的5个属性及其含义
其中,之一种属性依赖可以细分为窄依赖和洗牌依赖。窄依赖(NarrowDependency)又称“窄依赖”,是指RDD所依赖的数据不需要分布,基于当前已有的数据片段就可以执行compute attribute封装的函数。ShuffleDependency不是这样。意味着RDD所依赖的数据碎片需要先分布在集群中,然后才能执行RDD的compute函数来完成计算。因此,rdd之间的转换是否发生Shuffle取决于子rdd的依赖类型。如果依赖类型是ShuffleDependency,那么DAGScheduler确定Shuffle将被引入rdd和rdd之间的转换。在回溯到DAG的过程中,一旦DAGScheduler发现RDD的依赖类型是ShuffleDependency,它将依次执行以下三个操作:
沿着 Shuffle 边界的子 RDD 方向创建新的 Stage 对象把新建的 Stage 注册到 DAGScheduler 的 stages 系列字典中,这些字典用于存储、记录与 Stage 有关的状态和元信息,以备后用沿着当前 RDD 的父 RDD 遵循广度优先搜索算法继续回溯 DAG以土豆工坊为例,其尾节点flavouredBakedChipsRDD依赖于两个父rdd bakedChipsRDD和flavoursRDD,依赖类型为ShuffleDependency。然后,根据DAGScheduler的执行逻辑,此时将执行以下三个具体操作:
回溯DAG过程中遇到ShuffleDependency时DAGScheduler的主要操作过程
DAGScheduler沿着尾节点回溯并划分stage0。
在之一阶段(stage0)的创建和注册完成后,DAGScheduler继续向bakedChipsRDD方向回溯。沿着这条路跑的时候,我们的DAGScheduler向导官惊喜地发现,“我去!一路到马平川,风景非常好,站与站之间没有障碍,交通非常顺畅。真是好地形!”——沿路遇到的所有RDD(bakedChipsRDD,ChipsRDD,cleanedPotatosRDD,PotatosRDD)的依赖类型都是窄依赖。
回溯结束时,DAGScheduler将重复上述三个步骤。根据DAGScheduler以Shuffle为界划分阶段的原理,将沿途所有rdd归为同一阶段,暂命名为stage1。值得一提的是,Stage对象的rdd属性对应的数据类型是RDD[],而不是列表[RDD[]]。对于逻辑上包含多个RDD的阶段,其RDD属性存储路径末端的RDD节点,在我们的示例中,具体为bakedChipsRDD。
DAGScheduler沿着bakedChipsRDD方向回溯,划分stage1。
勤奋的DAGScheduler,在成功建立stage1之后,依然在不忘初心,牢记自己的使命,继续奔向未探索的路线。从上图中我们可以清楚的看到,整个地形中仍然存在着flavoursRDD方向的路径,这些路径并不包含在DAGScheduler的视野中。我们的DAGScheduler向导记忆力很好。早在stage0划分的时候,他就在一个小本子(栈)上写下:“这个路口有个岔口。先沿着bakedChipsRDD走,再往回走,沿着flavoursRDD的方向探索。记住,记住!”此时,向导大人拿出之前的笔记本,用横线划掉了bakedChipsRDD方向的路径——表示这个方向的路径已经探索过了,然后沿着flavoursRDD方向大步走去。我走下来发现,“我去!结束了!”然后按照通常的“三动一定”流程——创建阶段、注册阶段、返回。随着DAGScheduler创建最后一个阶段:阶段2,地形上的所有路径都已被探索。
DAGScheduler创建最后一个阶段:阶段2
到目前为止,我们的向导差点把腿摔断了,按照首尾相连的顺序搜遍了整个地形,最后把地形分成了三个战略区域(阶段)。那么问题来了,指南大人划分的三个区域有什么用呢?DAGScheduler他老爸一直这样跑。他想要什么?如前所述,DAGScheduler的核心职责是将抽象的DAG计算图转化为可以并行计算的具体分布式任务。追溯到DAG和创建Stage只是这个核心职责的之一步。DAGScheduler以Stage(TaskSet)为粒度调度任务,与TaskScheduler、SchedulerBackend等领导协同作战,运筹帷幄,调兵遣将。不过,毕竟这篇文章的主题是DAG,距离星火调度系统的核心还有一段距离,所以暂且在这里挖个坑,再单独开个口(星火调度系统),讲讲几位大佬之间的趣闻轶事。填坑的路又长又修远Xi,所以我要上下挖掘。
我们来回顾一下巫师的心路历程。首先,DAGScheduler一直向上到达DAG的尾节点,并判断沿途每个RDD节点的依赖关系。此后,如果确定RDD的依赖属性是窄依赖,则DAGScheduler继续回溯;如果RDD的依赖是ShuffleDependency,DAGScheduler将启动“三动一集”移动,创建阶段,注册阶段并继续返回。因此,何时切割DAG并生成新阶段是由RDD的依赖类型决定的,并且只有当RDD的依赖是ShuffleDependency时,DAGScheduler才会创建新阶段。
如果你喜欢刨根问底,你肯定会问“DAGScheduler怎么知道RDD依赖哪种类型?他怎么知道RDD的依赖是狭义依赖还是洗牌依赖?”要回答这个问题,我们必须回到RDD的五个属性,但这次是分裂者。还记得这个属性吗?分区器是RDD的分区器,它定义了RDD数据切片的分区规则。它决定了RDD数据切片在分布式集群中的分布方式。这个属性很重要,后面介绍Shuffle的时候会提到。DAGScheduler通过划分器来确定每个RDD的依赖类型。具体地,如果子RDD的划分器与父RDD的划分器一致,DAGScheduler确定子RDD对父RDD的依赖属于狭义依赖;相反,如果两个partitioner不一致,即分区规则不同(不同的分区规则意味着必须有数据的“洗牌”,即Shuffle),那么DAGScheduler确定子对父的依赖是ShuffleDependency。至此,DAGScheduler对于DAG的划分逻辑暂时可以告一段落了。原理说了,例子也举了。少了什么?是啊!代码。
给我看看代码古人云,“光说不练”。我们用一个小例子来说明DAG和Stage的关系。或者用之前《内存计算的起源——RDD》中的WordCount来举例。文件内容如下。
样本文件内容
代码也没有改变:
单词样本代码
虽然文件的内容和代码没有变,但是我们观察问题的角度变了。这一次,我们关注的是DAG中阶段的划分以及阶段之间的关系。RDD的toDebugString函数让我们看到了DAG的构成和阶段的划分,如下图所示。
DAG组成和阶段划分
在上图中,从第三行开始,每一行代表一个RDD。显然,第三行的ShuffledRDD是DAG的尾节点,而第七行的HadoopRDD是首节点。我们来观察一下每行的串印特点。首先,最明显的是第4、5、6、7行前面有一个制表符,与第3行明显错位。这意味着第3行中的ShuffledRDD被划分为一个阶段(标记为阶段0),而第4、5、6和7行中的其他RDD被划分为另一个阶段(标记为阶段),假设第7行下的RDD字符串被打印有两个标签,也就是说,它与第7行不对齐,那么第7行下的RDD被分配给新的阶段,等等。
可以看出,通过RDD的toDebugString观察DAG的阶段划分时,tab是一个重要的指标。此外,我们看到在第3行和第4行的开头有一个括号。括号内是一个数字,它标志着RDD分区的大小。当然,还有更直观的方式来观察RDD、DAG和Stage,Spark的Web UI提供了更丰富的视觉信息。但是Spark的Web UI面板众多,很容易让新生之一眼就不知所措。也许以后,如果时间允许,我们会单独开一个关于Spark的web UI的讲座。
附言本文为Spark分布式计算科普专栏之二,作者学识浅薄,疏漏在所难免。如果你有什么问题,或者觉得文章中的描述有遗漏或者不恰当,请在评论区留言讨论。要掌握一项技术,书本上的知识往往只占20%,30%靠讨论,50%靠实践。更多的讨论可以激发更多的观点、视角和见解,只有这样,对一项技术的认知和理解才能更加深入和坚定。
在这篇博文中,我们从DAG-Spark RDD算子的边缘入手,介绍连接RDD的两种算子:变换和动作,对懒惰计算有一个初步的了解。然后以土豆工坊为例,介绍了DAGScheduler切割DAG并生成Stage的过程和步骤,特别注意DAGScheduler以Shuffle为边界划分Stage。
最后用上一篇文章的字数简单演示一下DAG和Stage的关系。细心的读者可能已经发现,“文档再次展开后”和“文档单独打开后”在本文中被多次提及。Spark是一个精致复杂的分布式计算引擎,在这篇博文中我们不得不提前引用Spark中的很多概念。换句话说,一些概念(比如懒人计算、Shuffle、TaskScheduler、TaskSet、Spark调度系统)在可以解释之前就已经引入到这篇博文中了。这种叙述方式可能会给你带来困惑。毕竟,用一个没有解释清楚的概念去解释另一个新概念,总感觉不太有安全感。
俗话说“杀人偿命,还债偿命。”后续专栏我们会继续讨论Spark的核心概念和原理,慢慢还欠大家的技术债,尽可能还原Spark分布式内存计算引擎的全貌。毕竟星火调度系统到底有什么神圣之处?DAGScheduler和TaskScheduler、SchedulerBackend、TaskSetManager等人一起玩权利的游戏,且听下回分解。
作者简介
Leo,Spark Summit China 2017讲师,World AI Conference 2020讲师,曾任职于IBM、联想研究院、新浪微博。他在数据库、数据仓库和大数据开发与调优方面有着丰富的经验,领导了基于海量数据的大规模机器学习框架的设计与实现。现为康卡斯特飞轮机器学习团队负责人,负责机器学习应用在计算广告业务中的实践、落地和推广。我热爱技术分享,热衷于从生活的角度解读技术。我在IBM developerWorks和程序员杂志上发表过很多技术文章。
延伸阅读:
Spark简介(一):内存计算的起源-InfoQ
本人转发此文,私信“获取信息”即可免费获得价值4999元的InfoQ迷你本。点击文末“了解更多”,可移步InfoQ官网获取最新资讯~