任务调度器有哪些_本地计算机上的task scheduler

任务调度器有哪些_本地计算机上的task schedulerTaskScheduler可以看做任务调度的客户端,负责任务的提交,并且请求集群管理器对任务调度。TaskScheduler的类UML图如下,针对不同部署方式会有不同的TaskScheduler与SchedulerBackend进行组合。TaskScheduler类负责任务调度资源的分配,SchedulerBackend负责与Driver、Executor通信收集Executor上分配给该应用的资…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

TaskScheduler可以看做任务调度的客户端,负责任务的提交,并且请求集群管理器对任务调度。TaskScheduler的类UML图如下,针对不同部署方式会有不同的TaskScheduler与SchedulerBackend进行组合。TaskScheduler类负责任务调度资源的分配,SchedulerBackend负责与Driver、Executor通信收集Executor上分配给该应用的资源使用情况。常见的任务调度模式有以下四种:

  • Local模式:TaskSchedulerImpl + LocalBackend
  • Standalone模式:TaskSchedulerImpl + StandaloneSchedulerBackend
  • Yarn-Cluster模式:YarnClusterScheduler + YarnClusterSchedulerBackend
  • Yarn-Client模式:YarnScheduler + YarnClientSchedulerBackend

TaskScheduler思维导图

下面以最常用的Yarn-Cluster模式为例,从以下四个步骤来分析源码实现方式:

  1. TaskScheduler的创建;
  2. Task的提交;

TaskScheduler的创建

