Spark数据工程|专题(1)——引入,安装,数据填充,异常处理等

2021-08-10 11:28:35 浏览数 (1)

大家好!

写累了数学方面的笔记,今天写一点编程相关的,我们换换口味。

本节主要是对最近使用Spark完成的一些工作做一些抽象和整理。Spark是一个大数据框架(不是一门新的计算机编程语言,而是一个系统,一个框架。如果拿Python实现,就是pyspark,拿scala实现,就是spark-scala等),是大数据开发的一项必备技能,因其分布式系统(distributed system)的实现而被广泛应用。运算速度快的特点让其成为了算法与数据工程任务中的必备技能之一,在大厂的面试中也经常出现对Spark的考察。

不过Spark本身其实是具有一定的学习门槛的。对分布式准确性与速度的要求使其在很多设计上使用了一些精巧的办法,这也使得完成Spark的任务需要动一些脑筋,对其涉及到的特殊的数据结构也需要有一些了解。不过区别于数学与统计系列的笔记,编程我们不会做成数学方面的系列笔记,而更希望以练代讲,面向需求和实际任务,穿插介绍编程中涉及到的原理,并尽全力说明白这些设计的思考与目的。

好啦,废话够多了,我们开始吧。

目录

  • 安装Intellij IDEA与Spark
  • Spark启动与读取数据
  • Spark写入数据
  • Spark实现空值填充
  • Spark使用UDF处理异常值
  • Spark的执行UI展示

涉及关键词

  • SQL
  • SparkSession
  • SparkConf
  • json/csv
  • DataFrame
  • show
  • spark.implicits
  • Seq
  • selectExpr
  • collect
  • first
  • na.fill
  • Row
  • Array
  • Any
  • Column
  • withColumn
  • withColumnRenamed
  • groupBy
  • count
  • desc
  • sort
  • col
  • udf

Source

  • Bilibili,黑马程序员Spark视频

1. 安装Intellij IDEA与Spark

安装Intellij IDEA的原因是我们使用的是Scala来进行编程。而编写Scala最好的IDE自然就是Intellij IDEA(后面简写为IDEA

Note 1: scala是一门多范式(multi-paradigm)的编程语言,设计初衷是要集成面向对象编程和函数式编程的各种特性。Scala 运行在 Java 虚拟机上,并兼容现有的 Java 程序。

所以简单来说scala的语法接近Python,但是速度接近Java。不过不要觉得这个是一件大好事,实际上scala的应用还是有些复杂的,坑埋在了其他地方……不过这里我们不详谈。

当然了,之后的所有代码我们都会使用Scala来书写。至于为什么不用万金油Python,最大的原因就是速度慢,也就是说即使是pyspark,在实际的数据工程操作中也很少会被采用。当然如果是要写pyspark,那就需要使用PyCharm了

IDEA的安装非常简单,百度搜索一下很容易就能找到。我们可以先建一个Scala的项目。如果是一开始安装IDEA的话,一开始要确认自己的Spark是否有安装(因为IDEA一般还是基于Java来编程的,自然不可能一开始就装好这个)。如果没有的话,可以走下面的步骤

注意要搜索的是Azure Toolkit for Intellij。蓝色的框说明了这是我们需要的插件。

Remark 1: Plugins板块有很多支持IDEA的编程外部插件。

同样的我们也可以在IDEA安装Scala,如果你想学的话。

我想已经够清楚了,就不配文字了233。

那我们新建一个项目(project)好了。按照图的流程走一遍

Remark 2: Maven是一个包依赖管理工具。简单来说Java/Scala很多时候都会依赖到非常多外部的包(就和Python要写机器学习,肯定要导入sklearn包一样),不可能每一个包都下载下来(和Python很不一样,如果Python没找到包,肯定都会pip install ...,这就会把包下载到本地)。所以创建maven项目的时候,会有一个pom.xml文件,用来标记本项目所需要的外部包,maven会解析它们并下载作为本项目使用,不会永久存到本地电脑中。

然后随便起个名字,起个项目的地址就可以了。这里SDK选择的是jdk-8,也是它相对来说比较稳定的缘故,Spark的版本选择了2.4.0,则是考虑到公司的需求。

所以现在你就创建好了一个项目,这个项目具有一个统一的层级架构。如果是初学者一开始会比较不适应,对于习惯面向过程的算法工程师来说更是如此。我们来简单熟悉一下这个架构。

Remark 3: 一般来说我们会在/src/main/scala下写主功能代码,而/src/test/scala下则写对应的测试代码。编写对应的测试代码是开发的一个比较重要的习惯,具体的部分可以参考单元测试,文档测试相关的内容。

然后我们可以创建一个scala的文件。

这里要注意蓝色的框,如果不点scala文件夹,是看不到上面我选择的Scala Class这个选项的。这个原因在于IDEA认为你没有在正确的地方写代码,而不是因为你配置错了。

创建scala文件的时候,我们选择object,这一点和Java不太一样,但在这里,可以简单的认为它就是一种class。

这些都准备好了,我们就可以开始写代码了!

2. Spark启动与读取数据

Spark读取的数据是基于分布式的,因此读取方法是专门设计的。

Request 1: 读取并以Python中DataFrame的形式展示数据文件

现在我们假设我的项目的文件夹内有一个json文件,我们希望去读取它并展示。那么我们可以写出这么一段代码

代码语言:javascript复制
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.set("spark.driver.host", "127.0.0.1")
    val spark = SparkSession.builder().appName("HandleExample").config(conf).getOrCreate()
    val df = spark.read.json("src/main/resources/people.json")
    df.show()
    df.printSchema()
    spark.stop()
  }
}

