用Java、Python来开发Hive应用

2024-09-10 16:22:51 浏览数 (4)

1 预先配置

在hive配置文件:%HIVE_HOME%/conf/hive-site.xml添加

代码语言:javascript复制
<!-- 禁用 impersonation -->
<property>
    <name>hive.server2.enable.doAs</name>
    <value>false</value> 
</property>

在 Hadoop 的配置文件中%HADOOP_HOME%/etc/hadoo/下的:core-site.xml 和 hdfs-site.xml添加

代码语言:javascript复制
<property>
    <name>hadoop.proxyuser.root.groups</name>
    <value>*</value>
</property>
<property>
    <name>hadoop.proxyuser.root.hosts</name>
    <value>*</value>
</property>

确保没有设置限制 root 用户的权限

修改访问数据库表person的权限

代码语言:javascript复制
#hdfs dfs -chmod -R 775 /user/hive/warehouse/demo.db/person

由于Hive是数据仓库,而不是数据库,所以一般不支持增删改查,这里仅介绍如何通过Java来向Hive插入,查询数据。 2 用Java来开发Hive应用

pom.xml

代码语言:javascript复制
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.jerry</groupId>
  <artifactId>hive</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <description>Java How to connect Hivi</description>

    <dependencies>
        <!-- Hive JDBC Driver -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>3.1.2</version>
        </dependency>
        <!-- Hadoop Common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.2.2</version>
        </dependency>
        <!-- Hadoop Client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.2</version>
        </dependency>
    </dependencies>
</project>

Java文件

代码语言:javascript复制
package com.jerry;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;

public class HiveClient {
    private static final String DRIVER_CLASS = "org.apache.hive.jdbc.HiveDriver";
    private static final String CONNECTION_URL = "jdbc:hive2://192.168.31.184:10000/demo";
    private static PreparedStatement preparedstatement;
    private static Statement statement;
    private static ResultSet resultSet = null;
    //链接
    private Connection getConnection() throws SQLException {
        try {
            Class.forName(DRIVER_CLASS);
            Connection con = DriverManager.getConnection(CONNECTION_URL);
            statement = con.createStatement();
            return con;
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            throw new SQLException(e.getMessage());
        }
    }
    
    //断开链接
    public void disconnect(Connection con) throws SQLException {
      // Close resources
        resultSet.close();
        statement.close();
        con.close();
    }
    
    //执行查询
    public void query(String query) throws SQLException {
      // Execute a query
        resultSet = statement.executeQuery(query);
    }
    
    //带条件执行查询
    public void query(Connection con,String query,Map<String, String> condition) throws SQLException {
      String where = " where ";
      int i = 0;
      int length = condition.size(); 
      String[] valuearray= new String[length];
      for (String key : condition.keySet()) {
         String value = condition.get(key);
         where = where key " = ? AND ";
         valuearray[i] = value;
         i  ;
      }
      where = where   "1=1";
      query = query   where;
      PreparedStatement preparedStatement = con.prepareStatement(query);
      for(int j=0;j<length;j  ) {
        preparedStatement.setString(j 1, valuearray[j]);
      }
      resultSet = preparedStatement.executeQuery();;
    }
    
    //打印查询记录
    public void printQueryResult(ResultSet resultSet) throws SQLException {
      //获取 ResultSet 的元数据
        ResultSetMetaData metaData = resultSet.getMetaData();
        // 获取列数
        int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
          for (int i=1;i<=columnCount;i  ) {
            System.out.print(resultSet.getString(i) ",");
          }
          System.out.println("");
        }
    }
    
    //查询并且打印数据
    public void queryAndPrint(String query) throws SQLException {
      query(query);
      printQueryResult(resultSet);
    }
    
    //查询并且打印数据
    public void queryAndPrint(Connection con,String query,Map<String, String> condition) throws SQLException {
      query(con,query,condition);
      printQueryResult(resultSet);
    }
    
    //添加数据
    public void addDataToHiveTable(Connection con,String tableName,String[] newValue,String like,String map) {
        try {
          String insertSql = "INSERT INTO person SELECT ?,?,?," like "," map;
          System.out.println(like);
          preparedstatement = con.prepareStatement(insertSql);
          preparedstatement.setInt(1, Integer.parseInt(newValue[0]));
          preparedstatement.setString(2, newValue[1]);
          preparedstatement.setInt(3, Integer.parseInt(newValue[2]));
          preparedstatement.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    
    //将文件中的数据加载到表中
    public void loadDataForLocal(String tableName,String path) throws SQLException {
      String query = "LOAD DATA LOCAL INPATH '" path "' INTO TABLE " tableName;
      statement.execute(query);


    }
    
    //清空数据表
    public void truncateTable(Connection con,String tableName) throws SQLException {
      String query = "truncate table " tableName;
      con.setAutoCommit(true); // 确保自动提交
      Statement statement = con.createStatement();
      statement.execute(query);
    }
    
public static void main(String[] args) throws SQLException {
  HiveClient hive = new HiveClient();
  String tableName = "person";
  String like = "array('basketball', 'music', 'dance')";
  String map = "map('address','xxxx')";
  String[] newAddValue = {"10","elite0","50"};
  Connection con = hive.getConnection();
  String query = "SELECT * FROM " tableName;
  Map<String, String> condition = new HashMap<String, String>();
  condition.put("name","elite0");
  condition.put("age","50");
  String inpath = "/home/jerry/hive/person";
  try {
    System.out.println("全表查询:");
    hive.queryAndPrint(query);
    hive.addDataToHiveTable(con,tableName,newAddValue,like,map);
    System.out.println("插入数据后全表查询:");
    hive.queryAndPrint(query);
    System.out.println("条件查询:");
    hive.queryAndPrint(con,query,condition);
    hive.truncateTable(con,tableName);
    System.out.println("清空表:");
    hive.queryAndPrint(query);
    hive.loadDataForLocal(tableName,inpath);
    System.out.println("从文件中加载:");
    hive.queryAndPrint(query);
    hive.disconnect(con);
  } catch (Exception e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
  }
  }
}

运行结果

代码语言:javascript复制
全表查询:
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
array('basketball', 'music', 'dance')
插入数据后全表查询:
10,elite0,50,["basketball","music","dance"],{"address":"xxxx"},
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},
条件查询:
10,elite0,50,["basketball","music","dance"],{"address":"xxxx"},
清空表:
从文件中加载:
1,elite0,10,["basketball","music","dance"],{"adderss":"xx"},
2,elite1,20,["basketball","music","dance"],{"adderss":"xx"},
3,elite2,10,["basketball","music","dance"],{"adderss":"xx"},
4,elite3,20,["basketball","music","dance"],{"adderss":"xx"},
5,elite4,10,["basketball","music","dance"],{"adderss":"xx"},
6,elite5,20,["basketball","music","dance"],{"adderss":"xx"},

