代码语言:javascript复制
import requests
import logging
import re
import pymongo
from pyquery import PyQuery as pq
from urllib.parse import urljoin
import multiprocessing
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
BASE_URL = 'https://static1.scrape.cuiqingcai.com'
TOTAL_PAGE = 10
MONGO_CONNECTION_STRING = 'mongodb://localhost:27017'
MONGO_DB_NAME = 'movies'
MONGO_COLLECTION_NAME = 'movies'
client = pymongo.MongoClient(MONGO_CONNECTION_STRING)
db = client['movies']
collection = db['movies']
def scrape_page(url):
"""
scrape page by url and return its html
:param url: page url
:return: html of page
"""
logging.info('scraping %s...', url)
try:
response = requests.get(url)
if response.status_code == 200:
return response.text
logging.error('get invalid status code %s while scraping %s', response.status_code, url)
except requests.RequestException:
logging.error('error occurred while scraping %s', url, exc_info=True)
def scrape_index(page):
"""
scrape index page and return its html
:param page: page of index page
:return: html of index page
"""
index_url = f'{BASE_URL}/page/{page}'
return scrape_page(index_url)
def parse_index(html):
"""
parse index page
:param html: html of index page
:return: generator of detail page url
"""
doc = pq(html)
links = doc('.el-card .name')
for link in links.items():
href = link.attr('href')
detail_url = urljoin(BASE_URL, href)
logging.info('get detail url %s', detail_url)
yield detail_url
def scrape_detail(url):
"""
scrape detail page and return its html
:param page: page of detail page
:return: html of detail page
"""
return scrape_page(url)
def parse_detail(html):
"""
parse detail page
:param html: html of detail page
:return: data
"""
doc = pq(html)
cover = doc('img.cover').attr('src')
name = doc('a > h2').text()
categories = [item.text() for item in doc('.categories button span').items()]
published_at = doc('.info:contains(上映)').text()
published_at = re.search('(d{4}-d{2}-d{2})', published_at).group(1)
if published_at and re.search('d{4}-d{2}-d{2}', published_at) else None
drama = doc('.drama p').text()
score = doc('p.score').text()
score = float(score) if score else None
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
def save_data(data):
"""
save to mongodb
:param data:
:return:
"""
collection.update_one({
'name': data.get('name')
}, {
'$set': data
}, upsert=True)
def main(page):
"""
main process
:return:
"""
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
detail_html = scrape_detail(detail_url)
data = parse_detail(detail_html)
logging.info('get detail data %s', data)
logging.info('saving data to mongodb')
save_data(data)
logging.info('data saved successfully')
if __name__ == '__main__':
pool = multiprocessing.Pool()
pages = range(1, TOTAL_PAGE 1)
pool.map(main, pages)
pool.close()