Spark2.2-Task序列化源码解析

云计算 waitig 697℃ 百度已收录 0评论

源码版本:2.2

如有错误请指正

一、前言

    Spark在运行应用程序时,会根据RDD的操作,将数据处理流程划分为多个stage进行计算,其中划分stage的依据是数据是否产生shuffle。在同一个stage中,RDD之间的依赖都是窄依赖。一个stage下会有多个task,每个task针对不同的分区数据执行同样的运算逻辑。

    在Spark集群模式下,多个不同的task可能运行在不同的节点上。那么,是什么机制来确保Spark集群下不同节点都能运行同样的计算逻辑呢?

    在Spark中,针对RDD的操作都是一个一个闭包,Spark在进行Job提交时会将操作作为闭包进行序列化发送到执行具体task的节点上,这样就可以达到移动计算逻辑而不是数据的目的,提高计算效率。

二、具体流程

  1. SparkContext初始化时对DAGScheduler和TaskScheduler进行初始化,同时根据部署模式初始化对应的SchedulerBackend
  2. 任务提交时触发SparkContext中的runJob
  3. SparkContex向DAGScheduler提交runJob
  4. DAGScheduler向它自己的事件处理器DAGSchedulerEventProcessLoop发送JobSubmitted事件
  5. DAGScheduler创建ResultStage,并往前回溯,遇到shuffle则划分出新的stage
  6. DAGScheduler划分完成后提交stage,遍历回溯提交父stage,如果父stage都完成了,则提交task
  7. DAGScheduler序列化task,并进行广播
  8. DAGScheduler构建task集合,并向TaskScheduler提交任务集,任务集中包含了已经序列化的任务信息
  9. TaskScheduler创建任务集管理器TaskSetManager,向SchedulerBackend申请资源
  10. SchedulerBackend在申请到资源后,执行launchTasks方法,将TaskSet中的Task一个一个地发送到Executor去执行。

三、源码分析

3.1 SparkContext提交任务

    Spark应用程序中对Dataset执行一系列操作

    点击Dataset中的一个具体操作,例如collect函数中,可以发现Dataset在执行具体的transformation或是action操作时,实际上已经生成了一个执行计划。

    具体执行计划是如何生成的,涉及到Spark SQL中的实现细节,这里不做详细描述,之后单独对Spark SQL原理进行解读。

    在执行执行计划中的具体操作时,都会掉用SparkContext中的runJob方法,具体是如何调用的,在之后的Spark SQL原理解读中再进行详细描述。

    在调用runJob方法时,可以看到,当前的执行函数作为一个参数传给了SparkContext的runJob方法。

    SparkContext在获取到func时,首先会对它做一个闭包清理。在ClosureCleaner类中的clean方法验证闭包是否可以被序列化,是否可以直接对它进行转换清理。

