引
言
在PySpark中包含了两种机器学习相关的包:MLlib和ML,二者的主要区别在于MLlib包的操作是基于RDD的,ML包的操作是基于DataFrame的。根据之前我们叙述过的DataFrame的性能要远远好于RDD,并且MLlib已经不再被维护了,所以在本专栏中我们将不会讲解MLlib。
01
ML简介
在ML包中主要包含了三个主要的抽象类:转换器、评估器、管道,本文先来介绍第一种抽象类——转换器。
02
转换器
在PySpark中,我们通常通过将一个新列附加到DataFrame来转换数据。
- Binarizer()
- 用处:根据指定的阈值将连续变量转换为对应的二进制值。
- 使用方法示例:
from pyspark.ml.feature import Binarizer
df = spark.createDataFrame([(0.5, ), (1.0, ), (1.5, )], ['values'])
binarizer = Binarizer(threshold=0.7, inputCol="values", outputCol="features")
binarizer.transform(df).show()
# 结果展示
------ --------
|values|features|
------ --------
| 0.5| 0.0|
| 1.0| 1.0|
| 1.5| 1.0|
------ --------
- Bucketizer()
- 用处:将连续变量离散化到指定的范围区间。
- 使用方法示例:
from pyspark.ml.feature import Bucketizer
values = [(0.1, ), (0.4, ), (1.2, ), (1.5, ), (float("nan"), ),
(float("nan"), )]
df = spark.createDataFrame(values, ["values"])
# splits 为分类区间
bucketizer = Bucketizer(splits=[-float("inf"), 0.5, 1.4,
float("inf")],
inputCol="values",
outputCol="buckets")
bucketed = bucketizer.setHandleInvalid("keep").transform(df)
bucketed.show()
# 结果展示
------ -------
|values|buckets|
------ -------
| 0.1| 0.0|
| 0.4| 0.0|
| 1.2| 1.0|
| 1.5| 2.0|
| NaN| 3.0|
| NaN| 3.0|
------ -------
- ChiSqSelector()
- 用处:使用卡方检验完成选择。
- 使用方法示例:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import ChiSqSelector
df = spark.createDataFrame([(Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0),
(Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0),
(Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0)],
["features", "label"])
selector = ChiSqSelector(numTopFeatures=2, outputCol="selectedFeatures")
model = selector.fit(df)
model.transform(df).show()
# 结果展示
------------------ ----- ----------------
| features|label|selectedFeatures|
------------------ ----- ----------------
|[0.0,0.0,18.0,1.0]| 1.0| [18.0,1.0]|
|[0.0,1.0,12.0,0.0]| 0.0| [12.0,0.0]|
|[1.0,0.0,15.0,0.1]| 0.0| [15.0,0.1]|
------------------ ----- ----------------
- CountVectorizer()
- 用处:从数据集中学习某种模式,对数据进行标记
- 使用方法示例:
from pyspark.ml.feature import CountVectorizer
df = spark.createDataFrame([(0, ["a", "b", "c"]),
(1, ["a", "b", "b", "c", "a"])], ["label", "raw"])
cv = CountVectorizer(inputCol="raw", outputCol="vectors")
model = cv.fit(df)
model.transform(df).show(truncate=False)
# 结果展示
----- --------------- -------------------------
|label|raw |vectors |
----- --------------- -------------------------
|0 |[a, b, c] |(3,[0,1,2],[1.0,1.0,1.0])|
|1 |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
----- --------------- -------------------------
- ElementwiseProduct()
- 用处:返回传入向量和参数scalingVec的乘积
- 使用方法示例:
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([2.0, 1.0, 3.0]), )], ["values"])
ep = ElementwiseProduct(scalingVec=Vectors.dense([1.0, 2.0, 3.0]),
inputCol="values",
outputCol="eprod")
ep.transform(df).show()
ep.setParams(scalingVec=Vectors.dense([2.0, 3.0, 5.0])).transform(df).show()
# 结果展示
------------- -------------
| values| eprod|
------------- -------------
|[2.0,1.0,3.0]|[2.0,2.0,9.0]|
------------- -------------
------------- --------------
| values| eprod|
------------- --------------
|[2.0,1.0,3.0]|[4.0,3.0,15.0]|
------------- --------------
- MaxAbsScaler()
- 用处:将数据调整到[-1,1]范围内(不会移动数据的中心)
- 使用方法示例:
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([1.0]), ),
(Vectors.dense([2.0]), )], ["a"])
maScaler = MaxAbsScaler(inputCol="a", outputCol="scaled")
model = maScaler.fit(df)
model.transform(df).show()
# 结果展示
----- ------
| a|scaled|
----- ------
|[1.0]| [0.5]|
|[2.0]| [1.0]|
----- ------
- MinMaxScaler()
- 用处:将数据缩放到[0,1]范围内(最大最小归一化)。
- 使用方法示例:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([0.0]), ),
(Vectors.dense([2.0]), )], ["a"])
mmScaler = MinMaxScaler(inputCol="a", outputCol="scaled")
model = mmScaler.fit(df)
print(model.originalMin, model.originalMax)
model.transform(df).show()
# 结果展示
[0.0] [2.0]
----- ------
| a|scaled|
----- ------
|[0.0]| [0.0]|
|[2.0]| [1.0]|
----- ------
- NGram()
- 用处:返回NGram算法后的结果。
- 使用方法示例:
from pyspark.ml.feature import NGram
from pyspark.sql import Row
df = spark.createDataFrame([Row(inputTokens=["a", "b", "c", "d", "e"])])
ngram = NGram(n=2, inputCol="inputTokens", outputCol="nGrams")
ngram.transform(df).show()
# 结果展示
--------------- --------------------
| inputTokens| nGrams|
--------------- --------------------
|[a, b, c, d, e]|[a b, b c, c d, d e]|
--------------- --------------------
- Normalizer()
- 用处:使用p范数将数据缩放为单位范数(默认为L2)。
- 使用方法示例:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
svec = Vectors.sparse(4, {1: 4.0, 3: 3.0})
df = spark.createDataFrame([(Vectors.dense([3.0, -4.0]), svec)],
["dense", "sparse"])
normalizer = Normalizer(p=2.0, inputCol="dense", outputCol="features")
normalizer.transform(df).show()
# 结果展示
---------- ------------------- ----------
| dense| sparse| features|
---------- ------------------- ----------
|[3.0,-4.0]|(4,[1,3],[4.0,3.0])|[0.6,-0.8]|
---------- ------------------- ----------
- OneHotEncoderEstimator()
- 用处:将分类列编码为二进制向量列(独热编码)。
- 使用方法示例:
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(0.0, ), (1.0, ), (2.0, )], ["input"])
ohe = OneHotEncoderEstimator(inputCols=["input"], outputCols=["output"])
model = ohe.fit(df)
model.transform(df).show()
# 结果展示
----- -------------
|input| output|
----- -------------
| 0.0|(2,[0],[1.0])|
| 1.0|(2,[1],[1.0])|
| 2.0| (2,[],[])|
----- -------------
- PCA()
- 用处:使用主成分分析执行数据降维(PCA算法)。
- 使用方法示例:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]), ),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]), ),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]), )]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca.fit(df)
model.transform(df).show(truncate=0)
# 结果展示
--------------------- ----------------------------------------
|features |pca_features |
--------------------- ----------------------------------------
|(5,[1,3],[1.0,7.0]) |[1.6485728230883807,-4.013282700516296] |
|[2.0,0.0,3.0,4.0,5.0]|[-4.645104331781534,-1.1167972663619026]|
|[4.0,0.0,0.0,6.0,7.0]|[-6.428880535676489,-5.337951427775355] |
--------------------- ----------------------------------------
- QuantileDiscretizer()
- 用处:传入一个numBuckets参数,该方法通过计算数据的近似分位数来决定分隔应该是什么。
- 使用方法示例:
from pyspark.ml.feature import QuantileDiscretizer
values = [(0.1, ), (0.4, ), (1.2, ), (1.5, ), (float("nan"), ),
(float("nan"), )]
df = spark.createDataFrame(values, ["values"])
qds = QuantileDiscretizer(numBuckets=2,
inputCol="values",
outputCol="buckets",
relativeError=0.01,
handleInvalid="error")
bucketizer = qds.fit(df)
qds.setHandleInvalid("keep").fit(df).transform(df).show()
# 结果展示
------ -------
|values|buckets|
------ -------
| 0.1| 0.0|
| 0.4| 1.0|
| 1.2| 1.0|
| 1.5| 1.0|
| NaN| 2.0|
| NaN| 2.0|
------ -------
- RegexTokenizer()
- 用处:使用正则表达式的字符串分词器。
- 使用方法示例:
from pyspark.ml.feature import RegexTokenizer
df = spark.createDataFrame([("A B c", )], ["text"])
reTokenizer = RegexTokenizer(inputCol="text", outputCol="words")
reTokenizer.transform(df).show()
# 结果展示
------ ---------
| text| words|
------ ---------
|A B c|[a, b, c]|
------ ---------
- StandardScaler()
- 用处:数据标准化。
- 使用方法示例:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([0.0]), ),
(Vectors.dense([2.0]), )], ["a"])
standardScaler = StandardScaler(inputCol="a", outputCol="scaled")
model = standardScaler.fit(df)
print(model.mean, model.std)
model.transform(df).show()
# 结果展示
[1.0] [1.4142135623730951]
----- -------------------
| a| scaled|
----- -------------------
|[0.0]| [0.0]|
|[2.0]|[1.414213562373095]|
----- -------------------
- StopWordsRemover()
- 用处:从标记文本中删除停用词。
- 使用方法示例:
from pyspark.ml.feature import StopWordsRemover
df = spark.createDataFrame([(["a", "b", "c"], )], ["text"])
remover = StopWordsRemover(inputCol="text", outputCol="words", stopWords=["b"])
remover.transform(df).show()
# 结果展示
--------- ------
| text| words|
--------- ------
|[a, b, c]|[a, c]|
--------- ------
- Tokenizer()
- 用处:将字符串转成小写,然后以空格为分隔符分词。
- 使用方法示例:
from pyspark.ml.feature import Tokenizer
df = spark.createDataFrame([("ASD VA c", )], ["text"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenizer.transform(df).show()
# 结果展示
-------- ------------
| text| words|
-------- ------------
|ASD VA c|[asd, va, c]|
-------- ------------
- VectorSlicer()
- 用处:给定一个索引列表,从特征向量中提取值(作用于特征向量,不管是密集的还是稀疏的)。
- 使用方法示例:
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([-2.0, 2.3, 0.0, 0.0, 1.0]), ),
(Vectors.dense([0.0, 0.0, 0.0, 0.0, 0.0]), ),
(Vectors.dense([0.6, -1.1, -3.0, 4.5, 3.3]), )],
["features"])
vs = VectorSlicer(inputCol="features", outputCol="sliced", indices=[1, 4])
vs.transform(df).show(truncate=0)
# 结果展示
----------------------- ----------
|features |sliced |
----------------------- ----------
|[-2.0,2.3,0.0,0.0,1.0] |[2.3,1.0] |
|[0.0,0.0,0.0,0.0,0.0] |[0.0,0.0] |
|[0.6,-1.1,-3.0,4.5,3.3]|[-1.1,3.3]|
----------------------- ----------
- VectorAssembler()
- 用处:将多个数字(包括向量)列合并为一列向量。
- 使用方法示例:
from pyspark.ml.feature import VectorAssembler
df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
vecAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
vecAssembler.transform(df).show()
# 结果展示
--- --- --- -------------
| a| b| c| features|
--- --- --- -------------
| 1| 0| 3|[1.0,0.0,3.0]|
--- --- --- -------------
- Word2Vec()
- 用处:将一个句子(字符串)作为输入,将其转换为{string, vector}格式的映射。
- 使用方法示例:
from pyspark.ml.feature import Word2Vec
sent = ("a b " * 100 "a c " * 10).split(" ")
doc = spark.createDataFrame([(sent, ), (sent, )], ["sentence"])
word2Vec = Word2Vec(vectorSize=5,
seed=42,
inputCol="sentence",
outputCol="model")
model = word2Vec.fit(doc)
model.getVectors().show()
# 结果展示
---- --------------------
|word| vector|
---- --------------------
| a|[0.09461779892444...|
| b|[1.15474212169647...|
| c|[-0.3794820010662...|
---- --------------------