第三方库 requests, fake_useragent
代码语言:javascript复制import random, requests, re, logging, time
from fake_useragent import UserAgent
日志
代码语言:javascript复制logging.basicConfig(level=logging.INFO , format='%(asctime)s[%(levelname)s]: %(message)s')
logger = logging.getLogger(__name__)
随机获取请求头User-Agent
代码语言:javascript复制def getHeader():
return {'Referer': 'https://blog.csdn.net','User-Agent': UserAgent().random}
解析读取量
代码语言:javascript复制def parse_html(text:str):
return int(re.search(r'<span class="read-count">([0-9]*)</span>', text).group(1))
请求网页
代码语言:javascript复制def requestUrlText(url):
try:
header = getHeader()
response = requests.get(url, headers=header)
if response.status_code == 200:
return parse_html(response.text)
except (requests.RequestException, AttributeError):
return False
封装为请求url类
代码语言:javascript复制class request_url():
def __init__(self, url:str):
self.url = url
self.read = 0
def succeeded(self): #弃用
logging.debug(f"Request Url{self.url} successfully.")
def _update(self):
_res = requestUrlText(self.url)
if isinstance(_res, int):
self.succeeded()
_var = _res - self.read
self.read = _res
return _var
return 0
def update(self):
while True:
r = self._update()
if r: return r
def readnum(self) -> int:
return self.read
封装总调用类
代码语言:javascript复制class request_urls():
def __init__(self, *args):
self.__urls = []
self.add(*args)
self.all_add = 0
def add(self, *urls):
for url in urls:
self.__urls.append(request_url(url))
def get(self) -> int:
return sum([u.update() for u in self.__urls])
def total(self) -> int:
return sum([u.readnum() for u in self.__urls])
def update(self) -> None:
add = self.get()
self.all_add = add
logger.info(f"total {self.total()}({len(self.__urls)}blogs), ↑{add} [ALL {self.all_add}]")
运行与主程序
代码语言:javascript复制def run(urls):
req = request_urls(*urls)
req.get() #初始化.
while True:
req.update()
time.sleep(random.randint(60, 70))
if __name__ == '__main__':
run(["https://blog.csdn.net/m0_60394896/article/details/124571653?spm=1001.2014.3001.5502",
"https://blog.csdn.net/m0_60394896/article/details/124530993",
"https://blog.csdn.net/m0_60394896/article/details/124529941?spm=1001.2014.3001.5502",
"https://blog.csdn.net/m0_60394896/article/details/124519531",
"https://blog.csdn.net/m0_60394896/article/details/124508776",
"https://blog.csdn.net/m0_60394896/article/details/124361092",
"https://blog.csdn.net/m0_60394896/article/details/124094245",
"https://blog.csdn.net/m0_60394896/article/details/124034831",
"https://blog.csdn.net/m0_60394896/article/details/124033445",
"https://blog.csdn.net/m0_60394896/article/details/123981398",
"https://blog.csdn.net/m0_60394896/article/details/123772011",
"https://blog.csdn.net/m0_60394896/article/details/123583566",
"https://blog.csdn.net/m0_60394896/article/details/122505828",
"https://blog.csdn.net/m0_60394896/article/details/122371110"])
全部代码
代码语言:javascript复制import random, requests, re, logging, time
from fake_useragent import UserAgent
logging.basicConfig(level=logging.INFO , format='%(asctime)s[%(levelname)s]: %(message)s'); logger = logging.getLogger(__name__)
def getHeader():
return {'Referer': 'https://blog.csdn.net','User-Agent': UserAgent().random}
def parse_html(text:str):
return int(re.search(r'<span class="read-count">([0-9]*)</span>', text).group(1))
def requestUrlText(url):
try:
header = getHeader()
response = requests.get(url, headers=header)
if response.status_code == 200:
return parse_html(response.text)
except (requests.RequestException, AttributeError):
return False
class request_url():
def __init__(self, url:str):
self.url = url
self.read = 0
def succeeded(self): #弃用
logging.debug(f"Request Url{self.url} successfully.")
def _update(self):
_res = requestUrlText(self.url)
if isinstance(_res, int):
self.succeeded()
_var = _res - self.read
self.read = _res
return _var
return 0
def update(self):
while True:
r = self._update()
if r: return r
def readnum(self) -> int:
return self.read
class request_urls():
def __init__(self, *args):
self.__urls = []
self.add(*args)
self.all_add = 0
def add(self, *urls):
for url in urls:
self.__urls.append(request_url(url))
def get(self) -> int:
return sum([u.update() for u in self.__urls])
def total(self) -> int:
return sum([u.readnum() for u in self.__urls])
def update(self) -> None:
add = self.get()
self.all_add = add
logger.info(f"total {self.total()}({len(self.__urls)}blogs), ↑{add} [ALL {self.all_add}]")
def run(urls):
req = request_urls(*urls)
req.get() #初始化.
while True:
req.update()
time.sleep(60 * 60 * 2) #自动两小时请求一次
if __name__ == '__main__':
run(["https://blog.csdn.net/m0_60394896/article/details/124571653?spm=1001.2014.3001.5502",
"https://blog.csdn.net/m0_60394896/article/details/124530993",
"https://blog.csdn.net/m0_60394896/article/details/124529941?spm=1001.2014.3001.5502",
"https://blog.csdn.net/m0_60394896/article/details/124519531",
"https://blog.csdn.net/m0_60394896/article/details/124508776",
"https://blog.csdn.net/m0_60394896/article/details/124361092",
"https://blog.csdn.net/m0_60394896/article/details/124094245",
"https://blog.csdn.net/m0_60394896/article/details/124034831",
"https://blog.csdn.net/m0_60394896/article/details/124033445",
"https://blog.csdn.net/m0_60394896/article/details/123981398",
"https://blog.csdn.net/m0_60394896/article/details/123772011",
"https://blog.csdn.net/m0_60394896/article/details/123583566",
"https://blog.csdn.net/m0_60394896/article/details/122505828",
"https://blog.csdn.net/m0_60394896/article/details/122371110"])
什么? 刷取csdn阅读量?那是不可能 csdn主要靠这几种方式获取你是否在requests
项目 | Value |
---|---|
1 | 时间戳 time.time() 时间计算是一个重要方式 如果你再过少时间内请求多次,则视为增加一次阅读量 |
2 | 请求头 header 这是大多数网站返回结果考虑的因素 |
3 | Referer 你从哪里来? |
4 | Cookie IP 单个IP或者是cookie文件 请求次数过多,也会减少阅读量,但其实这一步可以用代理IP 但效果不怎么样 |
5 | 用户信息 如(4), 单个用户只能点赞收藏一次 |
阅读量并不能代表什么 点赞等,综合指标才能上去所以有时间还是要想想发布一些优质的博文.