问题
现在有一张很大的数据表(格式.csv)。内容量很多,记录着往年所有的历年时间和温度,并且升序存储。
要求1:给定一个历年时间,只用python中的内置函数去查找对应的温度,并且让使用的内存尽可能的小。
要求2:如果使用python中的第三方库,会不会使效率变高,为什么?
准备工作:生成数据测试用
为了更加方便的做内存性能分析。我这里写了个简单的生成满足要求数据的脚本。如下:
# _*_coding:utf-8_*_
'''
生成一些大数据
'''
import time
# 生成100万个日期,温度
dateStart = '2010-01-01 12:00:00'
def generateRandom(rangeFrom, rangeTo):
import random
return random.uniform(rangeFrom,rangeTo)
def generageMassiveIPAddr(fileLocation,numberOfLines):
IP = []
file_handler = open(fileLocation, 'a ')
for i in range(numberOfLines):
current_milli_time = lambda: str(round(time.mktime(time.strptime(dateStart, "%Y-%m-%d %H:%M:%S")) * 1000) i)
IP.append(current_milli_time() ',' str(generateRandom(-40,50)) 'n')
file_handler.writelines(IP)
file_handler.close()
if __name__ == '__main__':
print(time.ctime())
generageMassiveIPAddr('temperature.csv', 1000000)
print(time.ctime())
分析
python特性 & 需求特性
当时拿到这个题目,我考虑了以下几点。
- 确认题目要求的数据存在了多行还是一行。
- 使用第三方库很简单,pandas,numpy完全可以满足要求,那么使用内置函数怎么实现。
- 如何进行性能优化。
经过确认,这里的数据使多行,这样就可以用python中的readline去获取每一行的数据了。
要使内存尽可能的小,我想到了分片读,因为日期为顺序的,可以先分片,然后按照不同的偏移量,依次读一行然后顺序去筛选。
读文件是IO操作,那是不是可以加一个线程去优化读的流程。但是线程之间的时间片切换,也需要占用CPU资源,需要自测来确定线程的数量为多少最合适。
下面我们来根据python的特性来分析以下这些方法可行不可行。
#1
如何实现分片读
python的全局解释器锁GIL对线程的影响
#2
#3
如何测试使用的内存大小,这里我为了方便观察内存引入了profile模块。
什么是全局解释器锁(GIL)
python是一个动态语言,python中一个线程对应于c语言中的一个线程,GIL使得同一个时刻只有一个线程在一个cpu上执行字节码, 无法将多个线程映射到多个cpu上执行。所以同一时间,cpu实际上只能执行一个线程,这也就是为什么有很多人说python中的线程鸡肋。但是GIL是会被释放的,GIL在遇到IO的操作时候主动释放。所以在IO操作很多的情况下,python中的多线程还是有用的。
什么是IO操作
在网络编程中,经常会说到IO这个词。文件读写IO,网络IO。那么这些IO的底层原理是什么呢?看下图。
计算机的核心(CPU和内存),与其它设备之间数据转移的过程就是IO。比如数据从磁盘读入到内存,或内存的数据写回到磁盘,都是IO操作。在计算机的世界里,这就是IO的本质。
从编程的角度理解
应用程序的IO操作有两种:IO调用和IO执行。IO调用是进程发起的,IO执行是操作系统的工作,因此,这里说的IO是应用程序对操作系统IO功能的触发。
因此,应用程序里IO调用的目的是将进程的内部数据输出到外部,或将外部数据输入到进程内部。这里的外部指的是非进程空间数据。
总结,一个进程的IO输入操作流程如下:
a)进程向操作系统请求外部数据
b)操作系统将数据从外部磁盘拷贝到内存缓冲区
c)操作系统将数据从内存缓冲区拷贝到进程缓冲区
d)进程读取数据进行操作
结论
总结 & 代码
上面也说了,在IO密集性应用中。python的多线程还是可以起到一些作用的。那么线程之前的切换也会占用资源。所以得需要调整线程数量参数以使得性能最优。
所有代码如下:
# _*_coding:utf-8_*_
'''
多线程分块读大表
'''
import time, threading
# from memory_profiler import profile
'''
Reader类,继承threading.Thread
@__init__方法初始化
@run方法实现了读文件的操作
'''
class Reader(threading.Thread):
def __init__(self, file_name, start_pos, end_pos, key):
super(Reader, self).__init__()
self.file_name = file_name
self.start_pos = start_pos
self.end_pos = end_pos
self.key = key
# profile分析内存
# @profile()
def run(self):
fd = open(self.file_name, 'r')
'''
该if块主要判断分块后的文件块的首位置是不是行首
是行首的话,不做处理
否则,将文件块的首位置定位到下一行的行首
'''
if self.start_pos != 0:
fd.seek(self.start_pos-1)
if fd.read(1) != 'n':
line = fd.readline()
self.start_pos = fd.tell()
fd.seek(self.start_pos)
'''
对该文件块进行处理
'''
index = 0
while (self.start_pos <= self.end_pos):
index = 1
line = fd.readline()
'''
因为这里的历元时间是顺序排列,所以可以直接比较
'''
date,temperature = line.split(',')
if self.key == date:
# 通过结束位置和偏移可以确定行数
print('in file end_pos' str(self.end_pos) 'offset' str(index) ' temperature' temperature)
break
elif self.key > date:
self.start_pos = fd.tell()
continue
elif self.key < date:
break
'''
对文件进行分块,文件块的数量和线程数量一致
'''
class Partition(object):
def __init__(self, file_name, thread_num):
self.file_name = file_name
self.block_num = thread_num
def part(self):
fd = open(self.file_name, 'r')
'''
给offset参数一个定义,2代表从文件末尾算起
'''
fd.seek(0, 2)
pos_list = []
file_size = fd.tell()
block_size = file_size/self.block_num
start_pos = 0
for i in range(self.block_num):
if i == self.block_num-1:
end_pos = file_size-1
pos_list.append((start_pos, end_pos))
break
end_pos = start_pos block_size-1
if end_pos >= file_size:
end_pos = file_size-1
if start_pos >= file_size:
break
pos_list.append((start_pos, end_pos))
start_pos = end_pos 1
print(pos_list)
fd.close()
return pos_list
if __name__ == '__main__':
#文件名
file_name = 'temperature.csv'
key = '1262318499996'
#线程数量
thread_num = 6
#起始时间
start_time = time.perf_counter()
p = Partition(file_name, thread_num)
t = []
pos = p.part()
print(pos)
#生成线程
for i in range(thread_num):
t.append(Reader(file_name, *pos[i], key))
#开启线程
for i in range(thread_num):
t[i].start()
for i in range(thread_num):
t[i].join()
#结束时间
end_time = time.perf_counter()
print("Cost time is %f" % (end_time - start_time))
思考1
迭代器能帮忙吗
在我的上篇文章中讲了迭代器的本质。
迭代器有个特征是将函数又封装了一层,可以快速的实现上下文切换。那么我们是不是可以将这个特性用到这里,去掉线程,一行一行读数据,然后yield出去呢?
我觉得可以但是没必要,因为yield只针对于需要获取全部数据。这里是时间是顺序的,我不需要全部遍历,只需要一行一行读,然后去比较就可以了。
思考2
为什么第三方库这么快
关于第三方库我也写了一个简单代码,使用到了pandas,pandas可以将数据全部读出,然后因为时间为顺序,完全可以使用二分法去找。代码如下:
# _*_coding:utf-8_*_
'''
用python第三方库pandas读取大表格
'''
import numpy as np
import pandas as pd
import time
''' 递归实现二分法 '''
def dichotomy(data_source, key):
"""定义二分法"""
#先取数组中的中间值
mid = int(len(data_source)/2)
if len(data_source) > 1:
#判断整个数组中的数字个数
if data_source[mid][0] > key:
dichotomy(data_source[:mid], key)
return dichotomy(data_source[:mid], key)
elif data_source[mid][0] < key:
dichotomy(data_source[mid:], key)
return dichotomy(data_source[mid:], key)
else:
return data_source[mid]
#跳出条件
elif len(data_source) == 1:
if data_source[mid][0] == key:
return data_source[mid]
else:
# 如果没有找到就返回None
return None
if __name__ == '__main__':
#起始时间
start_time = time.perf_counter()
csv_result = pd.read_csv('temperature.csv')
row_list = csv_result.values.tolist()
key = 444
rst = dichotomy(row_list, key)
if not rst is None:
# 打印位置
print('find it!The temperature is' str(rst[1]))
else:
print('not find it')
#结束时间
end_time = time.perf_counter()
print("Cost time is %f" % (end_time - start_time))
pandas的基类是用Cython C写的,然后被编译成parser.pyd文件,用C进行文件读写,因此速度非常快,在读大文件时会比python快很多倍,具体不赘述了。
新浪微博|是yancy呀
公众号| 我和bug只能活一个