Spark MLlib
简介
MapReduce
对机器学习的算法编写的缺点:
- 反复读写磁盘
- 磁盘
IO
开销大
机器学习算法中具有大量的迭代计算,导致了MapReduce不太适合。
Spark
是基于内存的计算框架,使得数据尽量不存放在磁盘上,直接在内存上进行数据的操作。 MLlib只包含能够在集群上运行良好的并行算法。
特征化工具
- 特征提取
- 转化
- 降维
- 选择工具
实现算法
MLlib
实现的算法包含:
- 分类
- 回归
- 聚类
- 协同过滤
流水线
使用Spark SQL
中的DF
作为数据集,可以容纳各种数据类型。DF
被ML Pinline
用来存储源数据。DF中的列可以是:
- 文本
- 特征向量
- 真实和预测标签等
转换器transformer
能将一个DF转换成另一个DF,增加一个标签列。
评估器estimator
指的是学习算法或在训练数据上的训练方法的抽象概念,本质上就是一个算法。
参数parameter
用来进行参数的设置。
流水线构建
- 定义pipeline中的各个流水线阶段
PipelineStage
,包含转换器和评估器 - 转换器和评估器有序的组织起来构建
PipeLine
- 流水线本身也是估计器。在流水线的
.fit()
方法运行之后,产生一个PipelineModel
,变成了一个Transformer
# pyspark.ml依赖numpy:sudo pip3 install numpy
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF.Tokenizer
# 构建训练数据
training = spark.createDataFrame([
(0,"a b c d e spark", 1.0),
(1,"b d", 0.0),
(2,"spark b d e", 1.0),
(3,"hadoop mapreduce",0.0)
],["id","text","label"])
# 定义每个阶段:阶段都是评估器或者转换器
tokenizer = Tokenizer(inputCol="text",outputCol="words") # 分解器
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),outputCol="features")
Lr = LogisticRegression(maxIter=10,regParam=0.001)
# 合并到流水线
pipeline = Pipeline(stages=[tokenizer,hashingTF,Ir]) # 本质上是一个评估器
model = pipeline.fit(training) # 变成了一个PipelineModel,是一个转换器
# 构建测试数据
test = spark.createDataFrame([
(4," b d e spark"),
(5,"spark d"),
(6,"spark u d "),
(7,"hadoop spark")
],["id","text"])
prediction = model.transform(test)
selected = prediction.select("id","text","probability","prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print(rid,text,str(prob),predi ction)
特征提取和转换
特征提取
TF-IDF;词频-逆向文件频率
- TF:
HashingTF
是一个转换器;统计各个词条的词频 - IDF:是一个评估器,在数据集上应用
IDF
的fit
方法,会产生一个IDFmodel
from pyspark.ml.feature import HashingTF,IDF,Tokenizer
sentenceData = spark.createDataFrame([(0,"..."),(1,"..."),(0,"..."),(1,"..."),(1,"...")]).toDF("label","sentence") # ...代表是一个个句子
tokenizer = Tokenizer(inputCol="sentence",outputCol="words") # 指定分解器的两个属性
wordsData = tokenizer.transform(sentenceData)
wordsData.show()
hashingTF = HashingTF(inputCol="words",outputCol="rawFeatures",numFeatures=2000) # 把句子转成哈希特征向量
featurizedData = hashingTF.transform(wordsData)
featurizedData.select("words","rawFeatures").show(truncate=False)
# 调节权重:评估器
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
# 训练过程:调节权重
rescaleData = idfModel.transform(featurizedData)
rescaleData.select("features", "label").show(truncate=False)
转换
将字符串转换成整数索引,或者在完成计算之后将证书索引还原成字符串标签。相关的转换器