cloudoll 云端玩具(python)

2022-10-25 15:07:41 浏览数 (1)

cloudoll 云端玩具

更新日志

0.1.5 2022-09-19 - 修复logging level 错误的问题 - 修正默认依赖

0.1.4 2022-09-12 - 优化orm 超时的问题 - 增加websocket 支持

安装

代码语言:javascript复制
#for python3 
pip install cloudoll
# or
pip3 install cloudoll

Docs

Server

假设项目目录结构如下:

代码语言:javascript复制
├─controllers
│  ├─__init__.py
│  ├─api
│  │  ├─message.py
│  │  └─__init__.py
│  ├─view
│  │  ├─errors.py
│  │  ├─home.py
│  │  └─__init__.py
├─static
│  ├─css
│  │  └─index.css
│  ├─img
│  │  └─logo.png
│  ├─js
│  │  └─comment.js
├─template
│  │  ├─404.html
│  │  ├─500.html
│  │  └─index.html
│  └─layout
│      ├─footer.html
│      ├─header.html
│      └─index.html
├─app.py
├─configs.py
└─models.py

初始化

代码语言:javascript复制
from cloudoll.web.server import server
async def init(loop=None):
  # await mysql.connect(loop, **MYSQL) # 可以在这里初始化orm
  # tem_path = os.path.join(os.path.abspath("."), "template")
  # static = os.path.join(os.path.abspath("."), "static")
  server.create(
          loop=loop,
          # template=tem_path, #模板目录,可选
          # static=static, # 静态资源 ,测试用
          controllers="controllers", # 路由目录,路由会自动注册
          middlewares=[], # 中间件,可选
      )

  await server.run(port=9000)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init(loop))
    loop.run_forever()

Rest API

get , post, delete ,put
代码语言:javascript复制
# /controllers/api/message.py

from cloudoll.web.server import get,post,delete,put ,jsons

#get
@get('/v2/message/list')
async def list(request,fm):
  return jsons(dict(code=1, msg='ok'))

#post
@post('/v2/message/list')
async def list(request,fm):
  return jsons(dict(code=1, msg='ok'))

#delete
@delete('/v2/message/list')
async def list(request,fm):
  return jsons(dict(code=1, msg='ok'))

#get
@get('/v2/message/list')
async def list(request,fm):
  return jsons(dict(code=1, msg='ok'))

#put
@put('/v2/message/list')
async def list(request,fm):
  return jsons(dict(code=1, msg='ok'))

访问 http://127.0.0.1:9000/v2/message/list , 返回:

代码语言:javascript复制
{
  "code": 1,
  "msg": "ok"
}
传参

formdata ,body ,还是url 传参 ,都通过路由第二个值接收

代码语言:javascript复制
// 前端
var data = new FormData()
data.append('a',1)
data.append('b',2)

$.post('/v2/message/list?id=1&age=20',data=data)
代码语言:javascript复制
# url = '/v2/message/list'
@get('/v2/message/list')
async def list(request,fm):
  id = fm['id'] # 1
  age = fm['age']  # 20
  a = fm['a'] # 1
  b = fm['b'] # 2
  return jsons(dict(code=1, msg='ok'))
上传文件
代码语言:javascript复制
from cloudoll.web.server import post , jsons

@post('/v2/upload/image')
async def upload_image(request,fm):
    file = fm['file']
    if not file:
        return jsons(dict(code='00001', msg='请上传图片'))
    if not file.content_type.startswith('image'):
        return jsons(dict(code='00002', msg='只支持图片上传'))

    content = file.file.read()
    savepath = '/home/chuchur/123.jpg'
    with open(savepath, 'wb') as f:
        f.write(content)

    return jsons(dict(code=0, msg='ok'))
动态路由
代码语言:javascript复制
@get('/v2/message/{id}')
async def list(request,fm):
  id = request.route["id"] #取得路由id 值

  return jsons(dict(code=1, msg='ok'))
Websocket
代码语言:javascript复制
from cloudoll.web.server import get, WebSocket

