一文搞懂Spark的Task调度器(TaskScheduler)[通俗易懂]

一文搞懂Spark的Task调度器(TaskScheduler)[通俗易懂]TaskScheduler的核心任务是提交TaskSet到集群运算并汇报结果。为TaskSet创建和维护一个TaskSetManager,并追踪任务的本地性及错误信息。遇到Straggle任务会放到其他结点进行重试。向DAGScheduler汇报执行情况,包括在Shuffle输出丢失时报告fetchfailed错误等信息。TaskScheduler底层调度器1.TaskScheduler原理剖析2.TaskScheduler源代码解析2.1TaskScheduler实例化源代码.

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

TaskScheduler的核心任务是提交TaskSet到集群运算并汇报结果。

  1. 为TaskSet创建和维护一个TaskSetManager, 并追踪任务的本地性及错误信息。
  2. 遇到Straggle任务会放到其他结点进行重试。
  3. 向DAGScheduler汇报执行情况, 包括在Shuffle输出丢失时报告fetch failed错误等信息。

1. TaskScheduler原理剖析

通过之前 DAGScheduler的介绍可以 知道, DAGScheduler 将划分的一系列 Stage (每个Stage封装一个TaskSet) , 按照Stage的先后顺序依次提交给底层的TaskScheduler去执行。 下面来分析TaskScheduler接收到DAGScheduler的Stage任务 后, 是如何管理Stage (TaskSet) 的生命周期的。

TaskSchedulerlmpl在createTaskScheduler方法中实例化后, 就立即调用自己的initialize 方法把StandaloneSchedulerBackend的实例对象传进来 , 从而赋值给TaskSchedulerlmpl的backend。 在TaskSchedulerlmpl的血tialize方法中, 根据调度模式的配置创建 实现了 Schedul­erBuilder接口的相应实例对象, 并且创建的对象会立即调用buildPools创建 相应数量的Pool 存放和管理TaskSetManager的实例对象。 实现SchedulerBuilder接口的具体类都是Scheduler­Builder的内部类。

(1)FIFOSchedulableBuilder: 调度模式是SchedulingMode.FIFO , 使用先进先出策略调度。这是默认模式,在该模式下,只有一个TaskSetManager池。
(2)FairSchedulableBuilder: 调度模式是SchedulingMode.FAIR, 使用公平策略调度。

在createTaskSched uler方法返回后,TaskSchedulerlmpl通过DAGScheduler的实例化过程设置 DAGScheduler的实例对象, 然后调用自己的start方法。 在 TaskSchedulerlmpl 调用start 方法时, 会调用StandaloneSchedulerBackend的start方法 , 在StandaloneSchedulerBackend的start方法中会最终注册应用程序AppClient。 TaskSchedulerlmpl的start方法中还会根据配置判断是否周期性地检查任务的推测执行。
TaskSchedulerlmpl启动后, 就可以接收 DAGScheduler的submi tMissingTasks方法提交过来的TaskSet 进行进一步处理。 TaskSchedulerlmpl在submitTasks 中初始化 一个TaskSetManag­er 对其生命周期 进行管理, 当TaskSchedulerlmpl得到Worker结点上的Executor 计算资源时, 会通过TaskSetManager来发送具体的Task到Executor上执行计算。
如果Task执行过程中有错误导致失败 , 会调用TaskSetManager来处理Task失败的情况,进而通知DAGScheduler结束当前的Task。 TaskSetManager 会将失败的Task再次添加到待执行的Task队列中。

Spark Task允许失败的次数默认是4次,在TaskSchedulerlmpl初始化时通过spark. task. maxFailures 设置该默认值。

如果Task执行完毕,执行的结果会反馈给TaskSetManager,由TaskSetManager通知DAGScheduler。DAGScheduler根据是否还存在待执行的Stage,继续迭代提交对应的TaskSet给TaskScheduler去执行,或者输出Job的结果。

2. TaskScheduler源代码解析

下面通过源代码解析来看一下 TaskScheduler 是如何调度和管理 TaskSet 的任务。

2.1 TaskScheduler 实例化源代码

