Elasticsearch-spark-based recommender系统方案的两个关键步骤:
- ALS算法将user-item的交互历史建模构建相关共享隐变量空间(user matrix 和item matirx);
- 基于Elasticsearch将推荐问题转换为搜索问题。
1. 训练ALS模型
1) 数据预处理
代码语言:javascript复制ratings_from_es = spark.read.format("es").load("ratings")
ratings_from_es.show(5)
数据从es中读取,实际可以从其他源处理(clickhouse,csv等),另外可以分割为train、valid、test数据集
2)训练ALS模型
代码语言:javascript复制from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", regParam=0.02, rank=VECTOR_DIM, seed=54)
model = als.fit(ratings_from_es)
model.userFactors.show(5)
model.itemFactors.show(5)
3)将ALS模型的user和itemfactor vector存储到Elasticsearch
代码语言:javascript复制from pyspark.sql.functions import lit, current_timestamp, unix_timestamp
ver = model.uid
ts = unix_timestamp(current_timestamp())
movie_vectors = model.itemFactors.select("id",
col("features").alias("model_factor"),
lit(ver).alias("model_version"),
ts.alias("model_timestamp"))
movie_vectors.show(5)
user_vectors = model.userFactors.select("id",
col("features").alias("model_factor"),
lit(ver).alias("model_version"),
ts.alias("model_timestamp"))
user_vectors.show(5)
movie_vectors.write.format("es")
.option("es.mapping.id", "id")
.option("es.write.operation", "update")
.save("movies", mode="append")
user_vectors.write.format("es")
.option("es.mapping.id", "id")
.option("es.write.operation", "index")
.save("users", mode="append")
2. 使用Elasticsearch进行推荐:Script score query
代码语言:javascript复制def vector_query(query_vec, vector_field, q="*", cosine=False):
if cosine:
score_fn = "doc['{v}'].size() == 0 ? 0 : cosineSimilarity(params.vector, '{v}') 1.0"
else:
score_fn = "doc['{v}'].size() == 0 ? 0 : sigmoid(1, Math.E, -dotProduct(params.vector, '{v}'))"
score_fn = score_fn.format(v=vector_field, fn=score_fn)
return {
"query": {
"script_score": {
"query" : {
"query_string": {
"query": q
}
},
"script": {
"source": score_fn,
"params": {
"vector": query_vec
}
}
}
}
}
def get_similar(the_id, q="*", num=10, index="movies", vector_field='model_factor'):
response = es.get(index=index, id=the_id)
src = response['_source']
if vector_field in src:
query_vec = src[vector_field]
q = vector_query(query_vec, vector_field, q=q, cosine=True)
results = es.search(index=index, body=q)
hits = results['hits']['hits']
return src, hits[1:num 1]
def get_user_recs(the_id, q="*", num=10, users="users", movies="movies", vector_field='model_factor'):
response = es.get(index=users, id=the_id)
src = response['_source']
if vector_field in src:
query_vec = src[vector_field]
q = vector_query(query_vec, vector_field, q=q, cosine=False)
results = es.search(index=movies, body=q)
hits = results['hits']['hits']
return src, hits[:num]
def get_movies_for_user(the_id, num=10, ratings="ratings", movies="movies"):
response = es.search(index=ratings, q="userId:{}".format(the_id), size=num, sort=[{"rating":"desc"}])
hits = response['hits']['hits']
ids = [h['_source']['movieId'] for h in hits]
movies = es.mget(body={"ids": ids}, index=movies, _source_includes=['tmdbId', 'title'])
movies_hits = movies['docs']
tmdbids = [h['_source'] for h in movies_hits]
return tmdbids
通过Elasticsearch的script score query for vector functions从factor vector中生成推荐,具体通过vector_query进行封装,用cosine距离计算同种(user或者item)相似度,用prudoct点乘对user计算推荐物品。
3. 深入分析
1) 为什么不使用spark ml直接推荐?
其一,工程和学术做trade-off的结果,在model serving过程中对几百万个候选集逐一跑一遍模型的时间开销显然太大了,因此在通过Elasticsearch最近邻搜索的方法高效很多,复杂度nlogn vs logn。
其二,可以添加丰富灵活的query,直接对候选集进行多维度的过滤操作。比如:杭州地区(地点)20年代(年龄)用户喜欢的火锅店(品类)。
2) implicit vs explicit
显式反馈的目标函数
隐式反馈的目标函数
隐式反馈的数据场景远远多于显式反馈,spark.ml.recommender.ALS对两种都支持
代码语言:javascript复制class pyspark.ml.recommendation.ALS(
rank=10,
maxIter=10,
regParam=0.1,
numUserBlocks=10,
numItemBlocks=10,
implicitPrefs=False,
alpha=1.0,
userCol='user',
itemCol='item',
seed=None,
ratingCol='rating',
nonnegative=False,
checkpointInterval=10,
intermediateStorageLevel='MEMORY_AND_DISK',
finalStorageLevel='MEMORY_AND_DISK',
coldStartStrategy='nan')
关键参数的选择
3) 隐式反馈的评估 MPR, MRR
隐式反馈的评估基于召回的MPR(mean percent ranking)平均百分比排名。
另外一个评估指标是MRR(Mean Reciprocal Rank):
具体相关的计算pyspark代码
代码语言:javascript复制(
predictions
.withColumn('rank', row_number().over(Window.partitionBy('userId').orderBy(desc('prediction'))))
.where(col('counts') > 0) # Notice: this excludes users with no actions at all
.groupby('userId')
.agg(
count('*').alias('n'),
sum(1 - col('prediction')).alias('sum_pred'),
sum(col('rank') / n_genres).alias('sum_perc_rank'),
min('rank').alias('min_rank')
)
.agg(
(sum('sum_pred') / sum('n')).alias('avg 1-score'),
(sum('sum_perc_rank') / sum('n')).alias('MPR'), # the lower the better
mean(1 / col('min_rank')).alias('MRR') # the higher the better
)
.withColumn('MPR*k', col('MPR') * n_genres)
.withColumn('1/MRR', 1/col('MRR'))
).show()