@get("/v2/ws/test")
async def getrecord(req, fm):

    ws = WebSocket() # 初始化 WebSocket
    await ws.prepare(req)

    async for msg in ws:
        if msg.type == 1:
            text = msg.data # 收到客户端的消息
            if text == "close":
                await ws.close()
            else:
                await ws.send_str('收到消息:' text) #给客户端发送消息
        elif msg.type == 258:
            print("ws connection closed with exception %s" % ws.exception())

    print("websocket connection closed")

    return ws
Seesion
代码语言:javascript复制
from cloudoll.web.server import get

@get('/test')
async def test(request,fm):
  #读取Seesion
  a = request.session.get('a')
  #设置Seesion
  request.session['a'] = 'test'
JWT

JSON Web Token(缩写 JWT)是目前最流行的跨域认证解决方案。

代码语言:javascript复制
from cloudoll.web.server import post, jsons
import cloudoll.web.jwt as jwt

AUTH_KEY = 'fjkdsal&*(%^^&'

@post('/v2/login')
async def test(request,fm):
  # ...
  # 如果账号密码匹配成功
  user = {
    "nick": "chuchur",
    "uid": 100
  }
  # 加密存储
  token = jwt.encode( user, AUTH_KEY, exp=3600 * 24 * 2 ) #有效期两天
  # 返回加密后的 token
  return jsons(dict(code=0 , msg= 'ok' ,token=token))
中间件

下面是个登录验证的例子:

代码语言:javascript复制
from cloudoll.web.server import server, middleware, redirect
import cloudoll.web.jwt as jwt

AUTH_KEY = 'fjkdsal&*(%^^&'

def create_middleware():
    """
    验证token 有效性
    """
    @middleware
    async def middlewares(request, handler):
        try:
            if request.path.startswith("/v2") and request.path != "/v2/login":
                token = request.headers["Authorization"]
                if not token:
                    return jsons(dict(code="00001", msg="登录失效"), status=403)
                else:
                    token = token.replace("bearer", "").strip()
                    user = jwt.decode(token, AUTH_KEY) # JWT解密 token
                    if not user:
                        return jsons(dict(code="00001", msg="登录失效"), status=403)
                    else:
                        request.__user__ = user # 部分数据避免再次查询可以存储起来
            return await handler(request)
        except Exception as e:
            logging.error(e)
            if hasattr(e, "status") and e.get("status") == 404:
                return jsons(dict(code="00004", msg="数据被外星人偷走了!"), status=404)
                # or
                # return redirect("/404")
            else:
                return jsons(dict(code="00005", msg="工程师被外星人偷走了!"), status=500)
                # or
                # return redirect("/500")
            raise

    return middlewares

async def init(loop=None):
    # await mysql.connect(loop, **MYSQL) # 可以在这里初始orm
    tem_path = os.path.join(os.path.abspath("."), "template")
    # static = os.path.join(os.path.abspath("."), "static")

    server.create(
        loop=loop,
        template=tem_path,
        # static=static,
        controllers="controllers",
        middlewares=[create_middleware()], # 可以有多个中间件
    )

    await server.run(port=9000)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init(loop))
    loop.run_forever()

视图模板

默认模板引擎为 jinja2

代码语言:javascript复制
from cloudoll.web.server import get, view

@get('/')
async def home(request, fm):

    data = {
      "user_name":"chuchur",
      "age": 28
    }

    return view(
        template="index.html", #模板名称
        data=data
    )

数据渲染

代码语言:javascript复制
<!-- index.html -->
<!DOCTYPE html>
<html lang="zh">

<head>
  <title>cloudoll demo</title>
  <meta charset="utf-8">
</head>

<body>
  姓名:{{ data.user_name }} <br/>
  年龄:{{ data.age }}
</body>
</html>
Cookie

cookie 渲染视图时可用

代码语言:javascript复制
from cloudoll.web.server import get, view

