[TOC]
在 Spark 的使用中,性能的调优配置过程中,查阅了很多资料,之前自己总结过两篇小博文 Spark 踩坑记——初试和 Spark 踩坑记——数据库(Hbase+Mysql),第一篇概况的归纳了自己对 spark 的初步尝试,第二篇更多是局部在 spark 对于数据库的操作,而本文的思路是从 spark 最细节的本质,即核心的数据结构 RDD 出发,到整个 Spark 集群宏观的调度过程做一个整理归纳,从微观到宏观两方面总结,方便自己在调优过程中找寻问题,理清思路,也加深自己对于分布式程序开发的理解。(有任何问题和纰漏还请各位大牛指出啦,我会第一时间改正)
在 Spark 开山之作 "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing" 的这篇 paper 中(以下简称 RDD Paper),Matei 等提出了 RDD 这种数据结构,文中开头对 RDD 定义是: A distributed memory abstraction that lets programmers perform in-memory computations on large clusters in a fault-tolerant manner. 也就是说 RDD 设计的核心点为:
文中提到了对于 RDD 设计的最大挑战便是在提供有效的容错机制 (fault tolerance efficiently),之前存在的基于内存存储的集群抽象,例如分布式共享内存、键值存储、数据库等,更多是细粒度的(fine-grained)更新一个可变状态表,而其容错方式通常为在机器间进行数据复制或者日志更新,而这些方式很明显会造成机器负载加大以及大量的网络传输开销。 而 RDD 则使用了粗粒度的(coarse-grained)转换,即对于很多相同的数据项使用同一种操作(如 map/filter/join),这种方式能够通过记录 RDD 之间的转换从而刻画 RDD 的继承关系(lineage),而不是真实的数据,最终构成一个 DAG(有向无环图),而如果发生 RDD 丢失,RDD 会有充足的信息来得知怎么从其他 RDDs 重新计算得到。 这也是 RDD 设计的核心理念,接下来围绕这一理念我们来剖析,看 RDD 是怎么实现这种高效的容错机制的。
RDD 实现的数据结构核心是一个五元组,如下表:
属性 | 说明 |
---|---|
分区列表 - partitions | 每个分区为 RDD 的一部分数据 |
依赖列表 - dependencies | table 存储其父 RDD 即依赖 RDD |
计算函数 - compute | 利用父分区计算 RDD 各分区的值 |
分区器 - partitioner | 指明 RDD 的分区方式 (hash/range) |
分区位置列表 - preferredLocations | 指明分区优先存放的结点位置 |
其中每个属性的代码如下:
- // RDD中的依赖关系由一个Seq数据集来记录,这里使用Seq的原因是经常取第一个元素或者遍历
- private
- var dependencies_: Seq[Dependency[_]] = null
- // 分区列表定义在一个数组中,这里使用Array的原因是随时使用下标来访问分区内容
- // @transient分区列表不需要被序列化
- @transient private
- var partitions_: Array[Partition] = null
- // 接口定义,具体由子类实现,对输入的RDD分区进行计算
- def compute(split: Partition, context: TaskContext) : Iterator[T]
- // 分区器
- // 可选,子类可以重写以指定新的分区方式,Spark支持Hash和Range两种分区方式
- @transient val partitioner: Option[Partitioner] = None
- // 可选,子类可以指定分区的位置,如HadoopRDD可以重写此方法,让分区尽可能与数据在相同的节点上
- protected def getPreferredLocations(split: Partition) : Seq[String] = Nil
在 RDD Paper 中,作者提到在抽象 RDD 时,一个很重要的点便是如何使得 RDD 能够记录 RDD 之间的继承依赖关系(lineage),这种继承关系来自丰富的转移(Transformation)操作。所以作者提出了一种基于图的表示方式来实现这个目标,这也正是上面 RDD 五种属性的核心作用。 这五种属性从 spark 诞生到新的版本迭代,一直在使用,没有增加也没有减少,所以可以说 Spark 的核心就是 RDD,而 RDD 的核心就是这五种属性。
在 Spark 踩坑记——初试中对 RDD 的操作也进行了简单说明,在 Spark 中,对 RDD 的操作可以分为 Transformation 和 Action 两种,我们分别进行整理说明:
对于 Transformation 操作是指由一个 RDD 生成新 RDD 的过程,其代表了是计算的中间过程,其并不会触发真实的计算。
不同于 Transformation 操作,Action 代表一次计算的结束,不再产生新的 RDD,将结果返回到 Driver 程序。所以 Transformation 只是建立计算关系,而 Action 才是实际的执行者。每个 Action 都会调用 SparkContext 的 runJob 方法向集群正式提交请求,所以每个 Action 对应一个 Job。
RDD 的容错机制是通过记录更新来实现的,且记录的是粗粒度的转换操作。在外部,我们将记录的信息称为血统(Lineage)关系,而到了源码级别,Apache Spark 记录的则是 RDD 之间的依赖(Dependency)关系。在一次转换操作中,创建得到的新 RDD 称为子 RDD,提供数据的 RDD 称为父 RDD,父 RDD 可能会存在多个,我们把子 RDD 与父 RDD 之间的关系称为依赖关系,或者可以说是子 RDD 依赖于父 RDD。 依赖只保存父 RDD 信息,转换操作的其他信息,如数据处理函数,会在创建 RDD 时候,保存在新的 RDD 内。依赖在 Apache Spark 源码中的对应实现是 Dependency 抽象类。 Apache Spark 将依赖进一步分为两类,分别是窄依赖(Narrow Dependency)和 Shuffle 依赖(Shuffle Dependency,在部分文献中也被称为 Wide Dependency,即宽依赖)。
窄依赖中,父 RDD 中的一个分区最多只会被子 RDD 中的一个分区使用,换句话说,父 RDD 中,一个分区内的数据是不能被分割的,必须整个交付给子 RDD 中的一个分区。下图展示了几类常见的窄依赖及其对应的转换操作。
Shuffle 依赖中,父 RDD 中的分区可能会被多个子 RDD 分区使用。因为父 RDD 中一个分区内的数据会被分割,发送给子 RDD 的所有分区,因此 Shuffle 依赖也意味着父 RDD 与子 RDD 之间存在着 Shuffle 过程。下图展示了几类常见的 Shuffle 依赖及其对应的转换操作。
需要说明的是,依赖关系时 RDD 到 RDD 之间的一种映射关系,是两个 RDD 之间的依赖,那么如果在一次操作中涉及到多个父 RDD,也有可能同时包含窄依赖和 Shuffle 依赖,如 join 操作:
说到 Spark 集群的部署,我们先来讨论一下 Spark 中一些关键的组件,在我的博文 Spark 踩坑记——初试中,我对 Master/Worker/Driver/Executor 几个关键概念做了阐述。首先,先上官方文档中的一张图: 官方文档对其中的术语进行了总结,如下表: 从官方文档摘抄了这么多东东,对 Spark 中基本的集群结构,以及一个程序提交到 Spark 后的调度情况我们有了了解。
对于集群的部署方式,Spark 提供了多种集群部署方式,如下:
由于在我平时的使用中,是直接采用的 Standalone 的部署方式,我这里将部署的框架做一个简单的介绍,其他部署方式其实可以做一些参考来进行搭配部署: 假设我们的网段为 10.214.55.x,其中 1、2、3 机器我们用作集群节点,4 和 5 位 master 节点,这里我们用到了 zookeeper,关于 zookeeper 的介绍大家可以在网上搜搜,我们这里加入 zk 的目的就是 master 节点如果崩溃后进行一个主备切换,保证集群能够继续正常运行。如果我们在 1 提交我们的应用,那么 2 和 3 就将作为我们的 worker 节点参与运算。而关于配置文件中需要的具体配置项可以参考官方文档:Spark Standalone Mode
上文我们从微观和宏观两个角度对 Spark 进行了总结,RDD 以及 RDD 的依赖,Spark 集群以及部署,那么当我们在提交了一个任务或者说 Application 到 Spark 集群时,它是怎么运作的呢?
针对这个过程,我们可以从微观和宏观两个角度把控,将 RDD 的操作依赖关系,以及 task 在集群间的分配情况综合起来看,如下图:
在提交 Spark 任务时,我们可以在提交命令中加入一项参数 --conf spark.ui.port=xxxx,其中 "xxxx" 为你需要的端口号,这样在浏览器中我们就可以利用 Spark 提供的 UI 界面对 Application 的运行情况进行监控如下图:
在 spark 平时的使用过程当中,由于程序在整个集群当中奔跑,经常会遇到很多莫名其妙的错误,有时候通过日志给定的错误很难真的定位到真正的原因,那叫一个忧伤阿 T^T
出现这类错误,往往日志中会提到 JVM。在 Spark 中大多数操作会分担到各个结点的 worker 进行计算,但是对于 shuffle 类操作,如我们经常会用的 reduceByKey 或者 collect 等,都会使得 spark 将所有结点的数据汇总到 driver 进行计算,这样就会导致 driver 需要远大于正常 worker 的内存,所以遇到这类问题,最先可以考虑的便是增加 driver 结点的内存,增加方式如下:
- --driver-memory 15g
在利用 spark streaming 的 python 版本,消费 kafka 数据的时候,遇到类似下面的问题:
- UnicodeDecodeError: 'utf8' codec can't decode byte 0x85 in position 87: invalid start byte
我们知道 python2 中的字符串形式有两种即 unicode 形式和普通 str 形式,通过反复分析日志和查看 kafka.py 的源码找到了问题所在。首先在 pyspark 的 kafka API 中,找到 createStream 函数的如下说明: 图中红框内清楚的说明了,在解析 kafka 传来的数据的时候,默认使用了 utf8_decoder 函数,那这个东东是个什么玩意呢,找到 kafka.py 的源码,其定义如下:
- # 默认解码器
- def utf8_decoder(s):
- """ Decode the unicode as UTF-8 """
- if s is None:
- return None
- return s.decode('utf-8')
- class KafkaUtils(object):
- @staticmethod
- def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
- storageLevel=StorageLevel.MEMORY_AND_DISK_2,
- keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
- """
- Create an input stream that pulls messages from a Kafka Broker.
- :param ssc: StreamingContext object
- :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
- :param groupId: The group id for this consumer.
- :param topics: Dict of (topic_name -> numPartitions) to consume.
- Each partition is consumed in its own thread.
- :param kafkaParams: Additional params for Kafka
- :param storageLevel: RDD storage level.
- :param keyDecoder: A function used to decode key (default is utf8_decoder)
- :param valueDecoder: A function used to decode value (default is utf8_decoder)
- :return: A DStream object
- """
- if kafkaParams is None:
- kafkaParams = dict()
- kafkaParams.update({
- "zookeeper.connect": zkQuorum,
- "group.id": groupId,
- "zookeeper.connection.timeout.ms": "10000",
- })
- if not isinstance(topics, dict):
- raise TypeError("topics should be dict")
- jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
- helper = KafkaUtils._get_helper(ssc._sc)
- jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
- ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
- stream = DStream(jstream, ssc, ser)
- return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
- ...
我们看到默认的解码器直接调用了 s.decode,那么当 kafka 传来的数据中有非 utf8 编码的字符时,整个 stage 就会挂掉,所以修改如下:
- def my_uft8_decoder(s):
- if s is None:
- return None
- try:
- return s.decode('utf-8', 'replace')
- except Exception, e:
- print e;
- return None
- # 创建stream时传入
- kafkaStream = KafkaUtils.createStream(ssc, \
- conf.kafka_quorum, conf.kafka_consumer_group, {conf.kafka_topic:conf.spark_streaming_topic_parallelism}, {
- "auto.commit.interval.ms":"50000",
- "auto.offset.reset":"smallest",
- },
- StorageLevel.MEMORY_AND_DISK_SER,
- valueDecoder=my_uft8_decoder
- )
如果采用 createDirectStream 来创建 context 与此类似,不再赘述。所以在 pyspark 的 kafka 消费中遇到解码问题可以关注一下这里。
挺长的一篇整理,前后拖了很久。本篇博文我的构思主要就是,当我们提交了一个应用到 Spark 时,我们需要大致了解 Spark 做了什么,这里我并没有分析源码(因为我木有看哈哈)。从最微观的 RDD 的操作,到宏观的整个集群的调度运算,这样从 RDD 看集群调度就有了一个整体的认识,当遇到问题的时候就更容易排查,遇到性能拼瓶颈也容易查找。OK,这就是这篇博文的全部整理哈,其中末尾部分阐述了在实际项目中遇到的一些问题和坑,如果有相似的问题的朋友可以参考下。
做个小广告,项目是 WeTest 舆情,企鹅风讯,感兴趣的欢迎大家来踩踩: http://wetest.qq.com/bee/
参考文献:
来源: http://www.cnblogs.com/xlturing/p/6914064.html