sqlparse 是 Python 的非验证 SQL 解析器。 它提供对 SQL 语句的解析、拆分和格式化的支持。不废话,我们直接上代码。
代码语言:javascript复制import sqlparse
def parse_sql(sql):
# 使用 sqlparse 库解析 SQL 查询语句
parsed = sqlparse.parse(sql)[0]
# 获取查询类型(SELECT、INSERT、UPDATE 或 DELETE)
query_type = parsed.get_type()
# 获取查询目标(表名、字段列表、值列表等)
if query_type == 'SELECT':
target = parse_select(parsed)
elif query_type == 'INSERT':
target = parse_insert(parsed)
elif query_type == 'UPDATE':
target = parse_update(parsed)
elif query_type == 'DELETE':
target = parse_delete(parsed)
else:
target = None
return {'type': query_type, 'target': target}
def parse_select(parsed):
# 获取字段列表
fields = []
for token in parsed.tokens:
if isinstance(token, sqlparse.sql.IdentifierList):
for identifier in token.get_identifiers():
fields.append(identifier.value)
elif isinstance(token, sqlparse.sql.Identifier):
fields.append(token.value)
# 获取表名
for token in parsed.tokens:
if isinstance(token, sqlparse.sql.Where):
table = token.tokens[2].value
break
else:
table = None
# 获取条件
for token in parsed.tokens:
if isinstance(token, sqlparse.sql.Where):
condition = token.tokens[4].value
break
else:
condition = None
return {'fields': fields, 'table': table, 'condition': condition}
def parse_insert(parsed):
# 获取表名
table = parsed.tokens[2].value
# 获取字段列表
fields = []
for token in parsed.tokens:
if isinstance(token, sqlparse.sql.Parenthesis):
for identifier in token.get_identifiers():
fields.append(identifier.value)
# 获取值列表
values = []
for token in parsed.tokens:
if isinstance(token, sqlparse.sql.Values):
for value_list in token.get_parameters():
values.append([value.value for value in value_list])
return {'table': table, 'fields': fields, 'values': values}
def parse_update(parsed):
# 获取表名
table = parsed.tokens[2].value
# 获取字段列表和值列表
fields = []
values = []
for token in parsed.tokens:
if isinstance(token, sqlparse.sql.IdentifierList):
for identifier in token.get_identifiers():
fields.append(identifier.value)
elif isinstance(token, sqlparse.sql.Assignment):
fields.append(token.left.value)
values.append(token.right.value)
# 获取条件
for token in parsed.tokens:
if isinstance(token, sqlparse.sql.Where):
condition = token.tokens[4].value
break
else:
condition = None
return {'table': table, 'fields': fields, 'values': values, 'condition': condition}
def parse_delete(parsed):
# 获取表名
table = parsed.tokens[2].value
# 获取条件
for token in parsed.tokens:
if isinstance(token, sqlparse.sql.Where):
condition = token.tokens[4].value
break
else:
condition = None
return {'table': table, 'condition': condition}