通常写 spark 的程序用 scala 比较方便, 毕竟 spark 的源码就是用 scala 写的. 然而, 目前 java 开发者特别多, 尤其进行数据对接, 上线服务的时候, 这时候, 就需要掌握一些 spark 在 java 中的使用方法了
一, map
map 在进行数据处理, 转换的时候, 不能更常用了
在使用 map 之前 首先要定义一个转换的函数 格式如下:
- Function<String, LabeledPoint> transForm = new Function<String, LabeledPoint>() {//String 是某一行的输入类型 LabeledPoint 是转换后的输出类型
- @Override
- public LabeledPoint call(String row) throws Exception {// 重写 call 方法
- String[] rowArr = row.split(",");
- int rowSize = rowArr.length;
- double[] doubleArr = new double[rowSize-1];
- // 除了第一位的 lable 外 其余的部分解析成 double 然后放到数组中
- for (int i = 1; i <rowSize; i++) {
- String each = rowArr[i];
- doubleArr[i] = Double.parseDouble(each);
- }
- // 用刚才得到的数据 转成向量
- Vector feature = Vectors.dense(doubleArr);
- double label = Double.parseDouble(rowArr[0]);
- // 构造用于分类训练的数据格式 LabelPoint
- LabeledPoint point = new LabeledPoint(label, feature);
- return point;
- }
- };
需要特别注意的是:
1,call 方法的输入应该是转换之前的数据行的类型 返回值应是处理之后的数据行类型
2, 如果转换方法中调用了自定义的类, 注意该类名必须实现序列化 比如
- public class TreeEnsemble implements Serializable {
- }
3, 转换函数中如果调用了某些类的对象, 比如该方法需要调用外部的一个参数, 或者数值处理模型 (标准化, 归一化等), 则该对象需要声明是 final
然后就是在合适的时候调用该转换函数了
JavaRDD<LabeledPoint> rdd = oriData.toJavaRDD().map(transForm);
这种方式是需要将普通的 rdd 转成 javaRDD 才能使用的, 转成 javaRDD 的这一步操作不耗时, 不用担心
二, filter
在避免数据出现空值, 0 等场景中也非常常用, 可以满足 sql 中 where 的功能
这里首先也是要定义一个函数, 该函数给定数据行 返回布尔值 实际效果是将返回为 true 的数据保留
- Function<String, Boolean> boolFilter = new Function<String, Boolean>() {//String 是某一行的输入类型 Boolean 是对应的输出类型 用于判断数据是否保留
- @Override
- public Boolean call(String row) throws Exception {// 重写 call 方法
- boolean flag = row!=null;
- return flag;
- }
- };
通常该函数实际使用中需要修改的仅仅是 row 的类型 也就是数据行的输入类型, 和上面的转换函数不同, 此 call 方法的返回值应是固定为 Boolean
然后是调用方式
JavaRDD<LabeledPoint> rdd = oriData.toJavaRDD().filter(boolFilter);
三, mapToPair
该方法和 map 方法有一些类似, 也是对数据进行一些转换. 不过此函数输入一行 输出的是一个元组, 最常用的方法是用来做交叉验证 或者统计错误率 召回率 计算 AUC 等等
同样, 需要先定义一个转换函数
- Function<String, Boolean> transformer = new PairFunction<LabeledPoint, Object, Object>() {//LabeledPoint 是输入类型 后面的两个 Object 不要改动
- @Override
- public Tuple2 call(LabeledPoint row) throws Exception {// 重写 call 方法 通常只改动输入参数 输出不要改动
- double predicton = thismodel.predict(row.features());
- double label = row.label();
- return new Tuple2(predicton, label);
- }
- });
关于调用的类, 类的对象, 要求和之前的一致, 类需要实现序列化, 类的对象需要声明成 final 类型
相应的调用如下:
JavaPairRDD<Object, Object> predictionsAndLabels = oriData.mapToPair(transformer);
然后对该 predictionsAndLabels 的使用, 计算准确率, 召回率, 精准率, AUC, 接下来的博客中会有, 敬请期待
如有补充, 或者质疑, 或者有相关问题, 请发邮件给我, 或者直接回复 邮箱: 326543991@qq.com
来源: https://www.cnblogs.com/starwater/p/9195764.html