TaskScheduler的创建

  1. TaskScheduler是在SparkContext中定义并启动的:

     // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
     // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
     // 需要在createTaskScheduler调用前注册HeartbeatReceiver,因为Executor在构造时就要检索HeartbeatReceiver消息
     _heartbeatReceiver = env.rpcEnv.setupEndpoint(
       HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
     // master是"spark.master"参数的值,deployMode是"spark.submit.deployMode"参数的值
     // Create and start the scheduler
     // 创建task scheduler,返回(backend, scheduler)的Tuple
     val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
     _schedulerBackend = sched
     _taskScheduler = ts
     // DAGScheduler中保存有taskScheduler的引用,同样构造DAGScheduler时也将自身引用设置到taskScheduler中
     _dagScheduler = new DAGScheduler(this)
     // 向HeartbeatReceiver发送一条SparkContext.taskScheduler已经创建好的消息
     _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    
     // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
     // constructor
     // 在DAGScheduler的构造器中将自身的引用设置到taskScheduler里之后,启动TaskScheduler,
     // 方法中同时也会调用backend.start方法启动backend
     _taskScheduler.start()
    
  2. TaskScheduler的构建

    createTaskScheduler方法会根据master参数匹配部署模式,创建TaskSchedulerImpl,并生成不同的SchedulerBackend(Yarn-Cluster模式:YarnClusterScheduler + YarnClusterSchedulerBackend)。

     master match {
       case masterUrl =>
         val cm = getClusterManager(masterUrl) match {
           case Some(clusterMgr) => clusterMgr
           case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
         }
         try {
     	  // 利用集群管理器创建TaskScheduler
           val scheduler = cm.createTaskScheduler(sc, masterUrl)
     	  // 利用集群管理器创建SchedulerBackend,并且将scheduler的引用传入
           val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
     	  // 内部调用TaskSchedulerImpl.initialize方法将backend引用设置到scheduler中,
     	  // 并根据schedulingMode创建调度管理器FIFOScheduler或者FairScheduler
           cm.initialize(scheduler, backend)
           (backend, scheduler)
         } catch {
           case se: SparkException => throw se
           case NonFatal(e) =>
             throw new SparkException("External scheduler cannot be instantiated", e)
         }
       ...
     }
    

    YarnClusterScheduler和YarnScheduler类构造过程为空,TaskScheduler的构造过程全部在TaskSchedulerImpl中。

       // Listener object to pass upcalls into
       // DAGScheduler的引用,在DAGSchedulers构造过程中会将自身引用设置到这里
       var dagScheduler: DAGScheduler = null
       // SchedulerBackend的引用,在initialize方法时会进行设置
       var backend: SchedulerBackend = null
       // 保留MapOutputTrackerMaster的引用,driver中存储map输出结果位置的结构
       val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
       // 调度器,在initialize方法中根据schedulingMode创建FIFO或者FAIR调度器,默认为FIFO
       private var schedulableBuilder: SchedulableBuilder = null
       // default scheduler is FIFO
       private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
       val schedulingMode: SchedulingMode =
         try {
           SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))
         } catch {
           case e: java.util.NoSuchElementException =>
             throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
         }
       // 表示资源池或者任务集管理器的可调度实体
       val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
     
       // This is a var so that we can reset it for testing purposes.
       // Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
       // 创建TaskResultGetter,利用线程池远程接收并反序列化Worker上的Executor发送的Task的执行结果
       // 线程池利用Executors.newFixedThreadPool创建的,默认4个线程,线程名字以task-result-getter开头,
       // 可通过spark.resultGetter.threads参数修改
       private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
    
  3. TaskScheduler的初始化

    创建完TaskScheduler和SchedulerBackend后,调用YarnClusterManager的initialize方法对其进行初始化,而其实际是调用TaskSchedulerImpl的initialize方法。以默认的FIFO调度为例,TaskSchedulerImpl的初始化过程如下:

  • 设置SchedulerBackend引用。

  • 根据schedulingMode配置来创建FIFOSchedulableBuilder或FairSchedulableBuilder,用来操作Pool中的调度队列。

  • 创建Pool,Pool中缓存了调度队列、调度算法及TaskSetManager集合等信息。

      	  def initialize(backend: SchedulerBackend) {
      	    this.backend = backend
      	    schedulableBuilder = {
      	      schedulingMode match {
      	        case SchedulingMode.FIFO =>
      	          new FIFOSchedulableBuilder(rootPool)
      	        case SchedulingMode.FAIR =>
      	          new FairSchedulableBuilder(rootPool, conf)
      	        case _ =>
      	          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
      	          s"$schedulingMode")
      	      }
      	    }
      	    schedulableBuilder.buildPools()
      	  }
    
  1. TaskScheduler的启动

    调用TaskScheduler#start方法来启动scheduler,实际调用TaskSchedulerImpl#start方法。

       override def start() {
     	// 启动SchedulerBackend,
         backend.start()
     	// 如果不是本地模式且任务并发执行开关打开,则启动一个指定延时后周期调度执行的线程来执行并发任务
     	// 后台启动一个线程检查符合speculation条件的task
         if (!isLocal && conf.getBoolean("spark.speculation", false)) {
           logInfo("Starting speculative execution thread")
           speculationScheduler.scheduleWithFixedDelay(new Runnable {
             override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
               checkSpeculatableTasks()
             }
           }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
         }
       }
    
  2. TaskScheduler与SchedulerBackend的相互引用,TaskScheduler与DAGScheduler相互引用,具体实现的过程如下:

任务调度器有哪些_本地计算机上的task scheduler

Task的提交

spark任务的操作分为两大类:transformation和action,而只有action操作才会触发真正的job执行。查看源码可以发现所有action操作实际是调用SparkContext.runJob来进行任务的提交,下面是以rdd的collect操作为例展示任务提交的整个调用过程:

job提交函数调用过程

DAGScheduler将Stage打包成TaskSet交给TaskScheduler,TaskScheduler会将其封装为TaskSetManager加入到调度队列中(TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务)。TaskScheduler初始化后会启动SchedulerBackend,它负责跟外界打交道,接收Executor的注册信息,并维护Executor的状态。SchedulerBackend在启动后会定期地询问TaskScheduler有没有任务要运行,TaskScheduler会从调度队列中按照指定的调度策略选择TaskSetManager去调度运行,Task提交流程如下图所示。

