0.序言
本文主要以基于AWS 搭建的EMR spark 托管集群,使用pandas pyspark 对合作单位的业务数据进行ETL —- EXTRACT(抽取)、TRANSFORM(转换)、LOAD(加载) 等工作为例介绍大数据数据预处理的实践经验,很多初学的朋友对大数据挖掘,数据分析第一直观的印象,都只是业务模型,以及组成模型背后的各种算法原理。往往忽视了整个业务场景建模过程中,看似最普通,却又最精髓的数据预处理或者叫数据清洗过程。
1. 数据接入
我们经常提到的ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程,首先第一步就是根据不同来源的数据进行数据接入,主要接入方式有三:
- 1.批量数据 可以考虑采用使用备份数据库导出dmp,通过ftp等多种方式传送,首先接入样本数据,进行分析
- 2.增量数据 考虑使用ftp,http等服务配合脚本完成
- 2.实时数据 消息队列接入,kafka,rabbitMQ 等
数据接入对应ETL 中的E—-EXTRACT(抽取),接入过程中面临多种数据源,不同格式,不同平台,数据吞吐量,网络带宽等多种挑战。
python 这种胶水语言天然可以对应这类多样性的任务,当然如果不想编程,还有:Talend,Kettle,Informatica,Inaplex Inaport等工具可以使用.
e.g. 一个kettle 的作业流
以上不是本文重点,不同数据源的导入导出可以参考:
数据库,云平台,oracle,aws,es导入导出实战
我们从数据接入以后的内容开始谈起。
2. 脏数据的清洗
比如在使用Oracle等数据库导出csv file时,字段间的分隔符为英文逗号,字段用英文双引号引起来,我们通常使用大数据工具将这些数据加载成表格的形式,pandas ,spark中都叫做dataframe
对与字段中含有逗号,回车等情况,pandas 是完全可以handle 的,spark也可以但是2.2之前和gbk解码共同作用会有bug
数据样例
代码语言:javascript复制1,2,3
"a","b,
c","d"
"4","6,7","8"
pandas
代码语言:javascript复制# -*- coding:utf-8 -*-
"""@author:season@file:testCSV.py@time:2018/5/3110:49"""
import pandas
def sum_analysis(filename,col_names):
# 读csv文件
data = pandas.read_csv(filename,names=col_names,
engine='python', dtype=str)
# 返回前n行
first_rows = data.head(n=2)
print(first_rows)
# 返回全部列名
cols = data.columns
print(cols)
# 返回维度
dimensision = data.shape
print(dimensision)
print(data.info())
return data
def main():
col_names = ['1','2','3']
file_test = u'''test.csv'''
print(sum_analysis(file_test,col_names))
if __name__=='__main__':
main()
pandas 加载的 result
pyspark
代码语言:javascript复制sdf = spark.read.option("header","true")
.option("charset","gbk")
.option("multiLine", "true")
.csv("s3a://your_file*.csv")
pdf = sdf.limit(1000).toPandas()
linux 命令
强大的sed命令,去除两个双引号中的换行
代码语言:javascript复制**处理结果放入新文件**
sed ':x;N;s/nPO/ PO/;b x' INPUTFILE > OUTPUTFILE
代码语言:javascript复制**处理结果覆盖源文件**
sed -i ':x;N;s/nPO/ PO/;b x' INPUTFILE
当然,有些情况还有由于文件编码造成的乱码情况,这时候就轮到linux命令大显神威了。
比如 使用enconv 将文件由汉字编码转换成utf-8
代码语言:javascript复制enconv -L zh_CN -x UTF-8 filename
或者要把当前目录下的所有文件都转成utf-8
代码语言:javascript复制enca -L zh_CN -x utf-8 *
在Linux中专门提供了一种工具convmv进行文件名编码的转换,可以将文件名从GBK转换成UTF-8编码,或者从UTF-8转换到GBK。
下面看一下convmv的具体用法:
代码语言:javascript复制convmv -f 源编码 -t 新编码 [选项] 文件名
代码语言:javascript复制#将目录下所有文件名由gbk转换为utf-8
convmv -f GBK -t UTF-8 -r --nosmart --notest /your_directory
3. 缺失值的处理
pandas
pandas使用浮点值NaN(Not a Number)表示浮点数和非浮点数组中的缺失值,同时python内置None值也会被当作是缺失值。
如果其中有值为None,Series会输出None,而DataFrame会输出NaN,但是对空值判断没有影响。DataFrame使用isnull方法在输出空值的时候全为NaN
例如对于样本数据中的年龄字段,替换缺失值,并进行离群值清洗
代码语言:javascript复制pdf["AGE"] = pd.to_numeric(pdf["AGE"],"coerce").fillna(500.0).astype("int")
pdf[(pdf["AGE"] > 0) & (pdf["AGE"] < 150)]
自定义过滤器过滤
代码语言:javascript复制#Fix gender
def fix_gender(x):
if x is None:
return None
if "男" in x:
return "M"
if "女" in x:
return "F"
pdf["PI_SEX"] = pdf["PI_SEX"].map(fix_gender)
or
pdf["PI_SEX"] = pdf["PI_SEX"].apply(fix_gender)
或者直接删除有缺失值的行
代码语言:javascript复制data.dropna()
pyspark
spark 同样提供了,.dropna(…) ,.fillna(…) 等方法,是丢弃还是使用均值,方差等值进行填充就需要针对具体业务具体分析了
4. 数据质量核查与基本的数据统计
对于多来源场景下的数据,需要敏锐的发现数据的各类特征,为后续机器学习等业务提供充分的理解,以上这些是离不开数据的统计和质量核查工作,也就是业界常说的让数据自己说话。
4.1 统一单位
多来源数据 ,突出存在的一个问题是单位不统一,比如度量衡,国际标准是米,然而很多北美国际习惯使用英尺等单位,这就需要我们使用自定义函数,进行单位的统一换算。
4.2 去重操作
pandas
去重操作可以帮助我们统计业务的核心数据,从而迅速抓住主要矛盾。例如,对于互联网公司来说,每天有很多的业务数据,然而发现其中的独立个体的独立行为才是数据分析人员应该注意的点。
代码语言:javascript复制data.drop_duplicates(['column'])
pyspark
使用dataframe api 进行去除操作和pandas 比较类似
代码语言:javascript复制sdf.select("column1","column2").dropDuplicates()
当然如果数据量大的话,可以在spark环境中算好再转化到pandas的dataframe中,利用pandas丰富的统计api 进行进一步的分析。
代码语言:javascript复制pdf = sdf.select("column1","column2").dropDuplicates().toPandas()
使用spark sql,其实我觉的这个spark sql 对于传统的数据库dba 等分析师来说简直是革命性产品, 例如:如下代码统计1到100测试中每一个测试次数的人员分布情况
代码语言:javascript复制count_sdf.createOrReplaceTempView("testnumber")
count_sdf_testnumber = spark.sql("
SELECT tests_count,count(1) FROM
testnumber where tests_count < 100 and lab_tests_count > 0
group by tests_count
order by count(1) desc")
count_sdf_testnumber.show()
4.3 聚合操作与统计
pyspark 和pandas 都提供了类似sql 中的groupby 以及distinct 等操作的api,使用起来也大同小异,下面是对一些样本数据按照姓名,性别进行聚合操作的代码实例
代码语言:javascript复制sdf.groupBy("SEX").agg(F.count("NAME")).show()
labtest_count_sdf = sdf.groupBy("NAME","SEX","PI_AGE").agg(F.countDistinct("CODE").alias("tests_count"))
顺带一句,pyspark 跑出的sql 结果集合,使用toPandas() 转换为pandas 的dataframe 之后只要通过引入matplotlib, 就能完成一个简单的可视化demo 了。
样例数据
代码语言:javascript复制d2 = pd.DataFrame({
'label': [1,2,3],
'count': [10,2,3],})
d2.plot(kind='bar')
plt.show()
d2.plot.pie(labels=['1', '2', '3'],subplots=True, figsize=(8, 4))
plt.show()
直方图,饼图
参考文献
做Data Mining,其实大部分时间都花在清洗数据
http://www.raincent.com/content-10-8092-1.html
基于PySpark大规模数据预处理
https://www.jianshu.com/p/b7882e9616c7----