微信小程序-云开发 数据库http接口封装

2020-05-20 15:24:43 浏览数 (1)

最近学习一下微信小程序的云开发,作为serverless架构,着实省去很多麻烦,省的搞https证书了,也不用写api,但是总是要建后台系统的,云开发的数据库是nosql,为了用一下分页,我就封装了一下接口,如下

代码语言:txt复制
import requests
import json
from flask_sqlalchemy import Pagination

class WeChatCloudDB(object):
    """
    微信小程序云开发相关接口
    https://developers.weixin.qq.com/miniprogram/dev/wxcloud/reference-http-api/
    """
    def __init__(self, env, access_token):
        self.env = env
        self.access_token = "access_token="   access_token
        self.query = ''
        self.url = ''
        self.method = ''

    def paginate(self, page=None, per_page=None, max_per_page=100):
        if page is None:
            page = 1

        if per_page is None:
            per_page = 20

        if max_per_page is not None:
            per_page = min(per_page, max_per_page)

        if page < 1:
            page = 1

        if per_page < 0:
            per_page = 20

        self.query  = '.limit({0}).skip({1})'.format(per_page, (page-1)*per_page)
        items = self.commit()
        return Pagination(self, page, per_page, self.total, items)

    def commit(self):
        url = self.url   '?'   self.access_token
        data = {'env': self.env, 'query': self.query '.get()' if self.method=='get' else self.query}
        response = requests.post(url, data=json.dumps(data))
        results = json.loads(response.text)
        if results and results.get('errcode')==0:
            pager = results.get('pager')
            if pager:
                self.total = pager.get('Total')
            if results.get('data'):
                return results.get('data')
            if results.get('modified'):
                return results.get('modified')
            if results.get('deleted'):
                return results.get('deleted')
            if results.get('count'):
                return results.get('count')
            if results.get('id_list'):
                return results.get('id_list')
            
        else:
            print(results.get('errcode'), results.get('errmsg'))
            return []
        
    def get(self, table, _id, **kwargs):
        self.method = 'get'
        self.url = 'https://api.weixin.qq.com/tcb/databasequery'
        self.query = 'db.collection("{}")'.format(table)
        if _id:
            self.query  = '.doc("{}")'.format(_id)
        elif kwargs:
            filter_str = ''
            for k,v in kwargs.items():
                if isinstance(v, str):
                    filter_str  = '{0}:"{1}"'.format(k, v)
                elif isinstance(v, bool):
                    filter_str  = '{0}:{1}'.format(k, v and 'true' or 'false')
                else:
                    filter_str  = '{0}:{1}'.format(k, v)

            self.query  = '.where({%s})' % filter_str
        return self
   
    def update_one(self, table, _id, **kwargs):
        self.method = 'update'
        self.url = 'https://api.weixin.qq.com/tcb/databaseupdate'
        self.query = 'db.collection("{0}").doc("{1}")'.format(table, _id)
        if kwargs:
            filter_list = []
            for k,v in kwargs.items():
                if isinstance(v, str):
                    filter_list.append('{0}:"{1}"'.format(k, v))
                elif isinstance(v, bool):
                    filter_list.append('{0}:{1}'.format(k, v and 'true' or 'false'))
                else:
                    filter_list.append('{0}:{1}'.format(k, v))
            filter_str = ','.join(filter_list)
            self.query  = '.update({data:{%s}})' % filter_str
        return self
    
    def delete_one(self, table, _id):
        self.method = 'delete'
        self.url = 'https://api.weixin.qq.com/tcb/databasedelete'
        self.query = 'db.collection("{0}").doc("{1}").remove()'.format(table, _id)
        return self

    def add_one(self, table, **kwargs):
        self.method = 'add'
        self.url = 'https://api.weixin.qq.com/tcb/databaseadd'
        self.query = 'db.collection("{}")'.format(table)
        if kwargs:
            filter_list = []
            for k,v in kwargs.items():
                if isinstance(v, str):
                    filter_list.append('{0}:"{1}"'.format(k, v))
                elif isinstance(v, bool):
                    filter_list.append('{0}:{1}'.format(k, v and 'true' or 'false'))
                else:
                    filter_list.append('{0}:{1}'.format(k, v))
            filter_str = ','.join(filter_list)
            self.query  = '.add({data:[{%s}]})' % filter_str
        return self 
    
    def count(self, table, **kwargs):
        self.method = 'count'
        self.url = 'https://api.weixin.qq.com/tcb/databasecount'
        self.query = 'db.collection("{}")'.format(table)
        if kwargs:
            filter_str = ''
            for k,v in kwargs.items():
                if isinstance(v, str):
                    filter_str  = '{0}:"{1}"'.format(k, v)
                elif isinstance(v, bool):
                    filter_str  = '{0}:{1}'.format(k, v and 'true' or 'false')
                else:
                    filter_str  = '{0}:{1}'.format(k, v)

            self.query  = '.where({%s})' % filter_str
        self.query  = '.count()' 
        return self

link:https://github.com/xiaoyeqiannian/alita/blob/master/util/wx_api.py

0 人点赞