对 SerializerManager 的说明:
它是为各种 Spark 组件配置序列化, 压缩和加密的组件, 包括自动选择用于 shuffle 的 Serializer.spark 中的数据在 network IO 或 local disk IO 传输过程中. 都需要序列化. 其默认的 Serializer 是 org.apache.spark.serializer.JavaSerializer, 在一定条件下, 可以使用 kryo, 即 org.apache.spark.serializer.KryoSerializer.
支持的两种序列化方式
即值的类型是八种基本类型中一种或 null 或 String, 都会使用 kryo, 否则使用默认序列化方式, 即 java 序列化方式.
它还负责读写 Block 流是否使用压缩:
数据流是否支持压缩
默认情况下:
其中, 如果使用压缩, 默认的压缩是 lz4, 可以通过参数 spark.io.compression.codec 来配置. 它支持的所有压缩类型如下:
读写数据流如何支持压缩
其中, 支持压缩的 InputStream 和 OutputStream 是对原来的 InputStream 和 OutputStream 做了包装. 我们以 LZ4BlockOutputStream 为例说明.
调用如下函数返回支持压缩的 OutputStream:
首先, LZ4BlockOutputStream 的继承关系如下:
被包装的类被放到了 FilterOutputStream 类的 out 字段中, 如下:
outputStream 核心方法就是 write. 直接来看 LZ4BlockOutputStream 的 write 方法:
其中 buffer 是一个 byte 数组, 默认是 32k, 可以通过 spark.io.compression.lz4.blockSize 参数来指定, 在 LZ4BlockOutputStream 类中用 blockSize 保存.
重点看 flushBufferedData 方法:
方法内部实现思路如下:
外部写入到 buffer 中的数据经过 compressor 压缩到 compressorBuffer 中, 然后再写入一些 magic, 最终将压缩的 buffer 写入到 out 中, write 操作结束.
可见, 数据的压缩是由 LZ4BlockOutputStream 负责的, 压缩之后的数据被写入到目标 outputStream 中.
来源: https://www.cnblogs.com/johnny666888/p/11190380.html