Remark 4: 这里我们创建了一个名叫Test的object。这个名字必须要和之前创建scala文件的文件名是一致的,Java也有这个规范。

这里的文件people.json是这样的,它放在了src/main/resources这里

代码语言:javascript复制
{"name":"Bob","age": 40}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
{"name":"P", "age":  20.0}
{"name":"Q"}
{"name":"R", "age": 20}
{"name":"S", "age": 35}
{"name": "T", "age": 100}

这是很多条的映射关系,那么跑出来的结果是这样的

代码语言:javascript复制
# The first request
 ----- ------ 
|  age|  name|
 ----- ------ 
| 40.0|   Bob|
| 30.0|  Andy|
| 19.0|Justin|
| 20.0|     P|
| null|     Q|
| 20.0|     R|
| 35.0|     S|
|100.0|     T|
 ----- ------ 

# The second request
root
 |-- age: double (nullable = true)
 |-- name: string (nullable = true)

可以看出空值的部分被展示为了null。对于这样的dataframe,我们可以将行看作一条一条的数据,列看作一个一个的特征。比方说第一行的意思就是“Bob年龄是40.0“,这也是对应的json想表达的意思。

当然很多人肯定会觉得开幕雷击,这究竟是个啥?不要着急,我们一段一段切分。

我们可以先看这一段

代码语言:javascript复制
val conf = new SparkConf()
    conf.setMaster("local")
    conf.set("spark.driver.host", "127.0.0.1")
    val spark = SparkSession.builder().appName("HandleExample").config(conf).getOrCreate()

还是那句话,Spark是一个分布式的框架。所以在使用它之前,我们自然需要启动它。启动Spark的方法就是这一段。

Note 2: conf是一个SparkConf对象,它相当于对于Spark的启动做了一些配置setMaster方法指定了master的位置,local[*]表示在本地运行,*可以填一个数,表示分布式的线程数,也可以不填,由系统自主决定。也可以直接填成*,表示是cpu的最大核数。set方法做了一些其它的配置,这里设置了host127.0.0.1。除此之外,还设置了一个name(appName)标记这个Spark的运行进程。这些都标注好之后,通过SparkSession对象启动一个Spark的运行进程。

一大堆专有名词我看着都晕,我们再努力拆解一下。

下面这一张图简单描述了一下Spark的分布式究竟“分布”在哪里。