@get('/')
async def home(request, fm):
    data = {
      "user_name":"chuchur",
      "age": 28
    }

    v = view(
        template="index.html", #模板名称
        data=data
    )
    # 读取 Cookie
    a =  request.cookies.get('a')

    # 设置 Cookie 24 小时
    v.set_cookie('a','100' ,max_age=86400 ,httponly=True)
    # 设置 Cookie 过期
    v.set_cookie('a','-deleted-' ,max_age=0 ,httponly=True)

    return v

Orm

假定数据模型如下:

代码语言:javascript复制
class Users(Model):

    __table__ = 'users'

    id = models.IntegerField(primary_key=True,auto_increment=True,not_null=True,comment='主键')
    user_name = models.CharField(max_length='255',not_null=True,comment='登录用户名')
    password = models.CharField(max_length='255',not_null=True,comment='登录密码')
    age = models.IntegerField(comment='年龄')

数据模型可以通过全局api tables2models 生成,接口说明在后面

初始化

代码语言:javascript复制
from cloudoll.orm import mysql

MYSQL = {
    "debug": False,
    "db": {
        "host": "127.0.0.1",
        "port": 3306,
        "user": "root",
        "password": "abcdefg",
        "db": "test",
        "charset":"utf8mb4"
    }
}

await mysql.connect(loop=None,**MYSQL)

table_name = 'user'

模型API

分页查询
代码语言:javascript复制
page = 1
size = 20
items = await Users.findAll(where="uid=?" ,
              cols=['id'], #列,默认*
              limit=size,
              offset=(page-1) * size
              orderBy="id desc"
              params=[100])
主键查询
代码语言:javascript复制
# 查询id为2的用户
item = await Users(id=2).find()
条件查询
代码语言:javascript复制
# 查询id为2 , user_name 为chuchur的用户
item = await Users(id=2, user_name="chuchur").findBy()
主键更新
代码语言:javascript复制
# 更新id为2的用户,把user_name 更新为chuchur
item = await Users(id=2, user_name="chuchur").update()
# or
item = await Users(id=2, user_name="chuchur").save()
条件更新
代码语言:javascript复制
# 更新user_name 包含 mayun 的用户, 把user_name 更新为 啤酒云
result = await Users.updateAll( 
      where="user_name like %?%" , 
      params=['mayun'] ,
      user_name="啤酒云")
主键删除
代码语言:javascript复制
# 删除id为2的用户
result = await Users(id=2).delete()
条件删除
代码语言:javascript复制
# 删除user_name 包含 mayun 的用户
result = await Users.deleteAll(where="user_name like %?%" , params=['mayun'])
新增
代码语言:javascript复制
# 包含主键会 执行更新,不包含执行新增
user = {
  "user_name":"chuchur",
  "password":"1234",
  "age":28
}
result = await Users(**user).save()
统计
代码语言:javascript复制
# 统计id大于30的用户
count = await Users.count(where="id>30")
# output 30
判断存在
代码语言:javascript复制
# 判断id为1的用户 是否存在
result = await Users.exists(where="uid=1")
# output True or False

全局API

分页查询
代码语言:javascript复制
res = msyql.lists(table_name ,
    cols:['uid','age'],
    where="age>? and name like %?% ",
    limit=10,
    offset=20 ,
    params=[15,'mayun'])

for item in res:
  print(item)
唯一查询
代码语言:javascript复制
item = mysql.find(table_name,where="uid=1")
条件唯一查询
代码语言:javascript复制
# findBy(table_name,key,value)

item = mysql.findBy(table_name,'uid',1)
新增
代码语言:javascript复制
data = {
  "name": '马云',
  "age": 1
}
result = mysql.insert(table_name,**data)

print(result)
# output : { id: 101}
修改
代码语言:javascript复制
data = {
  "id":100,
  "name": '马云',
  "age": 1
}
# pk 为主键
result = mysql.update(table_name,pk=id, **data)
print(result)
# output : True
保存
代码语言:javascript复制
data = {
  "id":100,
  "name": '马云',
  "age": 1
}
# pk 为主键,有主键会自动执行更新,否则为新增
result = mysql.save(table_name,pk=id, **data)
print(result)
# output : True
删除
代码语言:javascript复制
result = mysql.delete(table_name,where="uid=100")
print(result)
# output : True
统计
代码语言:javascript复制
result = mysql.count(table_name,where="age>30")
print(result)
# output : 100
批量修改
代码语言:javascript复制
# 符合条件的数据的 name 修改为 啤酒云
result = mysql.updateAll(table_name,
        where="name=? and age=1" ,
        params=['mayun'] ,
        name="啤酒云")
