数据分析的本质是为了解决问题,以逻辑梳理为主,分析人员会将大部分精力集中在问题拆解、思路透视上面,技术上的消耗总希望越少越好,而且分析的过程往往存在比较频繁的沟通交互,几乎没有时间百度技术细节。
因此,熟练常用技术是良好分析的保障和基础。
笔者认为熟练记忆数据分析各个环节的一到两个技术点,不仅能提高分析效率,而且将精力从技术中释放出来,更快捷高效的完成逻辑与沟通部分。
笔者习惯将一些常用的技术点梳理出来,下次用到可以轻松复制出来,节省不少精力,随着时间的积累,逐渐成型了一套技术集合。本文基于数据分析的基本流程,整理了SQL、pandas、pyspark、EXCEL(本文暂不涉及数据建模、分类模拟等算法思路)在分析流程中的组合应用,希望对大家有所助益。
1、数据导入
将数据导入到python的环境中相对比较简单,只是工作中些许细节,如果知道可以事半功倍:
1.1、导入Excel/csv文件:
代码语言:javascript复制# 个人公众号:livandata
import pandas as pd
def fun(x):
x = int(x) - 1000
return x
data = pd.read_csv('total_data_append_ssl.txt',
dtype='str',
nrows=5,
sep=',',
header=[1,2],
names=['a','b','c'],
prefix='x',
converters={'a': fun, 'b': fun})
常用的导入Excel/CSV文件的方法为:read_csv()与read_excel()。
在使用过程中会用到一些基本的参数,如上代码:
1) dtype='str':以字符串的形式读取文件;
2) nrows=5:读取多少行数据;
3) sep=',:以逗号分隔的方式读取数据;
4) header=[1,2]:取哪一行作为列名。
如果将第2行作为列名,则header=1;
如果将第2,3行作为列名,则header=[1,2];
5) names=['a','b','c']如果要指定行名,则可以选用names参数:
6) prefix='x':对列名添加前缀,例如:列名为a,加入prefix之后显示为xa。
7) converters={'a': fun, 'b': fun}:对a和b两列做如上fun函数的处理。
1.2、导入txt文件:
代码语言:javascript复制with open('total_data_append_ssl.txt', 'r') as file_to_read:
while True:
lines = file_to_read.readline() # 整行读取数据
if not lines:
break
读取数据主要有两个:
1) r:覆盖式读取;
2) r :追加式读取;
1.3、读入mysql中的数据:
代码语言:javascript复制import sqlalchemy as sqla
# 用sqlalchemy构建数据库链接engine
con = sqla.create_engine('mysql pymysql://root:123456@localhost:3306/livan?charset=utf8')
# 如果读写数据中有汉字,则用charset=utf8mb4:
# con = sqla.create_engine('mysql pymysql://root:123456@localhost:3306/livan?charset=utf8mb4')
# sql 命令
sql_cmd = "SELECT * FROM table"
df = pd.read_sql(sql=sql_cmd, con=con)
在构建连接的时候,笔者遇到一个有意思的操作,就是charset=utf8mb4,由于mysql不支持汉字,则在有汉字读写的时候需要用到utf8mb4编码,而不是单纯的utf8结构。
1.4、使用pyspark读取数据:
代码语言:javascript复制from pyspark.sql import SparkSession
spark = SparkSession
.builder
.appName("PythonWordCount")
.master("local")
.getOrCreate()
spark.conf.set("spark.executor.memory", "500M")
sc = spark.sparkContext
pyspark是一个相对较新的包,主要是采用python的方式连接了spark环境,他可以对应的读取一些数据,例如:txt、csv、json以及sql数据,可惜的是pyspark没有提供读取excel的api,如果有excel的数据,需要用pandas读取,然后转化成sparkDataFrame使用。
1) 读取csv数据:
代码语言:javascript复制data = spark.read.
options(header='True', inferSchema='True', delimiter=',').
csv("/Users/livan/PycharmProjects/spark_workspace/total_data_append_1.csv")
2)读取txt数据:
代码语言:javascript复制df1 = spark.read.text("/spark_workspace/ssssss.txt")
lines = sc.textFile("data.txt")
3) 读取json数据:
代码语言:javascript复制df = spark.read.json('file:///Users/wangyun/Documents/BigData/script/data/people.json')
4) 读取SQL数据:
代码语言:javascript复制sqlDF = spark.sql("SELECT * FROM people")
读取sql时,需要连接对应的hive库或者数据库,有需要可以具体百度,这里就不详细描述了。
我们可以看到,pyspark读取上来的数据是存储在sparkDataFrame中,打印出来的方法主要有两个:
代码语言:javascript复制print(a.show())
print(b.collect())
show()是以sparkDataFrame格式打印;
collect()是以list格式打印。
2、分批读取数据:
遇到数据量较大时,我们往往需要分批读取数据,等第一批数据处理完了,再读入下一批数据,python也提供了对应的方法,思路是可行的,但是使用过程中会遇到一些意想不到的问题,例如:数据多批导入过程中,内存并不释放,最终导致内存溢出。
所以,正常情况下,如果遇到较大的数据量,我们会采用pyspark方式,这里只是记录分批读数的方案思路,有兴趣的小伙伴可以尝试一下:
代码语言:javascript复制# 分批读取文件:
def read_in_chunks(filePath, chunk_size=10*10):
file_object = open(filePath)
time.sleep(2)
while True:
chunk_data = file_object.read(chunk_size)
if not chunk_data:
break
yield chunk_data
if __name__ == "__main__":
path = '/Users/livan/PycharmProjects/data/Page Data/Facebook Insights Data Export - Visit Beijing - 2014-07.xml'
for chunk in read_in_chunks(path):
print(chunk)
另外,pandas中也提供了对应的分块方法:
代码语言:javascript复制loop = true
chunkSize = 10000
path = '../data/result.csv'
reader = pd.read_csv(path, iterator = True, dtype=str)
while loop:
try:
chunk = reader.get_chunk(chunkSize).fillna('nan')
except StopIteration:
loop = False
print('iteration is stopped~')
3、数据导出
3.1、导出到csv/excel中:
代码语言:javascript复制df.to_csv('tses.csv', sep=',',columns=['a','b','c'],
na_rep='', header=True,
index=True, encoding='utf_8_sig')
数据写入csv和excel 的函数主要有:to_csv和to_excel两个。1) sep=',':输出的数据以逗号分隔;
2) columns=['a','b','c']:制定输出哪些列;
3) na_rep='':缺失值用什么内容填充;
4) header=True:是导出表头;
5) index=True:是否写入行名;
6) encoding='utf_8_sig':以字符串形式输出到文件中,汉字的编码有两种形式encoding='utf_8'和encoding='utf_8_sig',如果一种情况出现乱码,可以再换另一种方式。
2.2、导出到txt中:
代码语言:javascript复制url='ssdsdsd'
with open('teete.txt', 'a', encoding="utf-8") as file_handle: # .txt可以不自己新建,代码会自动新建
file_handle.write(url)
将数据写入到txt文件中,a为追加模式,w为覆盖写入。
Open()函数中添加encoding参数,即以utf-8格式写入。
2.3、导出到mysql中:
代码语言:javascript复制columns = ['aaa', 'bbb', 'ccc', 'ddd']
index = ['chinese', 'math', 'English']
data = np.random.randint(0, 100, size=(3, 4))
test = DataFrame(columns=columns, index=index, data=data)
# 例如:
con = sqla.create_engine('mysql pymysql://root:123456@localhost:3306/livan?charset=utf8')
# 导入数据库
test.to_sql('test_table', con, index=False, if_exists='replace',
dtype={'aaa': sqla.types.INT,
'bbb': sqla.types.INT,
'ccc': sqla.types.INT,
'ddd': sqla.types.INT
})
其中if_exists参数是表示数据的追加模式:append追加模式和replace覆盖模式。
导出数据时如果数据量过大,to_sql的效率会很慢,有些大佬给出了对应的方案:
代码语言:javascript复制import cStringIO
output = cStringIO.StringIO()
# ignore the index
df_a.to_csv(output, sep='t', index=False, header=False)
output.getvalue()
# jump to start of stream
output.seek(0)
connection = engine.raw_connection() # engine 是 from sqlalchemy import create_engine
cursor = connection.cursor()
# null value become ''
cursor.copy_from(output, table_name, null='')
connection.commit()
cursor.close()
存入效率瞬间提升,在此做个感慨,沟通提高效率,例如上面的问题,不问不知道,一问有高人。
2.4、使用pyspark做数据导出:
代码语言:javascript复制from pyspark.sql import SparkSession
spark = SparkSession
.builder
.appName("PythonWordCount")
.master("local")
.getOrCreate()
spark.conf.set("spark.executor.memory", "500M")
sc = spark.sparkContext
sqlDF = spark.sql("SELECT * FROM people")
try:
sqlDF.write.csv("sss.csv")
sqlDF.write.text("seses.txt")
sqlDF.write.format("orc").mode("append").
saveAsTable("kuming.biaoming")
except Exception as e:
raise e
我们可以看到pyspark中的导出结构相对比较统一,即write函数,可以导出为csv、text和导出到hive库中,可以添加format格式和追加模式:append 为追加;overwrite为覆盖。
如上即为数据的导入导出方法,笔者在分析过程中,将常用的一些方法整理出来,可能不是最全的,但却是高频使用的,如果有新的方法思路,欢迎大家沟通。