[关闭]
@runzhliu 2018-05-16T15:43:29.000000Z 字数 1637 阅读 2096

推荐系统中 Spark 和 Elasticsearch 的交互

spark elasticsearch


参考资料
Apache Spark support

1 在何处交互

Spark 和 Elasticsearch 是90%大数据工程师的基础技术栈了,前者不用多数了,就是业界的大数据计算框架,后者则是优秀的搜索框架。

业务上这两个框架经常需要进行交互,常见的场景就是通过 Spark 批处理把应用日志写入 ES 集群来构架,然后通过 Kibana 进行展示。特定场景下 ES 除了可以做搜索引擎以外,还可以作为存储方案。

本人在两者的交互上使用较多,其中一个有意思的场景是用他们来构建推荐系统的召回层。所谓的召回层,可以理解成根据用户属性,在内容池子中,根据特定策略来挑选符合用户特征的内容(在进行推荐之前,一般还会有一个重排系统,来对召回内容进行打分,来判定推荐的顺序)。

在召回系统中使用 ES 的好处是可以利用其搜索引擎的天然属性,来构建复杂的查询语句,假设用户属性很多,召回策略非常复杂的情况下,如果使用 sql 对内容进行查询,可能需要写出很复杂的语句,并且检索效率不会比使用 ES 更好。

在推荐系统中,Spark 和 ES 的交互可以发生在这样的场景。批处理情况下,可以通过 Spark Sql 来计算用户的新增属性,例如通过算法,来为用户画像增加不同维度的特征,例如计算用户对某种内容的偏好程度,并将计算完的特征写入 ES 集群。流处理情况下,Spark Streaming 来对用户实现秒级的画像更新,并更新存储在 ES 中的数据。最后根据召回策略来组合查询的 query 来实现复杂的查询。

下面是我们业务中一个查询示例(特征名和敏感数据进行了处理):

  1. {
  2. "query": {
  3. "bool": {
  4. "filter": [
  5. {
  6. "terms": {
  7. "pr": [
  8. "-1",
  9. "-2",
  10. "3",
  11. "6",
  12. "7"
  13. ]
  14. }
  15. },
  16. {
  17. "bool": {
  18. "should": [
  19. {
  20. "term": {
  21. "vt": {
  22. "value": "2"
  23. }
  24. }
  25. }
  26. ]
  27. }
  28. }
  29. ]
  30. }
  31. },
  32. "size": 1000,
  33. "sort": [
  34. {
  35. "ok": {
  36. "missing": "_last",
  37. "order": "desc"
  38. }
  39. }
  40. ]
  41. }

通过以上 query 可以把符合用户特征的内容召回出来。

2 如何交互

为了把内容写入 ES 集群,我们可以在本地简单的跑起来一个 Demo。

首先,你需要一个本地的 ES 集群,brew install elasticsearch 安装 ES,然后运行 elasticsearch 命令来运行一个默认配置的集群。

image_1cdkphg7ckbc1het1ju91epq1jk39.png-444.8kB

然后可以通过官方文档的一个 Demo 来尝试一下把数据通过 Spark 写入到集群当中。

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.SparkContext
  3. import org.elasticsearch.spark._
  4. object Write2Es {
  5. def main(args: Array[String]): Unit = {
  6. val conf = new SparkConf().setAppName("test").setMaster("local")
  7. conf.set("es.nodes", "localhost")
  8. conf.set("es.port", "9200")
  9. conf.set("es.index.auto.create", "true")
  10. conf.set("es.mapping.id", "one")
  11. conf.set("es.write.operation", "upsert")
  12. val sc = new SparkContext(conf)
  13. val numbers = Map("one" -> 1, "five" -> 8)
  14. val numbers1 = Map("one" -> 3)
  15. sc.makeRDD(Seq(numbers, numbers1)).saveToEs("spark/docsid")
  16. }
  17. }

写入数据之后通过 RESTful 接口查看数据如下:

image_1cdkppi7gufgi58idsam5knem.png-96.7kB

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注