Task提交流程

  1. DAGScheduler将Stage打包成TaskSet交给TaskScheduler,TaskScheduler调用submitTasks方法进行任务调度。

     // TaskSchedulerImpl.scala
     override def submitTasks(taskSet: TaskSet) {
         val tasks = taskSet.tasks
     	// 同步代码块
         this.synchronized {
     	  // 创建TaskSet管理器,对同一个TaskSet中的任务进行调度,跟踪每个task的状态,
     	  // 如果失败则重试(最大重试次数maxTaskFailures可通过spark.task.maxFailures设置,默认为4)
     	  // 通过延迟调度的方式为该TaskSet处理位置感知的调度
           val manager = createTaskSetManager(taskSet, maxTaskFailures)
           val stage = taskSet.stageId
     	  // 获取stageId对应<stageAttemptId, TaskSetManager>
           val stageTaskSets =
             taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
     	  // 更新stageAttemptId对应的TaskSetManager关系
           stageTaskSets(taskSet.stageAttemptId) = manager
     	  // 检测冲突,如果同一stageAttemptId对应多个taskSet且当前这个TaskSetManager是僵尸进程,则抛出异常
           val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
             ts.taskSet != taskSet && !ts.isZombie
           }
           if (conflictingTaskSet) {
             throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
               s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
           }
     	  // 将当前TaskSetManager提交到调度器的调度池Pool中
           schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
     
           if (!isLocal && !hasReceivedTask) {
             starvationTimer.scheduleAtFixedRate(new TimerTask() {
               override def run() {
                 if (!hasLaunchedTask) {
                   logWarning("Initial job has not accepted any resources; " +
                     "check your cluster UI to ensure that workers are registered " +
                     "and have sufficient resources")
                 } else {
                   this.cancel()
                 }
               }
             }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
           }
           hasReceivedTask = true
         }
     	// 向SchedulerBackend申请资源
         backend.reviveOffers()
     }
    
  2. 通过RpcEnv发送一个请求资源的消息后,CoarseGrainedSchedulerBackend的receive方法则会接收分配到的资源。在该方法中,由于接收到的是ReviveOffers,会调用makeOffers方法开始分配资源。

     override def receive: PartialFunction[Any, Unit] = {
       case ReviveOffers =>
     	// 分配资源
         makeOffers()
    
       ...
     }
    
  3. 为所有executors提供资源分配:CoarseGrainedSchedulerBackend#makeOffers。

     private def makeOffers() {
       // Make sure no executor is killed while some task is launching on it
       // 使用同步块,保证在executor上分配任务时executor不会被kill掉
       val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
         // Filter out executors under killing
     	// 过滤掉正在被kill的executor,得到所有可用的executors
         val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
         val workOffers = activeExecutors.map { case (id, executorData) =>
           new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
         }.toIndexedSeq
     	// 根据优先级,为task分配Executor上的资源
         scheduler.resourceOffers(workOffers)
       }
       if (!taskDescs.isEmpty) {
     	// 将分配的任务发送到相应的Executor上去执行
         launchTasks(taskDescs)
       }
     }	
    
  4. 根据优先级,为task分配Executor上的资源:TaskSchedulerImpl#resourceOffers。

       /**
        * Called by cluster manager to offer resources on slaves. We respond by asking our active task
        * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
        * that tasks are balanced across the cluster.
        * 由cluster manager来调用,为task分配slave节点上的资源
        * 根据优先级为task分配资源
        * 采用round-robin方式使task均匀分布到集群的各个节点上
        */
       def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
         // Mark each slave as alive and remember its hostname
         // Also track if new executor is added
     	// 标记每个salve为活跃,并保存它的hostname
     	// 如果有新executor加入同样也进行跟踪标记
         var newExecAvail = false
         for (o <- offers) {
           if (!hostToExecutors.contains(o.host)) {
             hostToExecutors(o.host) = new HashSet[String]()
           }
           if (!executorIdToRunningTaskIds.contains(o.executorId)) {
             hostToExecutors(o.host) += o.executorId
             executorAdded(o.executorId, o.host)
             executorIdToHost(o.executorId) = o.host
             executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
             newExecAvail = true
           }
           for (rack <- getRackForHost(o.host)) {
             hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
           }
         }
     
         // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
         // this here to avoid a separate thread and added synchronization overhead, and also because
         // updating the blacklist is only relevant when task offers are being made.
     	// 在做任何分配之前,请从已过期的黑名单中删除黑名单的任何节点
     	// 此操作是为了避免单独的线程和增加的同步开销,还因为只有在提出任务时更新黑名单才有意义
         blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
     
         val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
           offers.filter { offer =>
             !blacklistTracker.isNodeBlacklisted(offer.host) &&
               !blacklistTracker.isExecutorBlacklisted(offer.executorId)
           }
         }.getOrElse(offers)
     	// 为避免多个Task集中分配到某些机器上,对这些Task进行随机打散
         val shuffledOffers = shuffleOffers(filteredOffers)
         // Build a list of tasks to assign to each worker.
     	// 构建要分配给每个Worker的Task列表
         val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
         val availableCpus = shuffledOffers.map(o => o.cores).toArray
     	// 从调度池中获取排好序的TaskSetManager,由调度池确定TaskSet的执行优先级顺序
         val sortedTaskSets = rootPool.getSortedTaskSetQueue
         for (taskSet <- sortedTaskSets) {
           logDebug("parentName: %s, name: %s, runningTasks: %s".format(
             taskSet.parent.name, taskSet.name, taskSet.runningTasks))
     	  // 如果该executor是新分配来的,则重新计算TaskSetManager的就近原则
           if (newExecAvail) {
             taskSet.executorAdded()
           }
         }
     
         // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
         // of locality levels so that it gets a chance to launch local tasks on all of them.
         // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
     	// 以我们的调度顺序执行每个TaskSet,然后按照升序的本地性级别为每个节点分配资源,
     	// 以便有机会在所有节点上启动本地任务
     	// 本地性优先级顺序:PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
         for (taskSet <- sortedTaskSets) {
           var launchedAnyTask = false
           var launchedTaskAtCurrentMaxLocality = false
           for (currentMaxLocality <- taskSet.myLocalityLevels) {
             do {
     		  // 在分配的executor资源上,执行TaskSet中包含的所有task
               launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
                 taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
               launchedAnyTask |= launchedTaskAtCurrentMaxLocality
             } while (launchedTaskAtCurrentMaxLocality)
           }
           if (!launchedAnyTask) {
             taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
           }
         }
     
         if (tasks.size > 0) {
           hasLaunchedTask = true
         }
         return tasks
       }
    