private def clean(
    func: AnyRef,
    checkSerializable: Boolean,
    cleanTransitively: Boolean,
    accessedFields: Map[Class[_], Set[String]]): Unit = {

  if (!isClosure(func.getClass)) {
    logWarning("Expected a closure; got " + func.getClass.getName)
    return
  }

  // TODO: clean all inner closures first. This requires us to find the inner objects.
  // TODO: cache outerClasses / innerClasses / accessedFields

  if (func == null) {
    return
  }

  logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")

  // A list of classes that represents closures enclosed in the given one
  val innerClasses = getInnerClosureClasses(func)

  // A list of enclosing objects and their respective classes, from innermost to outermost
  // An outer object at a given index is of type outer class at the same index
  val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)

  // For logging purposes only
  val declaredFields = func.getClass.getDeclaredFields
  val declaredMethods = func.getClass.getDeclaredMethods

  logDebug(" + declared fields: " + declaredFields.size)
  declaredFields.foreach { f => logDebug("     " + f) }
  logDebug(" + declared methods: " + declaredMethods.size)
  declaredMethods.foreach { m => logDebug("     " + m) }
  logDebug(" + inner classes: " + innerClasses.size)
  innerClasses.foreach { c => logDebug("     " + c.getName) }
  logDebug(" + outer classes: " + outerClasses.size)
  outerClasses.foreach { c => logDebug("     " + c.getName) }
  logDebug(" + outer objects: " + outerObjects.size)
  outerObjects.foreach { o => logDebug("     " + o) }

  // Fail fast if we detect return statements in closures
  getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)

  // If accessed fields is not populated yet, we assume that
  // the closure we are trying to clean is the starting one
  if (accessedFields.isEmpty) {
    logDebug(s" + populating accessed fields because this is the starting closure")
    // Initialize accessed fields with the outer classes first
    // This step is needed to associate the fields to the correct classes later
    for (cls <- outerClasses) {
      accessedFields(cls) = Set[String]()
    }
    // Populate accessed fields by visiting all fields and methods accessed by this and
    // all of its inner closures. If transitive cleaning is enabled, this may recursively
    // visits methods that belong to other classes in search of transitively referenced fields.
    for (cls <- func.getClass :: innerClasses) {
      getClassReader(cls).accept(new FieldAccessFinder(accessedFields, cleanTransitively), 0)
    }
  }

  logDebug(s" + fields accessed by starting closure: " + accessedFields.size)
  accessedFields.foreach { f => logDebug("     " + f) }

  // List of outer (class, object) pairs, ordered from outermost to innermost
  // Note that all outer objects but the outermost one (first one in this list) must be closures
  var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
  var parent: AnyRef = null
  if (outerPairs.size > 0) {
    val (outermostClass, outermostObject) = outerPairs.head
    if (isClosure(outermostClass)) {
      logDebug(s" + outermost object is a closure, so we clone it: ${outerPairs.head}")
    } else if (outermostClass.getName.startsWith("$line")) {
      // SPARK-14558: if the outermost object is a REPL line object, we should clone and clean it
      // as it may carray a lot of unnecessary information, e.g. hadoop conf, spark conf, etc.
      logDebug(s" + outermost object is a REPL line object, so we clone it: ${outerPairs.head}")
    } else {
      // The closure is ultimately nested inside a class; keep the object of that
      // class without cloning it since we don't want to clone the user's objects.
      // Note that we still need to keep around the outermost object itself because
      // we need it to clone its child closure later (see below).
      logDebug(" + outermost object is not a closure or REPL line object, so do not clone it: " +
        outerPairs.head)
      parent = outermostObject // e.g. SparkContext
      outerPairs = outerPairs.tail
    }
  } else {
    logDebug(" + there are no enclosing objects!")
  }

  // Clone the closure objects themselves, nulling out any fields that are not
  // used in the closure we're working on or any of its inner closures.
  for ((cls, obj) <- outerPairs) {
    logDebug(s" + cloning the object $obj of class ${cls.getName}")
    // We null out these unused references by cloning each object and then filling in all
    // required fields from the original object. We need the parent here because the Java
    // language specification requires the first constructor parameter of any closure to be
    // its enclosing object.
    val clone = instantiateClass(cls, parent)
    for (fieldName <- accessedFields(cls)) {
      val field = cls.getDeclaredField(fieldName)
      field.setAccessible(true)
      val value = field.get(obj)
      field.set(clone, value)
    }
    // If transitive cleaning is enabled, we recursively clean any enclosing closure using
    // the already populated accessed fields map of the starting closure
    if (cleanTransitively && isClosure(clone.getClass)) {
      logDebug(s" + cleaning cloned closure $clone recursively (${cls.getName})")
      // No need to check serializable here for the outer closures because we're
      // only interested in the serializability of the starting closure
      clean(clone, checkSerializable = false, cleanTransitively, accessedFields)
    }
    parent = clone
  }

  // Update the parent pointer ($outer) of this closure
  if (parent != null) {
    val field = func.getClass.getDeclaredField("$outer")
    field.setAccessible(true)
    // If the starting closure doesn't actually need our enclosing object, then just null it out
    if (accessedFields.contains(func.getClass) &&
      !accessedFields(func.getClass).contains("$outer")) {
      logDebug(s" + the starting closure doesn't actually need $parent, so we null it out")
      field.set(func, null)
    } else {
      // Update this closure's parent pointer to point to our enclosing object,
      // which could either be a cloned closure or the original user object
      field.set(func, parent)
    }
  }

  logDebug(s" +++ closure $func (${func.getClass.getName}) is now cleaned +++")

  if (checkSerializable) {
    ensureSerializable(func)
  }
}

