Spark源码系列(七)Spark on yarn具体实现

Spark源码系列(七)Spark on yarn具体实现

本来不打算写的了,但是真的是闲来无事,整天看美剧也没啥意思。这一章打算讲一下Spark on yarn的实现,1.0.0里面已经是一个stable的版本了,可是1.0.1也出来了,离1.0.0发布才一个月的时间,更新太快了,节奏跟不上啊,这里仍旧是讲1.0.0的代码,所以各位朋友也不要再问我讲的是哪个版本,目前为止发布的文章都是基于1.0.0的代码。

在第一章《spark-submit提交作业过程》的时候,我们讲过Spark on yarn的在cluster模式下它的main class是org.apache.spark.deploy.yarn.Client。okay,这个就是我们的头号目标。

提交作业

找到main函数,里面调用了run方法,我们直接看run方法。

    val appId = runApp()
    monitorApplication(appId)
    System.exit(0)

运行App,跟踪App,最后退出。我们先看runApp吧。

Spark源码系列(七)Spark on yarn具体实现
Spark源码系列(七)Spark on yarn具体实现

  def runApp(): ApplicationId = {
    // 校验参数,内存不能小于384Mb,Executor的数量不能少于1个。
    validateArgs()
    // 这两个是父类的方法,初始化并且启动Client
    init(yarnConf)
    start()

    // 记录集群的信息(e.g, NodeManagers的数量,队列的信息).
    logClusterResourceDetails()

    // 准备提交请求到ResourcManager (specifically its ApplicationsManager (ASM)// Get a new client application.
    val newApp = super.createApplication()
    val newAppResponse = newApp.getNewApplicationResponse()
    val appId = newAppResponse.getApplicationId()
    // 检查集群的内存是否满足当前的作业需求
    verifyClusterResources(newAppResponse)

    // 准备资源和环境变量.
    //1.获得工作目录的具体地址: /.sparkStaging/appId/
    val appStagingDir = getAppStagingDir(appId)
  //2.创建工作目录,设置工作目录权限,上传运行时所需要的jar包
    val localResources = prepareLocalResources(appStagingDir)
    //3.设置运行时需要的环境变量
    val launchEnv = setupLaunchEnv(localResources, appStagingDir)
  //4.设置运行时JVM参数,设置SPARK_USE_CONC_INCR_GC为true的话,就使用CMS的垃圾回收机制
    val amContainer = createContainerLaunchContext(newAppResponse, localResources, launchEnv)

    // 设置application submission context. 
    val appContext = newApp.getApplicationSubmissionContext()
    appContext.setApplicationName(args.appName)
    appContext.setQueue(args.amQueue)
    appContext.setAMContainerSpec(amContainer)
    appContext.setApplicationType("SPARK")

    // 设置ApplicationMaster的内存,Resource是表示资源的类,目前有CPU和内存两种.
    val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
    memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
    appContext.setResource(memoryResource)

    // 提交Application.
    submitApp(appContext)
    appId
  }

View Code

monitorApplication就不说了,不停的调用getApplicationReport方法获得最新的Report,然后调用getYarnApplicationState获取当前状态,如果状态为FINISHED、FAILED、KILLED就退出。

说到这里,顺便把跟yarn相关的参数也贴出来一下,大家一看就清楚了。

