本文共 4132 字,大约阅读时间需要 13 分钟。
在上一篇博文中有说到最后调用handlejobsubmitted中的submitStage来提交finalstagespark-master\spark-master\core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scala private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {#获取依赖的未提交的父stage val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")#父stage都提交完成了,则提交stage submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) {#父stage 未完成提交,则递归提交父stage submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }正常情况下应该会调用submitMissingTasksprivate def submitMissingTasks(stage: Stage, jobId: Int) { val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() val properties = jobIdToActiveJob(jobId).properties runningStages += stage#在DAG Stage中分为ShuffleMapStage 和 ResultStage ,并且最后的Stage为ResultStage stage match { case s: ShuffleMapStage => outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) case s: ResultStage => outputCommitCoordinator.stageStart( stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) }#通过partitionsToCompute.map 获取其优先位置 val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => } stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) if (partitionsToCompute.nonEmpty) { stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))#根据ShuffleMapStage 或者ResultStage 分别创建taskset val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id#创建ShuffleMapTask new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id)#创建ResultTask new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } } } catch { case NonFatal(e) => }#tasks.size大于零,说明有TaskSet 需要通过taskScheduler来提交 if (tasks.size > 0) { taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) } else {#提交完毕,标记Stage已经完成,后面还会区分是ShuffleMapStage 还是 ResultStage 提交完成 markStageAsFinished(stage, None) stage match {#提交完成则mark case stage: ShuffleMapStage => markMapStageJobsAsFinished(stage) case stage : ResultStage =>#提交完成只输出log logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})") } submitWaitingChildStages(stage) } }
转载地址:http://usnmi.baihongyu.com/