PySpark |ML(转换器)

2020-11-12 17:51:49 浏览数 (1)

在PySpark中包含了两种机器学习相关的包:MLlib和ML,二者的主要区别在于MLlib包的操作是基于RDD的,ML包的操作是基于DataFrame的。根据之前我们叙述过的DataFrame的性能要远远好于RDD,并且MLlib已经不再被维护了,所以在本专栏中我们将不会讲解MLlib。

01

ML简介

在ML包中主要包含了三个主要的抽象类:转换器、评估器、管道,本文先来介绍第一种抽象类——转换器。

02

转换器

在PySpark中,我们通常通过将一个新列附加到DataFrame来转换数据。

  • Binarizer()
  • 用处:根据指定的阈值将连续变量转换为对应的二进制值。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import Binarizer
df = spark.createDataFrame([(0.5, ), (1.0, ), (1.5, )], ['values'])
binarizer = Binarizer(threshold=0.7, inputCol="values", outputCol="features")
binarizer.transform(df).show()

# 结果展示
 ------ -------- 
|values|features|
 ------ -------- 
|   0.5|     0.0|
|   1.0|     1.0|
|   1.5|     1.0|
 ------ -------- 
  • Bucketizer()
  • 用处:将连续变量离散化到指定的范围区间。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import Bucketizer
values = [(0.1, ), (0.4, ), (1.2, ), (1.5, ), (float("nan"), ),
          (float("nan"), )]
df = spark.createDataFrame(values, ["values"])
# splits 为分类区间
bucketizer = Bucketizer(splits=[-float("inf"), 0.5, 1.4,
                                float("inf")],
                        inputCol="values",
                        outputCol="buckets")
bucketed = bucketizer.setHandleInvalid("keep").transform(df)
bucketed.show()

# 结果展示
 ------ ------- 
|values|buckets|
 ------ ------- 
|   0.1|    0.0|
|   0.4|    0.0|
|   1.2|    1.0|
|   1.5|    2.0|
|   NaN|    3.0|
|   NaN|    3.0|
 ------ ------- 
  • ChiSqSelector()
  • 用处:使用卡方检验完成选择。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import ChiSqSelector
df = spark.createDataFrame([(Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0),
                            (Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0),
                            (Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0)],
                           ["features", "label"])
selector = ChiSqSelector(numTopFeatures=2, outputCol="selectedFeatures")
model = selector.fit(df)
model.transform(df).show()

# 结果展示
 ------------------ ----- ---------------- 
|          features|label|selectedFeatures|
 ------------------ ----- ---------------- 
|[0.0,0.0,18.0,1.0]|  1.0|      [18.0,1.0]|
|[0.0,1.0,12.0,0.0]|  0.0|      [12.0,0.0]|
|[1.0,0.0,15.0,0.1]|  0.0|      [15.0,0.1]|
 ------------------ ----- ---------------- 
  • CountVectorizer()
  • 用处:从数据集中学习某种模式,对数据进行标记
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import CountVectorizer
df = spark.createDataFrame([(0, ["a", "b", "c"]),
                            (1, ["a", "b", "b", "c", "a"])], ["label", "raw"])
cv = CountVectorizer(inputCol="raw", outputCol="vectors")
model = cv.fit(df)
model.transform(df).show(truncate=False)

# 结果展示
 ----- --------------- ------------------------- 
|label|raw            |vectors                  |
 ----- --------------- ------------------------- 
|0    |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1    |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
 ----- --------------- ------------------------- 
  • ElementwiseProduct()
  • 用处:返回传入向量和参数scalingVec的乘积
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([2.0, 1.0, 3.0]), )], ["values"])
ep = ElementwiseProduct(scalingVec=Vectors.dense([1.0, 2.0, 3.0]),
                        inputCol="values",
                        outputCol="eprod")
ep.transform(df).show()
ep.setParams(scalingVec=Vectors.dense([2.0, 3.0, 5.0])).transform(df).show()

# 结果展示
 ------------- ------------- 
|       values|        eprod|
 ------------- ------------- 
|[2.0,1.0,3.0]|[2.0,2.0,9.0]|
 ------------- ------------- 

 ------------- -------------- 
|       values|         eprod|
 ------------- -------------- 