print(result)
# output : True
判断
代码语言:javascript复制
result = mysql.exists(table_name,
        where="name=? and age=1" ,
        params=['mayun'] ,
        name="马云")
print(result)
# output : True
Table 转 Model

sql 如下:

代码语言:javascript复制
DROP TABLE IF EXISTS `users`;
CREATE TABLE `users`  (
  `id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '登录用户名',
  `password` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '登录密码',
  `avatar` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '头像',
  `type` int NOT NULL DEFAULT 1 COMMENT '账户类型',
  `state` int NOT NULL DEFAULT 1 COMMENT '账户状态',
  `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `last_login_ip` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '最后登录IP',
  `nick_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '帐号昵称',
  `email` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '联系邮件',
  `birthday` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '生日',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

执行建表sql 之后,就可以通过 tables2models 把表转为 models

代码语言:javascript复制
# tables2models([表1,表2...],模型保存路径)
# 不传表,将导出所有的表模型

await mysql.tables2models(['users'],savepath= '/home/chuchur/models.py')

得到数据模型:

代码语言:javascript复制
from cloudoll.orm.mysql import models, Model

class Users(Model):

    __table__ = 'users'

    id = models.IntegerField(primary_key=True,auto_increment=True,not_null=True,comment='主键')
    user_name = models.CharField(max_length='255',not_null=True,comment='登录用户名')
    password = models.CharField(max_length='255',not_null=True,comment='登录密码')
    avatar = models.CharField(max_length='255',comment='头像')
    user_type = models.IntegerField(default='1',not_null=True,comment='账户类型')
    state = models.IntegerField(default='1',not_null=True,comment='账户状态')
    created_at = models.DatetimeField(default='CURRENT_TIMESTAMP',not_null=True,comment='创建时间')
    last_login_ip = models.CharField(max_length='255',comment='最后登录IP')
    nick_name = models.CharField(max_length='255',comment='帐号昵称')
    email = models.CharField(max_length='255',comment='联系邮件')
    birthday = models.CharField(max_length='255',comment='生日')

http 爬虫模块

爬取网页

代码语言:javascript复制
from cloudoll.web import http

result  =  http.get('https://baidu.com')
print(result)

# output:  <html>....</html>

请求rest api

支持 post ,delete , put ,head ,option

代码语言:javascript复制
# 错误次数 尝试5次
json = http.get('https://api.xxxx.com/v2/xxxx' ,trytimes=5)
print(json)

# output : { code: 0, message: 'ok'}

传参

代码语言:javascript复制
# get url 传参
params=dict(a=1,b=2)
http.get('https://api.xxxx.com/v2/xxxx',params=params)

# get payload 传参
params=dict(a=1,b=2)
http.get('https://api.xxxx.com/v2/xxxx',data=params)

# post 传参
data=dict(a=1,b=2)
http.get('https://api.xxxx.com/v2/xxxx',json=data)

# 上传文件
def upload():
    path = '/home/chuchur/123.jpg'
    with open(path, "rb") as f:
        files = {"file": f}
        # or
        # files = {"file": f, "a": (None, 1), "file_type": (None, 'image')}

        headers = {"authorization": AUTH}
        req = http.post(
            "https://xxx.abc.com/v1/api/upload",
            files=files,
            data={"a": 1 , "file_type": "image" }, # 带其它参数
            headers=headers,
        )

        if req["code"] == 0:
            logging.info("upload ok")
            return True
        else:
            logging.info("upload faild")
            return False

html 简易解析器

代码语言:javascript复制
from cloudoll.web.html import parser
from cloudoll.web import http

result  =  http.get('https://baidu.com')
print(result)
# output:  <html>....</html>

ps = parser().parser(result)

# 拿到所有 text
text = ps.text

# 拿到所有 图片链接
images = ps.images

# 拿到所有 视频链接
videos = ps.videos

下载文件

代码语言:javascript复制
src = 'https://www.baidu.com/img/flexible/logo/pc/result.png'
savepath = '/home/chuchur/download/baidu-logo.png'
http.download(src,savepath)

代理/头/cookies

代码语言:javascript复制
url = 'https://xxx.xxx.com'

proxies = {
  'http':'127.0.0.1',
  'https':'127.0.0.1'
}

headers = {
  'token':'xxxxxx'
}

cookies = {
  'username':'admin'
}

data = http.get(url,headers=headers ,cookies=cookies ,proxies=proxies)

logging

日志辅助 ,生成日志文件, 控制台打印彩色文字

代码语言:javascript复制
# /home/chuchur/work/test.py

from cloudoll import logging

logging.getLogger(__name__)
# or
# logging.getLogger(__file__)


logging.debug('I am debug...')
logging.info('I am info...')
logging.warning('I am warning...')
logging.error('I am error...')
logging.critical('I am critical...')

控制台:

日志文件:**-all.log

代码语言:javascript复制
2022-07-26 18:36:27-root-__init__.py-[line:151]-DEBUG-[日志信息]: I am debug...
2022-07-26 18:36:27-root-__init__.py-[line:149]-INFO-[日志信息]: I am info...
2022-07-26 18:36:27-root-__init__.py-[line:153]-WARNING-[日志信息]: I am warning...
2022-07-26 18:36:27-root-__init__.py-[line:155]-ERROR-[日志信息]: I am error...
2022-07-26 18:36:27-root-__init__.py-[line:157]-CRITICAL-[日志信息]: I am critical...

日志文件:**-error.log

代码语言:javascript复制
2022-07-26 18:36:27-root-__init__.py-[line:155]-ERROR-[日志信息]: I am error...

Mail

快速配置发送邮件

代码语言:javascript复制
# test_mail.py

from cloudoll.mail import smtp

MAIL = {
    "smtp_server": "smtp.qq.com",
    "account": "123456789@qq.com",
    "account_name": "chuchur",
    "password": "abcdefg",
    "prot": 465,  # 587
    "debug_level": 1,
}

client = smtp.Client(**MAIL)
# 标题
client.subject = "test title"
# 正文
client.content = "long long ago..."
#收件人
client.add_to_addr("chuchur", "chuchur@qq.com")

# 发送
client.send()

多个收件人

代码语言:javascript复制
client.add_to_addr("李彦宏", "liyanhong@baidu.com")
client.add_to_addr("马云", "jackma@alibaba.com")

附件

代码语言:javascript复制
filepathA = '/home/chuchur/img/a.jpg'
filepathB = '/home/chuchur/img/b.jpg'

client.addfile(filepathA)
client.addfile(filepathB)

嵌入html 和 html 调用附件

代码语言:javascript复制
client.addfile("/home/chuchur/img/a.jpg") # cid 0
client.addfile("/home/chuchur/img/b.jpg") # cid 1
client.addhtml("<html><body><h1>Hello</h1>"   '<p><img src="cid:0"><img src="cid:1"></p>'   "</body></html>")

Robot

快速接入 钉钉,飞书机器人

钉钉机器人

代码语言:javascript复制
from cloudoll.robot import dingtalk

webhook = '机器人地址'
secret = '机器人密钥'
access_token = '机器人token' # 可以不设定,上传文件必填

client = dingtalk.Client(
    webhook=webhook,
    secret=secret,
    access_token=access_token,
)

client.sendtext("代码出bug了!")

飞书机器人

代码语言:javascript复制
from cloudoll.robot import feishu

webhook = '机器人地址'
secret = '机器人密钥'

client = feishu.Client(
    webhook=webhook,
    secret=secret,
)

client.sendtext("代码出bug了!")

0 人点赞