3 用Python来开发Hive应用

pip3

代码语言:javascript复制
pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive

Python

代码语言:javascript复制
import pandas as pd
from pyhive import hive
from sqlalchemy import create_engine
from pyhive import hive
class Hive:
    def __init__(self):
        self.database= 'demo'
        self.host = '192.168.31.184'
        self.port = '10000'
        
    def getconnect(self):
        conn = hive.Connection(host=self.host, port=self.port,database=self.database)
        return conn;
        
    def getEngine(self):
        # 创建 Hive 数据库连接
        hive_uri = f"hive://" self.host ":" self.port "/" self.database
        return create_engine(hive_uri)

    def disconnect(self,engine,conn):
        engine.dispose()
        conn.close()
    #执行查询
    def query(self,sql,engine,condition=None):
        try:
            if condition is None:
            # 执行 SQL 查询
                df = pd.read_sql(sql, engine)
                print(df)
            else:
                values = []
                where = " where "
                for key in condition:
                    where = where key " = %s and "
                    values.append(condition[key])
                where = where "1=1"
                sql = sql   where
                params = tuple(values)
                df = pd.read_sql(sql, engine, params=params)
                print(df)
        except Exception as e:
            print("Error occurred:", e)
    #添加数据
    def addDataToHiveTable(self,conn,tableName,data):
        like_array = f"array({', '.join(map(lambda x: f''{x}'', data['like']))})"  # 使用单引号包裹字符串
        address_map = f"map('{list(data['address'].keys())[0]}', '{list(data['address'].values())[0]}')"  # 创建 MAP 格式
        # 创建游标
        cursor = conn.cursor()
        insertSql = "INSERT INTO person SELECT %s,%s,%s," like_array "," address_map
        # 执行插入操作
        try:
            cursor.execute(insertSql, (data['id'], data['name'], data['age']))
        except Exception as e:
            print(f"Error inserting data: {e}")
        conn.commit()
        cursor.close()
    #将文件中的数据加载到表中
    def loadDataForLocal(self,conn,tableName,path):
        cursor = conn.cursor()
        query = "LOAD DATA LOCAL INPATH '" path "' INTO TABLE " tableName
        cursor.execute(query)
        conn.commit()
        cursor.close()
    
    #清空数据表
    def truncateTable(self,conn,tableName):
        cursor = conn.cursor()
        query = "truncate table " tableName;
        #con.setAutoCommit(true) #确保自动提交
        cursor.execute(query)
        conn.commit()
        cursor.close()
        
if __name__ == "__main__":
    sql = "SELECT * FROM person"
    condition={"name":"elite1","age":"20"}
    # 准备要插入的数据
    data = {
        'id': "50",
        'name': "Jerry",
        'age': 50,  # 确保这里是整数
        'like': ["basketball", "music", "dance"],
        'address': {"address": "xx"}
    }
    tableName = "person"
    path = "/home/jerry/hive/person"
    myhive = Hive()
    print("建立连接")
    conn = myhive.getconnect()
    engine = myhive.getEngine()
    print("全表查询")
    myhive.query(sql,engine)
    print("条件查询")
    myhive.query(sql,engine,condition)
    print("加数据进入表")
    myhive.addDataToHiveTable(conn,tableName,data)
    myhive.query(sql,engine)
    print("清空表中所有数据")
    myhive.truncateTable(conn,tableName)
    print("从文件中导入数据")
    myhive.loadDataForLocal(conn,tableName,path)
    myhive.query(sql,engine)
    print("断开连接")
    myhive.disconnect(engine,conn)
  • connect:用于其他操作
  • engine:用于查询

运行结果

代码语言:javascript复制
建立连接
全表查询
   id    name  age                           likes           address
0   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
1   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
2   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
3   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
4   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
5   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
条件查询
   id    name  age                           likes           address
0   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
加数据进入表
   id    name  age                           likes           address
0  50   Jerry   50  ["basketball","music","dance"]  {"address":"xx"}
1   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
2   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
3   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
4   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
5   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
6   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
清空表中所有数据
从文件中导入数据
   id    name  age                           likes           address
0   1  elite0   10  ["basketball","music","dance"]  {"adderss":"xx"}
1   2  elite1   20  ["basketball","music","dance"]  {"adderss":"xx"}
2   3  elite2   10  ["basketball","music","dance"]  {"adderss":"xx"}
3   4  elite3   20  ["basketball","music","dance"]  {"adderss":"xx"}
4   5  elite4   10  ["basketball","music","dance"]  {"adderss":"xx"}
5   6  elite5   20  ["basketball","music","dance"]  {"adderss":"xx"}
断开连接

0 人点赞