详细代码我已上传到 GitHub:click me
一, 实验要求
在 Spark2.3 平台上实现 Apriori 频繁项集挖掘的并行化算法. 要求程序利用 Spark 进
行并行计算.
二, 算法设计
2.1 设计思路
变量定义
D 为数据集, 设 Lk 是 k 项频繁项集, Ck 是 k 项候选集, 每一行数据定义为一笔交易 (transaction), 交易中的每个商品为项 item.
支持度: support, 即该项集在数据集 D 中出现的次数
算法流程
单机 Apriori 算法的主要步骤如下:
获取输入数据, 产生频繁 1 项集, 以及和 I 作为候选集, 扫描数据集 D, 获取候选集 C1 的支持度, 并找出最小支持度 min_sup 的元素作为频繁 1 项集 L1.
扫描数据集 D, 获取候选集 Ck 的支持度, 并找出其中满足最小支持度的元素作为频繁 k 项集 Lk
通过频繁 k 项集 Lk 产生 k+1 候选集 Ck+1
通过迭代步骤 2 和 3, 直到找不到 k+1 项集结束
并行化设计的思路主要是考虑将对于支持度计数的过程使用 wordcount 来进行统计.
2.2 并行化算法设计
Apriori 算法产生频繁项集有两个特点: 第一, 它是逐层的, 即从频繁 1 - 项集到频繁 k - 项集; 第二, 它使用产生 - 测试的策略来发现频繁项, 每次迭代后都由前一次产生的频繁项来产生新的候选项, 然后对新产生的候选项集进行支持度计数得到新的频繁项集. 根据算法的特点, 我们将算法分为两个阶段:
如下图 1.1 算法的并行化框架图, 主节点每次迭代时需要将候选项集以广播的形式分发到每个从节点, 每个从节点收到之后进行一些列的操作得到新的频繁项集, 如此反复直至求得最大频繁项集.
图 1.1 并行化框架图
阶段 1: 从 HDFS 上获取原始的数据集 SparkRDD, 加载到分布式内存中. 扫描所有的 RDD 事务, 进行支持度计数, 产生频繁 1 - 项集; 如图 1.2 所示为 Ap 算法并行化第一阶段的 Lineage 图.
图 1.2 Apriori 算法并行化第一阶段的 Lineage 图
原始事务集由 flatMap 函数去读取事务, 并将所有的事务转化为 Spark RDD 并 cache 到分布式内存中. 接下来, 在每一个事务中执行 flatMap 函数来获取所有的 Items 项集, 之后执行 map 函数, 发射 < Item, 1 > 的 key/value 形式, 接下来执行 reduceByKey 函数统计每一个候选 1 - 项集的支持度, 最后并利用事先设好的支持度阈值进行剪枝, 所有超过支持度阈值的项集将会生成频繁 1 - 项集, 下面给出了第一阶段的算法伪代码
阶段 2: 在这个阶段, 不断迭代使用频繁 k - 项集去产生频繁 k+1 项集
图 1.3 Apriori 算法并行化第二阶段的 Lineage 图
如图 1.3 所示, 首先读取频繁 k - 项集并且以 < itemSet, Count > 的形式将其存储为 Spark RDD. 接下来, 从频繁 k - 项集得到候选 (k+1)- 项集. 为加速从候选项集中查找 (k+1)- 项集的过程, 将候选 (k+1)- 项集存放在哈希表中, 并将其 broadcast 到每个 worker 节点. 接下来, 通过 flatMap 函数获取每个候选项集在原始事务集中的支持度, 进一步对每个候选项使用 map 函数得到 < ItemSet, 1>, 之后通过 reduceBykey 函数搜集每个事务的最终的支持度计数, 最后利用实现设定好的支持度阈值剪枝, 支持度大于等于最小阈值的频繁项集将会以 key/value 的形式输出, 作为频繁 (k+1)- 项集, 下面给出了算法第二阶段的伪代码.
2.3 程序设计与性能分析
读取数据集, 按空格划分每行内容, 并用 HashSet 存储, 方便后期求子集以及一些集合操作
- // 将输入数据分区, 由于后面要频繁使用. 因此缓存起来
- val transations = sc.textFile(input, num)
- .map(x => {
- val content = x.split("\\s+")
- val tmp = new HashSet[String]
- for (i <- 0 until content.length) {
- tmp.add(content(i))
- }
- tmp
- }).cache()
根据支持度和数据总量计算频繁项阈值, 便于后期统计集合频度后直接对比
- // 计算频繁项阈值
- val numRecords = transations.count().toDouble
- val freqThre = numRecords * support
- println("frequency threshold:" + freqThre)
计算频繁 1 项集用于后续的循环迭代计算
- // 计算频繁 1 项集
- val oneFreqSet = transations
- .flatMap(_.seq)
- .map(x => (x, 1))
- .reduceByKey(_ + _)
- .filter(x => x._2>= freqThre)
- .map(x => (HashSet(x._1), x._2 / numRecords))
利用上一轮迭代计算生成的频繁 k 项集来构造候选 k+1 项集, 然后通过比频繁项阈值比对筛选出频繁 k+1 项集. 这里有一点要注意的, 由于从文件读入的源数据 transaction 被划分在各个 partition 上, 而候选集 candidates 要与 transaction 中每条记录比对来统计频度, 因此需要 spark 调用 broadcast 将候选集广播到每个 partition 上
- // 生成频繁项的候选集
- val candidates = generateCandidates(preFreSets, round)
- // 将候选项集广播到各个分区
- val broadcastCandidates = sc.broadcast(candidates)
- // 复杂度: len(transactions) * len(candidates) * round * transaction 项的平均长度
- // 这里的 len(transaction) 是指各个 partition 上 transaction 的平均长度
- val curFreqSet = transations
- .flatMap(x => verifyCandidates(x, broadcastCandidates.value))
- .reduceByKey(_ + _)
- .filter(x => x._2>= freqThre)
- // 写入频繁 round 项集结果到 hdfs
- curFreqSet.map(a => {
- val out = a._1.mkString(",") + ":" + (a._2 / numRecords).toString
- out
- }).saveAsTextFile(output + "/" + infileName + "freqset-" + round)
- // 生成频繁 round-Itemsets, 用于下一轮迭代生成候选集
- preFreSets = curFreqSet.collect().map(x => x._1)
第 round 轮迭代, 由候选项集生成频繁项集的复杂度: len(transactions) * len(candidates) * round * transaction 项的平均长度, 这里的 len(transaction) 是指各个 partition 上 transaction 的平均长度, 尽管我们通过提高并发度的方式将复杂度的稍微将了一些, 可是算法的整体复杂度还是很高, 特别是当源数据集很大时, 这样查表式地验证候选集很费时, 有考虑将项集索引, 但是如果全部项集都存那这个存储开销太大了, 目前没有很好的优化思路, 时间有限也没有进一步深入怎么优化这一步了.
对于候选集生成方法 generateCandidates 的具体实现, 我们首先拆分上一轮频繁项集 preFreSets 中的每个项再合并成一个元素集, 相当于一个词汇表, 然后遍历 preFreSets 中每个项, 如果该项中不包含元素表中的某个元素, 则将该元素与该项合并成一个候选项. 具体实现如下:
- def generateCandidates(preFreSets : Array[HashSet[String]], curRound: Int): Array[HashSet[String]] = {
- // 复杂度: len(elements) * len(preFrestats)^2 * curRound^2
- val elements = preFreSets.reduce((a,b) => a.union(b))
- val canSets = preFreSets.flatMap( t => for (ele <- elements if(!t.contains(ele))) yield t.+(ele) ).distinct
- canSets.filter( set => {
- val iter = set.subsets(curRound - 1)
- var flag = true
- while (iter.hasNext && flag){
- flag = preFreSets.contains(iter.next())
- }
- flag
- })
- }
但是这个过程复杂度太高: len(elements) * len(preFrestats)^2 * curRound^2, 当数据源中元素过多, 迭代更深以后, 这个复杂度将变得让人难以接受, 花了大量的时间再前一轮的候选项集中验证候选项, 需要想一个办法来避免顺序式的查表, 但限于时间有限, 这个地方没有深入展开研究怎么优化.
2.4 关联规则的实现
利用当前这一轮迭代生成的频繁项集 curFreqSet 来计算关联规则, 利用 curFreqSet 建立频繁项索引 freqSetIndex, 同统计候选项频度的原因一样, 我们需要将 freqSetIndex 广播到各个 partition 以统计规则 A->B 左项 A 的频度, 再利用 freqSetIndex 索引 AB 频繁项的频度即可计算规则 A->B 的置信度, 然后与设定的置信度对比即可筛选出需要的关联规则, 代码实现如下:
- // 生成关联规则
- val asst1 = System.nanoTime()
- // 建立频繁 round-Itemsets 的索引 Map
- val freqSetIndex = HashMap[HashSet[String], Int]()
- curFreqSet.collect().foreach(fs => freqSetIndex.put(fs._1, fs._2))
- // 将频繁 round-Itemsets 的索引 Map 广播到各个 partition
- val broadcastCurFreqSet = sc.broadcast(freqSetIndex)
- // 生成所有可能的关联规则, 然后筛选出置信度 >=confidence 的关联规则
- val associationRules = transations
- .flatMap(x => verifyRules(x, broadcastCurFreqSet.value.keys.toArray, round))
- .reduceByKey(_ + _)
- .map(x => ((x._1._1, x._1._2), broadcastCurFreqSet.value.get(x._1._1.union(x._1._2)).getOrElse(0) * 1.0 / x._2))
- .filter(x => x._2>= confidence)
对于规则构造 verifyRules 的具体实现, 我们通过遍历规则左项的长度来构造, 具体实现如下:
- def verifyRules(transaction: HashSet[String], candidates: Array[HashSet[String]], curRound: Int): Array[((HashSet[String], HashSet[String]), Int)] = {
- // yield 会根据第一个循环类型返回对应的类型, 这里的 candidates 是 Array, 因此返回的也是 Array 类型
- for {
- set <- candidates
- i <- 1 until curRound
- iter = set.subsets(i)
- l <- iter
- if (l.subsetOf(transaction))
- r = set.diff(l)
- } yield ((l, r), 1)
- }
三, 实验环境, 运行方式及结果
3.1 环境
spark 分布式环境的安装
在本地配置好 java,scala,hadoop(Spark 会用到 hadoop 的 hdfs)
版本: jdk 1.8.0_161, scala 2.11.8, hadoop 2.7.5
在 spark 官网下载 spark-2.3.0-bin-hadoop2.7, 解压安装
tar -zxvf spark-2.3.0-bin-hadoop2.7 -C ~/bigdata/spark
配置环境变量, 并使环境变量生效
- $ VIM ~/.bashrc
- # Spark Environment Variables
- export JAVA_HOME=~/bigdata/java/jdk1.8.0_161
- export JRE_HOME=${
- JAVA_HOME
- }/jre
- export SCALA_HOME=~/bigdata/scala/scala-2.11.8
- export HADOOP_HOME=~/bigdata/hadoop/hadoop-2.7.5
- export SPARK_HOME=~/bigdata/spark/spark-2.3.0-bin-hadoop2.7
- export CLASS_PATH=.:${
- JAVA_HOME
- }/lib:${
- JRE_HOME
- }/lib
- export PATH=${
- JAVA_HOME
- }/bin:${
- SCALA_HOME
- }/bin:${
- HADOOP_HOME
- }/bin:${
- HADOOP_HOME
- }/sbin:${
- SPARK_HOME
- }/bin:${
- SPARK_HOME
- }/sbin:$PATH
- $ source /etc/profile
配置, spark 的配置文件位于 $SPARK_HOME/conf 目录下, 需要修改的文件有 spark-env.sh, spark-defaluts.conf 和 slaves.
- $ cd ~/bigdata/spark/spark-2.3.0-bin-hadoop2.7/conf
- $ cp spark-env.sh.template spark-env.sh
- $ VIM spark-env,sh
- # spark-env.sh configuration
- export JAVA_HOME=~/bigdata/java/jdk1.8.0_161
- export SCALA_HOME=~/bigdata/scala/scala-2.11.8
- export SPARK_HOME=~/bigdata/spark/spark-2.3.0-bin-hadoop2.7
- export HADOOP_HOME=~/bigdata/hadoop/hadoop-2.7.5
- export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
- export PATH=$PATH:$JAVA_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin:$HADOOP_HOME/bin
- export SPARK_MASTER_IP=slave103
- $ cp spark-defaluts.conf.template spark-defaults.conf
- $ VIM saprk-defaults.conf
- # spark-defaults.conf configuration
- spark.executor.extraJavaOptions -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
- spark.eventLog.enabled true
- spark.eventLog.dir hdfs://slave103:9000/spark_event
- spark.yarn.historyServer.address slave103:18080
- spark.history.fs.logDirectory hdfs://slave103:9000/history_log
- spark.serializer org.apache.spark.serializer.KryoSerializer
- spark.executor.memory 2g
- spark.executor.cores 2
- spark.driver.memory 2g
- spark.driver.cores 2
- spark.yarn.am.memory 2g
- spark.yarn.am.cores 2
- spark.local.dir /tmp/sparklocaldir
- spark.yarn.jars hdfs://slave103:9000/spark/jars/*
- $ cp slaves.template slaves
- $ VIM slaves
- # slaves configuration(主机名在 / etc/hosts 中配置)
- slave101
- slave103
启动和停止
- # 启动
- $ bash $SPARK_HOME/sbin/start-all.sh
- # 停止
- $ bash $SPARK_HOME/sbin/stop-all.sh
启动 hadoop 和 spark 执行 jps 命令, 显示的进程如下图 3.1 和 3.2 所示:
图 3.1 主节点 jvm 进程
图 3.2 从节点 jvm 进程
3.2 jar 包运行方式
假设输入数据文件为 chess.dat,shell 下运行方式如下:
- spark-submit --class main.scala.Apriori.distributed.Apriori --master spark://slave103:7077 --conf spark.driver.memory=4g --conf spark.executor.cores=2 original-MapReduce-1.0.jar input/apriori/chess.dat output 0.8 20 24 0.9
- # jar 后面的参数说明: 输入文件 输出目录 支持度 迭代轮数 并发度即 partition 数目 置信度
3.3 结果
测试 connect.dat 数据集生成频繁项集的运行时间, 图 3.3 是单机版的, 图 3.4 是并行版的:
图 3.3 单机版运行时间
图 3.4 并行版运行时间
在 chess.dat 数据集上测试并行版本的频繁项集生成和关联规则挖掘的运行时间如下:
chess.dat 频繁项集生成结果
chess.dat 关联规则挖掘结果
四, webUI 执行报告
来源: https://www.cnblogs.com/brooksj/p/spark-Apriori.html