大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全系列IDE稳定放心使用
TaskScheduler的核心任务是提交TaskSet到集群运算并汇报结果。
- 为TaskSet创建和维护一个TaskSetManager, 并追踪任务的本地性及错误信息。
- 遇到Straggle任务会放到其他结点进行重试。
- 向DAGScheduler汇报执行情况, 包括在Shuffle输出丢失时报告fetch failed错误等信息。
TaskScheduler底层调度器
1. TaskScheduler原理剖析
通过之前 DAGScheduler的介绍可以 知道, DAGScheduler 将划分的一系列 Stage (每个Stage封装一个TaskSet) , 按照Stage的先后顺序依次提交给底层的TaskScheduler去执行。 下面来分析TaskScheduler接收到DAGScheduler的Stage任务 后, 是如何管理Stage (TaskSet) 的生命周期的。
TaskSchedulerlmpl在createTaskScheduler方法中实例化后, 就立即调用自己的initialize 方法把StandaloneSchedulerBackend的实例对象传进来 , 从而赋值给TaskSchedulerlmpl的backend。 在TaskSchedulerlmpl的血tialize方法中, 根据调度模式的配置创建 实现了 SchedulerBuilder接口的相应实例对象, 并且创建的对象会立即调用buildPools创建 相应数量的Pool 存放和管理TaskSetManager的实例对象。 实现SchedulerBuilder接口的具体类都是SchedulerBuilder的内部类。
(1)FIFOSchedulableBuilder: 调度模式是SchedulingMode.FIFO , 使用先进先出策略调度。这是默认模式,在该模式下,只有一个TaskSetManager池。
(2)FairSchedulableBuilder: 调度模式是SchedulingMode.FAIR, 使用公平策略调度。
在createTaskSched uler方法返回后,TaskSchedulerlmpl通过DAGScheduler的实例化过程设置 DAGScheduler的实例对象, 然后调用自己的start方法。 在 TaskSchedulerlmpl 调用start 方法时, 会调用StandaloneSchedulerBackend的start方法 , 在StandaloneSchedulerBackend的start方法中会最终注册应用程序AppClient。 TaskSchedulerlmpl的start方法中还会根据配置判断是否周期性地检查任务的推测执行。
TaskSchedulerlmpl启动后, 就可以接收 DAGScheduler的submi tMissingTasks方法提交过来的TaskSet 进行进一步处理。 TaskSchedulerlmpl在submitTasks 中初始化 一个TaskSetManager 对其生命周期 进行管理, 当TaskSchedulerlmpl得到Worker结点上的Executor 计算资源时, 会通过TaskSetManager来发送具体的Task到Executor上执行计算。
如果Task执行过程中有错误导致失败 , 会调用TaskSetManager来处理Task失败的情况,进而通知DAGScheduler结束当前的Task。 TaskSetManager 会将失败的Task再次添加到待执行的Task队列中。
Spark Task允许失败的次数默认是4次,在TaskSchedulerlmpl初始化时通过spark. task. maxFailures 设置该默认值。
如果Task执行完毕,执行的结果会反馈给TaskSetManager,由TaskSetManager通知DAGScheduler。DAGScheduler根据是否还存在待执行的Stage,继续迭代提交对应的TaskSet给TaskScheduler去执行,或者输出Job的结果。
2. TaskScheduler源代码解析
下面通过源代码解析来看一下 TaskScheduler 是如何调度和管理 TaskSet 的任务。
2.1 TaskScheduler 实例化源代码
TaskScheduler 和 DAGScheduler 都在 SparkContext 实例化的时候一同实例化。 Spark Context 源代码中与 TaskScheduler 实例化相关的代码如下。
private var _taskScheduler: TaskScheduler = _//任务调度器
.......
private[spark] def taskScheduler: TaskScheduler = _taskScheduler
private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = {
_taskScheduler = ts
}
.......
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
.......
//启动任务调度器
_taskScheduler.start()
本博客仅介绍Spark的Standalone部署模式,Spark Context的createTaskScheduler方法中与Standalone部署模式相关的代码如下。
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
//在本地运行时,请勿尝试在失败时重新执行任务
val MAX_LOCAL_TASK_FAILURES = 1
/** * 确保默认执行者的资源满足一项或多项任务要求。 * 此功能适用于未设置执行程序核心配置的集群管理器,其他功能则在ResourceProfile中检查。*/
def checkResourcesPerTask(executorCores: Int): Unit = {
val taskCores = sc.conf.get(CPUS_PER_TASK)//配置中每个任务分配的核数
/** * SKIP_VALIDATE_CORES_TESTING: * 此配置用于单元测试,以允许跳过任务cpus到内核验证,以允许在本地模式下运行时模拟独立模式的行为。 * 默认情况下,独立模式不指定执行者内核的数量,它仅使用主机上可用的所有内核。 * */
if (!sc.conf.get(SKIP_VALIDATE_CORES_TESTING)) {
//检查每个executor的核数至少满足一个任务的需求
validateTaskCpusLargeEnough(sc.conf, executorCores, taskCores)
}
val defaultProf = sc.resourceProfileManager.defaultResourceProfile
ResourceUtils.warnOnWastedResources(defaultProf, sc.conf, Some(executorCores))
}
//根据不同的运行模式,进行不同的初始化
master match {
........
//Spark Standalone部署模式下TaskScheduler和SchedulerBackend分别由各自对应的实现类TaskSchedulerImpl和StandaloneSchedulerBackend,来实例化对象
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
............
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}
2.2 TashScheduler初始化源代码
TaskScheduler在创建之后会调用其初始化方法进行初始化。TaskScheduler (实际上在实现类TaskSchedulerlmpl的initialize方法中)在初始化的过程中设置对SchedulerBackend 对象的引用, 实例化SchedulerBuilder具体实现类的对象用来创建和管理TaskSetManager池。
TaskSchedulerlmpl源代码中的相关代码如下:
....
// default scheduler is FIFO
private val schedulingModeConf = conf.get(SCHEDULER_MODE)
val schedulingMode: SchedulingMode =
try {
SchedulingMode.withName(schedulingModeConf)
} catch {
case e: java.util.NoSuchElementException =>
throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
}
//root的名字暂时设置为空值
val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
.....
def initialize(backend: SchedulerBackend): Unit = {
//设置对 SchepulerBackend 对象的引用
this.backend = backend
schedulableBuilder = {
schedulingMode match {
//调度模式是 FIFO
case SchedulingMode.FIFO =>
//FairSchedulableBuilder的rootPool里面直接添加TaskManager
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
//FaiiSchedulahleBuilder的rootPool根据配置文件可以挂若干个子pool, 每个 pool里面都添加TaskManager
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
//构建TaskManager池
schedulableBuilder.buildPools()
}
2.3 TaskScheduler启动源代码
TaskScheduler 实例对象在 DAGScheduler 实例化之后启动, 并且 TaskScheduler 启动的过程由 TaskSchedulerlmpl 具体实现。 在启动过程中, 主要是调用 SchedulerBackend 的启动方法, 然后对不是本地部署模式并且开启任务的推测执行(设置 spark. speculation 为 true)情况, 根据配置判断是否周期性地调用 TaskSetManager 的 checkSpeculatableTasks 方法检查任务 的推测执行。 StandaloneSchedulerBackend的 start 方法中会最终注册应用程序 AppClient。
TaskScheduler 的启动源代码如下所示:
override def start(): Unit = {
//调用SchedulerBackend的start方法启动
backend.start()
//不是本地模式,并且开启了推测执行
if (!isLocal && conf.get(SPECULATION_ENABLED)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(
() => Utils.tryOrStopSparkContext(sc) {
// 最终会调用调度池中的TaskSetManager 中的checkSpeculatableTasks来检查推测执行的任务
checkSpeculatableTasks() },
SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
2.4 TaskScheduler提交任务源代码
TaskSchedulerlmpl启动后, 就可以接收DAGScheduler的submitMissingTasks方法提交过来的TaskSet进行进一步处理了。 对千ShuffleMapStage类型的Stage, DAGScheduler初始化一 组ShuffleMapTask实例对象;对于ResultStage类型的Stage, DAGScheduler初始化一组ResultTask实例对象。 最后, DAGScheduler将这组ResultTask实例对象封装成TaskSet实例对象 提交给TaskSchedulerlmpl。
注意,ShuffleMapTask是根据Stage所依赖的RDD的partition分布产生跟partition数量相等的Task, 这些Task根据partition的本地性分布在不同的集群结点;ResultTask负责输出整个Job的结果。
DAGScheduler的submitMissingTasks方法的部分关键代码如下。
//创建任务列表
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map {
id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map {
id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
......
}
if (tasks.nonEmpty) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
//调用taskScheduler中的submitTasks方法提交任务集合
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
stage.resourceProfileId))
} else {
.......
}
TaskSchedulerlmpl 在 submitTasks 中初始化一个 TaskSetManager, 并通过SchedulerBuilder 对其生命周期进行管理, 最后调用 SchedulerBackend 的 reviveOffers 方法进行 TaskSet 所需资源的分配。 在 TaskSet 得到足够的资源后, 在 SchedulerBackend 的 launchTasks 方法中将 TaskSet 中的 Task 一个一个地发送到 Executor去执行。submitTasks 源代码如下所示:
override def submitTasks(taskSet: TaskSet): Unit = {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks "
+ "resource profile " + taskSet.resourceProfileId)
this.synchronized {
//初始化创建一个 TaskSetManager
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
// 在此阶段,将所有现有TaskSetManager标记为僵尸,因为我们要添加一个新的。
//这对于处理corner的情况是必要的。假设一个阶段具有10个分区,并具有2个TaskSetManager:TSM1(僵尸)和TSM2(活动)。 TSM1有一个正在运行的分区10任务,它已完成。 TSM2完成了分区1-9的任务,并认为他仍处于活动状态,因为分区10尚未完成。 但是,DAGScheduler获取所有10个分区的任务完成事件,并认为阶段已完成。 如果是shuffle阶段,并且由于某种原因缺少map outputs,则DAGScheduler将重新提交它并为其创建TSM3。 由于一个阶段不能有多个活动任务集管理器,因此必须将TSM2标记为僵尸(实际上是)。
stageTaskSets.foreach {
case (_, ts) =>
ts.isZombie = true
}
stageTaskSets(taskSet.stageAttemptId) = manager
//schedulableBuilder将新建的TaskSetManager实例对象添加到关联的Pool中
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
//对于非本地部署模试,如果没有接收到Task,就周期性地警告或者取消Task
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run(): Unit = {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
//Spark Standalone部署模式调用StandaloneSchedulerBackend的reviveOffers方法进行TaskSet所需资源的分配,在得到足够的资源后,将TaskSet中的Task一个一个地发送到Executor去执行
backend.reviveOffers()
}
关于本片博客中涉及的StandaloneSchedulerBackend相关方法,将在下一篇博客中详细介绍。
如果喜欢的话希望点赞收藏,关注我,将不间断更新博客。
希望热爱技术的小伙伴私聊,一起学习进步
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/183598.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...