[关闭]
@yanglfyangl 2018-07-12T03:37:28.000000Z 字数 4299 阅读 1396

PredictionIO 分析及使用建议

一些参考资料

Scala 常用语法
Scala基础之注解(annotation
SBT 基础学习

Spark各个知识点总结
比较两个生产级NLP库:训练Spark-NLP和spaCy的管道
干货:基于Spark Mllib的SparkNLP库。

如何配置Idea进行自定义Engine的开发

为什么选择PredictionIO

如果我们做一个大数据平台,我们事实上是在做这些东西
1. 数据读取,准备和清洗流程。
2. 数据离线训练的算法。
3. 各种算法的组合。
4. 预测的接口。
5. 支持源数据可视化分析(目前支持IPython notebook, Zeppelin, Tableau
5. 。。。
6. 同时,上面的过程尽量在自己的控制范围内,而不是

什么是PredictionIO

此处输入图片的描述

此处输入图片的描述

从图中可以看到,Predictionb包含了

  1. >* 实时处理部分
  2. >* 离线计算部分

主要包含了这样几大部分:

  1. 服务组件:
  2. >* EventServer:主要负责接收外部数据
  3. >* Traning:主要用于离线数据处理或模型训练。
  4. >* PredictionServer:主要用于结果获取或预测(是不是预测要看你的模型。)
  5. 基础组件:
  6. >* HBase:主要用于存Event数据。
  7. >* ES: 可以用于存处理后的数据或模型,同时也存了model version, engine version, ip mapping, evaluation results等等。。。
  8. >* HDFS: 批量导入时使用,根据模版的不同,也可能存储数据或模型.
  9. >* Spark:主要用于模型训练和数据处理。
  10. Train的结果主要包括两部分:a model and its meta-data

从数据流的角度来说,主要有这样几种

  • DataSource
  • Preparator
  • Alogrithm
  • Serving
  • Evaluation

DataSource -- 自定义数据

  1. val eventsRDD: RDD[Event] = PEventStore.find(
  2. appName = dsp.appName,
  3. entityType = Some("customer"), // MODIFIED
  4. eventNames = Some(List("like", "dislike")), // MODIFIED
  5. // targetEntityType is optional field of an event.
  6. targetEntityType = Some(Some("product")))(sc) // MODIFIED
  7. val ratingsRDD: RDD[Rating] = eventsRDD.map { event =>
  8. val rating = try {
  9. val ratingValue: Double = event.event match {
  10. // MODIFIED
  11. case "like" => 4.0 // map a like event to a rating of 4.0
  12. case "dislike" => 1.0 // map a like event to a rating of 1.0
  13. case _ => throw new Exception(s"Unexpected event ${event} is read.")
  14. }
  15. // entityId and targetEntityId is String
  16. Rating(event.entityId,
  17. event.targetEntityId.get,
  18. ratingValue)
  19. } catch {
  20. case e: Exception => {
  21. logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.")
  22. throw e
  23. }
  24. }
  25. rating
  26. }.cache()

Preparator -- 数据清洗

下面的例子是将需要train的数据中,将提前定义好的不参与train的部分去除掉。

  1. import scala.io.Source // ADDED
  2. class Preparator
  3. extends PPreparator[TrainingData, PreparedData] {
  4. def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
  5. // MODIFIED HERE
  6. val noTrainItems = Source.fromFile("./data/sample_not_train_data.txt")
  7. .getLines.toSet
  8. // exclude noTrainItems from original trainingData
  9. val ratings = trainingData.ratings.filter( r =>
  10. !noTrainItems.contains(r.item)
  11. )
  12. new PreparedData(ratings)
  13. }
  14. }

Algorithm 之 train -- 训练

  1. def train(sc: SparkContext, data: PreparedData): ECommModel = {
  2. require(!data.rateEvents.take(1).isEmpty, // MODIFIED
  3. s"rateEvents in PreparedData cannot be empty." + // MODIFIED
  4. " Please check if DataSource generates TrainingData" +
  5. " and Preprator generates PreparedData correctly.")
  6. ...
  7. val mllibRatings = data.rateEvents // MODIFIED
  8. .map { r =>
  9. ...
  10. ((uindex, iindex), (r.rating,r.t)) //MODIFIED
  11. }.filter { case ((u, i), v) => //过滤异常值
  12. // keep events with valid user and item index
  13. (u != -1) && (i != -1)
  14. }
  15. .reduceByKey { case (v1, v2) => // MODIFIED //去重
  16. // if a user may rate same item with different value at different times,
  17. // use the latest value for this case.
  18. // Can remove this reduceByKey() if no need to support this case.
  19. val (rating1, t1) = v1
  20. val (rating2, t2) = v2
  21. // keep the latest value
  22. if (t1 > t2) v1 else v2
  23. }
  24. .map { case ((u, i), (rating, t)) => // MODIFIED
  25. // MLlibRating requires integer index for user and item
  26. MLlibRating(u, i, rating) // MODIFIED
  27. }
  28. .cache()
  29. ...
  30. }

Algorithm 之 predict -- 预测

  1. def predict(
  2. model: RandomForestModel, // CHANGED
  3. query: Query): PredictedResult = {
  4. val label = model.predict(Vectors.dense(
  5. Array(query.attr0, query.attr1, query.attr2)
  6. ))
  7. PredictedResult(label)
  8. }

Serving -- 对外服务

如果预测结果可以直接返回的话,不需要做什么处理。
但比如说需要加黑名单什么的,可以在这里进行处理。
下面的例子是将被禁用的商品过滤掉的一个操作。

  1. import scala.io.Source // ADDED
  2. class Serving
  3. extends LServing[Query, PredictedResult] {
  4. override
  5. def serve(query: Query,
  6. predictedResults: Seq[PredictedResult]): PredictedResult = {
  7. // MODIFIED HERE
  8. // Read the disabled item from file.
  9. val disabledProducts: Set[String] = Source
  10. .fromFile("./data/sample_disabled_items.txt")
  11. .getLines
  12. .toSet
  13. val itemScores = predictedResults.head.itemScores
  14. // Remove items from the original predictedResult
  15. PredictedResult(itemScores.filter(ps => !disabledProducts(ps.item)))
  16. }
  17. }

上生产环境的几大Q/A

Q: 这个系统稳定吗
A:不能说100%稳定,目前的版本是0.12,但Star数为11k,Fork数为1.8K,从经验上来说,Apache旗下开源的项目,这个数字是可以开始考虑线上使用了,但不排除有坑。

Q:高可用怎么做
A:我们可以部署多台,前端通过Nginx或LVS进行负载。同时,负责training的节点与负责predict的节点分开(参考mysql的读写分离策略)

Q:我们有些服务并不是预测,而是统计,怎么做?
A:即使这样,我们也可以利用现有的框架,在训练的时候,进行统计计算。计算结果存入HBase。在统计的时候,从HBase中读出数据返回。同时,用定时器定期来跑Pio train就可以了。

Q:会不会有资源上的上限
A:PredictionIO本身是个框架,存储资源是依赖Hbase或ES,计算资源是依赖Spark。可以通过Nginx或Lvs进行负载均衡,所以理论上来说是可以横向扩展的。

Q:怎么与云上资源进行配合呢
A:可以考虑这样

  • 框架由我们自己控制。
  • 使用云上的spark和es资源。
  • 算法中基础部分,使用spark的。
  • 如果需要使用云上一些算法,可以在Serve层调用。

“统计型”使用方式在我们系统中建议的调用流程

Created with Raphaël 2.1.2Services(s)Services(s)kafkakafka统计引擎统计引擎PIOPIO定时器定时器新事件统计今日放入eventtrain(24点)更新上日数据取数据将日期组合算结果返回

“预测型”使用方式在我们系统中建议的调用流程

Created with Raphaël 2.1.2Services(s)Services(s)kafkakafkaPIOPIO开发人员开发人员新事件放入event进行训练,存为“训练模型”训练调优存储为“预测模型”使用“预测模型”进行预测返回结果
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注