本篇文章主要剖析 broadcast 的实现机制.
BroadcastManager 初始化
BroadcastManager 初始化方法源码如下:
TorrentBroadcastFactory 的继承关系如下:
- BroadcastFactory
- An interface for all the broadcast implementations in Spark (to allow multiple broadcast implementations). SparkContext uses a BroadcastFactory implementation to instantiate a particular broadcast for the entire Spark job.
即它是 Spark 中 broadcast 中所有实现的接口. SparkContext 使用 BroadcastFactory 实现来为整个 Spark job 实例化特定的 broadcast. 它有唯一子类 -- TorrentBroadcastFactory.
它有两个比较重要的方法:
newBroadcast 方法负责创建一个 broadcast 变量.
TorrentBroadcastFactory
其主要方法如下:
newBroadcast 其实例化 TorrentBroadcast 类.
unbroadcast 方法调用了 TorrentBroadcast 类的 unpersist 方法.
TorrentBroadcast 父类 Broadcast
官方说明如下:
- A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
- They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost. Broadcast variables are created from a variable v by calling org.apache.spark.SparkContext.broadcast. The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method.
- The interpreter session below shows this:
- scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
- broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
- scala> broadcastVar.value
- res0: Array[Int] = Array(1, 2, 3)
- After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).
即广播变量允许编程者将一个只读变量缓存到每一个机器上, 而不是随任务一起发送它的副本. 它们可以被用来用一种高效的方式拷贝输入的大数据集. Spark 也尝试使用高效的广播算法来减少交互代价. 它通过调用 SparkContext 的 broadcast 方法创建, broadcast 变量是对真实变量的包装, 它可以通过 broadcast 对象的 value 方法返回真实对象. 一旦真实对象被广播了, 要确保对象不会被改变, 以确保该数据在所有节点上都是一致的.
TorrentBroadcast 继承关系如下:
TorrentBroadcast 是 Broadcast 的唯一子类.
TorrentBroadcast
其说明如下:
- A BitTorrent-like implementation of org.apache.spark.broadcast.Broadcast.
- The mechanism is as follows:
- The driver divides the serialized object into small chunks and stores those chunks in the BlockManager of the driver.
- On each executor, the executor first attempts to fetch the object from its BlockManager.
- If it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or other executors if available.
- Once it gets the chunks, it puts the chunks in its own BlockManager, ready for other executors to fetch from.
- This prevents the driver from being the bottleneck in sending out multiple copies of the broadcast data (one per executor).
- When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
实现机制:
driver 将数据拆分成多个小的 chunk 并将这些小的 chunk 保存在 driver 的 BlockManager 中. 在每一个 executor 节点上, executor 首先先从它自己的 blockmanager 获取数据, 如果不存在, 它使用远程抓取, 从 driver 或者是其他的 executor 中抓取数据. 一旦它获取到 chunk, 就将其放入到自己的 BlockManager 中, 准备被其他的节点请求获取. 这使得 driver 发送多个副本到多个 executor 节点的瓶颈不复存在.
driver 端写数据
广播数据的保存有两种形式:
1. 数据保存在 memstore 中一份, 需要反序列化后存入; 保存在磁盘中一份, 磁盘中的那一份先使用 SerializerManager 序列化为字节数组, 然后保存到磁盘中.
2. 将对象根据 blockSize(默认为 4m, 可以通过 spark.broadcast.blockSize 参数指定),compressCodec(默认是启用的, 可以通过 spark.broadcast.compress 参数禁用. 压缩算法默认是 lz4, 可以通过 spark.io.compression.codec 参数指定) 将数据写入到 outputStream 中, 进而拆分为几个小的 chunk, 最终将数据持久化到 blockManager 中, 也是 memstore 一份, 不需要反序列化; 磁盘一份.
其中, TorrentBroadcast 的 blockifyObject 方法如下:
压缩的 Outputstream 对 ChunkedByteBufferOutputStream 做了装饰.
driver 或 executor 读数据
broadcast 方法调用 value 方法时, 会调用 TorrentBroadcast 的 getValue 方法, 如下:
_value 字段声明如下:
private lazy val _value: T = readBroadcastBlock()
接下来看一下 readBroadcastBlock 这个方法:
- private def readBroadcastBlock(): T = Utils.tryOrIOException {
- TorrentBroadcast.synchronized {
- val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
- Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
- setConf(SparkEnv.get.conf)
- val blockManager = SparkEnv.get.blockManager
- blockManager.getLocalValues(broadcastId) match {
- case Some(blockResult) =>
- if (blockResult.data.hasNext) {
- val x = blockResult.data.next().asInstanceOf[T]
- releaseLock(broadcastId)
- if (x != null) {
- broadcastCache.put(broadcastId, x)
- }
- x
- } else {
- throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
- }
- case None =>
- logInfo("Started reading broadcast variable" + id)
- val startTimeMs = System.currentTimeMillis()
- val blocks = readBlocks()
- logInfo("Reading broadcast variable" + id + "took" + Utils.getUsedTimeMs(startTimeMs))
- try {
- val obj = TorrentBroadcast.unBlockifyObject[T](
- blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)
- // Store the merged copy in BlockManager so other tasks on this executor don't
- // need to re-fetch it.
- val storageLevel = StorageLevel.MEMORY_AND_DISK
- if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
- throw new SparkException(s"Failed to store $broadcastId in BlockManager")
- }
- if (obj != null) {
- broadcastCache.put(broadcastId, obj)
- }
- obj
- } finally {
- blocks.foreach(_.dispose())
- }
- }
- }
- }
- }
对源码作如下解释:
第 3 行: broadcastManager.cachedValues 保存着所有的 broadcast 的值, 它是一个 Map 结构的, key 是强引用, value 是虚引用 (在垃圾回收时会被清理掉).
第 4 行: 根据 broadcastId 从 cachedValues 中取数据. 如果没有, 则执行 getOrElse 里的 default 方法.
第 8 行: 从 BlockManager 的本地获取 broadcast 的值 (从 memstore 或 diskstore 中, 获取的数据是完整的数据, 不是切分之后的小 chunk), 若有, 则释放 BlockManager 的锁, 并将获取的值存入 cachedValues 中; 若没有, 则调用 readBlocks 将 chunk 数据读取到并将数据转换为 broadcast 的 value 对象, 并将该对象放入 cachedValues 中.
其中, readBlocks 方法如下:
- /** Fetch torrent blocks from the driver and/or other executors. */
- private def readBlocks(): Array[BlockData] = {
- // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
- // to the driver, so other executors can pull these chunks from this executor as well.
- val blocks = new Array[BlockData](numBlocks)
- val bm = SparkEnv.get.blockManager
- for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
- val pieceId = BroadcastBlockId(id, "piece" + pid)
- logDebug(s"Reading piece $pieceId of $broadcastId")
- // First try getLocalBytes because there is a chance that previous attempts to fetch the
- // broadcast blocks have already fetched some of the blocks. In that case, some blocks
- // would be available locally (on this executor).
- bm.getLocalBytes(pieceId) match {
- case Some(block) =>
- blocks(pid) = block
- releaseLock(pieceId)
- case None =>
- bm.getRemoteBytes(pieceId) match {
- case Some(b) =>
- if (checksumEnabled) {
- val sum = calcChecksum(b.chunks(0))
- if (sum != checksums(pid)) {
- throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" +
- s"$sum != ${checksums(pid)}")
- }
- }
- // We found the block from remote executors/driver's BlockManager, so put the block
- // in this executor's BlockManager.
- if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
- throw new SparkException(
- s"Failed to store $pieceId of $broadcastId in local BlockManager")
- }
- blocks(pid) = new ByteBufferBlockData(b, true)
- case None =>
- throw new SparkException(s"Failed to get $pieceId of $broadcastId")
- }
- }
- }
- blocks
- }
源码解释如下:
第 14 行: 根据 pieceid 从本地 BlockManager 中获取到 chunk
第 15 行: 如果获取到了 chunk, 则释放锁.
第 18 行: 如果没有获取到 chunk, 则从远程根据 pieceid 获取远程获取 chunk, 获取到 chunk 后做 checksum 校验, 之后将 chunk 存入到本地 BlockManager 中.
注: 本篇文章没有对 BroadcastManager 中关于 BlockManager 的操作做进一步更详细的说明, 下一篇文章会专门剖析 Spark 的存储体系.
来源: https://www.cnblogs.com/johnny666888/p/11190397.html