博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
DAGScheduler 是什么?有什么作用?
阅读量:4072 次
发布时间:2019-05-25

本文共 3527 字,大约阅读时间需要 11 分钟。

前言

本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见

正文

RDD DAG 构建了基于数据流之上的操作算子流,即 RDD 的各个分区的数据总共会经过哪些 Transformation 和 Action 这两种类型的一系列操作的调度运行,从而 RDD 先被 Transformation 操作转换为新的 RDD ,然后被 Action 操作将结果反馈到 Driver Program 或存储到外部存储系统上。

可以结合我的这篇博客来理解——

上面提到的一系列操作的调度运行其实是 DAG 提交给 DAGScheduler 来解析完成的。

DAGScheduler 是面向 Stage 的高层级的调度器。

DAGScheduler 把 DAG 拆分成很多的 Tasks ,每组的 Tasks 都是一个 Sage ,解析时是以 Shuffle 为边界反向解析构建 Stage ,每当遇到 Shuffle ,就会产生新的 Stage ,然后以一个个 TaskSet (每个 Stage 封装一个 TaskSet )的形式提交给底层调度器 TaskScheduler 。

DAGScheduler 需要记录哪些 RDD 被存入磁盘等物化动作,同时要寻求 Task 的最优化调度,如在 Stage 内部数据的本地性等。

DAGScheduler 还需要监视因为 Shuffle 跨节点输出可能导致的失败,如果发现这个 Stage 失败,可能就要重新提交该 Stage 。

DAGScheduler 源码类注释 (3.2.0-SNAPSHOT)