Spark源码系列(七)Spark on yarn具体实现
Spark源码系列(七)Spark on yarn具体实现

 while (!args.isEmpty) { args match { case ("--jar") :: value :: tail => userJar = value args = tail case ("--class") :: value :: tail => userClass = value args = tail case ("--args" | "--arg") :: value :: tail => if (args(0) == "--args") { println("--args is deprecated. Use --arg instead.") } userArgsBuffer += value args = tail case ("--master-class" | "--am-class") :: value :: tail => if (args(0) == "--master-class") { println("--master-class is deprecated. Use --am-class instead.") } amClass = value args = tail case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail => if (args(0) == "--master-memory") { println("--master-memory is deprecated. Use --driver-memory instead.") } amMemory = value args = tail case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => if (args(0) == "--num-workers") { println("--num-workers is deprecated. Use --num-executors instead.") } numExecutors = value args = tail case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => if (args(0) == "--worker-memory") { println("--worker-memory is deprecated. Use --executor-memory instead.") } executorMemory = value args = tail case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => if (args(0) == "--worker-cores") { println("--worker-cores is deprecated. Use --executor-cores instead.") } executorCores = value args = tail case ("--queue") :: value :: tail => amQueue = value args = tail case ("--name") :: value :: tail => appName = value args = tail case ("--addJars") :: value :: tail => addJars = value args = tail case ("--files") :: value :: tail => files = value args = tail case ("--archives") :: value :: tail => archives = value args = tail case Nil => if (userClass == null) { printUsageAndExit(1) } case _ => printUsageAndExit(1, args) } }

View Code

ApplicationMaster

直接看run方法就可以了,main函数就干了那么一件事…

