Python | Python学习之常用项目代码(一)

2019-10-09 15:35:15 浏览数 (1)

写在前面

本篇是咸鱼日常撸视频的时候记录的一些代码实例,可以直接运用到项目中但是有些代码的可用性没有那么好,旨在分享思路,不喜勿喷~

搭建ip代理池(简易版)

推荐两个scrapy代理的项目

第一个是免费的代理插件,无需付费 https://github.com/aivarsk/scrapy-proxies

第二个是需要付费的代理插件 https://github.com/scrapy-plugins/scrapy-crawlera

撸视频的时候学到的代理池实例

获取西刺代理的代理列表并存入mysql数据库:

代码语言:javascript复制
def crawl_xici():
    headers = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"}

    for i in range(3411):
        res = requests.get('http://www.xicidaili.com/nn/{}'.format(i), headers = headers)
        ip_list = []
        selector = Selector(text=res.text)
        all_trs = selector.css("#ip_list tr")

        for tr in all_trs[1:]:
            speed_str = tr.css(".bar::attr(title)").extract()[0]
            if speed_str:
                speed = float(speed_str.split("秒")[0])

            all_texts = tr.css("td::text").extract()

            ip = all_texts[0]
            port = all_texts[1]
            proxy_type = all_texts[5]
            ip_list.append((ip,port,proxy_type,speed))
            # print(ip_list)

        # for ip_info in ip_list:
        #     cursor.execute(
        #         "insert into proxy_ip(ip, port, speed, proxy_type) VALUES ('{0}', '{1}', {2}, '{3}')".format(
        #             ip_info[0], ip_info[1], ip_info[3], ip_info[2]
        #         )
        #     )
        #     conn.commit()
        for ip_info in ip_list:
            insert_sql = """
                        insert into proxy_ip(ip, port, speed, proxy_type)
                        VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE ip=VALUES(ip), port=VALUES(port), speed=VALUES(speed), proxy_type=VALUES(proxy_type)
                    """
            params = (ip_info[0], ip_info[1], ip_info[3], ip_info[2])
            cursor.execute(insert_sql,params)
            conn.commit()
            # print("入库成功")

定义随机获取ip的类方法(包括删除无效代理)

代码语言:javascript复制
class GetIP(object):
  # 从数据库删除ip
    def delete_ip(self, ip):
        delete_sql = """    
                  delete from  proxy_ip WHERE ip={0}
                """.format(ip)
        cursor.execute(delete_sql)
        conn.commit()
        print("删除成功")
        return True

    # 验证ip
    def judge_ip(self, ip, port):
        http_url = "http://www.baidu.com"
        proxy_url = "http://{0}:{1}".format(ip, port)
        try:
            proxy_dict = {
                "http":proxy_url
            }
            res = requests.get(http_url, proxies=proxy_dict)
        except Exception as e:
            print("invalid ip and port")
            self.delete_ip(ip)
            return False
        else:
            code = res.status_code
            if code >= 200 and code < 300:
                print("effective ip")
                return True
            else:
                print("invalid ip and port")
                self.delete_ip(ip)
                return False


    # 从数据库获取随机ip
    def get_random_ip(self):
        select_sql = """    
          SELECT ip,port from  proxy_ip ORDER BY RAND() LIMIT 1
        """
        result = cursor.execute(select_sql)
        for ip_info in cursor.fetchall():
            ip = ip_info[0]
            port = ip_info[1]
            judge_re = self.judge_ip(ip,port)
            if judge_re:
                return "http://{0}:{1}".format(ip, port)
            else:
                return self.get_random_ip()



# crawl_xici()
if __name__ == '__main__':
    get_ip = GetIP()
    get_ip.get_random_ip()

在middleware文件中应用我们写的代理

代码语言:javascript复制
# 使用前要记得在setting中添加RadomProxyMiddleware
from tools.crawl_xici_ip import GetIP
# 随机ip代理
class RadomProxyMiddleware(object):
    def process_request(self, request, spider):
        get_ip = GetIP()
        request.meta['proxy'] = get_ip.get_random_ip()

自定义pipline的使用实例

pipline存储json(自定义json存储)
代码语言:javascript复制
import codecs
class JsonWithEncodingPipeline(object):
    #自定义json文件的导出
    def __init__(self):
        self.file = codecs.open('article.json', 'w', encoding="utf-8")
    def process_item(self, item, spider):
        lines = json.dumps(dict(item), ensure_ascii=False)   "n"
        self.file.write(lines)
        return item
    def spider_closed(self, spider):
        self.file.close()
pipline存储json(使用scrapy自带的组件)
代码语言:javascript复制
from scrapy.exporters import JsonItemExporter
class JsonExporterPipleline(object):
    #调用scrapy提供的json export导出json文件
    def __init__(self):
        self.file = open('export.json', 'wb')
        self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
        self.exporter.start_exporting()

    def close_spider(self, spider):
        self.exporter.finish_exporting()
        self.file.close()

    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item
