python-best-practices

2021-12-06 13:15:19 浏览数 (1)

文章目录

    • 导入
      • 新增路径导入
    • 语句
      • 单行 if else 语句
    • 字符串
      • 列表拼接为字符串
      • 去除字符串两边空格
    • 列表处理
      • 列表倒序
    • 时间
      • 当地时间
        • datetime 对象转换成字符串
        • 字符串转换成 datetime 对象
        • 时间戳转换成字符串
        • 字符串转换成时间戳
        • datetime 对象转换成时间戳
        • 时间戳转为当地时间字符串
      • UTC 时间
        • UTC datetime 对象转换成北京时间字符串
        • UTC 时间字符串转换成北京时间字符串
        • 时间戳转为 UTC 时间字符串
        • UTC 字符串转为时间戳
    • 网络请求
      • POST 请求
      • GET 请求
      • GET 请求封装
    • 并发
      • 多线程
      • GET 请求
    • 函数
      • 解包传参
    • 功能
      • 防止脚本重复运行
      • 精确小数
    • 日志
      • 日志打印
    • 单元测试
    • 数据库交互
      • ORM(对象-关系映射) - peewee
    • 文件
      • 打开文件
    • 装饰器
      • 函数运行显示运行日志
      • 函数报错显示错误跟踪
    • 常用代码
      • 两数组取交集
      • 生成 hash
      • 执行指令
    • 总结
      • requests 异常处理
      • 静态检查

导入

新增路径导入

代码语言:javascript复制
import sys

sys.path.append('project_root_path/')

语句

单行 if else 语句

代码语言:javascript复制
name = "kangkang" if age >18 else "maria"

字符串

列表拼接为字符串

代码语言:javascript复制
name_list = ["kangkang", "maria"]
str_name = ",".join(name_list)

# 结果为 str_name = "kangkang,maria"

去除字符串两边空格

代码语言:javascript复制
raw_name = "test "
name = raw_name.strip()

# 结果为 name = "test"

列表处理

列表倒序

代码语言:javascript复制
score_list = [91, 92, 78, 88]
reverse_score_list = score_list[::-1]

时间

当地时间

datetime 对象转换成字符串
代码语言:javascript复制
str_time = datetime_obj.strftime('%Y-%m-%d %H:%M:%S')
字符串转换成 datetime 对象
代码语言:javascript复制
datetime_obj = datetime.strptime(str_time, '%Y-%m-%d %H:%M:%S')
时间戳转换成字符串
代码语言:javascript复制
def timestamp_to_str_time(timestamp, _format="%Y-%m-%d %H:%M:%S"):
    """获得字符串时间

    :param timestamp: 时间戳
    :param _format: 时间格式
    :return: 字符串时间
    """

    struct_time = time.gmtime(timestamp)
    str_time = time.strftime("%Y-%m-%d %H:%M:%S", struct_time)

    return str_time
字符串转换成时间戳
代码语言:javascript复制
def str_time_to_timestamp(str_time, _format='%Y-%m-%d %H:%M:%S'):
    """格式化时间转化为Unix时间戳
    
    :param str_time: 字符串时间
    :param _format: 时间格式
    :return:
    """
    struct_time = time.strptime(str_time, _format)
    timestamp = time.mktime(struct_time)

    return timestamp
datetime 对象转换成时间戳
代码语言:javascript复制
from datetime import datetime

datetime.timestamp(datetime_obj)
时间戳转为当地时间字符串
代码语言:javascript复制
string_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp))

UTC 时间

UTC datetime 对象转换成北京时间字符串
代码语言:javascript复制
def utc_datetime_obj_to_bj_str_time(utc_datetime_obj):
    """ UTC datetime 对象转北京字符串时间

    :param utc_datetime_obj: UTC datetime 对象
    :return: 北京字符串时间
    """

    bj_datetime_obj = utc_datetime_obj   timedelta(hours=8)
    bj_str_time = bj_datetime_obj.strftime('%Y-%m-%d %H:%M:%S')

    return bj_str_time
UTC 时间字符串转换成北京时间字符串
代码语言:javascript复制
def utc_str_time_to_bj_str_time(utc_str_time):
    """ UTC 字符串时间转北京字符串时间

    :param utc_str_time: UTC 字符串时间
    :return:
    """

    utc_datetime_obj = datetime.strptime(utc_str_time, '%Y-%m-%d %H:%M:%S')
    bj_datetime_obj = utc_datetime_obj   timedelta(hours=8)
    bj_str_time = bj_datetime_obj.strftime('%Y-%m-%d %H:%M:%S')
    return bj_str_time
