TaskScheduler详解及源码介绍

TaskScheduler详解及源码介绍1createTaskScheduler创建TaskScheduler的源代码为SparkContext.createTaskScheduler,如下所示。该方法会根据master的配置匹配部署模式,每种部署模式中都会创建两个类(TaskSchedulerImpl、SchedulerBackend)的实例,只是TaskSchedulerImpl都相同,SchedulerBackend不同。/…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

TaskScheduler负责提交任务,并且请求集群管理器调度任务。

  1. 提交任务
  2. 请求集群管理器调度任务
    Spark的集群管理器有三种:独立集群管理器、Hadoop Yarn、Apache Mesos。可以参考Spark集群管理器介绍-博客园了解一下。

1 创建TaskScheduler:createTaskScheduler

创建TaskScheduler的源代码为SparkContext.createTaskScheduler,如下所示。该方法会根据master的配置匹配部署模式,每种部署模式中都会创建两个类(TaskSchedulerImpl、SchedulerBackend)的实例,只是TaskSchedulerImpl都相同,SchedulerBackend不同。

/**
   * Create a task scheduler based on a given master URL.
   * Return a 2-tuple of the scheduler backend and the task scheduler.
   */
  private def createTaskScheduler(
      sc: SparkContext,
      master: String,
      deployMode: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_REGEX(threads) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        if (threadCount <= 0) {
          throw new SparkException(s"Asked to run locally with $threadCount threads")
        }
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*, M] means the number of cores on the computer with M failures
        // local[N, M] means exactly N threads with M failures
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)

      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
        val memoryPerSlaveInt = memoryPerSlave.toInt
        if (sc.executorMemory > memoryPerSlaveInt) {
          throw new SparkException(
            "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
              memoryPerSlaveInt, sc.executorMemory))
        }

        val scheduler = new TaskSchedulerImpl(sc)
        val localCluster = new LocalSparkCluster(
          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
        val masterUrls = localCluster.start()
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
          localCluster.stop()
        }
        (backend, scheduler)

      case masterUrl =>
        val cm = getClusterManager(masterUrl) match {
          case Some(clusterMgr) => clusterMgr
          case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
        }
        try {
          val scheduler = cm.createTaskScheduler(sc, masterUrl)
          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
          cm.initialize(scheduler, backend)
          (backend, scheduler)
        } catch {
          case se: SparkException => throw se
          case NonFatal(e) =>
            throw new SparkException("External scheduler cannot be instantiated", e)
        }
    }
  }

2 TaskScheduler的实现类:TaskSchedulerImpl

TaskSchedulerImpl的源代码如下:

private[spark] class TaskSchedulerImpl(
    val sc: SparkContext,
    val maxTaskFailures: Int,
    isLocal: Boolean = false)
  extends TaskScheduler with Logging {
  ...
  }

TaskSchedulerImpl的构造过程:

  1. 从SparkConf中读取配置信息,包括每个任务分配的CPU数、调度模式(调度模式又FAIR和FIFO两种,默认为FIFO,可以修改属性spark.scheduler.mode来改变)等。源代码为:
  val conf = sc.conf

  // How often to check for speculative tasks
  val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")

  // Duplicate copies of a task will only be launched if the original copy has been running for
  // at least this amount of time. This is to avoid the overhead of launching speculative copies
  // of tasks that are very short.
  val MIN_TIME_TO_SPECULATION = 100

  private val speculationScheduler =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")

  // Threshold above which we warn user initial TaskSet may be starved
  val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")

  // CPUs to request per task
  val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

  // default scheduler is FIFO
  private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)

private lazy val barrierSyncTimeout = conf.get(config.BARRIER_SYNC_TIMEOUT)
  1. 创建TaskResultGetter,它的作用是通过线程池对Worker上的Executor发送的Task的执行结果进行处理。
  //源码:TaskSchedulerImpl的部分源码
  // This is a var so that we can reset it for testing purposes.
  private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)