|[2.0,1.0,3.0]|[4.0,3.0,15.0]|
 ------------- -------------- 
  • MaxAbsScaler()
  • 用处:将数据调整到[-1,1]范围内(不会移动数据的中心)
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([1.0]), ),
                            (Vectors.dense([2.0]), )], ["a"])
maScaler = MaxAbsScaler(inputCol="a", outputCol="scaled")
model = maScaler.fit(df)
model.transform(df).show()

# 结果展示
 ----- ------ 
|    a|scaled|
 ----- ------ 
|[1.0]| [0.5]|
|[2.0]| [1.0]|
 ----- ------ 
  • MinMaxScaler()
  • 用处:将数据缩放到[0,1]范围内(最大最小归一化)。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([0.0]), ),
                            (Vectors.dense([2.0]), )], ["a"])
mmScaler = MinMaxScaler(inputCol="a", outputCol="scaled")
model = mmScaler.fit(df)
print(model.originalMin, model.originalMax)
model.transform(df).show()

# 结果展示
[0.0] [2.0]
 ----- ------ 
|    a|scaled|
 ----- ------ 
|[0.0]| [0.0]|
|[2.0]| [1.0]|
 ----- ------ 
  • NGram()
  • 用处:返回NGram算法后的结果。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import NGram
from pyspark.sql import Row
df = spark.createDataFrame([Row(inputTokens=["a", "b", "c", "d", "e"])])
ngram = NGram(n=2, inputCol="inputTokens", outputCol="nGrams")
ngram.transform(df).show()

# 结果展示
 --------------- -------------------- 
|    inputTokens|              nGrams|
 --------------- -------------------- 
|[a, b, c, d, e]|[a b, b c, c d, d e]|
 --------------- -------------------- 
  • Normalizer()
  • 用处:使用p范数将数据缩放为单位范数(默认为L2)。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
svec = Vectors.sparse(4, {1: 4.0, 3: 3.0})
df = spark.createDataFrame([(Vectors.dense([3.0, -4.0]), svec)],
                           ["dense", "sparse"])
normalizer = Normalizer(p=2.0, inputCol="dense", outputCol="features")
normalizer.transform(df).show()

# 结果展示
 ---------- ------------------- ---------- 
|     dense|             sparse|  features|
 ---------- ------------------- ---------- 
|[3.0,-4.0]|(4,[1,3],[4.0,3.0])|[0.6,-0.8]|
 ---------- ------------------- ---------- 
  • OneHotEncoderEstimator()
  • 用处:将分类列编码为二进制向量列(独热编码)。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(0.0, ), (1.0, ), (2.0, )], ["input"])
ohe = OneHotEncoderEstimator(inputCols=["input"], outputCols=["output"])
model = ohe.fit(df)
model.transform(df).show()

# 结果展示
 ----- ------------- 
|input|       output|
 ----- ------------- 
|  0.0|(2,[0],[1.0])|
|  1.0|(2,[1],[1.0])|
|  2.0|    (2,[],[])|
 ----- ------------- 
  • PCA()
  • 用处:使用主成分分析执行数据降维(PCA算法)。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]), ),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]), ),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]), )]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca.fit(df)
model.transform(df).show(truncate=0)

# 结果展示
 --------------------- ---------------------------------------- 
|features             |pca_features                            |
 --------------------- ---------------------------------------- 
|(5,[1,3],[1.0,7.0])  |[1.6485728230883807,-4.013282700516296] |
|[2.0,0.0,3.0,4.0,5.0]|[-4.645104331781534,-1.1167972663619026]|
|[4.0,0.0,0.0,6.0,7.0]|[-6.428880535676489,-5.337951427775355] |
 --------------------- ---------------------------------------- 
  • QuantileDiscretizer()
  • 用处:传入一个numBuckets参数,该方法通过计算数据的近似分位数来决定分隔应该是什么。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import QuantileDiscretizer
values = [(0.1, ), (0.4, ), (1.2, ), (1.5, ), (float("nan"), ),
          (float("nan"), )]
df = spark.createDataFrame(values, ["values"])
qds = QuantileDiscretizer(numBuckets=2,
                          inputCol="values",
                          outputCol="buckets",
                          relativeError=0.01,
                          handleInvalid="error")
bucketizer = qds.fit(df)
qds.setHandleInvalid("keep").fit(df).transform(df).show()