Spark源码系列(七)Spark on yarn具体实现
Spark源码系列(七)Spark on yarn具体实现

 def run() { // 设置本地目录,默认是先使用yarn的YARN_LOCAL_DIRS目录,再到LOCAL_DIRS System.setProperty("spark.local.dir", getLocalDirs()) // set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box System.setProperty("spark.ui.port", "0") // when running the AM, the Spark master is always "yarn-cluster" System.setProperty("spark.master", "yarn-cluster")   // 设置优先级为30,和mapreduce的优先级一样。它比HDFS的优先级高,因为它的操作是清理该作业在hdfs上面的Staging目录 ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) appAttemptId = getApplicationAttemptId()   // 通过yarn.resourcemanager.am.max-attempts来设置,默认是2   // 目前发现它只在清理Staging目录的时候用 isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts amClient = AMRMClient.createAMRMClient() amClient.init(yarnConf) amClient.start() // setup AmIpFilter for the SparkUI - do this before we start the UI   // 方法的介绍说是yarn用来保护ui界面的,我感觉是设置ip代理的  addAmIpFilter()   // 注册ApplicationMaster到内部的列表里 ApplicationMaster.register(this) // 安全认证相关的东西,默认是不开启的,省得给自己找事 val securityMgr = new SecurityManager(sparkConf) // 启动driver程序  userThread = startUserClass() // 等待SparkContext被实例化,主要是等待spark.driver.port property被使用   // 等待结束之后,实例化一个YarnAllocationHandler  waitForSparkContextInitialized() // Do this after Spark master is up and SparkContext is created so that we can register UI Url.   // 向yarn注册当前的ApplicationMaster, 这个时候isFinished不能为true,是true就说明程序失败了  synchronized { if (!isFinished) { registerApplicationMaster() registered = true } } // 申请Container来启动Executor  allocateExecutors() // 等待程序运行结束  userThread.join() System.exit(0) }

View Code

run方法里面主要干了5项工作:

1、初始化工作

2、启动driver程序

3、注册ApplicationMaster

4、分配Executors

5、等待程序运行结束

我们重点看分配Executor方法。 

Spark源码系列(七)Spark on yarn具体实现
Spark源码系列(七)Spark on yarn具体实现

 private def allocateExecutors() { try { logInfo("Allocating " + args.numExecutors + " executors.") // 分host、rack、任意机器三种类型向ResourceManager提交ContainerRequest     // 请求的Container数量可能大于需要的数量  yarnAllocator.addResourceRequests(args.numExecutors) // Exits the loop if the user thread exits. while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, "max number of executor failures reached") }      // 把请求回来的资源进行分配,并释放掉多余的资源  yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) } } finally { // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.  ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } logInfo("All executors have launched.") // 启动一个线程来状态报告 if (userThread.isAlive) { // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) launchReporterThread(interval) } }

View Code

这里面我们只需要看addResourceRequests和allocateResources方法即可。

先说addResourceRequests方法,代码就不贴了。

Client向ResourceManager提交Container的请求,分三种类型:优先选择机器、同一个rack的机器、任意机器。

优先选择机器是在RDD里面的getPreferredLocations获得的机器位置,如果没有优先选择机器,也就没有同一个rack之说了,可以是任意机器。

下面我们接着看allocateResources方法。

Spark源码系列(七)Spark on yarn具体实现
Spark源码系列(七)Spark on yarn具体实现

 def allocateResources() { // We have already set the container request. Poll the ResourceManager for a response. // This doubles as a heartbeat if there are no pending container requests.   // 之前已经提交过Container请求了,现在只需要获取response即可  val progressIndicator = 0.1f val allocateResponse = amClient.allocate(progressIndicator) val allocatedContainers = allocateResponse.getAllocatedContainers() if (allocatedContainers.size > 0) { var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size) if (numPendingAllocateNow < 0) { numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow) } val hostToContainers = new HashMap[String, ArrayBuffer[Container]]() for (container <- allocatedContainers) {      // 内存 > Executor所需内存 + 384 if (isResourceConstraintSatisfied(container)) { // 把container收入名册当中,等待发落 val host = container.getNodeId.getHost val containersForHost = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]()) containersForHost += container } else { // 内存不够,释放掉它  releaseContainer(container) } } // 找到合适的container来使用. val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]() val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]() val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()     // 遍历所有的host for (candidateHost <- hostToContainers.keySet) { val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0) val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost) val remainingContainersOpt = hostToContainers.get(candidateHost) var remainingContainers = remainingContainersOpt.get        if (requiredHostCount >= remainingContainers.size) { // 需要的比现有的多,把符合数据本地性的添加到dataLocalContainers映射关系里  dataLocalContainers.put(candidateHost, remainingContainers) // 没有containner剩下的. remainingContainers = null } else if (requiredHostCount > 0) { // 获得的container比所需要的多,把多余的释放掉 val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount) dataLocalContainers.put(candidateHost, dataLocal) for (container <- remaining) releaseContainer(container) remainingContainers = null } // 数据所在机器已经分配满任务了,只能在同一个rack里面挑选了 if (remainingContainers != null) { val rack = YarnAllocationHandler.lookupRack(conf, candidateHost) if (rack != null) { val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0) val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - rackLocalContainers.getOrElse(rack, List()).size if (requiredRackCount >= remainingContainers.size) { // Add all remaining containers to to `dataLocalContainers`.  dataLocalContainers.put(rack, remainingContainers) remainingContainers = null } else if (requiredRackCount > 0) { // Container list has more containers that we need for data locality. val (rackLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredRackCount) val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, new ArrayBuffer[Container]()) existingRackLocal ++= rackLocal remainingContainers = remaining } } } if (remainingContainers != null) { // 还是不够,只能放到别的rack的机器上运行了  offRackContainers.put(candidateHost, remainingContainers) } } // 按照数据所在机器、同一个rack、任意机器来排序 val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size) allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers) allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers) allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers) // 遍历选择了的Container,为每个Container启动一个ExecutorRunnable线程专门负责给它发送命令 for (container <- allocatedContainersToProcess) { val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet() val executorHostname = container.getNodeId.getHost val containerId = container.getId      // 内存需要大于Executor的内存 + 384 val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) if (numExecutorsRunningNow > maxExecutors) { // 正在运行的比需要的多了,释放掉多余的Container  releaseContainer(container) numExecutorsRunning.decrementAndGet() } else { val executorId = executorIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) // To be safe, remove the container from `pendingReleaseContainers`.  pendingReleaseContainers.remove(containerId) // 把container记录到已分配的rack的映射关系当中 val rack = YarnAllocationHandler.lookupRack(conf, executorHostname) allocatedHostToContainersMap.synchronized { val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]()) containerSet += containerId allocatedContainerToHostMap.put(containerId, executorHostname) if (rack != null) { allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1) } }       // 启动一个线程给它进行跟踪服务,给它发送运行Executor的命令 val executorRunnable = new ExecutorRunnable( container, conf, sparkConf, driverUrl, executorId, executorHostname, executorMemory, executorCores) new Thread(executorRunnable).start() } } }

View Code

1、把从ResourceManager中获得的Container进行选择,选择顺序是按照前面的介绍的三种类别依次进行,优先选择机器 > 同一个rack的机器 > 任意机器。

2、选择了Container之后,给每一个Container都启动一个ExecutorRunner一对一贴身服务,给它发送运行CoarseGrainedExecutorBackend的命令。

3、ExecutorRunner通过NMClient来向NodeManager发送请求。

 

总结:

把作业发布到yarn上面去执行这块涉及到的类不多,主要是涉及到Client、ApplicationMaster、YarnAllocationHandler、ExecutorRunner这四个类。

1、Client作为Yarn的客户端,负责向Yarn发送启动ApplicationMaster的命令。

2、ApplicationMaster就像项目经理一样负责整个项目所需要的工作,包括请求资源,分配资源,启动Driver和Executor,Executor启动失败的错误处理。

3、ApplicationMaster的请求、分配资源是通过YarnAllocationHandler来进行的。

4、Container选择的顺序是:优先选择机器 > 同一个rack的机器 > 任意机器。

5、ExecutorRunner只负责向Container发送启动CoarseGrainedExecutorBackend的命令。

6、Executor的错误处理是在ApplicationMaster的launchReporterThread方法里面,它启动的线程除了报告运行状态,还会监控Executor的运行,一旦发现有丢失的Executor就重新请求。

7、在yarn目录下看到的名称里面带有YarnClient的是属于yarn-client模式的类,实现和前面的也差不多。

其它的内容更多是Yarn的客户端api使用,我也不太会,只是看到了能懂个意思,哈哈。

 

 

岑玉海

转载请注明出处,谢谢!

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/109775.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 一个经典例子让你彻彻底底理解java回调机制是什么_java实现回调函数

    一个经典例子让你彻彻底底理解java回调机制是什么_java实现回调函数以前不理解什么叫回调,天天听人家说加一个回调方法啥的,心里想我草,什么叫回调方法啊?然后自己就在网上找啊找啊找,找了很多也不是很明白,现在知道了,所谓回调:就是A类中调用B类中的某个方法C,然后B类中反过来调用A类中的方法D,D这个方法就叫回调方法,这样子说你是不是有点晕晕的,其实我刚开始也是这样不理解,看了人家说比较经典的回调方式:ClassA实现接口CallBackcallback

  • pyqt5获取textedit内容_java点击按钮获取文本框内容

    pyqt5获取textedit内容_java点击按钮获取文本框内容我想从PyQt5.qtwidgestQinputDialog中的用户获取多个输入文本。。。在这段代码中,我可以只得到一个输入文本框,当我被单击按钮时,我想得到更多的输入文本框。更多信息请参见图片。。。在fromPyQt5.QtWidgetsimport(QApplication,QWidget,QPushButton,QLineEdit,QInputDialog,QHBoxLayout)im…

  • java byte数组转json对象

    java byte数组转json对象Stringmessage=newString(“byte数组”);//去掉多余的引号和转义字符Stringsubstring=message.substring(1,message.length()-1).replace(“\\\””,”‘”);//转化为json对象JSONObjectjsonObject=newJSONObject(substring);

  • IOS学习随笔三

    IOS学习随笔三IOS学习随笔三

  • 国二C语言:VC++2010学习版「建议收藏」

    国二C语言:VC++2010学习版「建议收藏」全国计算机等级考试二级C语言程序设计考试大纲(2019年版)指出:开发环境:MicrosoftVisualC++2010学习版。也就是说,不管你在学习过程中用的是什么样的开发环境…

  • tomcat启动startup.bat一闪而过

    遇到很多次运行startup.bat后,一个窗口一闪而过的问题,但是从来没去纠正怎样修改配置才是正确的,现在从网上查阅的资料整理如下:tomcat在启动时,会读取环境变量的信息,需要一个CATALINA_HOME与JAVA_HOME的信息,CATALINA_HOME即tomcat的主目录,JAVA_HOME即java安装的主目录,jdk的主目录。首先,要在环境变量处,配置JAVA_HOM

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号