基于Python访问Hive的pytest测试代码实现

2024-09-10 16:23:32 浏览数 (1)

根据《用Java、Python来开发Hive应用》一文,建立了使用Python、来开发Hive应用的方法,产生的代码如下(做了修改):

代码语言:javascript复制
import pandas as pd
from pyhive import hive
from sqlalchemy import create_engine
from pyhive import hive
               
class Hive:
    def __init__(self):
        self.database= 'demo'
        self.host = '192.168.31.184'
        self.port = '10000'
        
    def getConnect(self):
        conn = hive.Connection(host=self.host, port=self.port,database=self.database)
        return conn;
                
    def getEngine(self):    
        # 创建 Hive 数据库连接
        hive_uri = f"hive://" self.host ":" self.port "/" self.database
        return create_engine(hive_uri)
           
    def disconnect(self,engine,conn):
        engine.dispose()
        conn.close()
           
    #执行查询
    def query(self,sql,engine,condition=None):
        try:
            if condition is None:
            # 执行 SQL 查询
                rs = pd.read_sql(sql, engine)
                return rs
            else:
                values = []
                where = " where "
                for key in condition:
                    where = where key " = %s and "
                    values.append(condition[key])
                where = where "1=1"    
                sql = sql   where
                params = tuple(values)
                rs = pd.read_sql(sql, engine, params=params)
                return rs
        except Exception as e:
            print("Error occurred:", e)
           
    #添加数据
    def addDataToHiveTable(self,conn,tableName,data):
        like_array = f"array({', '.join(map(lambda x: f''{x}'', data['like']))})"  # 使用单引号包裹字符串
        address_map = f"map('{list(data['address'].keys())[0]}', '{list(data['address'].values())[0]}')"  # 创建 MAP 格式
        # 创建游标
        cursor = conn.cursor()
        insertSql = "INSERT INTO person SELECT %s,%s,%s," like_array "," address_map
        # 执行插入操作
        try:
            cursor.execute(insertSql, (data['id'], data['name'], data['age']))
        except Exception as e:
            print(f"Error inserting data: {e}")    
        conn.commit()
        cursor.close()
           
    #将文件中的数据加载到表中
    def loadDataForLocal(self,conn,tableName,path):
        cursor = conn.cursor()
        query = "LOAD DATA LOCAL INPATH '" path "' INTO TABLE " tableName
        cursor.execute(query)
        conn.commit()
        cursor.close()
    
    #清空数据表
    def truncateTable(self,conn,tableName):
        cursor = conn.cursor()
        query = "truncate table " tableName;
        #con.setAutoCommit(true) #确保自动提交
        cursor.execute(query)
        conn.commit()
        cursor.close()

现在,使用pytest来进行测试。

1)建立全局变量

代码语言:javascript复制
 hive = Hive()
    tableName = "person"
    sql = "SELECT * FROM " tableName
    conn = None
    engine = None

2)建立setup_class(self)和teardown_class(self)函数

代码语言:javascript复制
    def setup_class(self):
        #导入数据路径
        path = "/home/jerry/hive/person"
        #建立连接,conn用于查询相关的SQL
        self.conn = self.hive.getConnect()
        #建立引擎,engine用于非查询相关的SQL
        self.engine = self.hive.getEngine()
        #导入测试初始化数据
        self.hive.loadDataForLocal(self.conn,self.tableName,path)
           
    def teardown_class(self):
        #清空测试数据
        self.hive.truncateTable(self.conn,self.tableName)
        #断开链接
        self.hive.disconnect(self.engine,self.conn)

3)测试查询