所以说Spark需要做一个分配,把集群(Cluster,可以理解为一大批电脑或服务器)的资源合理的调度,这就涉及到Yarn等各种集群调度框架,我们这里不详谈。感兴趣的可以看一下这篇文章

https://blog.csdn.net/zpf336/article/details/82152286

对于local方法来说,可以简单理解为以本地的一条线程就是一台电脑。这也是因为线程可以并行执行的特性

下面的这一张图则描述了master的含义。

所以master这个词其实来源于分布式系统中主从复制的概念,是为了保证数据的准确性而考虑的设计,其他的内容我们这里不详谈。

host一般理解为地址。直译为”东道主“,那你作为一个东道主自然需要提供一个场地嘛,这就是ip地址。这里我们填的127.0.0.1便是本地常用的ip地址。

最后,注意导入两个包,也就是

代码语言:javascript复制
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

不过如果你使用intellij IDEA的话,它会自动帮助你识别并导入它们的

当然如果你连“对象”这个词都完全不了解的话,这一段解释多半也看不太明白。但如果你恰好需要完成Spark相关的任务,那么原封不动的拷贝运行即可……

启动好了,终于到了读取文件的时候,读取文件对应了这一段

代码语言:javascript复制
val df = spark.read.json("src/main/resources/people.json")
    df.show()
    df.printSchema()
    spark.stop()

所以show方法就是展示数据的,里面也可以填一个数,表示展示的行数。printSchema则是展示数据的范式。读取json自然使用的就是spark.read.json方法,这里的spark就是我们之前创建的SparkSession对象。运行完之后,自然需要停止它,需要使用到stop方法。但在IDEA中,不加它也会自动帮你停止

Remark 5: 范式(Schema)是SQL中的概念,简单来说描述的是对于数据的规范。对于固定的一列,其数据必须为什么格式,是否允许有空值,是否为主键等等。如果对SQL不了解或不感兴趣,可以不关心这个知识点。 Remark 6: SQL是Structured Query Language(结构化查询语言)的缩写,经常会被用于查询数据和组合特征等前期的数据处理和分析工作,因为比较易读而被采用。

当然了,我们除了读json数据,也可以读csv数据(或者说更加常见的是csv数据)。但csv数据一般都会有一列特征名(也就是header),因此在读取的时候,要额外处理一下,核心代码为

代码语言:javascript复制
val df = spark.read.option("header", true).csv("src/main/resources/mushrooms.csv")
df.show()

这里的mushrooms.csv是kaggle上的一个公开数据集,大家可以进去下载并自己尝试跑通这个例子。如果不加header的限制的话,第一行的特征名也会被当成数据本身,对表的Schema的推断也会产生影响。

https://www.kaggle.com/uciml/mushroom-classification

除此之外,Seq格式也是经常会被使用到的。我们可以用它来人工构造一些测试数据。核心代码如下

代码语言:javascript复制
import spark.implicits._

val personList = Seq(("zhangsan", 15), ("lisi", 20)).toDF("a", "b")
personList.show()

最终的结果如下

代码语言:javascript复制
 -------- --- 
|       a|  b|
 -------- --- 
|zhangsan| 15|
|    lisi| 20|
 -------- --- 

这里要注意的是,Seq不是Spark的特有结构,而是scala的。因此如果希望把它转为Spark中的对象DataFrame,就需要导入spark.implicits._,并且要在SparkSession创建之后。换句话说这个导入是在main函数内部发生的,一开始写程序的话可能会感觉有些不可思议,但是在实际开发中这种灵活的操作非常常见。

那么到此为止,对于Spark的读数据,我们已经介绍的足够的多了。

3. Spark写入数据

Spark是分布式计算的框架,所以它的写入数据的方式也有所不同。我们不如看一下,假如说我们希望写入我们之前读入的people.json,那么可以这么做

代码语言:javascript复制
df.write.format("json").mode("Overwrite").save("src/main/resources/result2")
val df2 = spark.read.json("src/main/resources/result2")
df2.show()

