使用Elasticsearch、Spark构建推荐系统 #2:深入分析

2022-04-08 15:24:41 浏览数 (1)

Elasticsearch-spark-based recommender系统方案的两个关键步骤:

  1. ALS算法将user-item的交互历史建模构建相关共享隐变量空间(user matrix 和item matirx);
  2. 基于Elasticsearch将推荐问题转换为搜索问题。

1. 训练ALS模型

1) 数据预处理

代码语言:javascript复制
ratings_from_es = spark.read.format("es").load("ratings")
ratings_from_es.show(5)

数据从es中读取,实际可以从其他源处理(clickhouse,csv等),另外可以分割为train、valid、test数据集

2)训练ALS模型

代码语言:javascript复制
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", regParam=0.02, rank=VECTOR_DIM, seed=54)
model = als.fit(ratings_from_es)
model.userFactors.show(5)
model.itemFactors.show(5)

3)将ALS模型的user和itemfactor vector存储到Elasticsearch

代码语言:javascript复制
from pyspark.sql.functions import lit, current_timestamp, unix_timestamp
ver = model.uid
ts = unix_timestamp(current_timestamp())
movie_vectors = model.itemFactors.select("id",
                                         col("features").alias("model_factor"),
                                         lit(ver).alias("model_version"),
                                         ts.alias("model_timestamp"))
movie_vectors.show(5)
user_vectors = model.userFactors.select("id",
                                        col("features").alias("model_factor"),
                                        lit(ver).alias("model_version"),
                                        ts.alias("model_timestamp"))
user_vectors.show(5)

movie_vectors.write.format("es") 
    .option("es.mapping.id", "id") 
    .option("es.write.operation", "update") 
    .save("movies", mode="append")
user_vectors.write.format("es") 
    .option("es.mapping.id", "id") 
    .option("es.write.operation", "index") 
    .save("users", mode="append")   

2. 使用Elasticsearch进行推荐:Script score query

代码语言:javascript复制
def vector_query(query_vec, vector_field, q="*", cosine=False):   
    if cosine:
        score_fn = "doc['{v}'].size() == 0 ? 0 : cosineSimilarity(params.vector, '{v}')   1.0"
    else:
        score_fn = "doc['{v}'].size() == 0 ? 0 : sigmoid(1, Math.E, -dotProduct(params.vector, '{v}'))"
       
    score_fn = score_fn.format(v=vector_field, fn=score_fn)    
    return {
    "query": {
        "script_score": {
            "query" : { 
                "query_string": {
                    "query": q
                }
            },
            "script": {
                "source": score_fn,
                "params": {
                    "vector": query_vec
                }
            }
        }
    }
}

def get_similar(the_id, q="*", num=10, index="movies", vector_field='model_factor'):
    response = es.get(index=index, id=the_id)
    src = response['_source']
    if vector_field in src:
        query_vec = src[vector_field]
        q = vector_query(query_vec, vector_field, q=q, cosine=True)
        results = es.search(index=index, body=q)
        hits = results['hits']['hits']
        return src, hits[1:num 1]    
    
def get_user_recs(the_id, q="*", num=10, users="users", movies="movies", vector_field='model_factor'):
    response = es.get(index=users, id=the_id)
    src = response['_source']
    if vector_field in src:
        query_vec = src[vector_field]
        q = vector_query(query_vec, vector_field, q=q, cosine=False)
        results = es.search(index=movies, body=q)
        hits = results['hits']['hits']
        return src, hits[:num]

def get_movies_for_user(the_id, num=10, ratings="ratings", movies="movies"):
    response = es.search(index=ratings, q="userId:{}".format(the_id), size=num, sort=[{"rating":"desc"}])
    hits = response['hits']['hits']
    ids = [h['_source']['movieId'] for h in hits]
    movies = es.mget(body={"ids": ids}, index=movies, _source_includes=['tmdbId', 'title'])
    movies_hits = movies['docs']
    tmdbids = [h['_source'] for h in movies_hits]
    return tmdbids

通过Elasticsearch的script score query for vector functions从factor vector中生成推荐,具体通过vector_query进行封装,用cosine距离计算同种(user或者item)相似度,用prudoct点乘对user计算推荐物品。

3. 深入分析

1) 为什么不使用spark ml直接推荐?

其一,工程和学术做trade-off的结果,在model serving过程中对几百万个候选集逐一跑一遍模型的时间开销显然太大了,因此在通过Elasticsearch最近邻搜索的方法高效很多,复杂度nlogn vs logn。

其二,可以添加丰富灵活的query,直接对候选集进行多维度的过滤操作。比如:杭州地区(地点)20年代(年龄)用户喜欢的火锅店(品类)。

2) implicit vs explicit

显式反馈的目标函数

隐式反馈的目标函数

隐式反馈的数据场景远远多于显式反馈,spark.ml.recommender.ALS对两种都支持

代码语言:javascript复制
class pyspark.ml.recommendation.ALS(
  rank=10, 
  maxIter=10, 
  regParam=0.1, 
  numUserBlocks=10, 
  numItemBlocks=10, 
  implicitPrefs=False, 
  alpha=1.0, 
  userCol='user', 
  itemCol='item', 
  seed=None, 
  ratingCol='rating', 
  nonnegative=False, 
  checkpointInterval=10, 
  intermediateStorageLevel='MEMORY_AND_DISK', 
  finalStorageLevel='MEMORY_AND_DISK', 
  coldStartStrategy='nan')

关键参数的选择

3) 隐式反馈的评估 MPR, MRR

隐式反馈的评估基于召回的MPR(mean percent ranking)平均百分比排名。

另外一个评估指标是MRR(Mean Reciprocal Rank):

具体相关的计算pyspark代码

代码语言:javascript复制
(
    predictions
    .withColumn('rank', row_number().over(Window.partitionBy('userId').orderBy(desc('prediction'))))
    .where(col('counts') > 0) # Notice: this excludes users with no actions at all
    .groupby('userId')
    .agg(
        count('*').alias('n'),
        sum(1 - col('prediction')).alias('sum_pred'),
        sum(col('rank') / n_genres).alias('sum_perc_rank'),
        min('rank').alias('min_rank')
    )
    .agg(
        (sum('sum_pred') / sum('n')).alias('avg 1-score'),
        (sum('sum_perc_rank') / sum('n')).alias('MPR'), # the lower the better
        mean(1 / col('min_rank')).alias('MRR')          # the higher the better
    )
    .withColumn('MPR*k', col('MPR') * n_genres)
    .withColumn('1/MRR', 1/col('MRR'))
).show()

0 人点赞