使用Elasticsearch、Spark构建推荐系统 #1:概述及环境构建

2022-03-18 18:22:50 浏览数 (2)

推荐系统是机器学习当前最著名、最广泛使用,且已经证明价值的落地案例。尽管有许多资源可用作训练推荐模型的基础,但解释如何实际部署这些模型来创建大型推荐系统的资源仍然相对较少。

笔者找到个IBM的Code Pattern演示使用 Apache Spark 和 Elasticsearch 创建这样一个系统的关键要素。但是,该案例是5年前的2017年,对应的ES(Elasticsearch) 5.3.0,spark2.2.0;到如今很多软件已经不匹配,特别当时使用矢量评分插件进行模型向量相似度计算,现在这个功能在新版本ES中原生支持。为此,在follow其原理精髓的实践过程中,因地制宜做了扩展和修改,自以为对同道者有些许参考价值,同时也记录自己学习思考过程。

1. 方案架构流程

  1. 加载MovieLens数据集到spark中,清理数据集;
  2. ElasticSearch构建index mapping,并将Spark Dataframe数据加载;
  3. 使用Spark MLlib 库的ALS模型,训练一个协同过滤推荐模型,更新模型数据到Elasticsearch;
  4. 使用Elasticsearch查询,生成示例推荐,使用Movie Database API显示所推荐电影的海报图像。

Demo展示的数据逻辑处理流程,基于开源的数据集的操作;而实际部署是流式处理,引入Kafa做数据接入和分发(根据搜索的资料),详见下图

Machine Learning workflow for recommenderMachine Learning workflow for recommender

1) Why Spark

  • DataFrame:
    • 实际推荐使用场景,如用户行为(点击、收藏、购买等)描述为Event、metadata,是一种轻量结构数据(如json)
    • 适合于DataFrames的表达
    • Spark有丰富的插件访问外部数据源;
  • Spark ML:
    • pipeline包含可用于协同过滤的可伸缩的ASL模型;
    • ALS支持隐式反馈和NMF;支持交叉验证;
    • 自定义的数据转换和算法;

2)Why Elasticsearch

  • Storage
    • 支持原始json;
    • 可伸缩;
    • 支持时间序列/事件数据;
    • Kibana数据可视化;
    • 与Spark Dataframes集成
  • Scoring
    • 支持全文本搜索;
    • 支持多维度过滤;
    • 聚合计算
    • Search ~== recommendation

3) 个人实践的扩展(包含计划)

  • 匹配当前主流版本的环境构建;
  • 原始倾向于是独立部署对应环境(spark、Elasticsearch),用带参数命令启动jupter;本文使用既有环境,代码构建构建对应的环境;
  • 丰富推荐的应用API;
  • 更多的数据集以及真实业务数据。

2. 环境构建

原文发表于2017年,Elasticsearch版本比较古老用的时5.3.0,而到现在主流7.x,改动很大;使用矢量评分插件进行打分计算相似,现在版本原生的Dense Vector就支持该功能。

版本对比

软件

原版本(中文)版本

原Demo(英文)版本

我的版本

Elasticsearch

5.3.0

7.6.2

7.15.1

elasticsearch-hadoop

elasticsearch-spark-20_2.11-5.3.0.jar

elasticsearch-spark-20_2.11-7.6.2.jar

elasticsearch-spark-20_2.12-7.15.1.jar

spark

spark-2.2.0-bin-hadoop2.7

spark-2.4.5-bin-hadoop2.7

spark-3.1.2-bin-hadoop3.2

注意事项

由于spark 3 使用scala 2.12编译,所以用的elastic-hadoop连接器的scala版本也应该是scala 2.12,这个在当前elasticsearch官网上没找到,用maven去下载。

3. 启动方式

1) 带参数启动jupyter

代码语言:shell复制
PYSPARK_DRIVER_PYTHON="jupyter" PYSPARK_DRIVER_PYTHON_OPTS="notebook" ../spark-2.4.5-bin-hadoop2.7/bin/pyspark --driver-memory 4g --driver-class-path /FULL_PATH/elasticsearch-hadoop-7.6.2/dist/elasticsearch-spark-20_2.11-7.6.2.jar

2) 在jupyter启动后配置

代码语言:python代码运行次数:0复制
import os
import sys
# os.environ
print(os.environ["SPARK_HOME"])
os.environ["PYLIB"] = os.environ["SPARK_HOME"]   "/python/lib"
sys.path.insert(0, os.environ["PYLIB"]  "/py4j-0.10.9-src.zip")
sys.path.insert(0, os.environ["PYLIB"]  "/pyspark.zip")
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.config('spark.driver.extraClassPath', '/usr/local/elasticsearch-hadoop-7.15.1/dist/elasticsearch-spark-30_2.12-7.15.1.jar').appName("spark").getOrCreate()

4. 扩展阅读

  1. Spark MLlib Collaborative Filtering
  2. Alternating Least Squares and collaborative filtering
  3. Quora question on Alternating Least Squares
  4. How do you build a “People who bought this also bought that”-style recommendation engine
  5. The Movie Database (TMdb) API

0 人点赞