这一段代码会将之前的内容完整的输入和输出出来。

这个地方比较让人迷惑的是读入的数据有点让人看不懂。它会成为这样的数据

这是因为spark的写入是分布式写入的,所以正常情况下,它会写成多个文件,每一个文件是一个part,而所有文件在一起就是之前完整的数据集。换句话说我们的写入路径其实规定了文件保存的一个文件夹。我们也可以点开每一个part去看具体的文件内容,但一般情况下没人这么干……

同样的,因为这里以json方式写入了,所以读的时候就要以json方式读。完整的按照这个文件夹的地址读入即可。不过有的时候因为要对数据做一些处理,可能会存在csv存不下,或读入读出结果不一样的情况。这个情况我们到后面会简单讨论。

接下来我们讨论如何处理数据处理的相关问题。

4. Spark实现空值填充

空值填充是一个非常常见的数据处理方式,核心含义就是把原来缺失的数据给重新填上。因为数据各式各样,因为处理问题导致各种未填补的数据出现也是家常便饭。不同的数据自然要有不同的处理方式,因此我们这里也会介绍使用不同的方式进行填充时,对应的不同的代码。在这一部分,我们会介绍以平均数,中位数,众数自己手动处理方式进行空值填充的方式。

现在我们考虑people.json,这个文件中,age这一列是存在一个空值的。

Request 2: 对某一列中空值的部分填成这一列已有数据的平均数

可以这么做

代码语言:javascript复制
val meanResult = df.selectExpr("mean(age) AS age_mean").collect()(0)(0).toString().toDouble
val df2 = df.na.fill(meanResult, Array("age"))
df2.show()

最终输出的表是这样的

代码语言:javascript复制
 ------------------ ------ 
|               age|  name|
 ------------------ ------ 
|              40.0|   Bob|
|              30.0|  Andy|
|              19.0|Justin|
|              20.0|     P|
|37.714285714285715|     Q|
|              20.0|     R|
|              35.0|     S|
|             100.0|     T|
 ------------------ ------ 

可以看得出来,我们确实以37.714...这样的一串数填了进去。

这段代码的丑陋可瞬间暴露了自己代码写得烂的本质呢……我们分析一下。df.na.fill方法就是填充的方法,这个方法会对空的部分填上我们传入的参数。第二个参数Array("age")其实就表示了填充所对应的列。

Note 3: 这里要注意使用的是Scala中的Array数据结构,比较类似Java中的ArrayList。C中的链表或者数组。从设计的角度来说,因为填充的方法自然不可能只能对一列填充,所以这里表示可以填充多列,也就因此需要传入Array格式。

因此在这种情况下,我们可以先计算出这一行的平均值meanResult,再填入。

计算平均值的方法中,写SQL是最方便的(不同系统中的SQL语法有可能不一样,这里统一是Hive SQL),所以我们使用了df.selectExpr方法,最大程度的还原SQL的习惯。比方说上面的代码如果改成SQL,其实就是

代码语言:javascript复制
SELECT mean(age) AS age_mean FROM df 

也就是提取出平均值的含义,并且可以绕过null,只计算已有的数据的。

但是这样并不能得到平均值,而是得到一个包裹着平均值的DataFrame,这就是后面的操作的作用。

collect方法会将这个DataFrame做一个处理,把它变成一个列表,列表内的每一个元素都是一个列表,表示的是每一条数据。这是因为如果我们直接执行上面的SQL,会得到

代码语言:javascript复制
 ------------------ 
|          age_mean|
 ------------------ 
|37.714285714285715|
 ------------------ 

所以这个列表只有一个元素,这个元素是一个列表,为[37.714285714285715]。然后我们再提取出这一个元素,就是这个数本身了。

最后便是最为让人难以理解的地方,为什么要先把它转为String,再转为Double呢?这是因为按照我们上面的处理,最终得到的其实是一个Any格式的内容。

Remark 7: Any是Scala中的一种格式,类似Java中的Object,是所有数据格式的父类。因此能够直接使用的方法非常少。

