大家好,又见面了,我是你们的朋友全栈君。
python selenium 实现,定时自动结算购物车,邮件提醒结果。
说是秒杀脚本,但其实根本做不到毫秒级(看很多文章写毫秒级也是跪了)。自己在mac上实测,大约10s左右会收到邮件。用selenium秒杀是不要想了,用作自动提交订单,还算ok。
整个流程比较简单,扫码登陆,跳转到购物车界面,不停刷新,等时间差不多到了,全选,提交订单,做个练手脚本,简单实用。
通知截图:
整个代码如下,邮箱相关参数读取部分懒得写全了,就这样吧。
代码语言:javascript复制# -*- coding: UTF-8 -*-
import argparse
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import datetime
import time
import logging
import sys
from retry import retry
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(funcName)s - %(levelname)s - %(message)s')
logger = logging.getLogger('Buyer')
class EmailInfo(object):
def __init__(self):
self.port = 25
self.mail_host = 'xxxxxxxxx'
self.mail_user = 'xxxxxxxxx'
self.mail_pass = 'xxxxxxx'
self.sender = 'xxxxxx'
self.receivers = 'xxxxxxxxx'
class retryTimes():
def __init__(self, retry_times=3):
self.retry_times = retry_times
class BUY_MASHINE():
def __init__(self, buy_time, max_retry=3):
self.BUY_TIME = buy_time
self.BUY_TIME_OBJECT = datetime.datetime.strptime(self.BUY_TIME, '%Y-%m-%d %H:%M:%S')
self.LOG_STATUS = 0 # 未登录
self.MAX_RETRY = max_retry
self.TAOBAO = "https://login.taobao.com/"
self.MAX_SUBMIT = 50
#self.BUY_TYPE = buy_type # 0:秒杀 1:到点改价
self.Email = EmailInfo()
def init(self):
self.option = Options()
self.option.add_argument('disable-infobars') # 隐藏 启动浏览器出现‘Chrome正在受到自动软件的控制'
self.option.add_argument('log-level=3') # LOG_FATAL
self.option.add_experimental_option('excludeSwitches', ['enable-automation'])
# prefs = {"profile.managed_default_content_settings.images": 2}
# self.option.add_experimental_option("prefs", prefs)
self.driver = webdriver.Chrome(chrome_options=self.option)
#self.driver.maximize_window()
def switch_to_qr_load(self):
# 点击扫码界面
locator_qr = (By.ID, 'J_Static2Quick')
WebDriverWait(self.driver, 30, 0.5).until(EC.presence_of_element_located(locator_qr))
logger.info('二维码已载入')
qr = self.driver.find_element_by_id('J_Static2Quick')
if qr.is_displayed():
logger.info('账号密码已显示')
ActionChains(self.driver).move_to_element_with_offset(qr, 0, 0).click().perform()
# ActionChains(dr).move_to_element(ma).click().perform() # 此方法无效
logger.info('已切换为二维码登录')
# 请点击刷新
# 扫描成功
def login(self):
logger.info("开始登陆===>>>")
self.driver.get(self.TAOBAO)
time.sleep(1)
max_tries=retryTimes(self.MAX_RETRY)
@retry(tries=self.MAX_RETRY, delay=1)
def login_with_retry(**kargs):
cnt = 0
max_tries.retry_times -= 1
for key in kargs.keys():
kargs[key]
while True:
if cnt == 60: # 1min 刷新
if max_tries.retry_times > 0:
logger.info("retrying...")
raise NotImplementedError('login')
else:
return False
if self.driver.current_url != self.TAOBAO:
logger.info('扫描成功')
# page_source = self.driver.page_source
# logger.debug(page_source)
return True
time.sleep(1)
cnt = 1
return login_with_retry(func1=self.driver.refresh(), func2=self.switch_to_qr_load())
def working(self):
self.driver.get("https://cart.taobao.com/cart.htm")
time.sleep(1)
self.wait()
self.driver.get("https://cart.taobao.com/cart.htm")
time.sleep(1)
# 点击购物车里全选按钮
if self.driver.find_element_by_id("J_SelectAll1"):
self.driver.find_element_by_id("J_SelectAll1").click()
logger.info("已经选中购物车中全部商品 ...")
self.buying()
def buying(self):
max_submit = retryTimes(self.MAX_SUBMIT)
# 60~31
@retry(tries=-1)
def submit_with_retry(driver):
try:
driver.find_element_by_link_text('提交订单').click()
logger.info("已经点击提交订单按钮")
return True
except Exception as e:
if max_submit.retry_times <= 0:
logger.info("提交订单失败...")
return False
logger.info("没发现提交订单按钮,可能页面还没加载出来,重试...")
raise NotImplementedError('submit')
finally:
max_submit.retry_times -= 1
@retry(tries=-1)
def buying_with_retry(driver, deadline, ending=60):
now = datetime.datetime.now()
if now >= self.BUY_TIME_OBJECT:
# 点击结算按钮
if self.driver.find_element_by_id("J_Go"):
self.driver.find_element_by_id("J_Go").click()
logger.info("已经点击结算按钮...")
return submit_with_retry(driver)
if (now - deadline).seconds >= ending:
logger.info("Orz...buying failed at {}".format(now))
return False
else:
logger.info('finally waiting...')
raise NotImplementedError('buying')
ret = buying_with_retry(self.driver, self.BUY_TIME_OBJECT)
self.sendMail(ret, datetime.datetime.now())
return ret
def wait(self):
while True:
currentTime = datetime.datetime.now()
if (self.BUY_TIME_OBJECT - currentTime).seconds > 90:
self.driver.refresh()
logger.info("刷新购物车界面,防止登录超时...")
time.sleep(60)
else:
logger.info("抢购时间点将近,停止自动刷新,准备进入抢购阶段...")
break
def sendMail(self, Result, ok_time):
class retry_times():
num = 3
b_succeed = False
tries = retry_times()
message = MIMEMultipart('mixed')
message['Subject'] = '秒杀通知'
message['From'] = self.Email.sender
message['To'] = self.Email.receivers
if Result:
text = '{} 秒杀成功,请尽快付款~'.format(ok_time)
else:
text = '{} 秒杀失败,省钱啦~'.format(ok_time)
text_plain = MIMEText(text, 'plain', 'utf-8')
message.attach(text_plain)
@retry(tries=3, delay=30, backoff=2)
def sendMailKernel():
tries.num -= 1
try:
tries.b_succeed = True
smtpObj = smtplib.SMTP()
smtpObj.connect(self.Email.mail_host, self.Email.port)
smtpObj.ehlo()
smtpObj.starttls()
smtpObj.login(self.Email.mail_user, self.Email.mail_pass)
smtpObj.sendmail(
self.Email.sender, self.Email.receivers, message.as_string())
smtpObj.quit()
logger.info('send mail succeed.')
except smtplib.SMTPException as e:
tries.b_succeed = False
logger.info('send mail failed')
if tries.num > 0:
logger.info('SendEmail retry')
raise NotImplementedError('Email')
sendMailKernel()
return tries.b_succeed
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--buy_time', type=str, help='buy time like '2019-11-11 00:00:00'')
parser.add_argument('--login_retry', type=int, help='max retry login times.(default: 3)', default=3)
parser.add_argument('--email_reminder', type=bool, help='use email to notice result.(default: False)', default=False)
parser.add_argument('--port', type=int, help='set email port.(default: 25)', default=25)
parser.add_argument('--mail_host', type=str, help='email host.(default: 163--'smtp.163.com')', default='smtp.163.com')
parser.add_argument('--mail_count', type=str, help='email count.')
parser.add_argument('--mail_pass', type=str, help='email password.')
parser.add_argument('--mail_receivers', type=str, help='receive email count.(default: sender)')
args = parser.parse_args()
if not args.buy_time:
logger.info("buy_time must specify.")
sys.exit(-1)
buy_time = args.buy_time
max_retry = args.login_retry
buyer = BUY_MASHINE(buy_time, max_retry)
buyer.init()
buyer.login()
buyer.working()
参考链接:https://www.jianshu.com/p/5fe9bd8f7674
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/134376.html原文链接:https://javaforall.cn