本案例旨在综合使用 Spark Core 和 Spark Sql 完成业务需求, 具有一定的参考价值.
案例需求
筛选出符合查询条件的数据
统计出每天搜索 uv 排名前 3 的搜索词
按照每天的 top3 搜索词的 uv 搜索总次数, 倒序排序
将统计结果输出
案例数据
日期 | 搜索词 | 用户 | 城市 | 平台 | 版本 |
---|---|---|---|---|---|
2017-05-17 | Hadoop | user1 | 北京 | Android | 1.2 |
2017-05-17 | Hadoop | user2 | 北京 | Android | 1.2 |
2017-05-17 | Hadoop | user2 | 北京 | Android | 1.2 |
2017-05-17 | Hadoop | user3 | 北京 | Android | 1.2 |
2017-05-17 | Hadoop | user4 | 北京 | Android | 1.2 |
2017-05-17 | Scala | user1 | 天津 | Android | 1.2 |
2017-05-17 | Hadoop | user3 | 天津 | ISO | 1.2 |
2017-05-17 | Scala | user4 | 天津 | ISO | 1.2 |
2017-05-17 | Scala | user6 | 南京 | Android | 1.2 |
2017-05-18 | Scala | user1 | 天津 | Android | 1.2 |
2017-05-18 | Scala | user3 | 天津 | ISO | 1.2 |
2017-05-18 | Scala | user4 | 天津 | ISO | 1.2 |
2017-05-18 | Scala | user6 | 南京 | Android | 1.2 |
2017-05-18 | Spark | user7 | 天津 | Android | 1.2 |
2017-05-18 | Spark | user9 | 天津 | ISO | 1.2 |
2017-05-18 | Spark | user4 | 天津 | ISO | 1.2 |
2017-05-18 | Spark | user6 | 南京 | Android | 1.2 |
2017-05-18 | Spark | user6 | Android | 1.2 | |
2017-05-18 | Hadoop | user1 | 北京 | Android | 1.2 |
2017-05-18 | Hadoop | user2 | 北京 | Android | 1.2 |
2017-05-18 | Hadoop | user5 | 北京 | Android | 1.2 |
2017-05-18 | Hadoop | user3 | 北京 | Android | 1.2 |
2017-05-18 | Hadoop | user4 | 北京 | Android | 1.2 |
2017-05-18 | Hive | user1 | 北京 | Android | 1.2 |
2017-05-18 | Hive | user2 | 北京 | Android | 1.2 |
2017-05-18 | Hive | user5 | 北京 | Android | 1.2 |
2017-05-18 | Hive | user3 | 北京 | Android | 1.2 |
2017-05-18 | kafka | user4 | 北京 | Android | 1.2 |
2017-05-18 | kafka | user6 | 北京 | Android | 1.2 |
2017-05-18 | kafka | user7 | 北京 | Android | 1.2 |
2017-05-18 | kafka | user1 | 北京 | Android | 1.2 |
2017-05-18 | kafka | user2 | 北京 | Android | 1.2 |
2017-05-18 | SQL | user5 | 北京 | Android | 1.2 |
2017-05-18 | SQL | user3 | 北京 | Android | 1.2 |
2017-05-18 | SQL | user4 | 北京 | Android | 1.2 |
2017-05-18 | SQL | user2 | 北京 | Android | 1.2 |
2017-05-18 | SQL | user5 | 北京 | Android | 1.2 |
2017-05-18 | Mongodb | user3 | 北京 | Android | 1.2 |
2017-05-18 | redis | user4 | 北京 | Android | 1.2 |
原始数据存储在 hdfs 中, 数据项之间使用 \ t 进行分割, 部分数据项可能会有缺失.
实现思路
读取 hdfs 上的原始数据并转换为 RDD
使用 filter 算子顾虑有效的数据
从系统日志中获取有用的数据项并封装成 Row 对象
将 RDD<Row > 转换为 Dataset<Row>
按日期分组统计搜索词的搜索次数
使用窗口函数 row_number 获取每日 top 3 热词
将统计结果输出
示例代码
以本地环境为例, 生成环境只需把 master 和文件路径变更一下即可.
Spark API 实现方式
- // 创建 SparkSession 对象
- SparkSession session =SparkSession.builder()
- .appName("DailyTop3Keyword")
- .master("local[*]")
- .getOrCreate();
- // 创建 JavaSparkContext 对象
- JavaSparkContext jsc = JavaSparkContext.fromSparkContext(session.sparkContext());
- // 过滤条件
- List<String> list = Arrays.asList(new String[]{"北京","天津","南京"});
- // 使用广播变量进行性能优化
- final Broadcast<List<String>> cities = jsc.broadcast(list);
- // 加载系统日志
- JavaRDD<Row> rdd = jsc.textFile("D:/keywords.txt")
- // 过滤掉无效的日志信息
- .filter(new Function<String, Boolean>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Boolean call(String line) throws Exception {
- boolean flg = false;
- for(String city : cities.value()){
- if(line.contains(city)){
- flg =true;
- break;
- }
- }
- return flg;
- }
- }).cache()
- // 从每行日志中获取有用信息并封装成 Row 对象
- .map(new Function<String, Row>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Row call(String line) throws Exception {
- String[] values = line.split("\t");
- return RowFactory.create(values[0],values[1],values[2]);
- }
- }
- );
- // 将 JavaRDD<Row > 转换成 Dataset<Row>
- Dataset<Row> rows = session.createDataFrame(rdd, new StructType(new StructField[]{
- DataTypes.createStructField("date", DataTypes.StringType, true),
- DataTypes.createStructField("keyword", DataTypes.StringType, true),
- DataTypes.createStructField("user", DataTypes.StringType, true)
- }));
- // 按日期统计关键词的搜索次数
- Dataset<Row> kv = rows
- .select(new Column("date"),new Column("keyword"),new Column("user"))
- .groupBy("date","keyword")
- .agg(countDistinct("user").alias("kv"))
- .orderBy(new Column("date").asc(),new Column("kv").desc());
- // 使用窗口函数 row_number 获取每日排名前三的搜索关键词
- kv.select(new Column("date")
- ,new Column("keyword")
- ,new Column("kv")
- ,row_number().over(Windows.partitionBy(new Column("date")).orderBy(new Column("kv").desc())).alias("rank"))
- .where("rank <=3")
- .show();
SQL 脚本实现方式
最后两步也可以使用 SQL 脚本的方式进行实现.
- // 将倒数第三步的结果注册成一个临时表 rows
- rows.createOrReplaceTempView("rows");
- // 按日期统计关键词的搜索次数并将统计结果注册成临时表 kv
- session.sql("select date,keyword,count(distinct user) kv from rows group by date,keyword order by date asc,kv desc")
- .createOrReplaceTempView("kv");
- // 使用窗口函数 row_number 获取每日排名前三的搜索关键词
- session.sql("select * from (select date,keyword,kv,row_number() over(partition by date order by kv desc) rank from kv) tmp where rank <= 3")
- .show();
示例数据统计结果
date | keyword | kv | rank |
---|---|---|---|
2017-05-18 | Hadoop | 6 | 1 |
2017-05-18 | kafka | 5 | 2 |
2017-05-18 | Scala | 4 | 3 |
2017-05-17 | Hadoop | 4 | 1 |
2017-05-17 | Scala | 3 | 2 |
示例中需要引入的 class
- import static org.apache.spark.sql.functions.countDistinct;
- import static org.apache.spark.sql.functions.row_number;
- import java.util.Arrays;
- import java.util.List;
- import org.apache.spark.API.java.JavaRDD;
- import org.apache.spark.API.java.JavaSparkContext;
- import org.apache.spark.API.java.function.Function;
- import org.apache.spark.broadcast.Broadcast;
- import org.apache.spark.sql.Column;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.RowFactory;
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.expressions.Windows;
- import org.apache.spark.sql.types.DataTypes;
- import org.apache.spark.sql.types.StructField;
- import org.apache.spark.sql.types.StructType;
来源: http://www.jianshu.com/p/ef7d99f4beb1