某大厂面试题:如何只用python的内置函数处理10G的大文件并使使用内存最小

2022-09-20 14:56:48 浏览数 (1)

问题

现在有一张很大的数据表(格式.csv)。内容量很多,记录着往年所有的历年时间和温度,并且升序存储。

要求1:给定一个历年时间,只用python中的内置函数去查找对应的温度,并且让使用的内存尽可能的小。

要求2:如果使用python中的第三方库,会不会使效率变高,为什么?

准备工作:生成数据测试用

为了更加方便的做内存性能分析。我这里写了个简单的生成满足要求数据的脚本。如下:

# _*_coding:utf-8_*_

'''

生成一些大数据

'''

import time

# 生成100万个日期,温度

dateStart = '2010-01-01 12:00:00'

def generateRandom(rangeFrom, rangeTo):

import random

return random.uniform(rangeFrom,rangeTo)

def generageMassiveIPAddr(fileLocation,numberOfLines):

IP = []

file_handler = open(fileLocation, 'a ')

for i in range(numberOfLines):

current_milli_time = lambda: str(round(time.mktime(time.strptime(dateStart, "%Y-%m-%d %H:%M:%S")) * 1000) i)

IP.append(current_milli_time() ',' str(generateRandom(-40,50)) 'n')

file_handler.writelines(IP)

file_handler.close()

if __name__ == '__main__':

print(time.ctime())

generageMassiveIPAddr('temperature.csv', 1000000)

print(time.ctime())

分析

python特性 & 需求特性

当时拿到这个题目,我考虑了以下几点。

  1. 确认题目要求的数据存在了多行还是一行。
  2. 使用第三方库很简单,pandas,numpy完全可以满足要求,那么使用内置函数怎么实现。
  3. 如何进行性能优化。

经过确认,这里的数据使多行,这样就可以用python中的readline去获取每一行的数据了。

要使内存尽可能的小,我想到了分片读,因为日期为顺序的,可以先分片,然后按照不同的偏移量,依次读一行然后顺序去筛选。

读文件是IO操作,那是不是可以加一个线程去优化读的流程。但是线程之间的时间片切换,也需要占用CPU资源,需要自测来确定线程的数量为多少最合适。

下面我们来根据python的特性来分析以下这些方法可行不可行。

#1

如何实现分片读

python的全局解释器锁GIL对线程的影响

#2

#3

如何测试使用的内存大小,这里我为了方便观察内存引入了profile模块。

什么是全局解释器锁(GIL)

python是一个动态语言,python中一个线程对应于c语言中的一个线程,GIL使得同一个时刻只有一个线程在一个cpu上执行字节码, 无法将多个线程映射到多个cpu上执行。所以同一时间,cpu实际上只能执行一个线程,这也就是为什么有很多人说python中的线程鸡肋。但是GIL是会被释放的,GIL在遇到IO的操作时候主动释放。所以在IO操作很多的情况下,python中的多线程还是有用的。

什么是IO操作

在网络编程中,经常会说到IO这个词。文件读写IO,网络IO。那么这些IO的底层原理是什么呢?看下图。

计算机的核心(CPU和内存),与其它设备之间数据转移的过程就是IO。比如数据从磁盘读入到内存,或内存的数据写回到磁盘,都是IO操作。在计算机的世界里,这就是IO的本质。

从编程的角度理解

应用程序的IO操作有两种:IO调用和IO执行。IO调用是进程发起的,IO执行是操作系统的工作,因此,这里说的IO是应用程序对操作系统IO功能的触发。

因此,应用程序里IO调用的目的是将进程的内部数据输出到外部,或将外部数据输入到进程内部。这里的外部指的是非进程空间数据。

总结,一个进程的IO输入操作流程如下:

a)进程向操作系统请求外部数据

b)操作系统将数据从外部磁盘拷贝到内存缓冲区

c)操作系统将数据从内存缓冲区拷贝到进程缓冲区

d)进程读取数据进行操作

结论

总结 & 代码

上面也说了,在IO密集性应用中。python的多线程还是可以起到一些作用的。那么线程之前的切换也会占用资源。所以得需要调整线程数量参数以使得性能最优。

所有代码如下:

# _*_coding:utf-8_*_

'''

多线程分块读大表

'''

import time, threading

# from memory_profiler import profile

'''

Reader类,继承threading.Thread

@__init__方法初始化

@run方法实现了读文件的操作

'''

class Reader(threading.Thread):

def __init__(self, file_name, start_pos, end_pos, key):

super(Reader, self).__init__()

self.file_name = file_name

self.start_pos = start_pos