其中,该线程池由Executors.newFixedThreadPool创建,默认4个线程,线程名字以task-result-getter开头,线程工厂默认是Executors.defaultThreadFactory。根据下面两部分源码逐个解释:

  • 默认4个线程
    TaskResultGetter类中的THREADS常量,通过字符串”spark.resultGetter.threads”得到值4。后面将THREADS作为参数,传入进ThreadUtils.newDaemonFixedThreadPool方法,再传入进Executors.newFixedThreadPool方法,依次向下传入,最终传入进ThreadPoolExecutor的构造方法,作为参数corePoolSize,通过this.corePoolSize = corePoolSize;设置线程池的默认线程数为4。
  • 线程名字以task-result-getter开头
    在方法newDaemonFixedThreadPool方法的英文注释中说得很清楚。“Thread names are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.”即线程名的格式是prefix-ID,其中prefix即TaskResultGetter类中通过语句ThreadUtils.newDaemonFixedThreadPool(THREADS, "task-result-getter")传入的字符串”task-result-getter”,ID是惟一的、按顺序分配的整数。
  • 线程工厂默认是Executors.defaultThreadFactory
    说实话,没找到哎
    TaskResultGetter
    —>
    protected val getTaskResultExecutor: ExecutorService = ThreadUtils.newDaemonFixedThreadPool(THREADS, "task-result-getter")
    —>
    def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = { //使用到的线程工厂 val threadFactory = namedThreadFactory(prefix) ... }
    —>
    def namedThreadFactory(prefix: String): ThreadFactory = { new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build() }
    —>
    public ThreadFactory build() {return build(this);}
    —>
    private static ThreadFactory build(ThreadFactoryBuilder builder) { ... final ThreadFactory backingThreadFactory = builder.backingThreadFactory != null ? builder.backingThreadFactory : Executors.defaultThreadFactory(); Thread thread = backingThreadFactory.newThread(runnable); ... }
    —>
    //源码来自:Executors.java static class DefaultThreadFactory implements ThreadFactory {...}
    追根溯源到最后,最初的线程工厂实例即backingThreadFactory,它是类Executors.defaultThreadFactory的实例。
//源码:TaskResultGetter的部分源码
/**
  Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
 */
private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
  extends Logging {
//默认创建4个线程。
  private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)

  // Exposed for testing.
  protected val getTaskResultExecutor: ExecutorService =
    ThreadUtils.newDaemonFixedThreadPool(THREADS, "task-result-getter")
    ...
    }
//源码:newDaemonFixedThreadPool方法的源码,来自ThreadUtils.scala文件。
  /**
   * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
   * unique, sequentially assigned integer.
   */
  def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
    val threadFactory = namedThreadFactory(prefix)
    Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
  }
//源码:ThreadPoolExecutor
    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

Spark的任务调度模式有两种FAIR(公平调度)、FIFO(先入先出),任务的最终调度实际都是落实到接口SchedulerBackend的具体实现上。
在SchedulerBackEnd中到底是怎么调度的呢???,看过几篇文章太简略,笔者需要再学习一下,后面继续补充。关于此有一篇不错的文章,即Spark理论学习笔记(一)-一笑之奈何-博客园

3 TaskScheduler的启动

TaskScheduler的启动源码:

SparkContext.scala中启动TaskScheduler的代码:

    //源码来自SparkContext.scala
    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()

_taskScheduler是TaskSchedulerImpl的实例,TaskSchedulerImpl是TaskScheduler的实现类,并且TaskSchedulerImpl重写了TaskScheduler中的start()方法,所以_taskScheduler.start()实际启动了TaskSchedulerImpl中的start()方法。由下面的源码可知,TaskScheduler在启动的时候,实际调用了backend的start()方法。

//源码来自TaskSchedulerImpl.scala
  override def start() {
    backend.start()
  }

**那么backend启动后,又做了什么呢?**接着看第四章

4 TaskScheduler提交任务

在下面的源码中,backend.reviveOffers()这一句用于提交任务。调用taskScheduler的submitTasks方法,最终会转到调用backend的reviveOffers方法。所谓的taskScheduler调度任务,实际是由schedulerBackend调度任务。

  //源码来自:TaskSchedulerImpl.scala
  override def submitTasks(taskSet: TaskSet) {
    ...
  }

4 SchedulerBackend

/**
 * Used when running a local version of Spark where the executor, backend, and master all run in
 * the same JVM. It sits behind a [[TaskSchedulerImpl]] and handles launching tasks on a single
 * Executor (created by the [[LocalSchedulerBackend]]) running locally.
 */
