Apache Spark是大数据流行的开源平台。MMLib是Spark的开源学习库。MMLib提供了机器学习配置,统计,优化和线性代数等原语。在生态兼容性支持Spark API和Python等NumPy库,也可以使用Hadoop数据源。
在执行效率上性能也明显优于MapReduce。
一、核心功能:
ML提供的算法包括:
- 分类:逻辑回归,原生Bayes算法
- 回归:线性回归,生存回归
- 决策树,随机森林,梯度提升决策树
- 推荐:交替最小二乘法(ALS)
- 聚类:K-means,高斯混合(GMMS)
- 主题模型:隐含狄利克雷分布(英語:Latent Dirichlet allocation,简称LDA)
- 频繁项集,关联规则,序列样式探测
工具包括:
- 特征转化:标准化,归一化,hashing
- ML pipeline并行处理
- 模型评估和超参数调优
- 模型持久化:保存和加载模型
二、mllib和其他竞品
2.1 mllib和Pandas/sklearn
你也许要问Spark提供这些机器学习的库和Python自己的sklearn/pandas有区别吗?实际上Spark的mmlib解决的是一种数据集很大的场景,这时候Spark提供了cluster模式来处理巨大数据集。这时候Pandas可能因为内存不足而无法胜任。
换句话说,如果Spark能将数据集缩减到pandas/sklearn能够处理的大小,他们也不是不可以。
Sparks和pandas/sklearn也可以互相配合。比如说Spark dataframes有个toPandas()方法返回pandas dataframe。只要这个dataframe在你要求的内存范围内,它就可以交付给pandas/sklearn处理。
2.2 mmlbi和spark.ml
Spark除了mmlib,还有一个叫spark.ml
mmlib专注于RDD和DataFrame的API
三、实战mmlib
我们来实战下mmlib如何使用
3.1 spark环境:
首先需要安装java和scala。因为我的开发机使用ubuntu,直接使用apt工具安装
代码语言:bash复制apt install openjdk-17-jre-headless scala
接着安装spark,
代码语言:bash复制wget https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
然后解压移动到/usr/local/spark
代码语言:bash复制tar xvf spark-3.5.0-bin-hadoop3.tgz
mv spark-3.5.0-bin-hadoop3 /usr/local/spark
#接着把spark的工作目录加入到PATH中
export PATH=$PATH:/usr/local/spark/bin
安装pyspark
代码语言:bash复制pip install pyspark
3.2 mllib
我们先把spark的repo下载下来
代码语言:bash复制git clone https://github.com/apache/spark.git
然后进入spark目录
代码语言:bash复制 cd spark
然后使用spark-submit执行这个client脚本运行一个推荐系统的过程:训练模型和使用模型预测。
代码语言:bash复制spark-submit --driver-memory 2g examples/src/main/python/mllib/recommendation_example.py
这段代码从数据加载开始,使用ALS训练模型,再使用训练数据集合评估模型的均方误差。最后把模型持久化保存下来。
代码语言:python代码运行次数:0复制
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(','))
.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)
# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " str(MSE))
# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")