self.end_pos = end_pos

self.key = key

# profile分析内存

# @profile()

def run(self):

fd = open(self.file_name, 'r')

'''

该if块主要判断分块后的文件块的首位置是不是行首

是行首的话,不做处理

否则,将文件块的首位置定位到下一行的行首

'''

if self.start_pos != 0:

fd.seek(self.start_pos-1)

if fd.read(1) != 'n':

line = fd.readline()

self.start_pos = fd.tell()

fd.seek(self.start_pos)

'''

对该文件块进行处理

'''

index = 0

while (self.start_pos <= self.end_pos):

index = 1

line = fd.readline()

'''

因为这里的历元时间是顺序排列,所以可以直接比较

'''

date,temperature = line.split(',')

if self.key == date:

# 通过结束位置和偏移可以确定行数

print('in file end_pos' str(self.end_pos) 'offset' str(index) ' temperature' temperature)

break

elif self.key > date:

self.start_pos = fd.tell()

continue

elif self.key < date:

break

'''

对文件进行分块,文件块的数量和线程数量一致

'''

class Partition(object):

def __init__(self, file_name, thread_num):

self.file_name = file_name

self.block_num = thread_num

def part(self):

fd = open(self.file_name, 'r')

'''

给offset参数一个定义,2代表从文件末尾算起

'''

fd.seek(0, 2)

pos_list = []

file_size = fd.tell()

block_size = file_size/self.block_num

start_pos = 0

for i in range(self.block_num):

if i == self.block_num-1:

end_pos = file_size-1

pos_list.append((start_pos, end_pos))

break

end_pos = start_pos block_size-1

if end_pos >= file_size:

end_pos = file_size-1

if start_pos >= file_size:

break

pos_list.append((start_pos, end_pos))

start_pos = end_pos 1

print(pos_list)

fd.close()

return pos_list

if __name__ == '__main__':

#文件名

file_name = 'temperature.csv'

key = '1262318499996'

#线程数量

thread_num = 6

#起始时间

start_time = time.perf_counter()

p = Partition(file_name, thread_num)

t = []

pos = p.part()

print(pos)

#生成线程

for i in range(thread_num):

t.append(Reader(file_name, *pos[i], key))

#开启线程

for i in range(thread_num):

t[i].start()

for i in range(thread_num):

t[i].join()

#结束时间

end_time = time.perf_counter()

print("Cost time is %f" % (end_time - start_time))

思考1

迭代器能帮忙吗

在我的上篇文章中讲了迭代器的本质。

迭代器有个特征是将函数又封装了一层,可以快速的实现上下文切换。那么我们是不是可以将这个特性用到这里,去掉线程,一行一行读数据,然后yield出去呢?

我觉得可以但是没必要,因为yield只针对于需要获取全部数据。这里是时间是顺序的,我不需要全部遍历,只需要一行一行读,然后去比较就可以了。

思考2

为什么第三方库这么快

关于第三方库我也写了一个简单代码,使用到了pandas,pandas可以将数据全部读出,然后因为时间为顺序,完全可以使用二分法去找。代码如下:

# _*_coding:utf-8_*_

'''

用python第三方库pandas读取大表格

'''

import numpy as np

import pandas as pd

import time

''' 递归实现二分法 '''

def dichotomy(data_source, key):

"""定义二分法"""

#先取数组中的中间值

mid = int(len(data_source)/2)

if len(data_source) > 1:

#判断整个数组中的数字个数

if data_source[mid][0] > key:

dichotomy(data_source[:mid], key)

return dichotomy(data_source[:mid], key)

elif data_source[mid][0] < key:

dichotomy(data_source[mid:], key)

return dichotomy(data_source[mid:], key)

else:

return data_source[mid]

#跳出条件

elif len(data_source) == 1:

if data_source[mid][0] == key:

return data_source[mid]

else:

# 如果没有找到就返回None

return None

if __name__ == '__main__':

#起始时间

start_time = time.perf_counter()

csv_result = pd.read_csv('temperature.csv')

row_list = csv_result.values.tolist()

key = 444

rst = dichotomy(row_list, key)

if not rst is None:

# 打印位置

print('find it!The temperature is' str(rst[1]))

else:

print('not find it')

#结束时间

end_time = time.perf_counter()

print("Cost time is %f" % (end_time - start_time))

pandas的基类是用Cython C写的,然后被编译成parser.pyd文件,用C进行文件读写,因此速度非常快,在读大文件时会比python快很多倍,具体不赘述了。

新浪微博|是yancy呀

公众号| 我和bug只能活一个

0 人点赞