需求
客户希望通过 spark 来分析二进制文件中 0 和 1 的数量以及占比如果要分析的是目录, 则针对目录下的每个文件单独进行分析分析后的结果保存与被分析文件同名的日志文件中, 内容包括 0 和 1 字符的数量与占比
要求: 如果值换算为二进制不足八位, 则需要在左侧填充 0
可以在 linux 下查看二进制文件的内容命令:
xxd b c 1 filename
-c 1 是显示 1 列 1 个字符,-b 是显示二进制
Python 版本
代码
- # This Python file uses the following encoding: utf-8
- from __future__ import division
- import os
- import time
- import sys
- from pyspark import SparkConf, SparkContext
- APP_NAME = "Load Bin Files"
- def main(spark_context, path):
- file_paths = fetch_files(path)
- for file_path in file_paths:
- outputs = analysis_file_content(spark_context, path + "/" + file_path)
- print_outputs(outputs)
- save_outputs(file_path, outputs)
- def fetch_files(path):
- if os.path.isfile(path):
- return [path]
- return os.listdir(path)
- def analysis_file_content(spark_context, file_path):
- data = spark_context.binaryRecords(file_path, 1)
- records = data.flatMap(lambda d: list(bin(ord(d)).replace('0b', '').zfill(8)))
- mapped_with_key = records.map(lambda d: ('0', 1) if d == '0' else ('1', 1))
- result = mapped_with_key.reduceByKey(lambda x, y: x + y)
- total = result.map(lambda r: r[1]).sum()
- return result.map(lambda r: format_outputs(r, total)).collect()
- def format_outputs(value_with_key, total):
- tu = (value_with_key[0], value_with_key[1], value_with_key[1] / total * 100)
- return "字符 {0} 的数量为{1}, 占比为{2:.2f}%".format(*tu)
- def print_outputs(outputs):
- for output in outputs:
- print output
- def save_outputs(file_path, outputs):
- result_dir = "result"
- if not os.path.exists(result_dir):
- os.mkdir(result_dir)
- output_file_name = "result/" + file_name_with_extension(file_path) + ".output"
- with open(output_file_name, "a") as result_file:
- for output in outputs:
- result_file.write(output + "\n")
- result_file.write("统计于{0}\n\n".format(format_logging_time()))
- def format_logging_time():
- return time.strftime('%Y-%m-%d %H:%m:%s', time.localtime(time.time()))
- def file_name_with_extension(path):
- last_index = path.rfind("/") + 1
- length = len(path)
- return path[last_index:length]
- if __name__ == "__main__":
- conf = SparkConf().setMaster("local[*]")
- conf = conf.setAppName(APP_NAME)
- sc = SparkContext(conf=conf)
- if len(sys.argv) != 2:
- print("请输入正确的文件或目录路径")
- else:
- main(sc, sys.argv[1])
核心逻辑都在
analysis_file_content
方法中
运行
python 是脚本文件, 无需编译不过运行的前提是要安装好 pyspark 运行命令为:
./bin/spark-submit /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py "files"
遇到的坑
开发环境的问题
要在 spark 下使用 python, 需要事先使用 pip 安装 pyspark 结果安装总是失败 python 的第三方库地址是
https://pypi.python.org/simple/
, 在国内访问很慢通过搜索问题, 许多文章提到了国内的镜像库, 例如豆瓣的库, 结果安装时都提示找不到 pyspark
查看安装错误原因, 并非不能访问该库, 仅仅是访问较慢, 下载了不到 8% 的时候就提示下载失败这实际上是连接超时的原因因而可以修改连接超时值可以在~/.pip/pip.conf 下增加:
- [global]
- timeout = 6000
虽然安装依然缓慢, 但至少能保证 pyspark 安装完毕但是在安装 py4j 时, 又提示如下错误信息(安装环境为 mac):
OSError: [Errno 1] Operation not permitted: '/System/Library/Frameworks/Python.framework/Versions/2.7/share'
即使这个安装方式是采用 sudo, 且在管理员身份下安装, 仍然提示该错误解决办法是执行如下安装:
- pip install --upgrade pip
- sudo pip install numpy --upgrade --ignore-installed
sudo pip install scipy --upgrade --ignore-installed
sudo pip install scikit-learn --upgrade --ignore-installed
然后再重新执行
sudo pip install pyspark
, 安装正确
字符编码的坑
在提示信息以及最后分析的结果中都包含了中文运行代码时, 会提示如下错误信息:
SyntaxError: Non-ASCII character '\xe5' in file /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py on line 36, but no encoding declared; see http://python.org/dev/peps/pep-0263/ for details
需要在代码文件的首行添加如下编码声明:
# This Python file uses the following encoding: utf-8
SparkConf 的坑
初始化 SparkContext 的代码如下所示:
- conf = SparkConf().setMaster("local[*]")
- conf = conf.setAppName(APP_NAME)
- sc = SparkContext(conf)
结果报告运行错误:
- Error initializing SparkContext.
- org.apache.spark.SparkException: Could not parse Master URL: '<pyspark.conf.SparkConf object at 0x106666390>'
根据错误提示, 以为是 Master 的设置有问题, 实际上是实例化 SparkContext 有问题阅读代码, 发现它的构造函数声明如下所示:
- def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
- environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
- gateway=None, jsc=None, profiler_cls=BasicProfiler):
而前面的代码仅仅是简单的将 conf 传递给 SparkContext 构造函数, 这就会导致 Spark 会将 conf 看做是 master 参数的值, 即默认为第一个参数所以这里要带名参数:
sc = SparkContext(conf = conf)
sys.argv 的坑
我需要在使用 spark-submit 命令执行 python 脚本文件时, 传入我需要分析的文件路径与 scala 和 java 不同 scala 的 main 函数参数 argv 实际上可以接受命令行传来的参数 python 不能这样, 只能使用 sys 模块来接收命令行参数, 即 sys.argv
argv 是一个 list 类型, 当我们通过 sys.argv 获取传递进来的参数值时, 一定要明白它会默认将 spark-submit 后要执行的 python 脚本文件路径作为第一个参数, 而之后的参数则放在第二个例如命令如下:
./bin/spark-submit /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py "files"
则:
- argv[0]: /Users/zhangyi/PycharmProjects/spark_binary_files_demo/parse_files_demo.py
- argv[1]: files
因此, 我需要获得 files 文件夹名, 就应该通过 argv[1]来获得
此外, 由于 argv 是一个 list, 没有 size 属性, 而应该通过 len()方法来获得它的长度, 且期待的长度为 2
整数参与除法的坑
在 python 2.7 中, 如果直接对整数执行除法, 结果为去掉小数因此 4 / 5 得到的结果却是 0 在 python 3 中, 这种运算会自动转型为浮点型
要解决这个问题, 最简单的办法是导入一个现成的模块:
from __future__ import division
注意: 这个 import 的声明应该放在所有 import 声明前面
Scala 版本
代码
- package bigdata.demo
- import java.io.File
- import java.text.SimpleDateFormat
- import java.util.Calendar
- import com.google.common.io.{Files => GoogleFiles}
- import org.apache.commons.io.Charsets
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
- object Main {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setAppName("Binary Files").setMaster("local[*]")
- val sc = new SparkContext(conf)
- if (args.size != 1) {
- println("请输入正确的文件或目录路径")
- return
- }
- def analyseFileContent(filePath: String): RDD[String] = {
- val data = sc.binaryRecords(filePath, 1)
- val records = data.flatMap(x => x.flatMap(x => toBinaryStr(byteToShort(x)).toCharArray))
- val mappedWithKey = records.map(i => if (i == '0') ('0', 1L) else ('1', 1L))
- val result = mappedWithKey.reduceByKey(_ + _)
- val sum = result.map(_._2).sum()
- result.map { case (key, count) => formatOutput(key, count, sum)}
- }
- val path = args.head
- val filePaths = fetchFiles(path)
- filePaths.par.foreach { filePath =>
- val outputs = analyseFileContent(filePath)
- printOutputs(outputs)
- saveOutputs(filePath, outputs)
- }
- }
- private def byteToShort(b: Byte): Short =
- if (b < 0) (b + 256).toShort else b.toShort
- private def toBinaryStr(i: Short, digits: Int = 8): String =
- String.format("%" + digits + "s", i.toBinaryString).replace('','0')
- private def printOutputs(outputs: RDD[String]): Unit = {
- outputs.foreach(println)
- }
- private def saveOutputs(filePath: String, outputs: RDD[String]): Unit = {
- val resultDir = new File("result")
- if (!resultDir.exists()) resultDir.mkdir()
- val resultFile = new File("result/" + getFileNameWithExtension(filePath) + ".output")
- outputs.foreach(line => GoogleFiles.append(line + "\n", resultFile, Charsets.UTF_8))
- GoogleFiles.append(s"统计于:${formatLoggingTime()}\n\n", resultFile, Charsets.UTF_8)
- }
- private def formatLoggingTime(): String = {
- val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
- formatter.format(Calendar.getInstance().getTime)
- }
- private def getFileNameWithExtension(filePath: String): String = {
- filePath.substring(filePath.lastIndexOf("/") + 1)
- }
- private def fetchFiles(path: String): List[String] = {
- val fileOrDirectory = new File(path)
- fileOrDirectory.isFile match {
- case true => List(path)
- case false => fileOrDirectory.listFiles().filter(_.isFile).map(_.getPath).toList
- }
- }
- private def formatPercent(number: Double): String = {
- val percent = "%1.2f" format number * 100
- s"${percent}%"
- }
- private def formatOutput(key: Char, count: Long, sum: Double): String = {
- s"字符 ${key}的数量为 ${count}, 占比为 ${formatPercent(count/sum)}"
- }
- }
运行
通过 sbt 对代码进行编译打包后, 生成 jar 文件然后在 spark 主目录下运行:
$SPARK_HOME/bin/spark-submit --class bigdata.demo.Main --master spark://<ip> $SPARK_HOME/jars/binaryfilesstastistics_2.11-1.0.jar file:///share/spark-2.2.0-bin-hadoop2.7/derby.log
最后的参数 file:///share/spark-2.2.0-bin-hadoop2.7/derby.log 就是 main 函数接收的参数, 即要分析的文件目录如果为本地目录, 需要指定文件协议 file://, 如果为 HDFS 目录, 则指定协议 hdfs://
遇到的坑
byte 类型的值
在 Scala 中, Byte 类型为 8 位有符号补码整数数值区间为 -128 到 127 倘若二进制值为
11111111
, 通过 SparkContext 的 binaryRecords()方法读进 Byte 数据后, 其值为 - 1, 而非 255 原因就是补码的缘故如果十进制为 128, 转换为 Byte 类型后, 值为 - 128
而对于 - 1, 如果执行 toBinaryString(), 则得到的字符串为 11111111111111111111111111111111, 而非我们期待的 11111111 如下图所示:
针对八位的二进制数值, 可以编写一个方法, 将 Byte 类型转为 Short 类型, 然后再调用 toBinaryString()方法转换为对应的二进制字符串
- private def byteToShort(b: Byte): Short =
- if (b < 0) (b + 256).toShort else b.toShort
而对于不足八位的二进制数值, 如果直接调用 toBinaryString()方法, 则二进制字符串将不到八位可以利用 String 的 format 进行格式化:
- private def toBinaryStr(i: Short, digits: Int = 8): String =
- String.format("%" + digits + "s", i.toBinaryString).replace('','0')
当然, 可以将这两个方法定义为 Byte 与 Short 的隐式方法
来源: https://juejin.im/entry/5aa1f0476fb9a028be3596b1