MLlib

2021-03-02 15:59:31 浏览数 (1)

Spark MLlib

简介

MapReduce对机器学习的算法编写的缺点:

  • 反复读写磁盘
  • 磁盘IO开销大

机器学习算法中具有大量的迭代计算,导致了MapReduce不太适合。

Spark是基于内存的计算框架,使得数据尽量不存放在磁盘上,直接在内存上进行数据的操作。 MLlib只包含能够在集群上运行良好的并行算法。

特征化工具
  • 特征提取
  • 转化
  • 降维
  • 选择工具

实现算法

MLlib实现的算法包含:

  • 分类
  • 回归
  • 聚类
  • 协同过滤

流水线

使用Spark SQL中的DF作为数据集,可以容纳各种数据类型。DFML Pinline用来存储源数据。DF中的列可以是:

  • 文本
  • 特征向量
  • 真实和预测标签等

转换器transformer能将一个DF转换成另一个DF,增加一个标签列。

评估器estimator指的是学习算法或在训练数据上的训练方法的抽象概念,本质上就是一个算法。

参数parameter用来进行参数的设置。

流水线构建
  1. 定义pipeline中的各个流水线阶段PipelineStage,包含转换器和评估器
  2. 转换器和评估器有序的组织起来构建PipeLine
  3. 流水线本身也是估计器。在流水线的.fit()方法运行之后,产生一个PipelineModel,变成了一个Transformer
代码语言:javascript复制
# pyspark.ml依赖numpy:sudo pip3 install numpy

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF.Tokenizer

# 构建训练数据
training = spark.createDataFrame([
  (0,"a b c d e spark", 1.0),
  (1,"b d", 0.0),
  (2,"spark b d e", 1.0),
  (3,"hadoop mapreduce",0.0)
],["id","text","label"])

# 定义每个阶段:阶段都是评估器或者转换器
tokenizer = Tokenizer(inputCol="text",outputCol="words")   # 分解器
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),outputCol="features")
Lr = LogisticRegression(maxIter=10,regParam=0.001)

# 合并到流水线
pipeline = Pipeline(stages=[tokenizer,hashingTF,Ir])   # 本质上是一个评估器
model = pipeline.fit(training)  # 变成了一个PipelineModel,是一个转换器

# 构建测试数据
test = spark.createDataFrame([
  (4," b d e spark"),
  (5,"spark     d"),
  (6,"spark u d "),
  (7,"hadoop spark")
],["id","text"])

prediction = model.transform(test)
selected = prediction.select("id","text","probability","prediction")
for row in selected.collect():
  rid, text, prob, prediction = row
  print(rid,text,str(prob),predi  ction)

特征提取和转换

特征提取

TF-IDF;词频-逆向文件频率

  • TF:HashingTF是一个转换器;统计各个词条的词频
  • IDF:是一个评估器,在数据集上应用IDFfit方法,会产生一个IDFmodel
代码语言:javascript复制
from pyspark.ml.feature import HashingTF,IDF,Tokenizer

sentenceData = spark.createDataFrame([(0,"..."),(1,"..."),(0,"..."),(1,"..."),(1,"...")]).toDF("label","sentence")   # ...代表是一个个句子

tokenizer = Tokenizer(inputCol="sentence",outputCol="words")  # 指定分解器的两个属性
wordsData = tokenizer.transform(sentenceData)
wordsData.show()

hashingTF = HashingTF(inputCol="words",outputCol="rawFeatures",numFeatures=2000)  # 把句子转成哈希特征向量
featurizedData = hashingTF.transform(wordsData)
featurizedData.select("words","rawFeatures").show(truncate=False)

# 调节权重:评估器
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)

# 训练过程:调节权重
rescaleData = idfModel.transform(featurizedData)
rescaleData.select("features", "label").show(truncate=False)
转换

将字符串转换成整数索引,或者在完成计算之后将证书索引还原成字符串标签。相关的转换器

0 人点赞