因此如果要得到对应的正确的格式并填入,只能这么“曲线救国”了。可以看出这是一个效率很低的方法,而出现这种情况的原因也是我们在取数的时候,原始的关于数据格式的相关信息丢失了,因此只能通过这种非常强制的方法来做。当然这也是因为df.na.fill方法中,并没有实现参数是Any类型的情况,所以只能处理。

那么有没有其它的方法呢?当然也可以,注意到collect方法之后,我们其实会把DataFrame转为一个Array[Row]

Note 4: Row是一个Spark的数据格式,表示一行数据,它实现了一些可以直接将数据转为不同格式的方法。

所以对代码,我们可以这么改一下。

代码语言:javascript复制
val meanResult = df.selectExpr("mean(age) AS age_mean").collect()(0).getDouble()
val df2 = df.na.fill(meanResult, Array("age"))
df2.show()

// 或者你可以使用first函数
val meanResult = df.selectExpr("mean(age) AS age_mean").first().getDouble()
val df2 = df.na.fill(meanResult, Array("age"))
df2.show()

虽然内部不知道它会怎么做,但至少这样看着好看一些了。这里的getDouble方法可以直接把数据转换为Double。但是要注意的是,这里的转换遵循Spark的默认转换规则,比方说对应的数不是一个整数,但我们使用getInt方法,那么就会报错

代码语言:javascript复制
Exception in thread "main" java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.Integer

有的时候我们可能会有额外的需求。比方说希望填充的数经过上取整或者下取整等。只要学过SQL中的roundfloor函数,那都不是事。

代码语言:javascript复制
val meanResult = df.selectExpr("round(mean(age), 0) AS age_mean").collect()(0).getInt()
val df2 = df.na.fill(meanResult, Array("age"))
df2.show()

这样就可以了。

把这个需求变一变,就是下面的问题

Request 3: 对某一列中空值的部分填成这一列已有数据的

alpha

分位数

分位数在业界还是很有分量的一个统计量,但这个其实核心的问题还是落在SQL的书写上。SQL中有一个percentile函数,我们可以用上。

代码语言:javascript复制
val perResult = df.selectExpr("percentile(age, 0.5) AS age_percentile").first().getDouble()
val df2 = df.na.fill(perResult, Array("age"))
df2.show()

具体的执行细节,我们就不多说了,和上面完全一样。

Request 4: 对某一列中空值的部分填成这一列已有数据的众数。

按照“频率趋近于概率”的统计学思想,对缺失值填充为众数,也是一个非常常见的操作,因为众数是一类数据中,出现的频率最高的数据。因此很明显,我们要先找到这个数,保存下来,再填进去。这个思路和上面没有差别。

按照这个思路,我们可以写出这样的代码

代码语言:javascript复制
//import写在最上面
import org.apache.spark.sql.functions.desc

val mode = df.groupBy("age").count().sort(desc("count")).first().getDouble(0)
val dfTemp = df.na.fill(mode, Array("age"))
dfTemp.show()

Note 5: Spark中使用非常多的小的函数来封装SQL中的关键字功能。因此很多时候需要额外导入很多包。但如果Spark安装完整,IDEA会在没有引入包的时候提示,同样代码也不会通过编译。

运行的结果如下

代码语言:javascript复制
 ----- ------ 
|  age|  name|
 ----- ------ 
| 40.0|   Bob|
| 30.0|  Andy|
| 19.0|Justin|
| 20.0|     P|
| 20.0|     Q|
| 20.0|     R|
| 35.0|     S|
|100.0|     T|
 ----- ------ 

所以很明显,这个思路其实也是跟SQL来的。DataFrame里面非常多的算子都是和SQL语句有关的。在这里其实相当于翻译成了

代码语言:javascript复制
SELECT age, count(age) AS "count" 
FROM df 
GROUP BY age 
ORDER BY count DESC

使用的也是groupby归类的操作。Pandas中也具有这样的算子操作,感兴趣的可以看这一篇

https://zhuanlan.zhihu.com/p/83789325

