spark札记(sparkContext初始化的源码核心)

云计算 waitig 705℃ 百度已收录 0评论

TaskSchedulerImpl

sparkContext中的创建createTaskScheduler(2000多行)
  private def createTaskScheduler(
      sc: SparkContext,
      master: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_N_REGEX(threads) =>
        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        if (threadCount <= 0) {
          throw new SparkException(s"Asked to run locally with $threadCount threads")
        }
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
        scheduler.initialize(backend)
        (backend, scheduler)
  * 1。底层通过调度一个ScheduleBackend,针对不同种类的cluster(standalone,local,mesos),调度task。
  * 2。也可通过一个localbackend,并将一个islocal参数设为true,并在本地模式在运行。
  * 3。他负责处理一些通用的逻辑,比如说启动多个job的执行顺序,启动推测任务执行。
  * 4。客户端首先调用他们的init(),start()方法,然后通过run task()方法提交task sets;
TaskSchedulerImpl初始化的代码内容
def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
      }
    }
    schedulableBuilder.buildPools()
  }

DAGScheduler

sparkContext中的创建DAGScheduler

实现了面向stage的调度机制的最高层的调度层,它会为每个job计算一个stage的DAG(有向无环图),追踪
  * RDD和stage的输出是否被物化了(写入磁盘或内存),并且寻找一个最小消耗调度机制来运行job。
  * 它会将stage作为tasksets提交到底层的TaskSchedulerImpl,并将集群上运行他们。
  *除了处理stage的DAG,他们还要负责运行每个task的最佳位置,基于当前的缓存状态,将这些位置交给底层的TaskScheduleImpl
  * 除此之外,它还会处理shaffule处理文件导致的失败,在这种情况下,旧的stage可能被重新提交,一个stage内部的失败,如果不是
  * shuffle文件丢失出现的问题,会被taskScheduler处理,他会多次重试每一个task,知道最后,实在不行才会去取消整个stage。

DAGScheduler会构造一个ApplicationDescription
  *  //这个ApplicationDescription十分重要,它包括了当前执行的这个application的一些情况
    //cpu,内存。
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)

AppClient

  *这是一个接口,它负责application与spark集群进行通信。
  *它会接收一个sparkmaster的URL,以及一个applicationDescription,和一个集群事件的监听器,
  * 以及各种事件发生时监听器的回调函数。

SparkUI

//提供一个spark界面
private def create(
      sc: Option[SparkContext],
      conf: SparkConf,
      listenerBus: SparkListenerBus,
      securityManager: SecurityManager,
      appName: String,
      basePath: String = "",
      jobProgressListener: Option[JobProgressListener] = None,
      startTime: Long): SparkUI = {

    val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
      val listener = new JobProgressListener(conf)
      listenerBus.addListener(listener)
      listener
    }

    val environmentListener = new EnvironmentListener
    val storageStatusListener = new StorageStatusListener
    val executorsListener = new ExecutorsListener(storageStatusListener)
    val storageListener = new StorageListener(storageStatusListener)
    val operationGraphListener = new RDDOperationGraphListener(conf)

    listenerBus.addListener(environmentListener)
    listenerBus.addListener(storageStatusListener)
    listenerBus.addListener(executorsListener)
    listenerBus.addListener(storageListener)
    listenerBus.addListener(operationGraphListener)

    new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
      executorsListener, _jobProgressListener, storageListener, operationGraphListener,
      appName, basePath, startTime)
  }

这里写图片描述


本文由【waitig】发表在等英博客
本文固定链接:spark札记(sparkContext初始化的源码核心)
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)