Python-并发下载-多线程实现-下

2019-07-18 15:22:03 浏览数 (1)

上一节代码中,定义了一个全局变量PARSE_EXIT,用于标识网页源代码队列是否为空。PARSE_EXIT 不为空,则解析线程继续解析下一个源代码;如果 PARSE_EXIT 为空,表明源代码队列中的源代码全部解析完毕,解析线程就可以退出。

ThreadParse 类的 run() 方法中,循环判断 PARSE_EXIT 的值,当 PARSE_EXIT 为 False 时,取出 dataQueue 中的网页源代码,调用 parse() 方法对源代码进行解析。如果 PARSE_EXIT 为 True,表明网页源代码队列为空,所有的源代码已经解析完毕,这个解析线程就可以退出。

  • 解析 html 文档,获取文档内容
代码语言:javascript复制
# 解析 html 文档,获取文档内容
def parse(self, html):
  text = etree.HTML(html)
  # 模糊查询
  node_list = text.xpath("//div[recommend-article(@id, 'qiushi_tag')]")
  
  for node in node_list:
    try:
      # 用户名 
      username = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('.//span').text
      # 图片链接
      image = node.xpath("./li").xpath(".//@src")
      # 取出标题
      title = node.xpath("./li").xpath('./div')[0].xpath("./a").text
      # 点赞数
      like = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[0].text
      # 评论数
      comments = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[3].text
      
      items = {
        "username": username,
        "title": title,
        "image": image,
        "zan": like,
        "comments": comments
      }
      
      # 打开锁,向文件添加内容,释放锁
      with self.lock:
        self.localFile.write(json.dumps(items, ensure_ascii=False)   "n")
        

在多线程开发中,为了维护资源的完整性,在访问共享资源时要使用共享锁 lock。线程获得了锁之后,才可以访问文件 localFile,并往里写入数据;写入完毕后,将锁释放,其它线程就可以访问这个文件。同一时刻,只允许一个线程访问该文件。

  • 完成采集和解析网页内容
代码语言:javascript复制
def main():
  # 页码队列,存储 20 个页码,先进先出
  pageQueue = Queue(20)
  for i in range(1, 21):
    pageQueue.put(i)
  
  # 采集结果的数据队列,参数为空表示不限制
  dataQueue = Queue()
  # 以追加的方式打开本地文件
  localFile = open("duanzi.json", "a")
  # 互斥锁
  lock = threading.Lock()
  
  crawlList = ["采集线程1号", "采集线程2号", "采集线程3号"]
  # 创建、启动和存储 3 个采集线程
  threadCrawls = []
  for threadName in crawlList:
    thread = ThreadCrawl(threadName, pageQueue, dataQueue)
    thread.start()
    threadCrawls.append(thread)
    
  parseList = ["解析线程1号", "解析线程2号", "解析线程3号"]
  # 创建、启动和存储 3 个解析线程
  threadParses = []
  for threadName in parseList:
    thread = ThreadParse(threadName, dataQueue, localFile, lock)
    thread.start()
    threadParses.append(thread)
    
  # 如果 pageQueue 为空,采集线程退出循环
  while not pageQueue.empty():
    pass
  
  global CRAWL_EXIT
  CRAWL_EXIT = True
  for thread in threadCrawls:
    # 阻塞子线程
    thread.join()
  while not dataQueue.empty:
    pass
  
  global PARSE_EXIT
  PARSE_EXIT = True
  for thread in threadParses:
    thread.join()
    
  with lock:
    # 关闭文件,在文件之前,内容都存在内存中
    localFile.close()

0 人点赞