时间戳转为 UTC 时间字符串
代码语言:javascript复制
utc_time = datetime.utcfromtimestamp(timestamp)
utc_string_time = utc_time.strftime("%Y-%m-%d %H:%M:%S")
UTC 字符串转为时间戳
代码语言:javascript复制
import time
from datetime import datetime, timezone

str_time = "2020-06-02 04:58:49"
struct_time = datetime.strptime(str_time, "%Y-%m-%d %H:%M:%S")
utc_struct_time = struct_time.replace(tzinfo=timezone.utc)
timestamp = utc_struct_time.timestamp()

网络请求

POST 请求

代码语言:javascript复制
api_url = base_url   'login'
body_json = {
    'username': '123',
    'password': '123'
}

res_api = requests.post(url=api_url, data=json.dumps(body_json))

GET 请求

代码语言:javascript复制
api_url = base_url   'is_login'
params_json = {
    'key1': 'value1',
    'key2': 'value2'
}

ret = requests.get(url=api_url, params=params_json)

GET 请求封装

使用场景:适用于解析 Json API。 封装特性:异常获取后延迟重试。

代码语言:javascript复制
import time
import urllib3
from functools import wraps

import requests

from loguru import logger


TIME_OUT_SEC = 120


urllib3.disable_warnings()
logger.add("get_method.log",
           format="{time} {level} {message}", level="INFO")


def retry(exception_to_check, tries=3, delay=0.1):
    """
    Request an attempt to retry the decorator

    Args:
        exception_to_check: Exception to check
        tries: Number of attempts, default is 3
        delay: Pause time, default is 0.1

    Returns:
        Decorator functions
    """

    def deco_retry(f):
        @wraps(f)
        def f_retry(*args, **kwargs):
            mtries = tries
            while mtries > 0:
                try:
                    return f(*args, **kwargs)
                except exception_to_check as e:
                    logger.warning(e)
                    time.sleep(delay)
                    mtries -= 1

            try:
                return f(*args, **kwargs)
            except exception_to_check as e:
                logger.error('Fatal Error: {}'.format(e))

        return f_retry

    return deco_retry


@retry(exception_to_check=Exception, tries=3, delay=5)
def get_method(url, params=None):
    logger.info(f"[get_method] url:{url}, params: {params}")
    res = requests.get(url=url, params=params, timeout=TIME_OUT_SEC, verify=False)
    if res.status_code == 200:
        return res.json()
    raise Exception

并发

多线程

代码语言:javascript复制
with ThreadPoolExecutor(max_workers=2 * multiprocessing.cpu_count()   1) as executor:
    furs = executor.map(fuc_1, data_list)
    for fur in furs:
        print(fur.exception())  # 用于异常调试,抛出异常。

GET 请求

代码语言:javascript复制
import asyncio
import aiohttp
from loguru import logger

urllib3.disable_warnings()

class AsyncGetMethod:
    def __init__(self, url, params_list):
        self.url = url
        self.params_list = params_list
        self.results = []
        self.max_threads = 60

    async def handle_tasks(self, work_queue):
        while not work_queue.empty():
            current_params = await work_queue.get()
            try:
                await self.get_results(current_params)
            except Exception as e:
                logger.error('Error for {} {}'.format(current_params, e), exc_info=True)

    async def get_results(self, current_params):
        html = await self.get_body(current_params)
        self.results = self.results   html['transactions']

    @retry(ExceptionToCheck=Exception, tries=3, delay=5)
    async def get_body(self, current_params):
        async with aiohttp.ClientSession() as session:
            async with session.get(url=self.url, params=current_params, timeout=120, verify_ssl=False) as response:
                return await response.json()

    def eventloop(self):
        q = asyncio.Queue()
        [q.put_nowait(params) for params in self.params_list]
        loop = asyncio.new_event_loop()
        tasks = [self.handle_tasks(q, ) for task_id in range(self.max_threads)]
        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()

a = AsyncGetMethod(get_api_url,
                       [{'parameter1': value1, 'parameter2': value2} for value1 in value1_list])
a.eventloop()
print(a.results)

函数

解包传参

代码语言:javascript复制
def func_1(**property_dic):
	print(property_dic)

func_1(field_1=1, field_2=2)
# 输出结果:property_dic={field_1: 1, field_2: 2}

功能

防止脚本重复运行

