上一节代码中,定义了一个全局变量PARSE_EXIT,用于标识网页源代码队列是否为空。PARSE_EXIT 不为空,则解析线程继续解析下一个源代码;如果 PARSE_EXIT 为空,表明源代码队列中的源代码全部解析完毕,解析线程就可以退出。
ThreadParse 类的 run() 方法中,循环判断 PARSE_EXIT 的值,当 PARSE_EXIT 为 False 时,取出 dataQueue 中的网页源代码,调用 parse() 方法对源代码进行解析。如果 PARSE_EXIT 为 True,表明网页源代码队列为空,所有的源代码已经解析完毕,这个解析线程就可以退出。
- 解析 html 文档,获取文档内容
# 解析 html 文档,获取文档内容
def parse(self, html):
text = etree.HTML(html)
# 模糊查询
node_list = text.xpath("//div[recommend-article(@id, 'qiushi_tag')]")
for node in node_list:
try:
# 用户名
username = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('.//span').text
# 图片链接
image = node.xpath("./li").xpath(".//@src")
# 取出标题
title = node.xpath("./li").xpath('./div')[0].xpath("./a").text
# 点赞数
like = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[0].text
# 评论数
comments = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[3].text
items = {
"username": username,
"title": title,
"image": image,
"zan": like,
"comments": comments
}
# 打开锁,向文件添加内容,释放锁
with self.lock:
self.localFile.write(json.dumps(items, ensure_ascii=False) "n")
在多线程开发中,为了维护资源的完整性,在访问共享资源时要使用共享锁 lock。线程获得了锁之后,才可以访问文件 localFile,并往里写入数据;写入完毕后,将锁释放,其它线程就可以访问这个文件。同一时刻,只允许一个线程访问该文件。
- 完成采集和解析网页内容
def main():
# 页码队列,存储 20 个页码,先进先出
pageQueue = Queue(20)
for i in range(1, 21):
pageQueue.put(i)
# 采集结果的数据队列,参数为空表示不限制
dataQueue = Queue()
# 以追加的方式打开本地文件
localFile = open("duanzi.json", "a")
# 互斥锁
lock = threading.Lock()
crawlList = ["采集线程1号", "采集线程2号", "采集线程3号"]
# 创建、启动和存储 3 个采集线程
threadCrawls = []
for threadName in crawlList:
thread = ThreadCrawl(threadName, pageQueue, dataQueue)
thread.start()
threadCrawls.append(thread)
parseList = ["解析线程1号", "解析线程2号", "解析线程3号"]
# 创建、启动和存储 3 个解析线程
threadParses = []
for threadName in parseList:
thread = ThreadParse(threadName, dataQueue, localFile, lock)
thread.start()
threadParses.append(thread)
# 如果 pageQueue 为空,采集线程退出循环
while not pageQueue.empty():
pass
global CRAWL_EXIT
CRAWL_EXIT = True
for thread in threadCrawls:
# 阻塞子线程
thread.join()
while not dataQueue.empty:
pass
global PARSE_EXIT
PARSE_EXIT = True
for thread in threadParses:
thread.join()
with lock:
# 关闭文件,在文件之前,内容都存在内存中
localFile.close()