那么提取出这个众数,其实就是相当于提取这个SQL查询出来的表中,第一行对应age的那一列。因为这里的语句很简单,一看就知道这个数据在第一行第一列,所以也很好写后续的操作。

说完平均数,中位数,众数之后,还有两个比较好解决的需求是最大值和最小值

Request 5: 对某一列中空值的部分填成这一列已有数据的最大值/最小值。

说它好处理的原因是,在SQL中有和mean类似的maxmin算子,所以代码也非常类似,这里就不解释了。

代码语言:javascript复制
// 最大值
val maxResult = df.selectExpr("max(age) AS age_max").collect()(0)(0).toString().toDouble
val df2 = df.na.fill(maxResult, Array("age"))
df2.show()

// 最小值
val minResult = df.selectExpr("min(age) AS age_min").collect()(0)(0).toString().toDouble
val df2 = df.na.fill(minResult, Array("age"))
df2.show()

如果只是做一列自然没有意思,如果要做多列呢?这里我们以平均值举一个例子。

Request 6: 对多列进行空值填充,填充结果为各列已有值的平均值。

为了展示我们的效果,把people.json修改成下面的文件,也就是加了一列

代码语言:javascript复制
{"name":"Bob","age": 40, "height": 170}
{"name":"Andy", "age":30, "height": 166}
{"name":"Justin", "age":19}
{"name":"P", "age":  20.0, "height": 175}
{"name":"Q"}
{"name":"R", "age": 20, "height": 178}
{"name":"S", "age": 35, "height": 183}
{"name": "T", "age": 100}

为了通用性,我们把这些代码组合成一个函数的形式。

代码语言:javascript复制
import org.apache.spark.sql.DataFrame

def meanValue(df: DataFrame, columns: Array[String]): DataFrame = {
  var dfTemp = df
  for (x <- columns) {
    val meanDF = df.selectExpr(s"mean($x)").collect()(0).getDouble(0)
    dfTemp = dfTemp.na.fill(meanDF, Array(x))
  }
  dfTemp
}

// 在main方法中执行
meanValue(df, Array("age", "height")).show()

最终的结果如下

代码语言:javascript复制
 ------------------ ------ ------ 
|               age|height|  name|
 ------------------ ------ ------ 
|              40.0|   170|   Bob|
|              30.0|   166|  Andy|
|              19.0|   174|Justin|
|              20.0|   175|     P|
|37.714285714285715|   174|     Q|
|              20.0|   178|     R|
|              35.0|   183|     S|
|             100.0|   174|     T|
 ------------------ ------ ------ 

这个代码其实就是通过for循环,把一个处理多列的方法拆成多个处理单列的方法。但这里还是用到了挺多scala中的一些语法特点,还是值得分析一下。

Remark 8: 代码中用到的for (x <- columns)称作增强for循环,比较类似的用法是Python中的for i in list或者Java中的for(x: list)。相当于枚举一个列表(可迭代对象)中的每一个元素。 Remark 9: s"mean($x)"是一个字符串的格式化用法,类似于Python中的"mean({})".format(x)。可以比较方便的把不同的字符串变量存储到其中。 Remark 10: varval不一样,使用val声明的变量是不可变的,因此不能够参与迭代的修改。但是var声明的变量可变。因为我们要多次变换我们的数据,所以就只能定义一个可变对象,也就是dfTemp。 Remark 11: 函数内容的最后一行只有一个变量dfTemp,这个就是函数的返回值,而上方定义函数名的部分规定了函数的返回类型为DataFrame对象。

这些都算是非常常见的用法。

有的时候,需求上会希望保留新列,为了保证变化是正确的。

Request 7: 和之前类似,按平均值进行空值填充,并保留产生的新列。

那应该如何操作呢?可以这样

代码语言:javascript复制
import org.apache.spark.sql.functions.{col, udf}