TaskScheduler 和 DAGScheduler 都在 SparkContext 实例化的时候一同实例化。 Spark Context 源代码中与 TaskScheduler 实例化相关的代码如下。

  private var _taskScheduler: TaskScheduler = _//任务调度器
  .......
    private[spark] def taskScheduler: TaskScheduler = _taskScheduler
  private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = { 
   
    _taskScheduler = ts
  }
  .......
  // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    .......
     //启动任务调度器
    _taskScheduler.start()

本博客仅介绍Spark的Standalone部署模式,Spark Context的createTaskScheduler方法中与Standalone部署模式相关的代码如下。

private def createTaskScheduler(
      sc: SparkContext,
      master: String,
      deployMode: String): (SchedulerBackend, TaskScheduler) = { 
   
    import SparkMasterRegex._

    //在本地运行时,请勿尝试在失败时重新执行任务
    val MAX_LOCAL_TASK_FAILURES = 1
    /** * 确保默认执行者的资源满足一项或多项任务要求。 * 此功能适用于未设置执行程序核心配置的集群管理器,其他功能则在ResourceProfile中检查。*/
    def checkResourcesPerTask(executorCores: Int): Unit = { 
   
      val taskCores = sc.conf.get(CPUS_PER_TASK)//配置中每个任务分配的核数
      /** * SKIP_VALIDATE_CORES_TESTING: * 此配置用于单元测试,以允许跳过任务cpus到内核验证,以允许在本地模式下运行时模拟独立模式的行为。 * 默认情况下,独立模式不指定执行者内核的数量,它仅使用主机上可用的所有内核。 * */
      if (!sc.conf.get(SKIP_VALIDATE_CORES_TESTING)) { 
   
        //检查每个executor的核数至少满足一个任务的需求
        validateTaskCpusLargeEnough(sc.conf, executorCores, taskCores)
      }
      val defaultProf = sc.resourceProfileManager.defaultResourceProfile
      ResourceUtils.warnOnWastedResources(defaultProf, sc.conf, Some(executorCores))
    }
//根据不同的运行模式,进行不同的初始化
    master match { 
   
	........
	//Spark Standalone部署模式下TaskScheduler和SchedulerBackend分别由各自对应的实现类TaskSchedulerImpl和StandaloneSchedulerBackend,来实例化对象
      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)
    ............
        } catch { 
   
          case se: SparkException => throw se
          case NonFatal(e) =>
            throw new SparkException("External scheduler cannot be instantiated", e)
        }
    }
  }

2.2 TashScheduler初始化源代码

TaskScheduler在创建之后会调用其初始化方法进行初始化。TaskScheduler (实际上在实现类TaskSchedulerlmpl的initialize方法中)在初始化的过程中设置对SchedulerBackend 对象的引用, 实例化SchedulerBuilder具体实现类的对象用来创建和管理TaskSetManager池。
TaskSchedulerlmpl源代码中的相关代码如下:

....
// default scheduler is FIFO
  private val schedulingModeConf = conf.get(SCHEDULER_MODE)
  val schedulingMode: SchedulingMode =
    try { 
   
      SchedulingMode.withName(schedulingModeConf)
    } catch { 
   
      case e: java.util.NoSuchElementException =>
        throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
    }
//root的名字暂时设置为空值
  val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
  .....
  def initialize(backend: SchedulerBackend): Unit = { 
   
  //设置对 SchepulerBackend 对象的引用
    this.backend = backend
    schedulableBuilder = { 
   
      schedulingMode match { 
   
      //调度模式是 FIFO
        case SchedulingMode.FIFO =>
        //FairSchedulableBuilder的rootPool里面直接添加TaskManager
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
        //FaiiSchedulahleBuilder的rootPool根据配置文件可以挂若干个子pool, 每个 pool里面都添加TaskManager
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      }
    }
    //构建TaskManager池
    schedulableBuilder.buildPools()
  }

2.3 TaskScheduler启动源代码

