根据《用Java、Python来开发Hive应用》一文,建立了使用Python、来开发Hive应用的方法,产生的代码如下(做了修改):
代码语言:javascript复制import pandas as pd
from pyhive import hive
from sqlalchemy import create_engine
from pyhive import hive
class Hive:
def __init__(self):
self.database= 'demo'
self.host = '192.168.31.184'
self.port = '10000'
def getConnect(self):
conn = hive.Connection(host=self.host, port=self.port,database=self.database)
return conn;
def getEngine(self):
# 创建 Hive 数据库连接
hive_uri = f"hive://" self.host ":" self.port "/" self.database
return create_engine(hive_uri)
def disconnect(self,engine,conn):
engine.dispose()
conn.close()
#执行查询
def query(self,sql,engine,condition=None):
try:
if condition is None:
# 执行 SQL 查询
rs = pd.read_sql(sql, engine)
return rs
else:
values = []
where = " where "
for key in condition:
where = where key " = %s and "
values.append(condition[key])
where = where "1=1"
sql = sql where
params = tuple(values)
rs = pd.read_sql(sql, engine, params=params)
return rs
except Exception as e:
print("Error occurred:", e)
#添加数据
def addDataToHiveTable(self,conn,tableName,data):
like_array = f"array({', '.join(map(lambda x: f''{x}'', data['like']))})" # 使用单引号包裹字符串
address_map = f"map('{list(data['address'].keys())[0]}', '{list(data['address'].values())[0]}')" # 创建 MAP 格式
# 创建游标
cursor = conn.cursor()
insertSql = "INSERT INTO person SELECT %s,%s,%s," like_array "," address_map
# 执行插入操作
try:
cursor.execute(insertSql, (data['id'], data['name'], data['age']))
except Exception as e:
print(f"Error inserting data: {e}")
conn.commit()
cursor.close()
#将文件中的数据加载到表中
def loadDataForLocal(self,conn,tableName,path):
cursor = conn.cursor()
query = "LOAD DATA LOCAL INPATH '" path "' INTO TABLE " tableName
cursor.execute(query)
conn.commit()
cursor.close()
#清空数据表
def truncateTable(self,conn,tableName):
cursor = conn.cursor()
query = "truncate table " tableName;
#con.setAutoCommit(true) #确保自动提交
cursor.execute(query)
conn.commit()
cursor.close()
现在,使用pytest来进行测试。
1)建立全局变量
代码语言:javascript复制 hive = Hive()
tableName = "person"
sql = "SELECT * FROM " tableName
conn = None
engine = None
2)建立setup_class(self)和teardown_class(self)函数
代码语言:javascript复制 def setup_class(self):
#导入数据路径
path = "/home/jerry/hive/person"
#建立连接,conn用于查询相关的SQL
self.conn = self.hive.getConnect()
#建立引擎,engine用于非查询相关的SQL
self.engine = self.hive.getEngine()
#导入测试初始化数据
self.hive.loadDataForLocal(self.conn,self.tableName,path)
def teardown_class(self):
#清空测试数据
self.hive.truncateTable(self.conn,self.tableName)
#断开链接
self.hive.disconnect(self.engine,self.conn)
3)测试查询
代码语言:javascript复制 @allure.feature('Python访问Hive数据库')
@allure.story('根据query进行查询')
@allure.severity('Critical')
#测试根据Query查询
def test_query(self):
#建立查询
rs = self.hive.query(self.sql,self.engine)
#获得记录个数
row_count = len(rs)
#验证记录个数
assert row_count == 6
#遍历记录
for index, row in rs.iterrows():
#验证编号行是不是数字
assert isinstance(row.iloc[0], int)
#验证姓名行是不是包含"elite"
assert "elite" in row.iloc[1]
#验证年龄行是不是数字
assert isinstance(row.iloc[2], int)
#验证爱好行是不是包含"basketball"
assert "basketball" in row.iloc[3]
#验证地址行是不是包含"address"
assert "address" in row.iloc[4]
@allure.feature('Python访问Hive数据库')
@allure.story('带一个的条件查询')
@allure.severity('Normal')
def test_query_with_one_condition(self):
#查询条件
condition={"name":"elite1"}
#建立查询
rs = self.hive.query(self.sql,self.engine,condition)
#获得记录个数
row_count = len(rs)
#验证记录个数
assert row_count == 1
#遍历记录
for index, row in rs.iterrows():
#验证是不是符合查询条件
#验证姓名行是不是包含"elite"
assert "elite1" == row.iloc[1]
@allure.feature('Python访问Hive数据库')
@allure.story('带两个的条件查询')
@allure.severity('Normal')
def test_query_with_Two_condition(self):
#查询条件
condition={"name":"elite1","age":"20"}
#建立查询
rs = self.hive.query(self.sql,self.engine,condition)
#获得记录个数
row_count = len(rs)
#验证记录个数
assert row_count == 1
#遍历记录
for index, row in rs.iterrows():
#验证是不是符合查询条件
#验证姓名行是不是"elite"
assert "elite1" == row.iloc[1]
#验证年龄行是不是50
assert "50" == row.iloc[2]
#验证编号行是不是数字
assert isinstance(row.iloc[0], int)
#验证姓名行是不是包含"elite"
assert "elite" in row.iloc[1]
#验证年龄行是不是数字
assert isinstance(row.iloc[2], int)
#验证爱好行是不是包含"basketball"
assert "basketball" in row.iloc[3]
#验证地址行是不是包含"address"
assert "address" in row.iloc[4]
@allure.feature('Python访问Hive数据库')
@allure.story('带两个的条件查询')
@allure.severity('Normal')
def test_query_with_Two_condition(self):
#查询条件
condition={"name":"elite1","age":"20"}
#建立查询
rs = self.hive.query(self.sql,self.engine,condition)
#获得记录个数
row_count = len(rs)
#验证记录个数
assert row_count == 1
#遍历记录
for index, row in rs.iterrows():
#验证是不是符合查询条件
#验证姓名行是不是"elite"
assert "elite1" == row.iloc[1]
#验证年龄行是不是20
assert 20 == row.iloc[2]
@allure.feature('Python访问Hive数据库')
@allure.story('带三个的条件查询')
@allure.severity('Normal')
def test_query_with_three_condition(self):
#查询条件
condition={"id":"1","name":"elite0","age":"10"}
#建立查询
rs = self.hive.query(self.sql,self.engine,condition)
#获得记录个数
row_count = len(rs)
#验证记录个数
assert row_count == 1
#遍历记录
for index, row in rs.iterrows():
#验证是不是符合查询条件
#验证编号行是不是数字
assert 1 == row.iloc[0]
#验证姓名行是不是包含"elite"
assert "elite0" in row.iloc[1]
#验证年龄行是不是数字
assert 10 == row.iloc[2]
4)测试添加数据
代码语言:javascript复制 @allure.feature('Python访问Hive数据库')
@allure.story('插入数据')
@allure.severity('Normal')
def test_addDataToHiveTable(self):
#构造插入数据
data = {
'id': "50",
'name': "Jerry",
'age': "50",
'like': ["basketball", "music", "dance"],
'address': {"address": "xx"}
}
#插入数据
self.hive.addDataToHiveTable(self.conn,self.tableName,data)
#查询插入数据是否可以查询出来
condition = {"name":"Jerry","age":"50"}
rs = self.hive.query(self.sql,self.engine,condition)
row_count = len(rs)
assert row_count == 1
#验证插入数据
for index, row in rs.iterrows():
assert "Jerry" in row.iloc[1]
assert "50" in str(row.iloc[2])
主函数改为
代码语言:javascript复制if __name__ == '__main__':
pytest.main(['-sv', '-q', '--alluredir', './report/xml'])
建立项目文件
代码语言:javascript复制environment.properties
Project Name=Hive
Author = Jerry Gu
System Version= Win11
java version "17.0.10"
Allure Version= 2.20.1
代码语言:javascript复制pytest --alluredir=.reportxml
copy environment.properties .reportxml
allure serve .reportxml