参考文章

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/183135.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 【20】进大厂必须掌握的面试题-50个Hadoop面试

    【20】进大厂必须掌握的面试题-50个Hadoop面试

    2020年11月13日
  • idm和百度云怎么组合 idm下载百度云大文件教程

    idm和百度云怎么组合 idm下载百度云大文件教程如今百度云限速想必是很多人都难以接受的,下载的速度慢如狗。那么如何配合上述的软件达到超速的下载的呢?工具/原料https://pan.baidu.com/s/1LYsnJDJGhz0zS4eTCJWFvQ方法/步骤把我们给出的链接地址软件下载,下载后解压此文件,得到一个crx文件,把它拖入浏览器上进行安装。非ie浏览器才可以使用的,电脑浏览器用不了。此时打开浏览器,搜索图二的脚本网页。进入网页后点…

  • 网络协议 概念

    网络协议 概念

    2021年10月10日
  • 接口400错误解析

    接口400错误解析   今天我遇上一个让我很痛心的错误400。对程序员来说,这可能是一个最简单的错误码。因为这个相应并没有进拦截器,更没有进到Controller层。可我在解决这个问题时,质询了很多朋友不解,百度各种方案无效。苦苦耗费我大半天时间。        首先,遇到400问题,最大几率是出现了数据类型不一致的问题,简单来说是Controller层不用正确读取你发送请求附带的参数。该例是我前端传送JSON格…

  • Java之StringUtils的常用方法

    Java之StringUtils的常用方法StringUtils方法的操作对象是 Java.lang.String类型的对象,是JDK提供的String类型操作方法的补充,并且是null安全的(即如果输入参数String为null则不会抛出NullPointerException,而是做了相应处理,例如,如果输入为null则返回也是null等,具体可以查看源代码)。除了构造器,StringUtils…

  • JavaScript高级程序设计学习总结一

    JavaScript高级程序设计学习总结一一,JavaScript的实现JavaScript和ECMAScript通常都被人认为是相同的含义,JavaScript的含义比ECMAScript规定要多的多。一个完整的的JavaScript实现应该由三个不同的部分组成。核心(ECMAScript的) 文档对象模型(DOM) 浏览器对象模型(BOM)二,ECMAScript中是由ECMA-262定义的,ECMASc…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号