[关闭]
@ZSCDumin 2018-04-23T13:41:46.000000Z 字数 4447 阅读 698

SparkMlLib电影推荐系统


1.数据来源

https://github.com/ZSCDumin/SparkMLLIBMoviesRecommendation

2.代码实现

  1. import org.apache.log4j.{Level, Logger}
  2. import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
  3. import org.apache.spark.rdd._
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. import scala.io.Source
  6. object MoviesRecommendation {
  7. def main(args: Array[String]): Unit = {
  8. /**
  9. * 项目运行配置
  10. */
  11. Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  12. val conf = new SparkConf().setAppName("MoviesRecommendation").setMaster("local[*]")
  13. val sc = new SparkContext(conf)
  14. /**
  15. * 第一步:读取电影评分的数据
  16. */
  17. //裝載用戶評分
  18. val myRatings = loadRatings("/input/ml-1m/personalRatings.txt")
  19. val myRatingsRDD = sc.parallelize(myRatings, 1)
  20. //读取电影信息到本地
  21. val movies = sc.textFile("/input/ml-1m/movies.dat").map { line =>
  22. val fields = line.split("::")
  23. (fields(0).toInt, fields(1))
  24. }.collect().toMap
  25. //读取评分数据
  26. val ratings = sc.textFile("/input/ml-1m/ratings.dat").map { line =>
  27. val fields = line.split("::")
  28. val rating = Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
  29. val timestamp = fields(3).toLong % 10
  30. (timestamp, rating)
  31. }
  32. //输出数据集基本信息
  33. val numRatings = ratings.count
  34. val numUsers = ratings.map(_._2.user).distinct.count
  35. val numMovies = ratings.map(_._2.product).distinct.count
  36. println("总共获得" + numRatings + "条评分,来自于" + numUsers + "个用户,共" + numMovies + "部电影。")
  37. /**
  38. * 第二步:利用timestamp将数据集分为训练集(timestamp<6)、验证集(6<timestamp<8)和测试集(timestamp>8)
  39. */
  40. val training = ratings.filter(x => x._1 < 6).values.union(myRatingsRDD).repartition(4).cache()
  41. val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8).values.repartition(4).cache()
  42. val test = ratings.filter(x => x._1 >= 8).values.repartition(4).cache()
  43. val numTraining = training.count
  44. val numValidation = validation.count
  45. val numTest = test.count
  46. println("训练: " + numTraining + " 验证: " + numValidation + " 测试: " + numTest)
  47. /**
  48. * 第四步:使用不同的参数训练协同过滤模型,并且选择出RMSE最小的模型(为了简单起见,只从一个最小的参数范围选择:矩阵分解的秩从8-12中选择,
  49. * 正则系数从1.0~10.0 中选择,迭代次数从10~20 中选择,共计8个模型。
  50. */
  51. val ranks = List(8, 12)
  52. val lambdas = List(0.1, 10.0)
  53. val numIters = List(10, 20)
  54. var bestModel: Option[MatrixFactorizationModel] = None
  55. var bestValidationRmse = Double.MaxValue
  56. var bestRank = 0
  57. var bestLambda = -1.0
  58. var bestNumIter = -1
  59. for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
  60. val model = ALS.train(training, rank, numIter, lambda)
  61. val validationRmse = computeRmse(model, validation)
  62. if (validationRmse < bestValidationRmse) {
  63. bestModel = Some(model)
  64. bestValidationRmse = validationRmse
  65. bestRank = rank
  66. bestLambda = lambda
  67. bestNumIter = numIter
  68. }
  69. }
  70. // 用最佳模型预测测试集的评分,并计算和实际评分之间的均方根误差
  71. val testRmse = computeRmse(bestModel.get, test)
  72. println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")
  73. /**
  74. * 步骤五:对比使用协同过滤算法和不使用协同过滤算法
  75. */
  76. val meanRating = training.union(validation).map(_.rating).mean
  77. val baselineRmse = math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).mean)
  78. val improvement = (baselineRmse - testRmse) / baselineRmse * 100
  79. println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")
  80. /**
  81. * 步骤六:推荐前十部最感兴趣的电影,注意要剔除用户已经评分的电影
  82. */
  83. val myRatedMovieIds = myRatings.map(_.product).toSet
  84. val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)
  85. val recommendations = bestModel.get.predict(candidates.map((0, _))).collect.sortBy(-_.rating).take(10)
  86. var i = 1
  87. println("#############################################################")
  88. println("親,为您推荐的10部电影如下:")
  89. recommendations.foreach { r =>
  90. println("%2d".format(i) + ": " + movies(r.product))
  91. i += 1
  92. }
  93. println("#############################################################")
  94. //结束应用
  95. sc.stop()
  96. }
  97. /**
  98. * 第三步:定义函数计算均方误差RMSE:
  99. *
  100. * @param model
  101. * @param data
  102. * @return
  103. */
  104. def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {
  105. val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
  106. val predictionsAndRatings = predictions.map { x => ((x.user, x.product), x.rating) }
  107. .join(data.map(x => ((x.user, x.product), x.rating))).values
  108. math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean())
  109. }
  110. /**
  111. * 加载用户评分数据函數
  112. *
  113. * @param path
  114. * @return
  115. */
  116. def loadRatings(path: String): Seq[Rating] = {
  117. val lines = Source.fromFile(path).getLines()
  118. val ratings = lines.map {
  119. line =>
  120. val fields = line.split("::")
  121. Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
  122. }.filter(_.rating > 0.0)
  123. if (ratings.isEmpty) {
  124. sys.error("沒有評分數據!")
  125. }
  126. else {
  127. ratings.toSeq
  128. }
  129. }
  130. }

3. 运行效果

  1. #############################################################
  2. 親,为您推荐的10部电影如下:
  3. 1: Chushingura (1962)
  4. 2: Love Serenade (1996)
  5. 3: Inferno (1980)
  6. 4: Very Thought of You, The (1998)
  7. 5: Hard Core Logo (1996)
  8. 6: Ayn Rand: A Sense of Life (1997)
  9. 7: Visitors, The (Les Visiteurs) (1993)
  10. 8: Leather Jacket Love Story (1997)
  11. 9: Big Trees, The (1952)
  12. 10: Best Man, The (1999)
  13. #############################################################
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注