代码语言:javascript复制
     @allure.feature('Python访问Hive数据库')
    @allure.story('根据query进行查询')
    @allure.severity('Critical')
    #测试根据Query查询
    def test_query(self):
        #建立查询
        rs = self.hive.query(self.sql,self.engine)
        #获得记录个数
        row_count = len(rs)
        #验证记录个数
        assert row_count == 6
        #遍历记录
        for index, row in rs.iterrows():
            #验证编号行是不是数字
            assert isinstance(row.iloc[0], int)
            #验证姓名行是不是包含"elite"
            assert "elite" in row.iloc[1]
            #验证年龄行是不是数字
            assert isinstance(row.iloc[2], int)
            #验证爱好行是不是包含"basketball"
            assert "basketball" in row.iloc[3]
            #验证地址行是不是包含"address"    
            assert "address" in row.iloc[4]
           
    @allure.feature('Python访问Hive数据库')
    @allure.story('带一个的条件查询')
    @allure.severity('Normal')
    def test_query_with_one_condition(self):
        #查询条件
        condition={"name":"elite1"}
        #建立查询
        rs = self.hive.query(self.sql,self.engine,condition)
        #获得记录个数
        row_count = len(rs)
        #验证记录个数
        assert row_count == 1
        #遍历记录
        for index, row in rs.iterrows():
            #验证是不是符合查询条件
            #验证姓名行是不是包含"elite"
            assert "elite1" == row.iloc[1]
            
           
    @allure.feature('Python访问Hive数据库')    
    @allure.story('带两个的条件查询')
    @allure.severity('Normal')
    def test_query_with_Two_condition(self):
        #查询条件
        condition={"name":"elite1","age":"20"}
        #建立查询
        rs = self.hive.query(self.sql,self.engine,condition)
        #获得记录个数
        row_count = len(rs)
        #验证记录个数
        assert row_count == 1
        #遍历记录
        for index, row in rs.iterrows():
            #验证是不是符合查询条件
            #验证姓名行是不是"elite"
            assert "elite1" == row.iloc[1]
            #验证年龄行是不是50
            assert "50" == row.iloc[2]
            #验证编号行是不是数字
            assert isinstance(row.iloc[0], int)
            #验证姓名行是不是包含"elite"
            assert "elite" in row.iloc[1]    
            #验证年龄行是不是数字
            assert isinstance(row.iloc[2], int)
            #验证爱好行是不是包含"basketball"
            assert "basketball" in row.iloc[3]
            #验证地址行是不是包含"address"
            assert "address" in row.iloc[4]
           
    @allure.feature('Python访问Hive数据库')
    @allure.story('带两个的条件查询')
    @allure.severity('Normal')
    def test_query_with_Two_condition(self):
        #查询条件
        condition={"name":"elite1","age":"20"}
        #建立查询
        rs = self.hive.query(self.sql,self.engine,condition)
        #获得记录个数
        row_count = len(rs)
        #验证记录个数
        assert row_count == 1
        #遍历记录
        for index, row in rs.iterrows():
            #验证是不是符合查询条件    
            #验证姓名行是不是"elite"
            assert "elite1" == row.iloc[1]
            #验证年龄行是不是20
            assert 20 == row.iloc[2]
                     
    @allure.feature('Python访问Hive数据库')
    @allure.story('带三个的条件查询')
    @allure.severity('Normal')
    def test_query_with_three_condition(self):
        #查询条件
        condition={"id":"1","name":"elite0","age":"10"}
        #建立查询
        rs = self.hive.query(self.sql,self.engine,condition)
        #获得记录个数
        row_count = len(rs)
        #验证记录个数
        assert row_count == 1
        #遍历记录
        for index, row in rs.iterrows():
            #验证是不是符合查询条件
            #验证编号行是不是数字    
            assert 1 == row.iloc[0]
            #验证姓名行是不是包含"elite"
            assert "elite0" in row.iloc[1]
            #验证年龄行是不是数字
            assert 10 == row.iloc[2]

4)测试添加数据

代码语言:javascript复制
    @allure.feature('Python访问Hive数据库')
    @allure.story('插入数据')
    @allure.severity('Normal')
    def test_addDataToHiveTable(self):
        #构造插入数据
        data = {
        'id': "50",
        'name': "Jerry",
        'age': "50",
        'like': ["basketball", "music", "dance"],
        'address': {"address": "xx"}
        }
        #插入数据
        self.hive.addDataToHiveTable(self.conn,self.tableName,data)
        #查询插入数据是否可以查询出来
        condition = {"name":"Jerry","age":"50"}    
        rs = self.hive.query(self.sql,self.engine,condition)
        row_count = len(rs)
        assert row_count == 1
        #验证插入数据
        for index, row in rs.iterrows():
            assert "Jerry" in row.iloc[1]
            assert "50" in str(row.iloc[2])

主函数改为

代码语言:javascript复制
if __name__ == '__main__':
pytest.main(['-sv', '-q', '--alluredir', './report/xml'])

建立项目文件

代码语言:javascript复制
environment.properties
Project Name=Hive
Author = Jerry Gu
System Version= Win11
java version "17.0.10"
Allure Version= 2.20.1
代码语言:javascript复制
pytest --alluredir=.reportxml
copy environment.properties .reportxml
allure serve .reportxml

0 人点赞