博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark job提交3
阅读量:4215 次
发布时间:2019-05-26

本文共 4132 字,大约阅读时间需要 13 分钟。

在上一篇博文中有说到最后调用handlejobsubmitted中的submitStage来提交finalstagespark-master\spark-master\core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scala  private def submitStage(stage: Stage) {    val jobId = activeJobForStage(stage)    if (jobId.isDefined) {      logDebug("submitStage(" + stage + ")")      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {#获取依赖的未提交的父stage        val missing = getMissingParentStages(stage).sortBy(_.id)        logDebug("missing: " + missing)        if (missing.isEmpty) {          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")#父stage都提交完成了,则提交stage          submitMissingTasks(stage, jobId.get)        } else {          for (parent <- missing) {#父stage 未完成提交,则递归提交父stage            submitStage(parent)          }          waitingStages += stage        }      }    } else {      abortStage(stage, "No active job for stage " + stage.id, None)    }  }正常情况下应该会调用submitMissingTasksprivate def submitMissingTasks(stage: Stage, jobId: Int) {    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()    val properties = jobIdToActiveJob(jobId).properties    runningStages += stage#在DAG Stage中分为ShuffleMapStage 和 ResultStage ,并且最后的Stage为ResultStage     stage match {      case s: ShuffleMapStage =>        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)      case s: ResultStage =>        outputCommitCoordinator.stageStart(          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)    }#通过partitionsToCompute.map 获取其优先位置    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {      stage match {        case s: ShuffleMapStage =>          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap        case s: ResultStage =>          partitionsToCompute.map { id =>            val p = s.partitions(id)            (id, getPreferredLocs(stage.rdd, p))          }.toMap      }    } catch {      case NonFatal(e) =>    }    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)    if (partitionsToCompute.nonEmpty) {      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())    }    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))#根据ShuffleMapStage 或者ResultStage 分别创建taskset    val tasks: Seq[Task[_]] = try {      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()      stage match {        case stage: ShuffleMapStage =>          stage.pendingPartitions.clear()          partitionsToCompute.map { id =>            val locs = taskIdToLocations(id)            val part = partitions(id)            stage.pendingPartitions += id#创建ShuffleMapTask            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())          }        case stage: ResultStage =>          partitionsToCompute.map { id =>            val p: Int = stage.partitions(id)            val part = partitions(p)            val locs = taskIdToLocations(id)#创建ResultTask            new ResultTask(stage.id, stage.latestInfo.attemptNumber,              taskBinary, part, locs, id, properties, serializedTaskMetrics,              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,              stage.rdd.isBarrier())          }      }    } catch {      case NonFatal(e) =>    }#tasks.size大于零,说明有TaskSet 需要通过taskScheduler来提交    if (tasks.size > 0) {      taskScheduler.submitTasks(new TaskSet(        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))    } else {#提交完毕,标记Stage已经完成,后面还会区分是ShuffleMapStage 还是 ResultStage 提交完成      markStageAsFinished(stage, None)      stage match {#提交完成则mark        case stage: ShuffleMapStage =>            markMapStageJobsAsFinished(stage)        case stage : ResultStage =>#提交完成只输出log          logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")      }      submitWaitingChildStages(stage)    }  }

 

转载地址:http://usnmi.baihongyu.com/

你可能感兴趣的文章
S3C2440上LCD驱动 (FrameBuffer)实例开发讲解
查看>>
Linux音频编程指南
查看>>
usb-otg-调试心得
查看>>
USB规范浏览--设备和主机规范
查看>>
男人的品位--我们自己的最求
查看>>
Android (Linux) Suspend流程
查看>>
LINUX时间管理
查看>>
定时器的使用
查看>>
为Android加入busybox工具
查看>>
使用技巧busybox
查看>>
如何查看与/dev/input目录下的event对应的设备
查看>>
bootloader-bootable解析
查看>>
bootloader (LK)&&android lk bootloader中相关修改指南
查看>>
SD卡驱动分析--基于高通平台
查看>>
SD Card 驱动流程分析
查看>>
Linux之debugfs介绍
查看>>
关于sd卡中一些概念的理解
查看>>
sd卡驱动分析之相关硬件操作和总结
查看>>
好的播文
查看>>
linux dd命令解析
查看>>