spark的机器学习库mllib

2024-01-19 09:18:40 浏览数 (1)

Apache Spark是大数据流行的开源平台。MMLib是Spark的开源学习库。MMLib提供了机器学习配置,统计,优化和线性代数等原语。在生态兼容性支持Spark API和Python等NumPy库,也可以使用Hadoop数据源。

在执行效率上性能也明显优于MapReduce。

一、核心功能:

ML提供的算法包括:

  • 分类:逻辑回归,原生Bayes算法
  • 回归:线性回归,生存回归
  • 决策树,随机森林,梯度提升决策树
  • 推荐:交替最小二乘法(ALS)
  • 聚类:K-means,高斯混合(GMMS)
  • 主题模型:隐含狄利克雷分布(英語:Latent Dirichlet allocation,简称LDA)
  • 频繁项集,关联规则,序列样式探测

工具包括:

  • 特征转化:标准化,归一化,hashing
  • ML pipeline并行处理
  • 模型评估和超参数调优
  • 模型持久化:保存和加载模型

二、mllib和其他竞品

2.1 mllib和Pandas/sklearn

你也许要问Spark提供这些机器学习的库和Python自己的sklearn/pandas有区别吗?实际上Spark的mmlib解决的是一种数据集很大的场景,这时候Spark提供了cluster模式来处理巨大数据集。这时候Pandas可能因为内存不足而无法胜任。

换句话说,如果Spark能将数据集缩减到pandas/sklearn能够处理的大小,他们也不是不可以。

Sparks和pandas/sklearn也可以互相配合。比如说Spark dataframes有个toPandas()方法返回pandas dataframe。只要这个dataframe在你要求的内存范围内,它就可以交付给pandas/sklearn处理。

2.2 mmlbi和spark.ml

Spark除了mmlib,还有一个叫spark.ml

mmlib专注于RDD和DataFrame的API

三、实战mmlib

我们来实战下mmlib如何使用

3.1 spark环境:

首先需要安装java和scala。因为我的开发机使用ubuntu,直接使用apt工具安装

代码语言:bash复制
apt install openjdk-17-jre-headless scala

接着安装spark,

代码语言:bash复制
wget https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

然后解压移动到/usr/local/spark

代码语言:bash复制
tar xvf spark-3.5.0-bin-hadoop3.tgz
mv spark-3.5.0-bin-hadoop3 /usr/local/spark 
#接着把spark的工作目录加入到PATH中
export PATH=$PATH:/usr/local/spark/bin

安装pyspark

代码语言:bash复制
pip install pyspark

3.2 mllib

我们先把spark的repo下载下来

代码语言:bash复制
git clone https://github.com/apache/spark.git

然后进入spark目录

代码语言:bash复制
 cd spark

然后使用spark-submit执行这个client脚本运行一个推荐系统的过程:训练模型和使用模型预测。

代码语言:bash复制
spark-submit --driver-memory 2g examples/src/main/python/mllib/recommendation_example.py

这段代码从数据加载开始,使用ALS训练模型,再使用训练数据集合评估模型的均方误差。最后把模型持久化保存下来。

代码语言:python代码运行次数:0复制

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(','))
    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)

# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = "   str(MSE))

# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")

0 人点赞