- {"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
- {"name":"Andy", "age":30,"myScore":[{"score1":29,"score2":33},{"score1":38,"score2":52},{"score1":88,"score2":71}]}
- {"name":"Justin", "age":19,"myScore":[{"score1":39,"score2":43},{"score1":28,"score2":53}]}
- {"name":"Michael", "age":25,"myScore":[{"score1":19,"score2":23},{"score1":58,"score2":50}]}
- object explodeTest {
- def main(args: Array[String]): Unit = {
- val sparks = SparkSession.builder.master("local[4]").appName("test1").getOrCreate
- val sc = sparks.sparkContext
- val df= sparks.read.JSON("file:///C:\\Users\\imp\\Desktop\\bo-kong\\data\\josn")
- df.show()
- //spark 读取 JSON 数据
- /**+---+--------------------+-------+
- |age| myScore| name|
- +---+--------------------+-------+
- | 25| [[19,23], [58,50]]|Michael|
- | 30|[[29,33], [38,52]...| Andy|
- | 19| [[39,43], [28,53]]| Justin|
- | 25| [[19,23], [58,50]]|Michael|
- | 30|[[29,33], [38,52]...| Andy|
- | 19| [[39,43], [28,53]]| Justin|
- | 25| [[19,23], [58,50]]|Michael|
- | 30|[[29,33], [38,52]...| Andy|
- | 19| [[39,43], [28,53]]| Justin|
- +---+--------------------+-------+
- *
- *
- *
- */
- // 使用 spark.sql.functions._ explode 函数进行压平操作 行转列
- import org.apache.spark.sql.functions._
- val dfScore = df.select(df("name"),explode(df("myScore"))).toDF("name","myScore")
- val dfMyScore = dfScore.select("name","myScore.score1", "myScore.score2")
- dfScore.show()
- df.createOrReplaceTempView("df")
- //u.answer, ''
- /**
- *
- *
- *
- * +-------+-------+
- * | name|myScore|
- * +-------+-------+
- * |Michael|[19,23]|
- * |Michael|[58,50]|
- * | Andy|[29,33]|
- * | Andy|[38,52]|
- * | Andy|[88,71]|
- * | Justin|[39,43]|
- * | Justin|[28,53]|
- * |Michael|[19,23]|
- * |Michael|[58,50]|
- * | Andy|[29,33]|
- * | Andy|[38,52]|
- * | Andy|[88,71]|
- * | Justin|[39,43]|
- * | Justin|[28,53]|
- * |Michael|[19,23]|
- * |Michael|[58,50]|
- * | Andy|[29,33]|
- * | Andy|[38,52]|
- * | Andy|[88,71]|
- * | Justin|[39,43]|
- * +-------+-------+
- * only showing top 20 rows
- */
- }
- }
数据
- aa
- bb
- cc
- dd
- ee
- ff
dataframe 增加 index 主键列
- case class Log(map:scala.collection.mutable.Map[String,String],ID: Long)
- import sparks.implicits._
- val data2 = sc.parallelize(Seq((Map("uuid"->"sxexx","ip"->"192.168")),Map("uuid"->"man","ip"->"192.168.10.1"))).zipWithIndex()
- .map(i=>(i._1,i._2))
- data2.collect().foreach(print(_))
- /**
- * 先创造一个 Rdd[map] 使用 zipWithIndex 看看效果 第二个元素为 id 主键
- *
- *
- * (Map(uuid -> sxexx, ip -> 192.168),0)
- * (Map(uuid -> man, ip -> 192.168.10.1),1)
- */
- val data= sc.textFile("file:///C:\\Users\\imp\\Desktop\\bo-kong\\data\\data")
- .zipWithIndex().toDF("id","value")
- data.show()
- /**
- * 使用上面的数据的得出结果
- * +---+-----+
- * | id|value|
- * +---+-----+
- * | aa| 0|
- * | bb| 1|
- * | cc| 2|
- * | dd| 3|
- * | ee| 4|
- * | ff| 5|
- * +---+-----+
- */
来源: http://www.bubuko.com/infodetail-3001276.html