def meanValue(df: DataFrame, columns: Array[String]): DataFrame = {
  var dfTemp = df
  for (x <- columns) {
    val meanDF = df.selectExpr(s"mean($x)").collect()(0).getDouble(0)
    dfTemp = dfTemp.withColumn(x   "_mean", col(x))
    dfTemp = dfTemp.na.fill(meanDF, Array(x   "_mean"))
  }
  dfTemp
}

// 在main方法中执行
meanValue(df, Array("age", "height")).show()

得到的结果如下

代码语言:javascript复制
 ----- ------ ------ ------------------ ----------- 
|  age|height|  name|          age_mean|height_mean|
 ----- ------ ------ ------------------ ----------- 
| 40.0|   170|   Bob|              40.0|        170|
| 30.0|   166|  Andy|              30.0|        166|
| 19.0|  null|Justin|              19.0|        174|
| 20.0|   175|     P|              20.0|        175|
| null|  null|     Q|37.714285714285715|        174|
| 20.0|   178|     R|              20.0|        178|
| 35.0|   183|     S|              35.0|        183|
|100.0|  null|     T|             100.0|        174|
 ----- ------ ------ ------------------ ----------- 

这里用到的方法是withColumn,它的用法非常简单,第一个参数是列名,第二个参数是一个Column对象,表示这个列的相关信息。比方说这里我只填了一个col(x),所以表示新的列就是x(x是一个字符串)这一列的复制

Note 6: Column也是Spark内的一个独有的对象,简单来说就是一个“列”对象。col(x)的意思就是“一个列名为x的列“。它的主要用法体现在一些比较复杂的SQL中的join操作上,但这里简单理解为“一列数据“就可以了。

5. Spark使用UDF处理异常值

异常值(outlier)也是数据处理中非常常见到的情况,我们需要把它处理掉。那么这个时候,如何处理这些异常值呢?一种是丢弃,一种是截断。UDF的全称是user defined function,用户自定义函数。非常像Pandas中的apply方法。很明显,自然它会具备非常好的灵活性。

我们来看一下UDF是如何使用在这里的。

Request 8: 将异常值进行截断,即如果异常值大于上四分位数 1.5IQR,则截断至上四分位数 1.5IQR,小于下四分位数-1.5IQR,则同理操作。

我们可以写出这样的代码

代码语言:javascript复制
import org.apache.spark.sql.functions.udf

val quantiles = df.stat.approxQuantile("age", Array(0.25, 0.75), 0.0)
val Q1 = quantiles(0)
val Q3 = quantiles(1)
val IQR = Q3 - Q1
val lowerRange = Q1 - 1.5 * IQR
val upperRange = Q3   1.5 * IQR
val lowerRangeTrim = udf((x: Double) => if (x < lowerRange) lowerRange else x)
val upperRangeTrim = udf((x: Double) => if (x > upperRange) upperRange else x)
val dfTemp = df.withColumn( "age_new", upperRangeTrim(lowerRangeTrim(col("age"))))
dfTemp.drop("age").withColumnRenamed("age_new", "age").show()

最终得到的结果如下

代码语言:javascript复制
 ------ ------ ---- 
|height|  name| age|
 ------ ------ ---- 
|   170|   Bob|40.0|
|   166|  Andy|30.0|
|  null|Justin|19.0|
|   175|     P|20.0|
|  null|     Q|null|
|   178|     R|20.0|
|   183|     S|35.0|
|  null|     T|70.0|
 ------ ------ ---- 

看似没有区别,但要注意最后一个age为100的,已经被修改成70了

一步一步看,首先根据统计学的公式计算IQR,我们使用了DataFrame自带的stat.approxQuantile方法。

Note 7: 分布式计算会出现算不准的情况,所以有approx的前缀,表示近似的意思。

算完之后就是定义udf的地方,就是这两行。

代码语言:javascript复制
val lowerRangeTrim = udf((x: Double) => if (x < lowerRange) lowerRange else x)
val upperRangeTrim = udf((x: Double) => if (x > upperRange) upperRange else x)

