ShuffleManager(一)
本篇, 我们来看一下 spark 内核中另一个重要的模块, Shuffle 管理器 ShuffleManager.shuffle 可以说是分布式计算中最重要的一个概念了, 数据的 join, 聚合去重等操作都需要这个步骤. 另一方面, spark 之所以比 mapReduce 的性能高其中一个主要的原因就是对 shuffle 过程的优化, 一方面 spark 的 shuffle 过程更好地利用内存(也就是我们前面在分析内存管理时所说的执行内存), 另一方面对于 shuffle 过程中溢写的磁盘文件归并排序和引入索引文件. 当然, spark 性能高的另一个主要原因还有对计算链的优化, 把多步 map 类型的计算 chain 在一起, 大大减少中间过程的落盘, 这也是 spark 显著区别于 mr 的地方.
spark 新版本的 Shuffle 管理器默认是 SortShuffleManager.
SparkEnv 初始化部分的代码:
- val shortShuffleMgrNames = Map(
- "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
- "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
- ShuffleMapTask.runTask
看 shuffle 管理器的源码, 我们首先应该 ShuffleManager 的调用时机. 想一下 shuffle 的过程, 无非就是两个步骤, 写和读. 写是在 map 阶段, 将数据按照一定的分区规则归类到不同的分区中, 读是在 reduce 阶段, 每个分区从 map 阶段的输出中拉取属于自己的数据, 所以我们分析 ShuffleManager 源码基本也可以沿着这个思路. 我们先来分析写的过程, 因为对于一个完整的 shuffle 过程, 肯定是先写然后才读的.
回顾一下之前的对作业运行过程的分析, 我们应该还记得作业被切分成任务后是在 executor 端执行的, 而 Shuffle 阶段的的 stage 被切分成了 ShuffleMapTask,shuffle 的写过程正是在这个类中完成的, 我们看一下代码:
可以看到通过 ShuffleManager.getWriter 获取了一个 shuffle 写入器, 从而将 rdd 的计算数据写入磁盘.
- override def runTask(context: TaskContext): MapStatus = {
- // Deserialize the RDD using the broadcast variable.
- val threadMXBean = ManagementFactory.getThreadMXBean
- val deserializeStartTime = System.currentTimeMillis()
- val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
- threadMXBean.getCurrentThreadCpuTime
- } else 0L
- val ser = SparkEnv.get.closureSerializer.newInstance()
- // 反序列化 RDD 和 shuffle, 关键的步骤
- // 这里思考 rdd 和 shuffle 反序列化时, 内部的 SparkContext 对象是怎么反序列化的
- val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
- ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
- _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
- _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
- threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
- } else 0L
- var writer: ShuffleWriter[Any, Any] = null
- try {
- // shuffle 管理器
- val manager = SparkEnv.get.shuffleManager
- // 获取一个 shuffle 写入器
- writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
- // 这里可以看到 rdd 计算的核心方法就是 iterator 方法
- // SortShuffleWriter 的 write 方法可以分为几个步骤:
- // 将上游 rdd 计算出的数据 (通过调用 rdd.iterator 方法) 写入内存缓冲区,
- // 在写的过程中如果超过 内存阈值就会溢写磁盘文件, 可能会写多个文件
- // 最后将溢写的文件和内存中剩余的数据一起进行归并排序后写入到磁盘中形成一个大的数据文件
- // 这个排序是先按分区排序, 在按 key 排序
- // 在最后归并排序后写的过程中, 没写一个分区就会手动刷写一遍, 并记录下这个分区数据在文件中的位移
- // 所以实际上最后写完一个 task 的数据后, 磁盘上会有两个文件: 数据文件和记录每个 reduce 端 partition 数据位移的索引文件
- writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
- // 主要是删除中间过程的溢写文件, 向内存管理器释放申请的内存
- writer.stop(success = true).get
- } catch {
- case e: Exception =>
- try {
- if (writer != null) {
- writer.stop(success = false)
- }
- } catch {
- case e: Exception =>
- log.debug("Could not stop writer", e)
- }
- throw e
- }
- }
- SortShuffleManager.getWriter
这里根据 shuffle 类型获取不同的 ShuffleWriter 对象, 大多数情况下, 都是 SortShuffleWriter 类型, 所以我们直接看 SortShuffleWriter.write 方法.
- /** Get a writer for a given partition. Called on executors by map tasks. */
- // 获取一个 shuffle 存储器, 在 executor 端被调用, 在执行一个 map task 调用
- override def getWriter[K, V](
- handle: ShuffleHandle,
- mapId: Int,
- context: TaskContext): ShuffleWriter[K, V] = {
- numMapsForShuffle.putIfAbsent(
- handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
- val env = SparkEnv.get
- handle match {
- case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
- new UnsafeShuffleWriter(
- env.blockManager,
- shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
- context.taskMemoryManager(),
- unsafeShuffleHandle,
- mapId,
- context,
- env.conf)
- case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
- new BypassMergeSortShuffleWriter(
- env.blockManager,
- shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
- bypassMergeSortHandle,
- mapId,
- context,
- env.conf)
- case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
- new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
- }
- }
- SortShuffleWriter.write
总结一下这个方法的主要逻辑:
获取一个排序器, 根据是否需要 map 端聚合传递不同的参数
将数据插入排序器中, 这个过程或溢写出多个磁盘文件
根据 shuffleid 和分区 id 获取一个磁盘文件名,
将多个溢写的磁盘文件和内存中的排序数据进行归并排序, 并写到一个文件中, 同时返回每个 reduce 端分区的数据在这个文件中的位移
将索引写入一个索引文件, 并将数据文件的文件名由临时文件名改成正式的文件名.
最后封装一个 MapStatus 对象, 用于 ShuffleMapTask.runTask 的返回值.
在 stop 方法中还会做一些收尾工作, 统计磁盘 io 耗时, 删除中间溢写文件
- override def write(records: Iterator[Product2[K, V]]): Unit = {
- sorter = if (dep.mapSideCombine) {
- // map 端进行合并的情况, 此时用户应该提供聚合器和顺序
- require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
- new ExternalSorter[K, V, C](
- context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
- } else {
- // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
- // care whether the keys get sorted in each partition; that will be done on the reduce side
- // if the operation being run is sortByKey.
- new ExternalSorter[K, V, V](
- context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
- }
- // 将 map 数据全部写入排序器中,
- // 这个过程中可能会生成多个溢写文件
- sorter.insertAll(records)
- // Don't bother including the time to open the merged output file in the shuffle write time,
- // because it just opens a single file, so is typically too fast to measure accurately
- // (see SPARK-3570).
- // mapId 就是 shuffleMap 端 RDD 的 partitionId
- // 获取这个 map 分区的 shuffle 输出文件名
- val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
- // 加一个 uuid 后缀
- val tmp = Utils.tempFileWith(output)
- try {
- val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
- // 这一步将溢写到的磁盘的文件和内存中的数据进行归并排序,
- // 并溢写到一个文件中, 这一步写的文件是临时文件名
- val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
- // 这一步主要是写入索引文件, 使用 move 方法原子第将临时索引和临时数据文件重命名为正常的文件名
- shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
- // 返回一个状态对象, 包含 shuffle 服务 Id 和各个分区数据在文件中的位移
- mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
- } finally {
- if (tmp.exists() && !tmp.delete()) {
- logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
- }
- }
- }
- IndexShuffleBlockResolver
我们首先看一下获取 shuffle 输出文件名, 是通过 IndexShuffleBlockResolver 组件获取的, 而它的内部又是通过 BlockManager 内部的 DiskBlockManager 分配文件名的, 这个 DiskBlockManager 我在之前分析块管理器时提到过, 它的作用就是管理文件名的分配, 以及 spark 使用的目录, 子目录的创建删除等. 我们看到对于数据文件和索引文件的命名规则是不一样的, 他们的命名规则分别定义在 ShuffleDataBlockId 和 ShuffleIndexBlockId 中.
- def getDataFile(shuffleId: Int, mapId: Int): File = {
- blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
- }
- private def getIndexFile(shuffleId: Int, mapId: Int): File = {
- blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
- }
- ExternalSorter.insertAll
我们根据 SortShuffleWriter 中的调用顺序, 首先看一下 ExternalSorter.insertAll 方法:
首选根据是否在爱 map 端合并分为两种情况, 这两种情况使用的内存存储结构也不一样, 对于在 map 端合并的情况使用的是 PartitionedAppendOnlyMap 结构, 不在 map 合并则使用 PartitionedPairBuffer. 其中, PartitionedAppendOnlyMap 是用数组和线性探测法实现的 map 结构.
然后将数据一条一条地循环插入内存的存储结构中, 同时考虑到 map 端合并的情况
- def insertAll(records: Iterator[Product2[K, V]]): Unit = {
- // TODO: stop combining if we find that the reduction factor isn't high
- val shouldCombine = aggregator.isDefined
- // 在 map 端进行合并的情况
- if (shouldCombine) {
- // Combine values in-memory first using our AppendOnlyMap
- val mergeValue = aggregator.get.mergeValue
- val createCombiner = aggregator.get.createCombiner
- var kv: Product2[K, V] = null
- val update = (hadValue: Boolean, oldValue: C) => {
- if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
- }
- while (records.hasNext) {
- addElementsRead()
- kv = records.next()
- // 向内存缓冲中插入一条数据
- map.changeValue((getPartition(kv._1), kv._1), update)
- // 如果缓冲超过阈值, 就会溢写到磁盘生成一个文件
- // 每写入一条数据就检查一遍内存
- maybeSpillCollection(usingMap = true)
- }
- } else {// 不再 map 端合并的情况
- // Stick values into our buffer
- while (records.hasNext) {
- addElementsRead()
- val kv = records.next()
- buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
- maybeSpillCollection(usingMap = false)
- }
- }
- }
- AppendOnlyMap.changeValue
我们看一个稍微复杂一点的结构, AppendOnlyMap,
首先考虑空值的情况
计算 key 的 hash, 然后对容量取余. 注意, 这里由于容量是 2 的整数次幂, 所以对容量取余的操作等同于和容量 - 1 进行位与操作, java HashMap 中的操作.
如果, 不存在旧值, 那么直接插入,
如果存在旧值, 更新旧值
如果发生 hash 碰撞, 那么需要向后探测, 并且是跳跃性的探测,
可以看出, 这个结构设计还是很精良的, 这里面有个很重的方法, incrementSize 方法中会判断当前数据量的大小, 如果超过阈值就会扩容, 这个扩容的方法比较复杂, 就是一个重新 hash 再分布的过程, 不过有一点, 发不论是在插入新数据还是重新 hash 再分布的过程中, 对于 hash 碰撞的处理策略一定要相同, 否则可能造成不一致.
- // 向数组中插入一个 kv 对,
- def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
- assert(!destroyed, destructionMessage)
- val k = key.asInstanceOf[AnyRef]
- // 处理 key 为空的情况
- if (k.eq(null)) {
- // 如果是第一次插入空值, 那么需要将大小增加 1
- if (!haveNullValue) {
- incrementSize()
- }
- nullValue = updateFunc(haveNullValue, nullValue)
- haveNullValue = true
- return nullValue
- }
- var pos = rehash(k.hashCode) & mask
- // 线性探测法处理 hash 碰撞
- // 这里是一个加速的线性探测, 即第一次碰撞时走 1 步,
- // 第二次碰撞时走 2 步, 第三次碰撞时走 3 步
- var i = 1
- while (true) {
- val curKey = data(2 * pos)
- if (curKey.eq(null)) {// 如果旧值不存在, 直接插入
- val newValue = updateFunc(false, null.asInstanceOf[V])
- data(2 * pos) = k
- data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
- incrementSize()
- return newValue
- } else if (k.eq(curKey) || k.equals(curKey)) {// 如果旧值存在, 需要更新
- val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
- data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
- return newValue
- } else {// 发生 hash 碰撞, 向后探测, 跳跃性的探测
- val delta = i
- pos = (pos + delta) & mask
- i += 1
- }
- }
- null.asInstanceOf[V] // Never reached but needed to keep compiler happy
- }
- ExternalSorter.maybeSpillCollection
我们回到 ExternalSorter 的插入方法中, 没插入一条数据都要检查内存占用, 判断是否需要溢写到磁盘, 如果需要就溢写到磁盘.
这个方法里调用了 map.estimateSize 来估算当前插入的数据的内存占用大小, 对于内存占用的追踪和估算的功能是在 SizeTracker 特质中实现的, 这个特质我在之前分析 MemoryStore 时提到过, 在将对象类型的数据插入内存中时使用了一个中间态的数据结构 DeserializedValuesHolder, 它的内部有一个 SizeTrackingVector, 这个类就是通过继承 SizeTracker 特征从而实现对象大小的追踪和估算.
- private def maybeSpillCollection(usingMap: Boolean): Unit = {
- var estimatedSize = 0L
- if (usingMap) {
- estimatedSize = map.estimateSize()
- if (maybeSpill(map, estimatedSize)) {
- map = new PartitionedAppendOnlyMap[K, C]
- }
- } else {
- estimatedSize = buffer.estimateSize()
- if (maybeSpill(buffer, estimatedSize)) {
- buffer = new PartitionedPairBuffer[K, C]
- }
- }
- if (estimatedSize> _peakMemoryUsedBytes) {
- _peakMemoryUsedBytes = estimatedSize
- }
- }
- ExternalSorter.maybeSpill
首先检查当前内存占用是否超过阈值, 如果超过会申请一次执行内存, 如果没有申请到足够的执行内存, 那么依然需要溢写到磁盘
- protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
- var shouldSpill = false
- // 每写入 32 条数据检查一次
- if (elementsRead % 32 == 0 && currentMemory>= myMemoryThreshold) {
- // Claim up to double our current memory from the shuffle memory pool
- val amountToRequest = 2 * currentMemory - myMemoryThreshold
- // 向内存管理器申请执行内存
- val granted = acquireMemory(amountToRequest)
- myMemoryThreshold += granted
- // If we were granted too little memory to grow further (either tryToAcquire returned 0,
- // or we already had more memory than myMemoryThreshold), spill the current collection
- // 如果内存占用超过了阈值, 那么就需要溢写
- shouldSpill = currentMemory>= myMemoryThreshold
- }
- shouldSpill = shouldSpill || _elementsRead> numElementsForceSpillThreshold
- // Actually spill
- if (shouldSpill) {
- _spillCount += 1
- logSpillage(currentMemory)
- // 溢写到磁盘
- spill(collection)
- _elementsRead = 0
- _memoryBytesSpilled += currentMemory
- // 释放内存
- releaseMemory()
- }
- shouldSpill
- }
- ExternalSorter.spill
接着上面的方法,
- override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
- // 获取一个排序后的迭代器
- val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
- // 将数据写入磁盘文件中
- val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
- spills += spillFile
- }
- WritablePartitionedPairCollection.destructiveSortedWritablePartitionedIterator
这个方法返回按照分区和 key 排序过的迭代器, 其具体的排序逻辑在 AppendOnlyMap.destructiveSortedIterator 中
AppendOnlyMap.destructiveSortedIterator
这段代码分为两块, 首先对数组进行压紧, 是的稀疏的数据全部转移到数组的头部;
然后对数组按照比较器进行排序, 比较器首先是按照分区进行比较, 如果分区相同才按照 key 进行比较;
然后返回一个迭代器, 这个迭代器仅仅是对数组的封装. 通过这个方法, 我们大概知道了 AppendonlyMap 的排序逻辑.
- def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
- destroyed = true
- // Pack KV pairs into the front of the underlying array
- // 这段代码将稀疏的数据全部转移到数组头部, 将数据压紧
- var keyIndex, newIndex = 0
- while (keyIndex < capacity) {
- if (data(2 * keyIndex) != null) {
- data(2 * newIndex) = data(2 * keyIndex)
- data(2 * newIndex + 1) = data(2 * keyIndex + 1)
- newIndex += 1
- }
- keyIndex += 1
- }
- assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
- // 根据比较器对数据进行排序
- new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)
- new Iterator[(K, V)] {
- var i = 0
- var nullValueReady = haveNullValue
- def hasNext: Boolean = (i < newIndex || nullValueReady)
- def next(): (K, V) = {
- if (nullValueReady) {
- nullValueReady = false
- (null.asInstanceOf[K], nullValue)
- } else {
- val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
- i += 1
- item
- }
- }
- }
- }
- ExternalSorter.spillMemoryIteratorToDisk
回到 ExternalSorter.spill 方法中, 在获取了经过排序后 的迭代器之后, 我们就可以将数据溢写到磁盘上了.
这个方法的代码我不贴了, 总结一下主要步骤:
首先通过 DiskBlockManager 获取一个临时块的 BlockId 和临时文件名
通过 blockManager 获取一个磁盘写入器, 即 DiskBlockObjectWriter 对象, 内部封装了调用 java 流 API 写文件的逻辑
循环将每条数据写入磁盘, 并定期进行刷写(每隔一定的数据条数将内存中的数据刷写到磁盘上)
如果发生异常, 则会对之前写入的文件进行回滚
小结
总结一下数据通过 ExternalSorter 向磁盘溢写的全过程:
首先, 数据会被一条一条地向内部的 map 结构中插入
每插入一条数据都会检查内存占用情况, 如果内存占用超过阈值, 并且申请不到足够的执行内存, 就会将目前内存中的数据溢写到磁盘
对于溢写的过程: 首先会将数据按照分区和 key 进行排序, 相同分区的数据排在一起, 然后根据提供的排序器按照 key 的顺序排; 然后通过 DiskBlockManager 和 BlockManager 获取 DiskBlockWriter 将数据写入磁盘形成一个文件., 并将溢写的文件信息
在整个写入过程中, 会溢写多个文件
ExternalSorter.writePartitionedFile
总结一下主要的步骤:
仍然是通过 blockManager 获取一个磁盘写入器
将内部溢写的多个磁盘文件和滞留在内存的数据进行归并排序, 并分装成一个按照分区归类的迭代器
循环将数据写入磁盘, 每当一个分区的数据写完后, 进行一次刷写, 将数据从 os 的文件缓冲区同步到磁盘上, 然后获取此时的文件长度, 记录下每个分区在文件中的位移
- def writePartitionedFile(
- blockId: BlockId,
- outputFile: File): Array[Long] = {
- // Track location of each range in the output file
- val lengths = new Array[Long](numPartitions)
- val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
- context.taskMetrics().shuffleWriteMetrics)
- // 如果前面没有数据溢写到磁盘中,
- // 则只需要将内存中的数据溢写到磁盘
- if (spills.isEmpty) {
- // Case where we only have in-memory data
- val collection = if (aggregator.isDefined) map else buffer
- // 返回排序后的迭代器
- val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
- while (it.hasNext) {
- val partitionId = it.nextPartition()
- while (it.hasNext && it.nextPartition() == partitionId) {
- it.writeNext(writer)
- }
- // 写完一个分区刷写一次
- val segment = writer.commitAndGet()
- // 记录下分区的数据在文件中的位移
- lengths(partitionId) = segment.length
- }
- } else {// 有溢写到磁盘的文件
- // We must perform merge-sort; get an iterator by partition and write everything directly.
- // 封装一个用于归并各个溢写文件以及内存缓冲区数据的迭代器
- // TODO 这个封装的迭代器是实现归并排序的关键
- for ((id, elements) <- this.partitionedIterator) {
- if (elements.hasNext) {
- for (elem <- elements) {
- writer.write(elem._1, elem._2)
- }
- // 每写完一个分区, 主动刷写一次, 获取文件位移,
- // 这个位移就是写入的分区的位移,
- // reduce 端在拉取数据时就会根据这个位移直接找到应该拉取的数据的位置
- val segment = writer.commitAndGet()
- lengths(id) = segment.length
- }
- }
- }
- writer.close()
- // 写完后更新一些统计信息
- context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
- context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
- context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
- // 返回每个 reduce 端分区数据在文件中的位移信息
- lengths
- }
- IndexShuffleBlockResolver.writeIndexFileAndCommit
仍然回到 SortShuffleWriter.write 方法, 最后一步调用了 IndexShuffleBlockResolver.writeIndexFileAndCommit 方法,
这个方法的作用主要是将每个的分区的位移值写入到一个索引文件中, 并将临时的索引文件和临时的数据文件重命名为正常的文件名(重命名操作是一个原子操作)
总结
我总结 shuffle 写数据的过程, 可以分为两个主要的步骤:
一是在数据写入的过程中会由于内存不足从而溢写多个数据文件到磁盘中, 而所有的文件都是按照分区和 key 排序的, 这为第二部归并排序打下基础
第二部就是将这些溢写的小文件和最后内存中剩下的数据进行归并排序, 然后写入一个大文件中, 并且在写入的过程中记录每个分区数据在文件中的位移,
最后还要写入一个索引文件, 索引文件即记录了每个 reduce 端分区在数据文件中的位移, 这样 reduce 在拉取数据的时候才能很快定位到自己分区所需要的数据
来源: https://www.cnblogs.com/zhuge134/p/11026040.html