- package movierating
- import org.apache.spark.SparkConf
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.{Row, SparkSession}
- import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
- object MovieRating {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[*]").setAppName("RDD_MOVIE_USERS_ANALYZER")
- val spark = SparkSession.builder().config(conf).getOrCreate()
- val sc = spark.sparkContext
- // 设置日志级别
- sc.setLogLevel("WARN")
- // 读取数据
- val dataPath = "E:\\sparkproject\\src\\main\\data\\"
- val usersRDD = sc.textFile(dataPath + "users.dat")
- val moviesRDD = sc.textFile(dataPath + "movies.dat")
- val ratingsRDD = sc.textFile(dataPath + "ratings.dat")
- /* 具体业务处理逻辑 */
- val movieAndRatingArray = getTop10TitleAndAvgRating(spark, moviesRDD, ratingsRDD)
- for ((movie, rating) <- movieAndRatingArray) {
- println(movie + "评分为:" + rating)
- }
- val movieAndRatingArrayForGender = getFemailTop10TitleAndAvgRating(spark, moviesRDD, ratingsRDD, usersRDD, "M")
- for ((movie, rating) <- movieAndRatingArrayForGender) {
- println(movie + "评分为:" + rating)
- }
- val sortedResult = sortByTimeAndRating(moviesRDD, ratingsRDD)
- for (movie <- sortedResult) {
- println(movie)
- }
- CntGroupByGenderAndAgeWithDF(spark, usersRDD, ratingsRDD, moviesRDD)
- CntGroupByGenderAndAgeWithDS(spark, usersRDD, ratingsRDD, moviesRDD)
- // 关闭 SparkSession
- spark.close()
- }
- /**
- * 获取 top10 电影及平均分
- *
- * @param spark
- * @param moviesRDD
- * @param ratingsRDD
- * @return
- */
- def getTop10TitleAndAvgRating(spark: SparkSession, moviesRDD: RDD[String], ratingsRDD: RDD[String]): Array[(String, Double)] = {
- //(movie_id,title)
- val movieInfo = moviesRDD.map(_.split("::")).map(x => (x(0), x(1))).cache()
- //(user_id,movie_id,rating)
- val ratings = ratingsRDD.map(_.split("::")).map(x => (x(0), x(1), x(2))).cache()
- //(movie_id,(sum(rating),count(1)))
- val movieAndRatings = ratings.map(x => (x._2, (x._3.toDouble, 1))).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
- //(movie_id,avg)
- val avgRatings = movieAndRatings.map(x => (x._1, x._2._1.toDouble / x._2._2))
- //sort
- val titleAndAvg = avgRatings.join(movieInfo).map(x => (x._2._1, x._2._2)).sortByKey(false).map(x => (x._2, x._1)).take(10)
- titleAndAvg
- }
- /**
- * 获取性别电影偏好
- *
- * @param spark
- * @param moviesRDD
- * @param ratingsRDD
- * @param usersRDD
- * @param gender
- * @return
- */
- def getFemailTop10TitleAndAvgRating(spark: SparkSession, moviesRDD: RDD[String], ratingsRDD: RDD[String], usersRDD: RDD[String], gender: String): Array[(String, Double)] = {
- //user_id
- val userGender = usersRDD.map(_.split("::")).map(x => (x(0), x(1))).filter(x => x._2.equals(gender)).cache()
- //(movie_id,title)
- val movieInfo = moviesRDD.map(_.split("::")).map(x => (x(0), x(1))).cache()
- //(user_id,(movie_id,rating))
- val ratings: RDD[(String, (String, String))] = ratingsRDD.map(_.split("::")).map(x => (x(0), (x(1), x(2)))).cache()
- //(movie_id,rating)
- val FemailUserRating = userGender.join(ratings).map(x => (x._2._2._1, x._2._2._2))
- //(movie_id,(sum(rating),count(1)))
- val FemailMovieAndRating = FemailUserRating.map(x => (x._1, (x._2.toDouble, 1))).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
- //(movie_id,avg)
- val femailMovieAndAvgRating = FemailMovieAndRating.map(x => (x._1, x._2._1.toDouble / x._2._2))
- val titleAndAvg = femailMovieAndAvgRating.join(movieInfo).map(x => (x._2._1, x._2._2)).sortByKey(false).map(x => (x._2, x._1)).take(10)
- titleAndAvg
- }
- /**
- * 通过时间戳与评分进行二次排序
- *
- * @param moviesRDD
- * @param ratingsRDD
- * @return
- */
- def sortByTimeAndRating(moviesRDD: RDD[String], ratingsRDD: RDD[String]): Array[String] = {
- val movieInfo = moviesRDD.map(_.split("::")).map(x => (x(0), x(1))).cache()
- val pairWithSortKey = ratingsRDD.map(line => {
- val splited = line.split("::")
- (new SecondarySortKey(splited(3).toDouble, splited(2).toDouble), splited(1))
- })
- pairWithSortKey.sortByKey(false).map(x => (x._2, x._1)).join(movieInfo).map(x => x._2._2).take(10)
- }
- /**
- * DF
- *
- * @param spark
- * @param usersRDD
- * @param ratingsRDD
- * @param moviesRDD
- */
- def CntGroupByGenderAndAgeWithDF(spark: SparkSession, usersRDD: RDD[String], ratingsRDD: RDD[String], moviesRDD: RDD[String]): Unit = {
- val schemaForUsers = StructType("UserID::Gender::Age::OccupationID::Zip-code".split("::").map(column => StructField(column, StringType, true)))
- val usersRDDRows = usersRDD.map(_.split("::")).map(line => Row(line(0).trim, line(1).trim, line(2).trim, line(3).trim, line(4).trim))
- val usersDataFrame = spark.createDataFrame(usersRDDRows, schemaForUsers)
- val schemaforratings = StructType("UserID::MovieID".split("::").map(column => StructField(column, StringType, true)))
- .add("Rating", DoubleType, true)
- .add("Timestamp", StringType, true)
- val ratingsRDDRows = ratingsRDD.map(_.split("::")).map(line => Row(line(0).trim, line(1).trim, line(2).trim.toDouble, line(3).trim))
- val ratingsDataFrame = spark.createDataFrame(ratingsRDDRows, schemaforratings)
- val schemaformovies = StructType("MovieID::Title::Genres".split("::").map(column => StructField(column, StringType, true)))
- val moviesRDDRows = moviesRDD.map(_.split("::")).map(line => Row(line(0).trim, line(1).trim, line(2).trim))
- val moviesDataFrame = spark.createDataFrame(moviesRDDRows, schemaformovies)
- ratingsDataFrame.filter(s"MovieID=1193").join(usersDataFrame, "UserID")
- .select("Gender", "Age")
- .groupBy("Gender", "Age")
- .count().show(10)
- // 会话级别
- ratingsDataFrame.createTempView("ratings")
- usersDataFrame.createTempView("users")
- val sql_local = "select gender,age,count(*) from users u join ratings r on u.userid = r.userid where movieid =1193 group by gender,age"
- spark.sql(sql_local).show()
- //application 级别, global_temp
- ratingsDataFrame.createGlobalTempView("ratings")
- usersDataFrame.createGlobalTempView("users")
- val sql = "select gender,age,count(*) from global_temp.users u join global_temp.ratings r on u.userid = r.userid where movieid =1193 group by gender,age"
- spark.sql(sql).show()
- // 隐式转换
- import spark.sqlContext.implicits._
- ratingsDataFrame.select("movieid", "rating")
- .groupBy("movieid").avg("rating")
- .rdd.map(row => (row(1), (row(0), row(1))))
- .sortBy(_._1.toString.toDouble, false)
- .map(x => x._2).collect.take(10).foreach(println)
- }
- /**
- * DS
- *
- * @param userID
- * @param gender
- * @param Age
- * @param OccupationID
- * @param Zip_Code
- */
- case class User(userID: String, gender: String, Age: String, OccupationID: String, Zip_Code: String)
- case class Rating(userID: String, movieID: String, rating: Double, Timestamp: String)
- def CntGroupByGenderAndAgeWithDS(spark: SparkSession, usersRDD: RDD[String], ratingsRDD: RDD[String], moviesRDD: RDD[String]): Unit = {
- import spark.implicits._
- val usersForDSRDD = usersRDD.map(_.split("::")).map(x => User(x(0).trim, x(1).trim, x(2).trim, x(3).trim, x(4).trim))
- val userDataSet = spark.createDataset[User](usersForDSRDD)
- userDataSet.show()
- val ratingsForDSRDD = ratingsRDD.map(_.split("::")).map(x => Rating(x(0).trim, x(1).trim, x(2).trim.toDouble, x(3).trim))
- val ratingDataSet = spark.createDataset[Rating](ratingsForDSRDD)
- ratingDataSet.filter(s"movieid=1193").join(userDataSet, "userid").select("gender", "age")
- .groupBy("gender", "age").count().orderBy($"gender".desc, $"age").show()
- }
- }
- package movierating
- /**
- * 实现二次排序
- *
- * @param first
- * @param second
- */
- class SecondarySortKey(val first: Double, val second: Double) extends Ordered[SecondarySortKey] with Serializable {
- override def compare(that: SecondarySortKey): Int = {
- // 先比较第一个是否相等
- if (this.first - that.first != 0) {
- (this.first - that.first).toInt
- } else {
- if (this.second - that.second> 0) {
- Math.ceil(this.second - that.second).toInt
- } else if (this.second - that.second < 0) {
- Math.floor(this.second - that.second).toInt
- } else {
- (this.second - that.second).toInt
- }
- }
- }
- }
来源: http://www.bubuko.com/infodetail-3333716.html