PySpark|ML(评估器)

2020-11-24 09:51:12 浏览数 (1)

PySpark ML(评估器)

在PySpark中包含了两种机器学习相关的包:MLlib和ML,二者的主要区别在于MLlib包的操作是基于RDD的,ML包的操作是基于DataFrame的。根据之前我们叙述过的DataFrame的性能要远远好于RDD,并且MLlib已经不再被维护了,所以在本专栏中我们将不会讲解MLlib。

数据集获取地址1:https://gitee.com/dtval/data.git

数据集获取地址2:公众号后台回复spark

01

评估器简介

ML中的评估器主要是对于机器学习算法的使用,包括预测、分类、聚类等,本文中会介绍多种模型的使用方式以及使用一些模型来实现简单的案例。

分类

  • LogisticRegression 逻辑回归(仅支持二分类问题)
  • DecisionTreeClassifier 决策树
  • GBTClassifier 提督提升决策树
  • RandomForestClassifier 随机森林
  • NaiveBayes 朴素贝叶斯
  • MultilayerPerceptronClassifier 多层感知器
  • OneVsRest 将多分类问题简化为二分类问题

回归

  • AFTSurvivalRegression 加速失效时间回归模型
  • DecisionTreeRegressor 决策树回归
  • GBTRegressor 梯度提升决策树回归
  • GeneralizedLinearRegression 广义线性回归
  • IsotonicRegression 拟合一个形式自由、非递减的行到数据中。
  • LinearRegression 线性回归
  • RandomForestRegressor 随机森林回归(预测)

聚类

  • BisectingKMeans 二分K均值算法
  • KMeans K均值算法
  • GaussianMixture 高斯混合模型
  • LDA LDA模型

02

评估器应用(分类)

代码语言:javascript复制
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, VectorAssembler

spark = SparkSession.builder.master('local[1]').appName(
    'learn_ml').getOrCreate()

# 载入数据
df0 = spark.read.csv('mushrooms.csv',
                     header=True,
                     inferSchema=True,
                     encoding='utf-8')
# 查看是否有缺失值
df0.toPandas().isna().values.any()
# False 没有缺失值

# 先使用StringIndexer将字符转化为数值,然后将特征整合到一起
old_columns_names = df0.columns
new_columns_names = [name   '-new' for name in old_columns_names]
for i in range(len(old_columns_names)):
    indexer = StringIndexer(inputCol=old_columns_names[i],
                            outputCol=new_columns_names[i])
    df0 = indexer.fit(df0).transform(df0)
vecAss = VectorAssembler(inputCols=new_columns_names[1:], outputCol='features')
df0 = vecAss.transform(df0)
# 更换label列名
df0 = df0.withColumnRenamed(new_columns_names[0], 'label')

# 创建新的只有label和features的表
dfi = df0.select(['label', 'features'])

# 查看数据
# dfi.show(5, truncate=0)

# 将数据集分为训练集和测试集
train_data, test_data = dfi.randomSplit([4.0, 1.0], 100)

blor = LogisticRegression(regParam=0.01)
blorModel = blor.fit(train_data)
result = blorModel.transform(test_data)

# 计算准确率
result.filter(result.label == result.prediction).count() / result.count()
# 0.9661954517516902

03

评估器应用(预测/回归)

代码语言:javascript复制
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName('learn_regression').master(
    'local[1]').getOrCreate()
# 数据导入
df_train = spark.read.csv('boston/train.csv',
                          header=True,
                          inferSchema=True,
                          encoding='utf-8')
df_test = spark.read.csv('boston/test.csv',
                         header=True,
                         inferSchema=True,
                         encoding='utf-8')
# 表合并
from pyspark.sql.functions import lit
df_test = df_test.withColumn('medv', lit(22.77))
df0 = df_train.union(df_test).sort('ID')
# df0.show(3)



def feature_converter(df):
    vecAss = VectorAssembler(inputCols=df0.columns[1:-1], outputCol='features')
    df_va = vecAss.transform(df)
    return df_va


# 按照7:3的方式划分训练集和测试集
train_data, test_data = feature_converter(df0).select(
    ['features', 'medv']).randomSplit([7.0, 3.0], 101)

# 选择算法并训练数据
gbt = GBTRegressor(maxIter=10, labelCol='medv', maxDepth=3)
gbt_model = gbt.fit(train_data)
# 对数据进行预测
result = gbt_model.transform(test_data)
# 计算测试数据的均方根误差
gbt_evaluator = RegressionEvaluator(labelCol='medv',
                                    metricName="rmse",
                                    predictionCol='prediction')
rmse = gbt_evaluator.evaluate(result)
print('测试数据的均方根误差(rmse):{}'.format(rmse))
# 测试数据的均方根误差(rmse):5.624145397622545

04

评估器应用(聚类)

代码语言:javascript复制
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from plotly.offline import iplot, init_notebook_mode
import plotly.graph_objs as go
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

spark = SparkSession.builder.master('local[1]').appName(
    'learn_cluster').getOrCreate()
# 导入数据
df = spark.read.csv('Mall_Customers.csv', header=True, inferSchema=True)
# 更换列名
df = df.withColumnRenamed('Annual Income (k$)',
                          'Income').withColumnRenamed('Spending Score (1-100)',
                                                      'Spend')
# 查看数据
# df.show(3)

# 查看是否有缺失值
df.toPandas().isna().sum()

#选取特征项,将特征项合并成向量
vecAss = VectorAssembler(inputCols=df.columns[3:], outputCol='features')
df_km = vecAss.transform(df).select('CustomerID', 'features')

# k=5 创建模型
kmeans = KMeans(k=5, seed=1)
km_model = kmeans.fit(df_km)
centers = km_model.clusterCenters()
# 集簇中心点
centers
[
    np.array([55.2962963, 49.51851852]),
    np.array([25.72727273, 79.36363636]),
    np.array([86.53846154, 82.12820513]),
    np.array([88.2, 17.11428571]),
    np.array([26.30434783, 20.91304348])
]

# 获取聚类预测结果
transformed = km_model.transform(df_km).select('CustomerID', 'prediction')

# 合并表格
df_pred = df.join(transformed, 'CustomerID')

# 转化pandas dataframe 然后可视化
pd_df = df_pred.toPandas()
trace = go.Scatter(x=pd_df.Income,
                   y=pd_df.Spend,
                   mode='markers',
                   marker={
                       'size': 10,
                       'color': pd_df.prediction,
                       'colorscale': 'Viridis'
                   })
iplot([trace])

聚类结果展示:

听说长得好看的人都拉到底点了在看

0 人点赞