大家好,又见面了,我是全栈君。
Spark中的Scheduler
scheduler分成两个类型。一个是TaskScheduler与事实上现,一个是DAGScheduler。
TaskScheduler:主要负责各stage中传入的task的运行与调度。
DAGScheduler:主要负责对JOB中的各种依赖进行解析,依据RDD的依赖生成stage并通知TaskScheduler运行。
实例生成
TaskScheduler实例生成:
scheduler实例生成,我眼下主要是针对onyarn的spark进行的相关分析,
在appmaster启动后,通过调用startUserClass()启动线程来调用用户定义的spark分析程序。
传入的第一个參数为appmastername(master),可传入的如:yarn-cluster等。
在用户定义的spark分析程序中。生成SparkContext实例。
通过SparkContext.createTaskScheduler函数。假设是yarn-cluster,生成YarnClusterScheduler实例。
此部分生成的scheduler为TaskScheduler实例。
defthis(sc:SparkContext) = this(sc,newConfiguration())
同一时候YarnClusterSchduler实现TaskSchedulerImpl。
defthis(sc:SparkContext) = this(sc,sc.conf.getInt(“spark.task.maxFailures”,4))
生成TaskScheduler中的SchedulerBackend属性引用,yarn-cluster为CoarseGrainedSchedulerBackend
valbackend =newCoarseGrainedSchedulerBackend(scheduler,sc.env.actorSystem)
scheduler.initialize(backend)
DAGScheduler实例生成:
classDAGScheduler(
taskSched: TaskScheduler,
mapOutputTracker:MapOutputTrackerMaster,
blockManagerMaster:BlockManagerMaster,
env: SparkEnv)
extendsLogging {
defthis(taskSched:TaskScheduler){
this(taskSched,SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
SparkEnv.get.blockManager.master,SparkEnv.get)
}
taskSched.setDAGScheduler(this)
scheduler调度过程分析
1.rdd运行action操作。如saveAsHadoopFile
2.调用SparkContext.runJob
3.调用DAGScheduler.runJob–>此函数调用submitJob,并等job运行完毕。
Waiter.awaitResult()中通过_jobFinished检查job执行是否完毕,假设完毕,此传为true,否则为false.
_jobFinished的值通过resultHandler函数,每调用一次finishedTasks的值加一,
假设finishedTasks的个数等于totalTasks的个数时,表示完毕。
或者出现exception.
defrunJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T])=> U,
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
resultHandler: (Int, U) =>Unit,
properties: Properties = null)
{
valwaiter =submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler,properties)
waiter.awaitResult()match{
caseJobSucceeded => {}
caseJobFailed(exception:Exception, _) =>
logInfo(“Failedto run “ + callSite)
throwexception
}
}
4.调用DAGScheduler.submitJob函数,
部分代码:生成JobWaiter实例,并传入此实例。发送消息。调用JobSubmitted事件。并返回waiter实例。
JobWaiter是JobListener的实现。
valwaiter =newJobWaiter(this,jobId,partitions.size, resultHandler)
eventProcessActor! JobSubmitted(
jobId,rdd, func2,partitions.toArray, allowLocal, callSite, waiter,properties)
waiter
5.处理DAGScheduler的JobSubmitted事件消息,通过processEvent处理消息接收的事件。
defreceive = {
caseevent:DAGSchedulerEvent =>
logTrace(“Gotevent of type “ +event.getClass.getName)
if(!processEvent(event)){
submitWaitingStages()
} else{
resubmissionTask.cancel()
context.stop(self)
}
}
}))
6.processEvent函数中处理JobSubmitted事件部分代码:
caseJobSubmitted(jobId,rdd, func,partitions,allowLocal,callSite,listener,properties)=>
varfinalStage:Stage = null
try{
生成stage实例,stage的id通过nextStageId的值加一得到,task的个数就是partitions的分区个数,
依据job相应的rdd,得到假设parentrdd是shuffle的rdd时生成ShuffleMapStage,通过getParentStages函数,
此处去拿到parentrdd时,假设currentrdd的parentrdd不是shuffle,递归调用parentrdd,
假设parendrdd中没有shuffle的rdd,不生成新的stage,否则有多少个,生成多少个。
此处是处理DAG类的依赖
finalStage= newStage(rdd,partitions.size,None, jobId,Some(callSite))
} catch{
casee:Exception =>
logWarning(“Creatingnew stage failed due to exception – job: “+ jobId, e)
listener.jobFailed(e)
returnfalse
}
生成ActiveJob实例。
设置numFinished的值为0,表示job中有0个完毕的task.
设置全部task个数的arrayfinished.并把全部元素的值设置为false.把JobWaiter当listener传入ActiveJob.
valjob = newActiveJob(jobId,finalStage,func,partitions,callSite,listener,properties)
对已经cache过的TaskLocation进行清理。
clearCacheLocs()
logInfo(“Gotjob “ + job.jobId+ ” (“+ callSite+ “) with “+ partitions.length+
“output partitions (allowLocal=” +allowLocal+ “)”)
logInfo(“Finalstage: “ + finalStage+ ” (“+ finalStage.name+ “)”)
logInfo(“Parentsof final stage: “ +finalStage.parents)
logInfo(“Missingparents: “ +getMissingParentStages(finalStage))
假设runJob时传入的allowLocal的值为true,同一时候没有须要shuffle的rdd,同一时候partitions的长度为1,
也就是task仅仅有一个,直接在local执行此job..通过runLocallyWithinThread生成一个线程来执行。
if(allowLocal&& finalStage.parents.size== 0 &&partitions.length== 1) {
//Compute very short actions like first() or take() with no parentstages locally.
listenerBus.post(SparkListenerJobStart(job,Array(), properties))
通过ActiveJob中的func函数来执行job的执行,此函数在rdd的action调用时生成定义。
如saveAsHadoopFile(saveAsHadoopDataset)中的定义的内部func,writeToFile函数。
完毕函数运行后,调用上面提到的生成的JobWaiter.taskSucceeded函数。
runLocally(job)
} else{
否则有多个partition也就是有多个task,或者有shuffle的情况,
idToActiveJob(jobId)= job
activeJobs+= job
resultStageToJob(finalStage)= job
listenerBus.post(SparkListenerJobStart(job,jobIdToStageIds(jobId).toArray,properties))
调用DAGScheduler.submitStage函数。
submitStage(finalStage)
}
7.DAGScheduler.submitStage函数:递归函数调用,
假设stage包括parentstage(shuffle的情况)把stage设置为waiting状态。等待parentstage运行完毕才进行运行。
privatedefsubmitStage(stage: Stage) {
valjobId =activeJobForStage(stage)
if(jobId.isDefined){
logDebug(“submitStage(“+ stage + “)”)
假设RDD的Dependency的RDD还没有运行完毕,等待Dependency运行完毕后当前的RDD才干进行运行操作。
if(!waiting(stage)&& !running(stage)&& !failed(stage)){
依据stage中rdd的Dependency,检查是否须要生成新的stage,假设是ShuffleDependency,会生成新的ShuffleMapStage
此处去拿到parentrdd时,假设currentrdd的parentrdd不是shuffle,递归调用parentrdd,
假设parendrdd中没有shuffle的rdd,不生成新的stage,否则有多少个,生成多少个。此处是处理DAG类的依赖
valmissing =getMissingParentStages(stage).sortBy(_.id)
logDebug(“missing:” + missing)
假设没有RDD中的shuffle的Dependency,也就是RDD之间都是NarrowDependency的Dependency
表示全部的Dependency都在map端本地运行。
if(missing ==Nil) {
logInfo(“Submitting” + stage + “(“ + stage.rdd+ “), which has no missingparents”)
submitMissingTasks(stage,jobId.get)
running+= stage
} else{
假设RDD有Dependency,先运行parentrdd的stage操作。此处是递归函数调用
for(parent <-missing) {
submitStage(parent)
}
waiting+= stage
}
}
}else{
abortStage(stage, “Noactive job for stage “ + stage.id)
}
}
8.DAGScheduler.submitMissingTask的运行流程:
privatedefsubmitMissingTasks(stage: Stage, jobId: Int) {
logDebug(“submitMissingTasks(“+ stage + “)”)
//Get our pending tasks and remember them in our pendingTasks entry
valmyPending =pendingTasks.getOrElseUpdate(stage,newHashSet)
myPending.clear()
vartasks =ArrayBuffer[Task[_]]()
假设stage是shuffle的rdd,迭代stage下的的全部partition,依据partition与相应的TaskLocation
生成ShuffleMapTask.加入到task列表中。
if(stage.isShuffleMap){
for(p <- 0until stage.numPartitionsifstage.outputLocs(p)== Nil) {
vallocs =getPreferredLocs(stage.rdd,p)
tasks+= newShuffleMapTask(stage.id,stage.rdd,stage.shuffleDep.get,p, locs)
}
}else{
否则表示stage是非shuffle的rdd,此是是运行完毕后直接返回结果的stage,生成ResultTask实例。
因为是ResultTask,因此须要传入定义的func,也就是怎样处理结果返回
//This is a final stage; figure out its job’s missing partitions
valjob =resultStageToJob(stage)
for(id <- 0until job.numPartitionsif!job.finished(id)){
valpartition =job.partitions(id)
vallocs =getPreferredLocs(stage.rdd,partition)
tasks+= newResultTask(stage.id,stage.rdd,job.func,partition,locs, id)
}
}
valproperties= if(idToActiveJob.contains(jobId)){
idToActiveJob(stage.jobId).properties
}else{
//thisstage will be assigned to “default” pool
null
}
//must be run listener before possible NotSerializableException
//should be “StageSubmitted” first and then “JobEnded”
listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage),properties))
if(tasks.size> 0) {
//Preemptively serialize a task to make sure it can be serialized. Weare catching this
//exception here because it would be fairly hard to catch thenon-serializableexception
//down the road, where we have several different implementations forlocal scheduler and
//cluster schedulers.
try{
SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)
} catch{
casee:NotSerializableException =>
abortStage(stage, “Tasknot serializable: “ + e.toString)
running-= stage
return
}
logInfo(“Submitting” + tasks.size+ ” missing tasks from “+ stage + ” (“+ stage.rdd+ “)”)
myPending++= tasks
logDebug(“Newpending tasks: “ + myPending)
生成TaskSet实例。把stage中要运行的Task列表传入。同一时候把stage相应的ActiveJob也传入。
通过TaskScheduler的实现。调用submitTasks函数,YarnClusterScheduler(TaskSchedulerImpl)
taskSched.submitTasks(
newTaskSet(tasks.toArray,stage.id,stage.newAttemptId(), stage.jobId,properties))
stageToInfos(stage).submissionTime= Some(System.currentTimeMillis())
}else{
logDebug(“Stage” + stage + “is actually done; %b %d %d”.format(
stage.isAvailable,stage.numAvailableOutputs,stage.numPartitions))
running-= stage
}
}
9.TaskSchedulerImpl.submitTasks函数流程分析:
通过传入的TaskSet,得到要运行的tasks列表。并生成TaskSetmanager实例。
同一时候把实例加入到的schedulableBuilder(FIFOSchedulableBuilder/FairSchedulableBuilder)队列中。
关于TaskSetManager实例可參见后面的分析。
overridedefsubmitTasks(taskSet: TaskSet) {
valtasks =taskSet.tasks
logInfo(“Addingtask set “ + taskSet.id+ ” with “+ tasks.length+ ” tasks”)
this.synchronized{
valmanager =newTaskSetManager(this,taskSet, maxTaskFailures)
activeTaskSets(taskSet.id)= manager
schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties)
taskSetTaskIds(taskSet.id)= newHashSet[Long]()
定期检查task的运行消息是否被生成运行。假设task被分配运行,关闭此线程。
否则一直给出提示.
if(!isLocal && !hasReceivedTask){
starvationTimer.scheduleAtFixedRate(newTimerTask() {
overridedefrun() {
if(!hasLaunchedTask){
logWarning(“Initialjob has not accepted any resources; “+
“checkyour cluster UI to ensure that workers are registered “+
“andhave sufficient memory”)
} else{
this.cancel()
}
}
}, STARVATION_TIMEOUT,STARVATION_TIMEOUT)
}
hasReceivedTask= true
}
通过SchedulerBackend的实现CoarseGrainedSchedulerBackend.reviceOffers发起运行处理操作。
backend.reviveOffers()
}
9.1TaskSetManager的实例生成:
private[spark]classTaskSetManager(
sched: TaskSchedulerImpl,
valtaskSet:TaskSet,
valmaxTaskFailures:Int,
clock: Clock = SystemClock)
extendsSchedulablewithLogging
………………………
for(i <- (0until numTasks).reverse){
addPendingTask(i)
}
关于addPendingTask的定义:此睦传入的readding的值为false.
privatedefaddPendingTask(index: Int, readding: Boolean = false){
//Utility method that adds `index` to a list only if readding=falseor it’s not already there
内部定义的addTo方法。
defaddTo(list:ArrayBuffer[Int]) {
if(!readding || !list.contains(index)) {
list += index
}
}
varhadAliveLocations= false
迭代全部的要运行的task,并通过task的TaskLocation检查运行的节点级别。加入到对应的pendingTask容器中
for(loc <-tasks(index).preferredLocations){
for(execId <-loc.executorId){
检查TaskSchedulerImpl.activeExecutorIds的活动的worker的executor是否存在,
假设是第一个运行的RDD时,此时activeExecutorIds容器的的值为空,当第一个RDD中有TASK在此executor中运行过后。
会把executor的id加入到activeExecutorIds容器中。
第一个RDD的stage运行时,此部分不运行。但第二个stage运行时,可最大可能的保证task在PROCESS_LOCAL的运行。
if(sched.isExecutorAlive(execId)){
addTo(pendingTasksForExecutor.getOrElseUpdate(execId,newArrayBuffer))
hadAliveLocations= true
}
}
if(sched.hasExecutorsAliveOnHost(loc.host)){
假设在TaskSchedulerImpl的executorsByHost容器中包括此host,在pendingTasksForHost中加入相应的task.
TaskSchedulerImpl.executorsByHost容器的值在每个worker注冊时
通过向CoarseGrainedSchedulerBackend.DriverActor发送RegisterExecutor事件消息。
通过makeOffers()–>TaskSchedulerImpl.resourceOffers把host加入到executorsByHost容器中。
addTo(pendingTasksForHost.getOrElseUpdate(loc.host,newArrayBuffer))
通过调用YarnClusterScheduler.getRackForHost得到host相应的rack,
并在rack的pending容器中加入相应的task个数和。
for(rack <-sched.getRackForHost(loc.host)){
addTo(pendingTasksForRack.getOrElseUpdate(rack,newArrayBuffer))
}
hadAliveLocations= true
}
}
假设上面两种情况都没有加入到容器中pendingTasksWithNoPrefs。
if(!hadAliveLocations){
//Even though the task might’ve had preferred locations, all of thosehosts or executors
//are dead; put it in the no-prefslist so we can schedule it elsewhere right away.
addTo(pendingTasksWithNoPrefs)
}
在TaskSetManager实例生成是,把全部task的个数都加入到allPendingTasks容器中
if(!readding) {
allPendingTasks+= index // No point scanning thiswhole list to find the old task there
}
}
………………………..
得到可选择的LocalityLevel级别。
valmyLocalityLevels= computeValidLocalityLevels()
vallocalityWaits= myLocalityLevels.map(getLocalityWait)// Time to wait at each level
下面代码是computeValidLocalityLevels的定义,主要依据各种locality中pending的容器中是否有值。
生成当前stage中的task运行可选择的Locality级别。
privatedefcomputeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
importTaskLocality.{PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL,ANY}
vallevels =newArrayBuffer[TaskLocality.TaskLocality]
if(!pendingTasksForExecutor.isEmpty&& getLocalityWait(PROCESS_LOCAL)!= 0) {
levels+= PROCESS_LOCAL
}
if(!pendingTasksForHost.isEmpty&& getLocalityWait(NODE_LOCAL)!= 0) {
levels+= NODE_LOCAL
}
if(!pendingTasksForRack.isEmpty&& getLocalityWait(RACK_LOCAL)!= 0) {
levels+= RACK_LOCAL
}
levels+= ANY
logDebug(“Validlocality levels for “ + taskSet+ “: “+ levels.mkString(“,”))
levels.toArray
}
}
下面代码是getLocalityWait的定义代码:此函数主要是定义每个Task在此Locality级别中运行的等待时间。
也就是scheduler调度在传入的Locality级别时所花的时间是否超过指定的等待时间,
假设超过表示须要放大Locality的查找级别。
privatedefgetLocalityWait(level: TaskLocality.TaskLocality): Long = {
valdefaultWait= conf.get(“spark.locality.wait”,“3000”)
level match{
caseTaskLocality.PROCESS_LOCAL=>
conf.get(“spark.locality.wait.process”,defaultWait).toLong
caseTaskLocality.NODE_LOCAL=>
conf.get(“spark.locality.wait.node”,defaultWait).toLong
caseTaskLocality.RACK_LOCAL=>
conf.get(“spark.locality.wait.rack”,defaultWait).toLong
caseTaskLocality.ANY=>
0L
}
}
10.SchedulerBackend.reviveOffers()的调度处理流程:
SchedulerBackend的实现为CoarseGrainedSchedulerBackend。
overridedefreviveOffers() {
driverActor! ReviveOffers
}
以上代码发CoarseGrainedSchedulerBackend内部的DriverActor发送消息,处理ReviveOffers事件。
caseReviveOffers =>
makeOffers()
…………….
defmakeOffers() {
见以下的launchTasks与resourceOffers函数
launchTasks(scheduler.resourceOffers(
executorHost.toArray.map{case(id, host)=> newWorkerOffer(id,host,freeCores(id))}))
}
调用TaskSchedulerImpl.resourceOffers并传入注冊的worker中executorid与host的kvarray.
defresourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] =synchronized {
SparkEnv.set(sc.env)
//Mark each slave as alive and remember its hostname
for(o <-offers) {
executorIdToHost(o.executorId)= o.host
此部分主要是在worker注冊时executorsByHost中还不存在时会运行。
if(!executorsByHost.contains(o.host)){
executorsByHost(o.host)= newHashSet[String]()
executorGained(o.executorId,o.host)
}
}
offers表示有多少个注冊的worker的executor,依据每个worker中可能的cpucore个数生成可运行的task个数。
//Build a list of tasks to assign to each worker
valtasks =offers.map(o => newArrayBuffer[TaskDescription](o.cores))
可分配的cpu个数,由此处能够看出每个任务分配时最好按每个worker能分配的最大cpucore个数来分配。
valavailableCpus= offers.map(o => o.cores).toArray
得到队列中的全部的TaskSetManager列表。
valsortedTaskSets= rootPool.getSortedTaskSetQueue()
for(taskSet <-sortedTaskSets){
logDebug(“parentName:%s, name: %s, runningTasks: %s”.format(
taskSet.parent.name,taskSet.name,taskSet.runningTasks))
}
计算task的Locality级别,launchedTask=false表示须要放大Locality的级别。
//Take each TaskSet in our scheduling order, and then offer it eachnode in increasing order
//of locality levels so that it gets a chance to launch local tasks onall of them.
varlaunchedTask= false
计算task的Locality,此处是一个for的迭代调用。先从taskset列表中拿出一个tasetset,
子迭代是从PROCESS_LOCAL開始迭代locality的级别。
for(taskSet <-sortedTaskSets;maxLocality<- TaskLocality.values) {
do{
launchedTask= false
迭代调用每个worker的值,从每个worker中在taskset中选择task的运行级别,生成TaskDescription
for(i <- 0until offers.size) {
得到迭代出的worker的executorid与host
valexecId =offers(i).executorId
valhost =offers(i).host
通过TaskSetManager.resourceOffer选择一个运行级别,通过此函数选择Locality级别时。
不能超过传入的maxLocality,每次生成一个task,
for(task <-taskSet.resourceOffer(execId,host,availableCpus(i),maxLocality)){
每次生成一个task,把生成的task加入到上面的tasks列表中。
tasks(i)+= task
valtid =task.taskId
taskIdToTaskSetId(tid)= taskSet.taskSet.id
taskSetTaskIds(taskSet.taskSet.id)+= tid
taskIdToExecutorId(tid)= execId
设置当前executorid设置到activeExecutorIds列表中。当有多个依赖的stage运行时。
第二个stage在submitTasks时,生成TaskSetManager时,会依据的activeExecutorIds值,
在pendingTasksForExecutor中生成等运行的PROCESS_LOCAL的pendingtasks.
activeExecutorIds+= execId
把executor相应的host记录到executorsByHost容器中。
executorsByHost(host)+= execId
当前worker中可用的cpucore的值须要减去一,这样能充分保证一个cpucore运行一个task
availableCpus(i) -= 1
这个值用来检查是否在当前的Locality级别中接着运行其他的task的分配,
假设这个值为true,不放大maxLocality的级别,从下一个worker中接着分配剩余的task
launchedTask= true
}
}
} while(launchedTask)
}
if(tasks.size> 0) {
设置hasLaunchedTask的值为true,表示task的运行分配完毕。在上面提到过的检查线程中对线程运行停止操作。
hasLaunchedTask= true
}
returntasks
}
10.1TaskSetManager.resourceOffer流程分析
defresourceOffer(
execId: String,
host:String,
availableCpus: Int,
maxLocality:TaskLocality.TaskLocality)
:Option[TaskDescription] =
{
假设完毕的task个数小于要生成的总task个数,同一时候当前cpu可用的core个数和大于或等于一个配置的。默认1
if(tasksSuccessful< numTasks&& availableCpus >= CPUS_PER_TASK){
valcurTime =clock.getTime()
通过如今运行task分配的时间减去上一次并从currentLocalityIndex的下标開始,
取出locality相应的task分配等待时间,假设时间超过了此配置,把下标值加一,
找到下一个locality的配置时间,按这方式找,直到找到ANY的值,详细可见以下的此方法说明
varallowedLocality= getAllowedLocalityLevel(curTime)
假设通过的locality的级别超过了传入的最大locality级别。把级别设置为传入的最大级别
if(allowedLocality> maxLocality) {
allowedLocality= maxLocality // We’re not allowed tosearch for farther-away tasks
}
findTask主要是从相应的pending的列表中依据相应的Locality拿到相应的task的下标,在TaskSet.tasks中的下标。
findTask(execId, host,allowedLocality)match{
caseSome((index,taskLocality))=> {
//Found a task; do some bookkeeping and return a task description
valtask =tasks(index)
valtaskId =sched.newTaskId()
//Figure out whether this should count as a preferred launch
logInfo(“Startingtask %s:%d as TID %s on executor %s: %s (%s)”.format(
taskSet.id,index,taskId,execId, host, taskLocality))
//Do various bookkeeping
copiesRunning(index) += 1
valinfo = newTaskInfo(taskId,index,curTime,execId, host, taskLocality)
taskInfos(taskId)= info
taskAttempts(index)= info ::taskAttempts(index)
把分配此task的locality级别拿到相应的下标,并又一次设置下标的值。
//Update our locality level for delay scheduling
currentLocalityIndex= getLocalityIndex(taskLocality)
把这次的task的分配时间设置成最后一次分配时间。
lastLaunchTime= curTime
//Serialize and return the task
valstartTime =clock.getTime()
//We rely on the DAGScheduler to catch non-serializableclosures and RDDs, so in here
//we assume the task can be serialized without exceptions.
valserializedTask= Task.serializeWithDependencies(
task,sched.sc.addedFiles,sched.sc.addedJars,ser)
valtimeTaken =clock.getTime() – startTime
addRunningTask(taskId)
logInfo(“Serializedtask %s:%d as %d bytes in %d ms”.format(
taskSet.id,index,serializedTask.limit,timeTaken))
valtaskName =“task %s:%d”.format(taskSet.id,index)
假设是第一次运行。通过DAGScheduler.taskStarted发送BeginEvent事件。
if(taskAttempts(index).size== 1)
taskStarted(task,info)
returnSome(newTaskDescription(taskId,execId, taskName,index,serializedTask))
}
case_ =>
}
}
None
}
依据超时时间配置,假设这次分配task的时间减去上次task分配的时间超过了locality分配等待的配置时间,
把locality的级别向上移动一级。并又一次比对时间,拿到不超时的locality级别或ANY的级别。
privatedefgetAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
while(curTime – lastLaunchTime>= localityWaits(currentLocalityIndex)&&
currentLocalityIndex< myLocalityLevels.length- 1)
{
下标值加一,也就是把当前的Locality的级别向上放大一级。
//Jump to the next locality level, and remove our waiting time for thecurrent one since
//we don’t want to count it again on the next one
lastLaunchTime+= localityWaits(currentLocalityIndex)
currentLocalityIndex+= 1
}
myLocalityLevels(currentLocalityIndex)
}
DAGScheduler中处理BeginEvent事件:
caseBeginEvent(task,taskInfo)=>
for(
job<- idToActiveJob.get(task.stageId);
stage<- stageIdToStage.get(task.stageId);
stageInfo<- stageToInfos.get(stage)
) {
if(taskInfo.serializedSize> TASK_SIZE_TO_WARN* 1024 &&
!stageInfo.emittedTaskSizeWarning){
stageInfo.emittedTaskSizeWarning= true
logWarning((“Stage%d (%s) contains a task of very large “+
“size(%d KB). The maximum recommended task size is %d KB.”).format(
task.stageId,stageInfo.name,taskInfo.serializedSize/ 1024,TASK_SIZE_TO_WARN))
}
}
listenerBus.post(SparkListenerTaskStart(task,taskInfo))
11.CoarseGrainedSchedulerBackend.launchTasks流程
运行task的运行。发送LaunchTask事件处理消息
deflaunchTasks(tasks: Seq[Seq[TaskDescription]]) {
for(task <-tasks.flatten) {
freeCores(task.executorId) -= 1
依据worker注冊时的actor,向此actor发送LaunchTask事件。
executorActor(task.executorId)! LaunchTask(task)
}
}
12.启动task,因为是onyarn的模式,worker的actor在CoarseGrainedExecutorBackend.
处理代码例如以下:
caseLaunchTask(taskDesc)=>
logInfo(“Gotassigned task “ + taskDesc.taskId)
if(executor== null){
logError(“ReceivedLaunchTask command but executor was null”)
System.exit(1)
} else{
executor.launchTask(this,taskDesc.taskId,taskDesc.serializedTask)
}
………………………..
通过Executor启动task的运行。
其他actor的消息处理与task的详细运行与shuffle后面分析。这里先不做细的说明。
吐槽一把scala,这玩意编写代码是方便,但看起来有点麻烦呀。
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/116335.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...