- java
- public class Demo {
- private static SparkConf conf = new SparkConf().setAppName("demo").setMaster("local");
- private static JavaSparkContext jsc = new JavaSparkContext(conf);
- private static SparkSession session = new SparkSession(jsc.sc());
- public static void main(String[] args) {
- // 加载 students.JSON name,score
- Dataset<Row> score = session.read().JSON("./src/main/java/cn/tele/spark_sql/json/students.json");
- score.createOrReplaceTempView("scoreView");
- // name,score
- JavaRDD<Row> scoreRDD = session.sql("select * from scoreView where score> 80").javaRDD();
- // 创建信息 JSON name,age
- JavaRDD<String> infoRDD = jsc.parallelize(Arrays.asList("{\"name\":\"Leo\",\"age\":18}",
- "{\"name\":\"Marry\",\"age\":19}", "{\"name\":\"Jack\",\"age\":20}"));
- Dataset<Row> info = session.read().JSON(infoRDD);
- info.createOrReplaceTempView("infoView");
- // 拼接 sql
- List<Row> scoreList = scoreRDD.collect();
- String sql = "select * from infoView where name in (";
- for (int i = 0; i <scoreList.size(); i++) {
- sql += "'"+ scoreList.get(i).getAs("name") +"'";
- if (i < scoreList.size() - 1) {
- sql += ",";
- }
- }
- sql += ")";
- // 查询 分数 > 80 的学生的 name,age
- // 转换
- JavaPairRDD<String, Integer> tempRDD = session.sql(sql).javaRDD()
- .mapToPair(new PairFunction<Row, String, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Tuple2<String, Integer> call(Row t) throws Exception {
- return new Tuple2<String, Integer>(t.getAs("name"), Integer.valueOf(t.getAs("age").toString()));
- }
- });
- JavaPairRDD<String, Integer> scoreRDD2 = scoreRDD.mapToPair(new PairFunction<Row, String, Integer>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Tuple2<String, Integer> call(Row t) throws Exception {
- return new Tuple2<String, Integer>(t.getAs("name"), Integer.valueOf(t.getAs("score").toString()));
- }
- });
- // join
- JavaPairRDD<String, Tuple2<Integer, Integer>> resultRDD = tempRDD.join(scoreRDD2);
- // 遍历
- resultRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Integer>>>() {
- private static final long serialVersionUID = 1L;
- @Override
- public void call(Tuple2<String, Tuple2<Integer, Integer>> t) throws Exception {
- System.out.println("name:" + t._1 + "," + "age:" + t._2._1 + ",score:" + t._2._2);
- }
- });
- // 保存为 JSON 格式
- StructType schema = DataTypes
- .createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false),
- DataTypes.createStructField("age", DataTypes.IntegerType, false),
- DataTypes.createStructField("score", DataTypes.IntegerType, false)));
- JavaRDD<Row> rowRDD = resultRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
- private static final long serialVersionUID = 1L;
- @Override
- public Row call(Tuple2<String, Tuple2<Integer, Integer>> v1) throws Exception {
- return RowFactory.create(v1._1, Integer.valueOf(v1._2._1), Integer.valueOf(v1._2._2));
- }
- });
- Dataset<Row> resultDS = session.createDataFrame(rowRDD, schema);
- resultDS.write().format("json").mode(SaveMode.Append).save("./src/main/java/cn/tele/spark_sql/json/result");
- session.stop();
- jsc.close();
- }
- }
- scala
- object Demo {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setAppName("demo").setMaster("local")
- val sc = new SparkContext(conf)
- val sqlContext = new SQLContext(sc)
- // 加载 score 信息
- val scoreDF = sqlContext.read.JSON("./src/main/scala/cn/tele/spark_sql/json/students.json")
- scoreDF.createOrReplaceTempView("scoreView")
- val arr = sqlContext.sql("select * from scoreView where score> 80").rdd.collect()
- // 创建 学生信息
- val infoRDD = sc.parallelize(Array(
- "{\"name\":\"Leo\",\"age\":20}",
- "{\"name\":\"Marry\",\"age\":30}",
- "{\"name\":\"Jack\",\"age\":21}"), 2)
- val infoDS = sqlContext.read.JSON(infoRDD)
- infoDS.createOrReplaceTempView("infoView")
- var sql = "select * from infoView where name in ("
- // 拼接 sql
- for (i <- 0 to arr.length - 1) {
- sql += "'"+ arr(i).getAs[String]("name") +"'"
- if (i <arr.length - 1) {
- sql += ","
- }
- }
- sql += ")"
- val tempRDD = sqlContext.sql(sql).rdd.map(row => {
- (row.getAs[String]("name"), row.getAs[Long]("age").toInt)
- })
- val tempRDD2 = scoreDF.rdd.map(row => {
- (row.getAs[String]("name"), row.getAs[Long]("score").toInt)
- })
- //join
- val resultRDD = tempRDD.join(tempRDD2)
- // 遍历
- resultRDD.foreach(t => {
- println("name:" + t._1 + "age:" + t._2._1 + "score:" + t._2._2)
- })
- val rowRDD = resultRDD.map(t => Row(t._1, t._2._1, t._2._2))
- // 保存为 JSON 文件
- val schema = DataTypes.createStructType(Array(
- StructField("name", DataTypes.StringType, false),
- StructField("age", DataTypes.IntegerType, false),
- StructField("score", DataTypes.IntegerType, false)))
- val df = sqlContext.createDataFrame(rowRDD, schema)
- df.write.format("json").mode(SaveMode.Append).save("./src/main/scala/cn/tele/spark_sql/json/result")
- }
- }
来源: http://www.bubuko.com/infodetail-2958118.html