3.2 DAGScheduler序列化并提交任务

    SparkContext在runJob方法中调用了它所拥有的DAGScheduler的runJob方法来运行Job。

    DAGScheduler在它的runJob方法中,通过submitJob方法提交任务,获取一个一直阻塞等待Job执行完毕的对象JobWaiter。

    在submitJob方法中,DAGScheduler首先会对func的类型进行处理,(为什么会做这样的类型转换???)向它的事件处理器发送JobSubmitted

    在DAGScheduler实际对JobSubmitted事件进行处理时,func函数的类型已经从(TaskContext, Iterator[_]) => U 转换成了(TaskContext, Iterator[_]) => _

    在经过一系列的stage划分等操作,最终提交任务是在DAGScheduler中的submitMissingTasks方法进行处理,我们可以看到抽象类Stage有两个具体实现类ShuffleMapStage和ResultStage,其中只有ResultStage中才有具体的执行操作func。 

  1. ShuffleMapStage是在DAG中为shuffle生成数据的中间stage。它们在每一个shffle操作前发生,并且可能包含多个pipelined操作(eg map和filter)。在执行时,ShuffleMapStage会将后面reduce task将会使用到的数据保存为输出文件。’ShfuuleDep’字段描述了每个阶段的shuffle,’outputLocs’和’numAvailableOutputs’变量跟踪了map输出的就绪情况。
  2. ResultStage将一个函数运行在RDD的某些分区,来计算一个action操作的结果。ResultStage对象捕获要执行的函数,’func’,它将会在每一个分区上运行,’partitions’变量存放的是分区的ID集合。一些stage可能不会在RDD的所有分区上运行,例如first和lookup action

    由于只有ResultStage中才会包含func信息,DAGScheduler在submitMissingTasks方法中提交task时会区分stage类型类进行序列化。提交task之前首先会进行一些状态更新和获取数据Loc的操作。

    更新状态后,DAGScheduler会根据stage的类型来讲stage序列化为二进制的task。针对ShuffleMapStage,会将它的rdd和shuffleDep进行序列化。针对ResultStage,如前面所说,ResultStage对象捕获要执行的函数,’func’,它将会在每一个分区上运行,序列化时会将rdd和func进行序列化。注意,虽然Spark有多种序列化的实现,但在序列化任务信息时,只会采用JavaSerializer。即DAGScheduler中的closureSerializer的固定默认实现是JavaSerializerInstance。

    我们再跳转到JavaSerializerInstance的serialize和deserialize方法中可以发现,Spark的JavaSerializerInstance在对闭包进行序列化时,并没有涉及到闭包的独有信息。闭包在进入序列化之前需要先做自己的校验和清理工作,这部分代码是在前面讲的SparkContext中调用ClosureCleaner的clean方法实现的,对闭包的innerClass、declaredField和declaredMethod等方法进行了校验(具体的校验逻辑和原理暂时还未理清)。

    Spark的JavaSerializer中的序列化和反序列化方法中基于java.io的ByteArrayOutputStream和InputStream封装了自己的字节输入输出流ByteBufferOutputStream和ByteBufferInputStream。序列化完成后的结果是一个Byte数组Array[Byte],DAGScheduler会委托SparkContext将这些字节码广播给每一个工作节点。当遇到闭包中存在无法序列化的对象和引用时,会直接触发stage的失败。

    完成RDD和func/ shuffleDep的序列化之后,DAGScheduler会根据分区的id、数据本地性结合上一步序列化后的二进制码结果,构造出一系列的Task,这里的Task的运行逻辑一致,但分区id和数据本地性信息是根据要计算的数据的分区信息来进行包装的。

    TaskSet构造完成后,DAGScheduler向TaskScheduler提交任务

 

3.3 TaskScheduler提交具体任务到Executor 

    TaskScheduler的submitTasks后会构造任务集管理器TaskSetManager来跟踪任务的运行状况,并向SchedulerBackend申请资源。

    SchedulerBackend在申请到资源后,会调用Executor中的launchTask来执行具体任务,在执行具体任务是,构造了一个TaskRunner的Runnable对象,TaskRunner对象中存放了具体的Task信息:

TaskDescription,里面包括了Task的描述信息、依赖的jar文件和序列化的task信息。

3.4 Exectutor反序列化任务并执行

  Task的具体反序列化过程在Executor中的TaskRunner中的run方法中,即任务实际执行时进行反序列化,我们可以重点看一下TaskRunner中的run方法:

    TaskRunner中的反序列化方法是通过获取env中的闭包反序列化实例获得的,这里默认只能使用JavaSerializerInstance进行反序列化,但注意,这里的反序列化之后真正执行的函数还是binary数据。

    第一步updateDependencies下载SparkContext广播的JAR和文件,添加jar到classpath中

    第二步 操作中对task进行真正的反序列化,使用更新后的当前线程类加载器

task = ser.deserialize[Task[Any]](
  taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)

    反序列化后真正执行任务是通过调用抽象类Task的run方法来完成的

    抽象类Task的run方法是一个final方法,抽象类有两种不同的实现ShuffleMapTask和ResultTask。抽象类Task中的run方法再调用实际Task实现类的runTask方法进行。在这里才会将真正的函数binary数据反序列化为真正的RDD和func。

    反序列化出func后,实际运行func,完成数据计算工作。

四、问题

    通过上面的源码分析,可以理解Task序列化的整体流程和大概的序列化细节。但具体针对闭包的序列化还是有一些需要仔细研究的点

  1. 闭包的序列化清理原理
  2. 闭包的JAR依赖
  3. ClassLoader隔离机制

 



本文由【waitig】发表在等英博客
本文固定链接:Spark2.2-Task序列化源码解析
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)