# 结果展示
 ------ ------- 
|values|buckets|
 ------ ------- 
|   0.1|    0.0|
|   0.4|    1.0|
|   1.2|    1.0|
|   1.5|    1.0|
|   NaN|    2.0|
|   NaN|    2.0|
 ------ ------- 
  • RegexTokenizer()
  • 用处:使用正则表达式的字符串分词器。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import RegexTokenizer
df = spark.createDataFrame([("A B  c", )], ["text"])
reTokenizer = RegexTokenizer(inputCol="text", outputCol="words")
reTokenizer.transform(df).show()

# 结果展示
 ------ --------- 
|  text|    words|
 ------ --------- 
|A B  c|[a, b, c]|
 ------ --------- 
  • StandardScaler()
  • 用处:数据标准化。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([0.0]), ),
                            (Vectors.dense([2.0]), )], ["a"])
standardScaler = StandardScaler(inputCol="a", outputCol="scaled")
model = standardScaler.fit(df)
print(model.mean, model.std)
model.transform(df).show()

# 结果展示
[1.0] [1.4142135623730951]
 ----- ------------------- 
|    a|             scaled|
 ----- ------------------- 
|[0.0]|              [0.0]|
|[2.0]|[1.414213562373095]|
 ----- ------------------- 
  • StopWordsRemover()
  • 用处:从标记文本中删除停用词。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import StopWordsRemover
df = spark.createDataFrame([(["a", "b", "c"], )], ["text"])
remover = StopWordsRemover(inputCol="text", outputCol="words", stopWords=["b"])
remover.transform(df).show()

# 结果展示
 --------- ------ 
|     text| words|
 --------- ------ 
|[a, b, c]|[a, c]|
 --------- ------ 
  • Tokenizer()
  • 用处:将字符串转成小写,然后以空格为分隔符分词。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import Tokenizer
df = spark.createDataFrame([("ASD VA c", )], ["text"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenizer.transform(df).show()

# 结果展示
 -------- ------------ 
|    text|       words|
 -------- ------------ 
|ASD VA c|[asd, va, c]|
 -------- ------------ 
  • VectorSlicer()
  • 用处:给定一个索引列表,从特征向量中提取值(作用于特征向量,不管是密集的还是稀疏的)。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([-2.0, 2.3, 0.0, 0.0, 1.0]), ),
                            (Vectors.dense([0.0, 0.0, 0.0, 0.0, 0.0]), ),
                            (Vectors.dense([0.6, -1.1, -3.0, 4.5, 3.3]), )],
                           ["features"])
vs = VectorSlicer(inputCol="features", outputCol="sliced", indices=[1, 4])
vs.transform(df).show(truncate=0)

# 结果展示
 ----------------------- ---------- 
|features               |sliced    |
 ----------------------- ---------- 
|[-2.0,2.3,0.0,0.0,1.0] |[2.3,1.0] |
|[0.0,0.0,0.0,0.0,0.0]  |[0.0,0.0] |
|[0.6,-1.1,-3.0,4.5,3.3]|[-1.1,3.3]|
 ----------------------- ---------- 
  • VectorAssembler()
  • 用处:将多个数字(包括向量)列合并为一列向量。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
vecAssembler.transform(df).show()

# 结果展示
 --- --- --- ------------- 
|  a|  b|  c|     features|
 --- --- --- ------------- 
|  1|  0|  3|[1.0,0.0,3.0]|
 --- --- --- ------------- 
  • Word2Vec()
  • 用处:将一个句子(字符串)作为输入,将其转换为{string, vector}格式的映射。
  • 使用方法示例:
代码语言:javascript复制
from pyspark.ml.feature import Word2Vec
sent = ("a b " * 100   "a c " * 10).split(" ")
doc = spark.createDataFrame([(sent, ), (sent, )], ["sentence"])
word2Vec = Word2Vec(vectorSize=5,
                    seed=42,
                    inputCol="sentence",
                    outputCol="model")
model = word2Vec.fit(doc)
model.getVectors().show()

# 结果展示
 ---- -------------------- 
|word|              vector|
 ---- -------------------- 
|   a|[0.09461779892444...|
|   b|[1.15474212169647...|
|   c|[-0.3794820010662...|
 ---- -------------------- 

0 人点赞