pipline中的存储mysql(阻塞)
代码语言:javascript复制
import MySQLdb
import MySQLdb.cursors
class MysqlPipeline(object):
    #采用同步的机制写入mysql
    def __init__(self):
        self.conn = MySQLdb.connect('127.0.0.1', 'root', 'root', 'spider', charset="utf8", use_unicode=True)
        self.cursor = self.conn.cursor()

    def process_item(self, item, spider):
        insert_sql = """
            insert into jobbole_article(title, url, create_date, fav_nums)
            VALUES (%s, %s, %s, %s)
        """
        self.cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
        self.conn.commit()
pipline中存储mysql(异步)
代码语言:javascript复制
class MysqlTwistedPipline(object):
    def __init__(self, dbpool):
        self.dbpool = dbpool

    @classmethod
    def from_settings(cls, settings):
        dbparms = dict(
            host = settings["MYSQL_HOST"],
            db = settings["MYSQL_DBNAME"],
            user = settings["MYSQL_USER"],
            passwd = settings["MYSQL_PASSWORD"],
            charset='utf8',
            cursorclass=MySQLdb.cursors.DictCursor,
            use_unicode=True,
        )
        dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)

        return cls(dbpool)

    def process_item(self, item, spider):
        #使用twisted将mysql插入变成异步执行
        query = self.dbpool.runInteraction(self.do_insert, item)
        query.addErrback(self.handle_error, item, spider) #处理异常

    def handle_error(self, failure, item, spider):
        # 处理异步插入的异常
        print (failure)


    def do_insert(self, cursor, item):
        insert_sql = """
            insert into jobbole_article(title, url, create_date, fav_nums)
            VALUES (%s, %s, %s, %s)
        """
        cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))

    # 想使用下面的插入方法需要在item中定义insert_sql
    # def do_insert(self, cursor, item):
    #     #执行具体的插入
    #     #根据不同的item 构建不同的sql语句并插入到mysql中
    #     insert_sql, params = item.get_insert_sql()
    #     print (insert_sql, params)
    #     cursor.execute(insert_sql, params)

如何在scrapy中随机切换UA?

随机UA下载中间件(初始版)

setting文件:

代码语言:javascript复制
DOWNLOADER_MIDDLEWARES = {
    'ArticleSpider.middlewares.RandomUserAgentMiddleware': 543,
}

middlewares文件:

代码语言:javascript复制
from fake_useragent import UserAgent
class RandomUserAgentMiddleware(object):
  def __init__(self, crawler):
      super(RandomUserAgentMiddleware, self).__init__()
      self.ua = UserAgent()
      self.ua_type = crawler.settings.get('RANDOM_UA_TYPE', 'random')

      @classmethod
      def from_crawler(cls, crawler):
          return cls(crawler)
      def process_request(self, request, spider):
          def get_ua():
              return getattr(self.ua, self.ua_type)
          request.headers.setdefault('User-Agent', get_ua())

数据存错怎么办?

将redis数据库导入mongodb数据库
代码语言:javascript复制
import json, redis, pymongo

def main():
    # 指定Redis数据库信息
    rediscli = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
    # 指定MongoDB数据库信息
    mongocli = pymongo.MongoClient(host='localhost', port=27017)
    # 创建数据库名
    db = mongocli['sina']
    # 创建表名
    sheet = db['sina_items']
    offset = 0
    while True:
        # FIFO模式为 blpop,LIFO模式为 brpop,获取键值
        source, data = rediscli.blpop(["sinainfospider_redis:items"])
        item = json.loads(data.decode("utf-8"))
        sheet.insert(item)
        offset  = 1
        print(offset)
        try:
            print("Processing: %s " % item)
        except KeyError:
            print("Error procesing: %s" % item)

if __name__ == '__main__':
    main()
将redis数据存入mysql数据库
代码语言:javascript复制
import redis, json, time
from pymysql import connect

# redis数据库链接
redis_client = redis.StrictRedis(host="127.0.0.1", port=6379, db=0)
# mysql数据库链接
mysql_client = connect(host="127.0.0.1", user="root", password="mysql",
                 database="sina", port=3306, charset='utf8')
cursor = mysql_client.cursor()

i = 1
while True:
    print(i)
    time.sleep(1)
    source, data = redis_client.blpop(["sinainfospider_redis:items"])
    item = json.loads(data.decode())
    print("source===========", source)
    print("item===========", item)
    sql = "insert into sina_items(parent_url,parent_title,sub_title,sub_url,sub_file_name,son_url,head,content,crawled,spider) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
    params = [item["parent_url"], item["parent_title"], item["sub_title"], item["sub_url"], item["sub_file_name"],
              item["son_url"], item["head"], item["content"], item["crawled"], item["spider"], ]
    cursor.execute(sql, params)
    mysql_client.commit()
    i  = 1

0 人点赞