背景
平常做一些简单的demo测试,或者数据量比较小的数据存储都是放json文件,或者csv文件,感觉连接MySQL比较麻烦。但有些测试的项目最终是要转产的,且用的是MySQL一类关系型数据库,就要改代码,很麻烦。SQLite就很方便做临时数据了,不用安装什么软件,可以写sql语句。
SQLite 是一种关系型数据库管理系统,它提供了一种轻量级的、基于文件的数据库管理解决方案。相比于简单的文件格式,SQLite 有以下优势:
- 1. 数据结构化:SQLite 允许你创建表格、定义数据类型、建立索引等,使得数据更加结构化和易于管理。
- 2. SQL 支持:SQLite 支持标准的 SQL 查询语言,这使得对数据进行查询、更新、删除等操作更加方便和灵活。
- 3. ACID 事务支持:SQLite 支持 ACID(原子性、一致性、隔离性、持久性)事务,保证了数据的完整性和一致性。
- 4. 并发性:SQLite 支持多个连接同时对数据库进行读取操作,虽然不支持多个连接同时进行写入操作,但对于轻量级的应用来说,这通常不是问题。
- 5. 跨平台性:SQLite 是跨平台的,可以在各种操作系统上运行,包括 Windows、Mac 和各种 Linux 发行版。
总的来说,SQLite 提供了一种简单、轻量级、易于集成的数据库解决方案,适用于许多小型应用和嵌入式系统。
可以理解成就一个文件数据库,但有SQL的服务(不用安装任何数据库服务都可以执行sql语句) 由于其轻量级的特性,SQLite 并不适合大规模的、高并发的应用场景
在 SQLite 中,当进行搜索时,并不会把整个数据加载到内存中。SQLite 使用一种称为“查询优化器”的技术来处理查询,它会根据查询条件和索引等信息,尽可能地减少对磁盘的访问,以提高查询效率。
工具类
先来构建一个工具类
代码语言:javascript复制import sqlite3
class SQLiteDatabase:
def __init__(self, db_name):
self.db_name = db_name
def connect(self):
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
return conn, cursor
def connect_close(self, conn, cursor):
cursor.close()
conn.close()
def create_table(self, sql):
conn, cursor = self.connect()
cursor.execute(sql)
self.connect_close(conn, cursor)
def insert(self, sql):
conn, cursor = self.connect()
try:
row_count = cursor.execute(sql).rowcount
conn.commit()
except:
row_count = 0
conn.rollback()
finally:
self.connect_close(conn, cursor)
return row_count
def insert_many(self, sql, values):
conn, cursor = self.connect()
try:
row_count = cursor.executemany(sql, values).rowcount
conn.commit()
except:
row_count = 0
conn.rollback()
finally:
self.connect_close(conn, cursor)
return row_count
def update(self, sql):
conn, cursor = self.connect()
try:
row_count = cursor.execute(sql).rowcount
conn.commit()
except:
row_count = 0
conn.rollback()
finally:
self.connect_close(conn, cursor)
return row_count
def update_many(self, sql, values):
conn, cursor = self.connect()
try:
row_count = cursor.executemany(sql, values).rowcount
conn.commit()
except:
row_count = 0
conn.rollback()
finally:
self.connect_close(conn, cursor)
return row_count
def select(self, sql):
conn, cursor = self.connect()
try:
cursor.execute(sql)
row_data = cursor.fetchone()
finally:
self.connect_close(conn, cursor)
return row_data
def select_many(self, sql):
conn, cursor = self.connect()
try:
cursor.execute(sql)
row_data = cursor.fetchall()
finally:
self.connect_close(conn, cursor)
return row_data
def select_define_data(self, sql, num):
conn, cursor = self.connect()
try:
cursor.execute(sql)
result = cursor.fetchmany(num)
finally:
self.connect_close(conn, cursor)
return result
def delete(self, sql):
conn, cursor = self.connect()
try:
row_count = cursor.execute(sql).rowcount
conn.commit()
except:
row_count = 0
conn.rollback()
finally:
self.connect_close(conn, cursor)
return row_count
def delete_many(self, sql, values):
conn, cursor = self.connect()
try:
row_count = cursor.executemany(sql, values).rowcount
conn.commit()
except:
row_count = 0
conn.rollback()
finally:
self.connect_close(conn, cursor)
return row_count
使用
创建数据表
代码语言:javascript复制# 使用示例
db = SQLiteDatabase('testdb.sqlite')
users_sqls = '''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
age INTEGER,
score DOUBLE
);
'''
db.create_table(users_sqls)
插入数据
代码语言:javascript复制# 插入单条数据
insert_sql = f"INSERT INTO users (name, age, score) VALUES ('Alice', 25, 60.65)"
insert_count = db.insert(insert_sql)
print("单条插入", insert_count, "条数据!")
# 插入多条数据
sql = f'INSERT INTO users (name, age, score) VALUES (?, ?, ?)'
insert_counts = db.insert_many(sql, [('Lim', 20, 61.89), ('Kurt', 23, 50.01), ('Tom', 34, 67.89), ('Nics', 26, 52.01)])
print("批量插入", insert_counts, "条数据!")
更新数据
代码语言:javascript复制# 单数据更新
sql = f"UPDATE users SET `name` = 'Mikey' WHERE id = 2"
result = db.update(sql)
print("单条更新", result, "条数据!")
# 批量更新
sql = f'UPDATE users SET `name` = ? WHERE id = ?'
result = db.update_many(sql, [('Carry', 3), ('Tiya', 4)])
print("批量更新", result, "条数据!")
查询数据
代码语言:javascript复制# 查询单条数据
sql = f'SELECT * FROM users WHERE id = 3'
result = db.select(sql)
print("查询单条数据:", result)
# 查询score>60的所有数据
sql = f'SELECT * FROM users WHERE score > 60'
result = db.select_many(sql)
print("查询score>60的所有数据:", result)
# 查询score>60前两条数据
sql = f'SELECT * FROM users WHERE score > 2'
result = db.select_define_data(sql, 2)
print("查询score>60前两条数据:", result)
删除数据
代码语言:javascript复制# 删除单条数据
sql = f'DELETE FROM users WHERE id = 1'
delete_count = db.delete(sql)
print("单条删除", delete_count, "条数据!")
# 批量删除
sql = f'DELETE FROM users WHERE id = ?'
delete_counts = db.delete_many(sql, [(3,), (4,)])
print("批量删除", delete_counts, "条数据!")