代码语言:javascript复制
def exec_cmd(cmd):
    """执行指令的函数
    
    :param cmd: 指令
    :return:
    """
    res = os.popen(cmd)
    res_text = res.read()
    res.close()

    return res_text

def script_scheduler():
    """脚本调度
    调度方案:防止两个监控进程同时在跑

    :return:
    """
    filename = os.path.basename(__file__)
    ps_cmd = "ps -ef|grep "   filename   "|grep -v grep| wc -l"
    cmd_res = exec_cmd(ps_cmd)
    running_len = int(cmd_res.split('n')[0].strip())

    if running_len > 2:
        class_log.info('%s is running' % filename)
    else:
        pass

精确小数

代码语言:javascript复制
PRECISE_DECIMAL_DIGIT = 4


def precise_decimal(_value):
    """精确小数

    :param _value: 需要精确小数的值
    :return: 精确小数后的值
    """

    return round(_value, PRECISE_DECIMAL_DIGIT)

日志

日志打印

代码语言:javascript复制
pip3 install loguru
代码语言:javascript复制
from loguru import logger

# 添加日志配置
logger.add("[日志路径]",
           format="{time} {level} {message}", level="INFO")


logger.info("这是一条 info 日志")

# Traceback 记录
@logger.catch
def my_function(x, y, z):
    # An error? It's caught anyway!
    return 1 / (x   y   z)

单元测试

单元测试保证后续对代码的更改不会引入新的 Bug。

代码语言:javascript复制
import unittest

class Testxxx(unittest.TestCase):
    
    def test_xxx(self):

        self.assertEqual(xxx, True)  # 当 xxx 与 True 不一致时报错
        self.assertNotEqual(xxx, 0)  # 当 xxx 为 0 时报错

数据库交互

ORM(对象-关系映射) - peewee

代码语言:javascript复制
from peewee import *

# 连接数据库
MYSQL_DB = MySQLDatabase(host=conf.MYSQL_IP, 
                        port=conf.MYSQL_PORT,
                        user=conf.MYSQL_USERNAME,
                        passwd=conf.MYSQL_PASSWD,
                        database=conf.MYSQL_DB)

class BaseModel(Model):
    """基础模型类"""

    class Meta:
        database = MYSQL_DB


class xxxRecord(BaseModel):
    """xxxRecord 模型类"""
    pass

    class Meta:
        db_table = 'xxx_record'
        constraints = [SQL('UNIQUE KEY(field1, field2)')]

# 查询数据
xxx_data = xxxRecord.get(field1=field1_value)
xxx_value = xxx_data.value

文件

打开文件

代码语言:javascript复制
with open(FILE_PATH   FILE_NAME, 'r') as xxx_file:
    for xxx_file_line in xxx_file.readlines():
        pass

装饰器

函数运行显示运行日志

代码语言:javascript复制
import functools

from loguru import logger

def func_run_log(func):
    """函数运行显示运行日志"""

    @functools.wraps(func)
    def wrapper(*args, **kw):
        logger.info('调用函数: %s, 参数: %s' % (func.__name__,
                                          kw))
        return func(*args, **kw)
    return wrapper

函数报错显示错误跟踪

代码语言:javascript复制
import functools
import traceback

from loguru import logger

def validate(f):
    @functools.wraps(f)
    def decorator(*args,**kw):
        try:
            data = f(*args,**kw)
        except Exception as e:
            logger.error('发生错误:{}',traceback.format_exc())
            data = None
        return data
    return decorator

常用代码

两数组取交集

代码语言:javascript复制
def get_intersection(list1, list2):
    return [x for x in list1 if x in list2]

get_intersection(["1", "2", "3", "5"], ["2", "4", "5"])

生成 hash

代码语言:javascript复制
# 生成 xxx_hash_id
raw_string = 字段1   字段2   str(int(time.time()))
bytes_string = bytes(raw_string, encoding="utf8")
xxx_hash_id = hashlib.md5(bytes_string).hexdigest()

执行指令

代码语言:javascript复制
import os

def execution_command(command):
    """执行指令
    :param command: 指令
    :return:
    """

    res = os.popen(command)
    res_text = res.read()
    res.close()

    return res_text

总结

requests 异常处理

  • try…except 处理理连接异常
  • 状态码判断 处理理访问异常
  • res.text 判断 处理理数据接收异常
  • 数据返回状态码判断 处理理 API 数据返回异常

静态检查

  • 对代码运行 pylint。

0 人点赞