private[spark] class LocalSchedulerBackend(
    conf: SparkConf,
    scheduler: TaskSchedulerImpl,
    val totalCores: Int)
  extends SchedulerBackend with ExecutorBackend with Logging {
  ...
	  override def start() {
	    val rpcEnv = SparkEnv.get.rpcEnv
	    val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
	    localEndpoint = rpcEnv.setupEndpoint("LocalSchedulerBackendEndpoint", executorEndpoint)
	    listenerBus.post(SparkListenerExecutorAdded(
	      System.currentTimeMillis,
	      executorEndpoint.localExecutorId,
	      new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty)))
	    launcherBackend.setAppId(appId)
	    launcherBackend.setState(SparkAppHandle.State.RUNNING)
	  }
  ...
  }
/**
 * A backend interface for scheduling systems that allows plugging in different ones under
 * TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as
 * machines become available and can launch tasks on them.
 */
private[spark] trait SchedulerBackend {
  private val appId = "spark-application-" + System.currentTimeMillis

  def start(): Unit
  def stop(): Unit
  def reviveOffers(): Unit
  def defaultParallelism(): Int
  ...
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/183089.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • delphi remobjects

    delphi remobjects概述:RemObjectsSDK是一个先进的远程框架,允许你通过网络远程地访问驻留在服务器上的对象。RemObjectsSDK允许你开发客户/服务应用程序,利用高优化性能的SmartServices或跨平台兼容性的标准WebServices,使客户端和服务器端轻松高效的通讯。高亮特征强大的远程框架为客户端和服务器间有效通讯提供广泛的选项,跨越各种质量的网络(从本地局域网…

    2022年10月31日
  • docker 访问宿主局域网_docker链接宿主数据库

    docker 访问宿主局域网_docker链接宿主数据库展开全部例如你的62616964757a686964616fe4b893e5b19e31333433626437docker环境的虚拟IP是192.168.99.100,那么宿主机同样会托管一个和192.168.99.100同网段的虚拟IP,并且会是主IP:192.168.99.1,那么就简单了,在容器中访问192.168.99.1这个地址就等于访问宿主机。注意,通过192.168.99.1访问宿…

  • MyBatis 所有的 jdbcType类型

    MyBatis 所有的 jdbcType类型MyBatis处理MySQL字段类型date与datetime1)DATETIME显示格式:yyyy-MM-ddHH:mm:ss时间范围:[‘1000-01-0100:00:00’到’9999-12-3123:59:59’]2)DATE显示格式:yyyy-MM-dd时间范围:[‘1000-01-01’到’9999-12-31’]3)TIMESTAMP显示格式:yyyy-MM-ddHH:mm:ss时间范围:[‘1970-01-0100:00:00’到’2037-12-

    2022年10月20日
  • SpringBoot上传文件实现

    SpringBoot上传文件实现前言上传文件需求也是日常开发必不可少的操作,今天就稍微总结下,一般如果是上传图片操作,很多稍微大点的公司都有专门的图片服务器可直接将图片上传至那边即可,如果没有图片服务器的话,那么此处把图片也一并归为文件进行讲解。本文代码以springBoot为准上传到哪?这个问题想必我们在实现需求时也必定会思考,那么如果能确定该项目是一个单服务器结构,那为了方便起见,可采用上传至本地服务器的项…

  • Matlab调用excel数据绘制折线图「建议收藏」

    Matlab调用excel数据绘制折线图「建议收藏」如题,matlab之前没接触过,但是电脑上一直有安装,有些老师需要做几张图放论文里,所以尝试了一下(excel其实效果也行,但matlab感觉更专业)x=2:2:778;%x轴上的数据,第一个值代表数据开始,第二个值代表间隔,第三个值代表终止a=xlsread(‘d:/RVA-zlx.xls’,1,’D2:D390′);%a数据y值b=xlsread(‘d:/RVA-zlx.xls…

  • SPSS中如何进行快速聚类分析「建议收藏」

    SPSS中如何进行快速聚类分析「建议收藏」作为广受数据分析师青睐的一款数据统计和分析软件,IBMSPSSStatistics中有全面的数据分析方法,今天我们要介绍的是它的聚类分析中的快速聚类分析。一、方法概述聚类分析是将研究对象按照一定的标准进行分类的方法,分类结果是每一组的对象都具有较高的相似度,组间的对象具有较大的差异。这类分析方法多用于对于数据样本没有特定的分类依据的情况,IBMSPSSStatistics会通过对数据的观察为用户做出较为完善的分类。图1:功能位置快速聚类是聚类分析的一种,使用到的功能在“分析”

    2022年10月17日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号