/** * 实现面向 stage 调度的高层次调度层。 *  * 它为每个作业计算 stage 的 DAG,追踪那些被具象化的 rdd 和 stage 输出,并找到最小化的调度方式来运行作业。 *  * 然后,它将 stage 作为 TaskSets 提交给在集群上的 TaskSchedulerImpl 来运行。 *  * TaskSets 包含完全独立的任务,这些任务可以根据集群中已有的数据(例如,来自前一个 stage 的 map 输出文件)立即运行,但如果这些数据不可用,TaskSets 可能会失败。  *  * Spark 的 stage 是通过在shuffle边界打破 RDD 的 graph 来创建的。 *  * 具有“窄”依赖关系的 RDD 操作,如 map() 和 filter() ,在每个 stage 都通过 pipeline 连接到一组任务中,但是具有 Shuffle 依赖关系的操作需要多个 stage *  * (一个 stage 用于写一组 map 输出文件,另一个 stage 用于在 Shuffle 之后读取这些文件)。 *  * 最后,每个 stage 将只对其他 stage 具有 Shuffle 依赖关系,并且可以在其中计算多个操作。 *  * 这些操作的实际 pipelining 操作发生在各种 RDD 的 RDD.compute() 函数中。 *  * DAGScheduler 除了提供 stage 的 DAG 外,还根据当前缓存状态确定运行每个任务的首选位置,并将这些位置传递给低层次的调度器 TaskScheduler。 *  * 此外,它还处理由于 Shuffle 输出文件丢失而导致的失败问题,在这种情况下,可能需要重新提交旧 stage。 *  * stage 内部的不是由 Shuffle 文件丢失引起的失败问题,由 TaskScheduler 处理,它将在取消整个 stage 之前每个任务重试几次。  *  * 在浏览此代码时,有几个关键概念:  *  * 1. Jobs(由 ActiveJob 表示)是提交给调度程序的高层次工作项。 *  *      例如,当用户调用像 count() 这样的操作时,job 将通过 submitJob 提交。 *  *      每个 job 可能需要执行多个 stage 来构建中间数据。  *       * 2. Stage 是在 job 中计算中间结果的任务集,其中每个 Task 在相同 RDD 的分区上计算相同的函数。 *  *      stage 在 Shuffle 边界处分开,这引入了一个屏障(在这里我们必须等待前一 stage 完成以获取输出)。 *  *      有两种类型的阶段:ResultStage(执行操作的最后阶段)和 ShuffleMapStage(为 shuffle 写入 map 输出文件)。 *       *      如果多个 job 重用相同的 RDD,则 stage 通常在多个 job 之间共享。  *       * 3. Task 是单独的工作单元,每个工作单元被发送到一台机器上。  *       * 4. Cache tracking:DAGScheduler 会找出缓存了哪些 rdd 以避免重新计算它们,并且同样会记住哪些 shuffle map stage 已经生成了输出文件以避免重新跑一次 shuffle 的 map 端。  *  * 5. Preferred locations:DAGScheduler 还基于底层 rdd 的首选位置,缓存 或者 Shuffle 数据的位置,计算在 stage 中运行每个 Task 的位置。  *  * 6. Cleanup:当依赖于数据结构的正在运行的 job 完成时,所有数据结构都被清除,以防止长时间运行的应用程序中出现内存泄漏。  *  * 要从失败中恢复,同一阶段可能需要多次运行,这称为“attempts”。 *  * 如果 TaskScheduler 报告某个 Task 由于前一 stage 的 map 输出文件丢失而失败,则 DAGScheduler 将重新提交丢失的 stage。 *  * 这是通过 FetchFailed 的 CompletionEvent 或 ExecutorLost 事件检测到的。 *  * DAGScheduler 将等待一小段时间以查看其他节点或 Task 是否失败,然后重新提交 TaskSets 用于计算丢失 Task 的所有丢失的 stage 。 *  * 作为此过程的一部分,我们可能还必须为以前清理 Stage 对象的旧(已完成)Stage 创建 Stage 对象。 *  * 由于来自旧 stage 尝试的任务可能仍在运行,因此必须小心应对正确 stage 对象中接收的任何事件。  *  * 对这个类进行修改或审核时应当注意:  *  * - 所有的数据结构都应该在涉及它们的 job 结束时清除,以避免在长时间运行的程序中状态的无限累积。  *  * - 添加新数据结构时,请更新 `DAGSchedulerSuite.assertDataStructuresEmpty` 以包含新结构。这将有助于捕捉内存泄漏。 *  * @param sc Spark 上下文对象 * @param taskScheduler Spark 底层针对 Task 的调度器 * @param listenerBus 事件 bus,基于 reactor 思想 * @param mapOutputTracker 用来追踪 map 输出结果的,Shuffle 时会用到 * @param blockManagerMaster master 的全局块管理器 * @param env Spark 环境对象 * @param clock 由“System”API报告的操作系统的实际时间钟 */private[spark] class DAGScheduler(    private[scheduler] val sc: SparkContext,    private[scheduler] val taskScheduler: TaskScheduler,    listenerBus: LiveListenerBus,    mapOutputTracker: MapOutputTrackerMaster,    blockManagerMaster: BlockManagerMaster,    env: SparkEnv,    clock: Clock = new SystemClock())  extends Logging

转载地址:http://lkgji.baihongyu.com/

你可能感兴趣的文章
CentOS操作系统下安装yum的方法
查看>>
ping 报name or service not known
查看>>
FTP 常见问题
查看>>
zookeeper单机集群安装
查看>>
do_generic_file_read()函数
查看>>
Python学习笔记之数据类型
查看>>
Python学习笔记之特点
查看>>
Python学习笔记之安装
查看>>
shell 快捷键
查看>>
VIM滚屏操作
查看>>
EMC 2014存储布局及十大新技术要点
查看>>
linux内核内存管理(zone_dma zone_normal zone_highmem)
查看>>
将file文件内容转成字符串
查看>>
循环队列---数据结构和算法
查看>>
优先级队列-数据结构和算法
查看>>
链接点--数据结构和算法
查看>>
servlet中请求转发(forword)与重定向(sendredirect)的区别
查看>>
Spring4的IoC和DI的区别
查看>>
springcloud 的eureka服务注册demo
查看>>
eureka-client.properties文件配置
查看>>