使用aioredis操作列表对象:插入、批量插入和数据检索

2023-11-15 15:55:37 浏览数 (1)

哈哈哈~~~

这篇文章只讲如何使用aioredis操作列表和列表的基本属性。

redis列表可以应用于代理池等功能上。

最后将代码封装一下, 随便学一下封装方法, 写的多了, 慢慢就熟练了

插入单个元素

将字符串 'element1' 插入到 Redis 列表 my_list 的左侧。

代码语言:javascript复制
# -*- coding: utf-8 -*-
# @Author: Mehaei
# @Date:   2023-11-11 17:40:14
# @Last Modified by:   Mehaei
# @Last Modified time: 2023-11-11 17:43:13
import asyncio
import aioredis
async def insert_single_element():
    """
    插入单个元素
    """
    redis = await aioredis.from_url('redis://localhost')
    await redis.lpush('my_list', 'element1')
    await redis.close()

批量插入多个元素

有的时候会批量插入,这个示例将列表 elements 中的多个元素一次性插入到 Redis 列表my_list的左侧。

代码语言:javascript复制
async def insert_multiple_elements():
    """
    插入多个元素
    """
    redis = await aioredis.from_url('redis://localhost')
    elements = ['element1', 'element2', 'element3']
    await redis.lpush('my_list', *elements)
    await redis.close()

获取数据并转换为字符串

要检索 Redis 列表中的数据并将其转换为字符串,您可以使用 lrange 方法和 Python 的 str() 函数。这个示例使用 lrange 获取 Redis 列表 my_list 中的所有元素,然后将它们转换为字符串并打印出来。:

代码语言:javascript复制
async def retrieve_and_convert():
    """
    取出多个元素
    """
    redis = await aioredis.from_url('redis://localhost')
    # 按元素索引获取
    result = await redis.lrange('my_list', 0, -1)
    await redis.close()
    # 将结果列表转换为字符串
    result_as_strings = [str(item, 'utf-8') for item in result]
    print(result_as_strings)

使用

代码语言:javascript复制
# 使用异步事件循环执行插入操作
loop = asyncio.get_event_loop()
# loop.run_until_complete(insert_single_element())
# loop.run_until_complete(insert_multiple_elements())
loop.run_until_complete(retrieve_and_convert())

封装一下

简单封装一下, 主要是学思路和方法, 后续需要什么属性, 可以再加。代码中有注释, 这里就不多做解释了,今天又完了!!!

代码语言:javascript复制
# -*- coding: utf-8 -*-
# @Author: Mehaei
# @Date:   2023-11-11 15:05:11
# @Last Modified by:   Mehaei
# @Last Modified time: 2023-11-11 18:15:57
import asyncio
import aioredis


class RedisArray(object):
    def __init__(self, url="redis://localhost", kname="my_list"):
        """
        :param url: redis url
        :param kname: list name
        """
        self.url = url
        self.kname = kname

    async def create_pool(self):
        """
        创建连接池
        """
        self.aioClient = await aioredis.from_url('redis://localhost')

    async def lpush(self, kvalue):
        """
        往列表左边, 也就是头部插入元素
        :param kvalue: element value
        """
        try:
            return await self.aioClient.lpush(self.kname, kvalue)
        except Exception as e:
            print(f"lpush 插入: {self.kname}/{kvalue}报错: {str(e)}")
            return None

    async def rpush(self, kvalue):
        """
        往列表右边, 也就是尾部插入元素
        :param kvalue: element value
        """
        try:
            return await self.aioClient.rpush(self.kname, kvalue)
        except Exception as e:
            print(f"rpush 插入: {self.kname}/{kvalue}报错: {str(e)}")
            return None

    async def get(self, start=0, end=-1):
        """
        获取元素
        :param start: element start index
        :param end: element end index
        """
        try:
            result = await self.aioClient.lrange(self.kname, 0, -1)
            result = [str(item, 'utf-8') for item in result]

            print(result)
            return result
        except Exception as e:
            print(f"读取: {self.kname}/{start}/{end}错误: {str(e)}")
            return []

    async def close(self):
        """
        关闭redis连接
        """
        await self.aioClient.close()


async def main():
    # 使用异步事件循环执行插入操作
    RA = RedisArray()
    await RA.create_pool()
    await RA.lpush("test22")
    await RA.rpush("test23")
    await RA.get()
    await RA.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

0 人点赞