场景
现在有如下数据格式
图书分类, 图书名, 数量
现在想统计全部分类中数量最多的书名以及数量
场景解析
如果不基于 spark, 我们来思考这个问题, 数据量大内存是放不下, 分类也不确定有多少类, 图书名可能有重复, 还需要合并计算. 这种情况只能是分治, 首先分类, 把文件首先按照分类拆分成多个文件, 每个文件中的数据都是图书名数量, 然后根据图书名对数量进行合并, 最后进行排序.
spark 思维转化
上面的思路单独写这个程序没问题, 但是如果基于 spark 就有点问题了, 首先是分区的事情, 想把数据准确落在不同的分区, 且不重复, 必须要先知道到底有多少分区. 所以首先要统计分类种类, 帮助以后分区.
分区器
//data 是已经读取进来的图书分类的集合 data.distinct().collect()
有了数据就要应用分区器
- classMyPartionerextendsPartitioner{
- private Map<String, Integer> part = new HashMap<>();
- publicMyPartioner(List<String> data){
- int count = 0;
- for (String s : data) {
- part.put(s, count++);
- }
- }
- @Override publicintnumPartitions(){
- return part.size();
- }
- @Override publicintgetPartition(Object o){
- Keys info = (Keys) o;
- return part.get(((Keys) o).type);
- }
- }
直接根据已经生成好的数据来进行分区. 保证 1 个分类 1 个分区, 这样就可以以后的部分就只关注排序即可.
数据合并
分区, 分区器都准备好了, 按照以前的思路, 是不是应该把数据分散在不同的分区了. 想法挺好, 但是在分布式存储中, 数据移动的成本很高, 所以都是先对本地数据进行处理合并, 减小数据量然后才进行数据的 shuffle 等分区操作, 所以这里我们要做的其实是合并同类数据.
textFile.mapToPair(lines ->newTuple2(name,count)) .reduceByKey((x, y)->x + y);
这里是一个典型的单词计数的案例.
接下来就是想着分区, 然后排序, 如果你查查 API 的话, 你会发现并没用按照 value 排序的算子. 如果要排序的话, 一定是 key. 这里发生了一个冲突点, 就是你是按照 type 分区, 次数的 type 就是 key, 接下来排序, 其实就是按照 type 来排. 发现了我们要依赖 key 完成两件事, 一个是分区, 一个是排序. 分区靠 type, 排序靠 count. 这里的解决方案就是用对象. 计数之后, 得到的结果会是一个 < bookname,totalcount > 的 tuple. 这个明显无法继续下去了, 你连分区的条件都没了. bookname 和 type 是一一对应的, 所以这里合并统计的是 bookname+type 的结构体. 这样就满足了分区的条件了. 为了把排序的因子给加上, 我们做个 map 操作, 把 type 和 count 组织成一个对象.
classKeysimplementsSerializable{ String type; Integer count;publicKeys(String type, Integer count){this.type = type;this.count = count; }}
这样就给了我们很大的空间, 在分区器里, 取出 key 来进行操作, 在排序的时候, 写一个比较器, 按照 count 来进行排序.
.repartitionAndSortWithinPartitions(newMyPartioner(collect),newKeyCompare());
直接使用分区并且排序的算子帮我解决这个问题.
小结
在大数据环境下, 数据的 shuffle 操作的代价很大, 所以优先考虑合并数据, 然后再进行分区等等. spark 的算子大部分都是对 key 进行生效的, 例如排序等等, 对 value 的操作大部分是合并和迭代, 并没有单独的排序出来. 所以要合理利用 java 对象来组合 key 值, 完成功能.
大家喜欢多多关注, 你的关注是我最大的动力.
大家喜欢的可以关注我的微信公号号: 首席数据师 你的关注是我最大的动力
来源: http://www.jianshu.com/p/f1966eea14f5