TaskScheduler 实例对象在 DAGScheduler 实例化之后启动, 并且 TaskScheduler 启动的过程由 TaskSchedulerlmpl 具体实现。 在启动过程中, 主要是调用 SchedulerBackend 的启动方法, 然后对不是本地部署模式并且开启任务的推测执行(设置 spark. speculation 为 true)情况, 根据配置判断是否周期性地调用 TaskSetManager 的 checkSpeculatableTasks 方法检查任务 的推测执行。 StandaloneSchedulerBackend的 start 方法中会最终注册应用程序 AppClient。
TaskScheduler 的启动源代码如下所示:

 override def start(): Unit = { 
   
 //调用SchedulerBackend的start方法启动
    backend.start()
//不是本地模式,并且开启了推测执行
    if (!isLocal && conf.get(SPECULATION_ENABLED)) { 
   
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleWithFixedDelay(
        () => Utils.tryOrStopSparkContext(sc) { 
    
       // 最终会调用调度池中的TaskSetManager 中的checkSpeculatableTasks来检查推测执行的任务
        checkSpeculatableTasks() },
        SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }

2.4 TaskScheduler提交任务源代码

TaskSchedulerlmpl启动后, 就可以接收DAGScheduler的submitMissingTasks方法提交过来的TaskSet进行进一步处理了。 对千ShuffleMapStage类型的Stage, DAGScheduler初始化一 组ShuffleMapTask实例对象;对于ResultStage类型的Stage, DAGScheduler初始化一组Re­sultTask实例对象。 最后, DAGScheduler将这组ResultTask实例对象封装成TaskSet实例对象 提交给TaskSchedulerlmpl。

注意,ShuffleMapTask是根据Stage所依赖的RDD的partition分布产生跟partition数量相等的Task, 这些Task根据partition的本地性分布在不同的集群结点;ResultTask负责输出整个Job的结果。

DAGScheduler的submitMissingTasks方法的部分关键代码如下。

//创建任务列表
val tasks: Seq[Task[_]] = try { 

val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match { 

case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { 
 id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { 
 id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch { 

......
}
if (tasks.nonEmpty) { 

logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
//调用taskScheduler中的submitTasks方法提交任务集合
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
stage.resourceProfileId))
} else { 

.......
}

TaskSchedulerlmpl 在 submitTasks 中初始化一个 TaskSetManager, 并通过SchedulerBuilder 对其生命周期进行管理, 最后调用 SchedulerBackend 的 reviveOffers 方法进行 TaskSet 所需资源的分配。 在 TaskSet 得到足够的资源后, 在 SchedulerBackend 的 launchTasks 方法中将 Task­Set 中的 Task 一个一个地发送到 Executor去执行。submitTasks 源代码如下所示:

 override def submitTasks(taskSet: TaskSet): Unit = { 

val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks "
+ "resource profile " + taskSet.resourceProfileId)
this.synchronized { 

//初始化创建一个 TaskSetManager
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
// 在此阶段,将所有现有TaskSetManager标记为僵尸,因为我们要添加一个新的。
//这对于处理corner的情况是必要的。假设一个阶段具有10个分区,并具有2个TaskSetManager:TSM1(僵尸)和TSM2(活动)。 TSM1有一个正在运行的分区10任务,它已完成。 TSM2完成了分区1-9的任务,并认为他仍处于活动状态,因为分区10尚未完成。 但是,DAGScheduler获取所有10个分区的任务完成事件,并认为阶段已完成。 如果是shuffle阶段,并且由于某种原因缺少map outputs,则DAGScheduler将重新提交它并为其创建TSM3。 由于一个阶段不能有多个活动任务集管理器,因此必须将TSM2标记为僵尸(实际上是)。
stageTaskSets.foreach { 
 case (_, ts) =>
ts.isZombie = true
}
stageTaskSets(taskSet.stageAttemptId) = manager
//schedulableBuilder将新建的TaskSetManager实例对象添加到关联的Pool中
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
//对于非本地部署模试,如果没有接收到Task,就周期性地警告或者取消Task
if (!isLocal && !hasReceivedTask) { 

starvationTimer.scheduleAtFixedRate(new TimerTask() { 

override def run(): Unit = { 

if (!hasLaunchedTask) { 

logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else { 

this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
//Spark Standalone部署模式调用StandaloneSchedulerBackend的reviveOffers方法进行TaskSet所需资源的分配,在得到足够的资源后,将TaskSet中的Task一个一个地发送到Executor去执行
backend.reviveOffers()
}

关于本片博客中涉及的StandaloneSchedulerBackend相关方法,将在下一篇博客中详细介绍。

如果喜欢的话希望点赞收藏,关注我,将不间断更新博客。

希望热爱技术的小伙伴私聊,一起学习进步

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/183598.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号