udf就是所使用的函数,内部其实是scala中的匿名函数,也就是Python中的lambda函数。左端是函数的参数,必须要注明参数的数据类型,且加上括号。右端则是函数的操作,这里也就是一个三元表达式,即如果x > upperRange,那么就把这个值变为upperRange,否则就不变。

Note 8: 需要注明参数的数据类型的原因就是它依然是一个函数,你见过哪一个函数的参数数据类型是任意的呢?

那么接下来,我们传入了一个包装了两层udf的Column对象。相当于对这一列的每一个数据都做了两次处理,一次向上截断,一次则向下截断。

代码语言:javascript复制
val dfTemp = df.withColumn("age_new", upperRangeTrim(lowerRangeTrim(col("age"))))

最后一步实在是不太优雅,为了保证这一列相同,我们之前先创建了一个新列,再删除了旧列,再使用withColumnRenamed方法把它的名字改了。这可真是太离谱了,但是这也是因为我们的需求本身是一个比较通用性的需求的原因,如果我们对数据比较知根知底,了解它的各种特征,schema的话,是可以写出更加优雅的方法的

最后再来看一下异常值的丢弃,应该如何处理。

Request 9: 将异常值进行丢弃,即如果异常值大于上四分位数 1.5IQR或小于下四分位数-1.5IQR,则丢弃。

从业务上来说,如果我们希望丢弃这个异常值,其实就隐含了这一组数据是“无效”的意思。所以丢弃它也没什么大不了的。

我们可以写出这样的代码

代码语言:javascript复制
val quantiles = df.stat.approxQuantile("age", Array(0.25, 0.75), 0.0)
val Q1 = quantiles(0)
val Q3 = quantiles(1)
val IQR = Q3 - Q1
val lowerRange = Q1 - 1.5 * IQR
val upperRange = Q3   1.5 * IQR
df.filter(s"age <= ${upperRange} AND age >= ${lowerRange}").show()

运行的结果如下

代码语言:javascript复制
 ---- ------ ------ 
| age|height|  name|
 ---- ------ ------ 
|40.0|   170|   Bob|
|30.0|   166|  Andy|
|19.0|  null|Justin|
|20.0|   175|     P|
|20.0|   178|     R|
|35.0|   183|     S|
 ---- ------ ------ 

可以看到,那位百岁老人被干掉了。

在这里我们也用到了格式化字符串,将变量lowerRangeupperRange以SQL的形式传入了我们的条件中。这里用到了filter函数,意思是满足条件的才能留下

6. Spark的执行UI展示

如果你真的一直从头到尾实践了这一节所提到的这些需求,那么不难发现,在Spark执行的过程中,一直会产生各种各样的日志

虽然它标成了写代码最怕碰到的红色,但是事实上大部分日志都是无害的。这里我们也可以通过日志来告诉我们Spark的执行UI。但读懂它的UI信息,完全就可以再写一两篇文章了,所以这里只是做个简单的展示。

因为我们是在IDEA中运行代码,它会自动的开始和结束Spark进程,因此程序执行的很快的话,是看不到它的,所以我们运行下面的这一段代码。

代码语言:javascript复制
for(i <- 1 to 300)
 meanValue(df, Array("age", "height")).show()

然后我们在这运行的过程中,进入127.0.0.1:4040,这个IP地址可以通过Spark的日志找到。

所以我们可以看到类似下面的界面。

在这个界面中,画框的部分都是具有信息量的部分,可以看出来执行好和没有执行好的部分,看出不同的任务,它们完成的情况。点击不同的区域自然还会出现不同的任务。这里主要的观察是,Spark会把代码拆成不同的job,然后不同的job内会拆成不同的stage和task。当然这里具有一些Spark的专有的名词,它们都具有不同的意义。

小结

这一节我们主要介绍了一些Spark的基础操作和一些需求的代码编写,在这中间穿插着介绍了一些比较简单的数据处理的操作和注意事项。数据工程的相关任务中,通用性和数据格式的转换一直是需要考虑的重点,也是编写代码中容易出错的地方。

很显然这些还不足够说对Spark有了解,它可以做的还有很多,我们到之后再说。

0 人点赞