添加kerberos后,Flink任务的运行认证及Hive使用JDBC连接的认证

2023-08-08 13:03:05 浏览数 (2)

Kerberos安装配置

https://www.psvmc.cn/article/2022-11-08-bigdata-kerberos-centos.html

Flink任务认证

flink on yarn

代码语言:javascript复制
flink run 
  -yD security.kerberos.login.keytab=/root/psvmc.keytab 
  -yD security.kerberos.login.principal=psvmc/hadoop@HADOOP.COM 
  yxzt-data-tcs-1.0-SNAPSHOT-jar-with-dependencies.jar -job /root/zjhome/task_trans.json

认证原理

  1. flink程序启动,自动将keytab文件自动上传hdfs,由yarn管理,分发给每个executor缓存token,定时刷新。
  2. 基于以上原理,当自定义RichSinkFunction里需要是使用基于kerberos认证的组件时,不需要再做认证操作。
  3. 比如:hive、hbase、kudu等等,直接建立连接就可以访问

Hive JDBC认证

需要两个文件

  • 配置文件krb5.conf
  • 认证文件krb5.keytab,一般由服务器生成后获取

放到resources目录下

Kerberos认证

指定krb5配置文件:krb5.conf,根据实际情况替换

认证文件:krb5.keytab,根据实际情况替换

认证用户:hive,根据实际情况修改

这里是通过将配置文件和认证文件拷贝到临时目录进行认证,可以根据需要指定固定目录认证

认证方法KerberosAuth.java

代码语言:javascript复制
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Paths;

public class KerberosAuth {
    private static final Logger log = LoggerFactory.getLogger(KerberosAuth.class);
    // kerberos配置文件,从服务上获取
    private static final String krbConfig = "krb5.conf";
    // kerberos认证文件
    private static final String krbKeytab = "psvmc.keytab";
    // kerberos认证用户
    private static final String principal = "psvmc/hadoop@HADOOP.COM";

    public static void init() {
        initkerberos();
    }

    public static void initkerberos() {
        log.info("Kerberos 登陆验证");
        try {
            // java临时目录,window为C:Users登录用户AppDataLocalTemp,linux为/tmp,需要根据情况添加斜杠
            String javaTempDir = System.getProperty("java.io.tmpdir");
            String tempDir = Paths.get(javaTempDir, "krb_"   System.currentTimeMillis()).toString();
            String configPath = getTempPath(tempDir, krbConfig);
            String keytabPath = getTempPath(tempDir, krbKeytab);
            log.error(configPath);
            log.error(keytabPath);
            System.setProperty("java.security.krb5.conf", configPath);//设置krb配置文件路径,注意一定要放在Configuration前面,不然不生效
            Configuration conf = new Configuration();
            conf.set("hadoop.security.authentication", "Kerberos");//设置认证模式Kerberos
            UserGroupInformation.setConfiguration(conf);
            UserGroupInformation.loginUserFromKeytab(principal, keytabPath);//设置认证用户和krb认证文件路径
            log.error("Kerberos 验证成功");
        } catch (Exception e) {
            log.error("Kerberos 验证失败", e);
        }
    }

    /**
     * 复制文件并根据文件名称获取文件路径(解决jar包不支持获取resource下文件问题)
     *
     * @param tempPath 临时目录
     * @param fileName 文件名称
     * @return 文件临时路径
     */
    @SuppressWarnings("ResultOfMethodCallIgnored")
    public static String getTempPath(String tempPath, String fileName) {
        InputStream in = KerberosAuth.class.getResourceAsStream("/"   fileName);
        String pathAll = tempPath   File.separator   fileName;
        File file = new File(pathAll);
        File tempPathFile = new File(tempPath);
        if (!tempPathFile.exists()) {
            tempPathFile.mkdirs();
        }
        try {
            copyInputStreamToFile(in, pathAll);
        } catch (Exception e) {
            log.error("getTempPath", e);
        }
        return file.getPath();
    }

    private static void copyInputStreamToFile(InputStream is, String strFileFullPath) throws IOException {
        long size = 0;
        BufferedInputStream in = new BufferedInputStream(is);
        BufferedOutputStream out = new BufferedOutputStream(Files.newOutputStream(Paths.get(strFileFullPath)));
        int len = -1;
        byte[] b = new byte[1024];
        while ((len = in.read(b)) != -1) {
            out.write(b, 0, len);
            size  = len;
        }
        in.close();
        out.close();
        //修改文件的访问权限
        changeFolderPermission(strFileFullPath);
    }

    private static void changeFolderPermission(String dirPath) {
        File dirFile = new File(dirPath);
        dirFile.setReadable(true, false);
        dirFile.setExecutable(true, false);
        dirFile.setWritable(true, false);
    }

    public static void main(String[] args) {
        KerberosAuth.init();
    }
}

JDBC连接

Hive中配置Kerberos认证后,JDBC连接要进行kerberos认证。

认证后JDBC的URL也要添加认证相关的配置

如下

代码语言:javascript复制
jdbc:hive2://192.168.7.101:10000/zdb;principal=psvmc/hadoop@HADOOP.COM

其中

principal:

  • hive 用户名
  • hostname:主机名,也可以理解为组
  • PSVMC.CN:realms和krb5.conf文件里一致即可

工具类

代码语言:javascript复制
import com.gientech.schedule.config.KerberosConnect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.sql.*;
import java.util.*;
 
public class HiveUtils {
    private static Logger logger = LoggerFactory.getLogger(HiveUtils.class.getName());
 
    private static String driverName = "org.apache.hive.jdbc.HiveDriver";
    private static String url = "jdbc:hive2://192.168.7.101:10000/zdb;principal=psvmc/hadoop@HADOOP.COM";//端口默认10000
 
    /**
     * 获取Connection
     * @return conn
     * @throws SQLException
     * @throws ClassNotFoundException
     */
 
    public static Connection getConnection() throws SQLException {
        Connection conn = null;
        try {
            KerberosAuth.init();
            conn = DriverManager.getConnection(url);
        } catch (SQLException e) {
            logger.info("获取数据库连接失败!");
            throw e;
        }
        return conn;
    }
 
    // 创建数据库
    public static void createDatabase(String databaseName) throws Exception {
        String sql = "create database " databaseName;
        logger.info("Running: "   sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        stmt.execute(sql);
        closeConnection(conn);
        closeStatement(stmt);
    }
 
    // 查询所有数据库
    public static void showDatabases() throws Exception {
        String sql = "show databases";
        logger.info("Running: "   sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery(sql);
        while (rs.next()) {
            logger.info(rs.getString(1));
        }
        closeConnection(rs,stmt,conn);
    }
 
    /**
     * 创建表(分割符为“,”)
     * 如create table tableName(name string,sex string) row format delimited fields terminated by ','
     * @param sql
     * @throws Exception
     */
    public static void createTable(String sql) throws Exception {
        logger.info("Running: "   sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        stmt.execute(sql);
        closeConnection(conn);
        closeStatement(stmt);
    }
 
    // 查询所有表
    public static void showTables() throws Exception {
        String sql = "show tables";
        logger.info("Running: "   sql);
        getConnection();
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery(sql);
        while (rs.next()) {
            logger.info(rs.getString(1));
        }
        closeConnection(rs,stmt,conn);
    }
 
    // 查看表结构
    public static void descTable(String tableName) throws Exception {
        String sql = "desc formatted " tableName;
        logger.info("Running: "   sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery(sql);
        while (rs.next()) {
            logger.info(rs.getString(1)   "t"   rs.getString(2));
        }
        closeConnection(rs,stmt,conn);
    }
 
    // 加载数据(请确保文件权限)
    public static void loadData(String filePath,String tableName) throws Exception {
        String sql = "load data inpath '"   filePath   "' into table tableName";
        logger.info("Running: "   sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        stmt.execute(sql);
        closeConnection(conn);
        closeStatement(stmt);
    }
 
    // 查询数据
    public static void selectData(String sql) throws Exception {
        logger.info("Running: "   sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        ResultSet rs = stmt.executeQuery(sql);
        rs = stmt.executeQuery(sql);
        while (rs.next()) {
            logger.info(rs.getString(1));
        }
        closeConnection(rs,stmt,conn);
    }
 
    // 删除数据库
    public static void dropDatabase(String databaseName) throws Exception {
        String sql = "drop database if exists " databaseName;
        logger.info("Running: "   sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        stmt.execute(sql);
        closeConnection(conn);
        closeStatement(stmt);
    }
 
    // 删除数据库表
    public static void deopTable(String tableName) throws Exception {
        String sql = "drop table if exists " tableName;
        logger.info("Running: "   sql);
        Connection conn = getConnection();
        Statement stmt = conn.createStatement();
        stmt.execute(sql);
        closeConnection(conn);
        closeStatement(stmt);
    }
 
 
    public static Map<String,Object> queryMapBySql(String sql){
        //定义数据库连接
        Connection conn = null;
        //定义PreparedStatement对象
        PreparedStatement ps = null;
        //定义查询的结果集
        ResultSet rs = null;
        try {
            conn = getConnection();
            //定义执行的sql语句
            ps = conn.prepareStatement(sql);
            rs = ps.executeQuery();
            return getMapFromResultSet(rs);
        } catch (Exception e) {
            logger.info("queryDataListBySql" e.getMessage());
        }finally {
            closeConnection(rs,ps,conn);
        }
        return Collections.emptyMap();
    }
 
    /**
     * 关闭ResultSet、Statement、Connection
     *
     * @param rs
     * @param stmt
     * @param con
     */
 
    public static void closeConnection(ResultSet rs, Statement stmt, Connection con) {
        closeResultSet(rs);
        closeStatement(stmt);
        closeConnection(con);
    }
 
    /**
     * 关闭ResultSet
     *
     * @param rs
     */
 
    public static void closeResultSet(ResultSet rs) {
        if (rs != null) {
            try {
                rs.close();
            } catch (SQLException e) {
                logger.info(e.getMessage());
            }
        }
    }
 
    /**
     * 关闭Statement
     *
     * @param stmt
     */
 
    public static void closeStatement(Statement stmt) {
        if (stmt != null) {
            try {
                stmt.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
        }
    }
 
    /**
     * 关闭Connection
     *
     * @param con
     */
 
    public static void closeConnection(Connection con) {
        if (con != null) {
            try {
                con.close();
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
        }
    }
 
    /**
     * 将resultset结果转为sonObject
     * @param rs ResultSet
     * @return List
     * @throws SQLException 异常
     */
    public static Map<String,Object> getMapFromResultSet(ResultSet rs)
            throws SQLException {
        Map<String,Object> hm = new HashMap();
        ResultSetMetaData rsmd = rs.getMetaData();
        int count = rsmd.getColumnCount();// 获取列的数量
        while(rs.next()) {
            for (int i = 1; i <= count; i  ) {
                String key = rsmd.getColumnLabel(i);
                Object value = rs.getObject(i);
                hm.put(key, value);
            }
        }
        return hm;
    }
 
    public static List<Map<String,Object>> queryListBySql(String sql){
        //定义数据库连接
        Connection conn = null;
        //定义PreparedStatement对象
        PreparedStatement ps = null;
        //定义查询的结果集
        ResultSet rs = null;
        try {
            conn = getConnection();
            //定义执行的sql语句
            ps = conn.prepareStatement(sql);
            rs = ps.executeQuery();
            return getListFromResultSet(rs);
        } catch (Exception e) {
            logger.info("queryDataListBySql" e.getMessage());
        }finally {
            closeConnection(rs,ps,conn);
        }
        return Collections.emptyList();
    }
 
    /**
     * 将resultset结果转为list
     * @param rs ResultSet
     * @return List
     * @throws SQLException 异常
     */
    private static List<Map<String,Object>> getListFromResultSet(ResultSet rs)
            throws SQLException {
        List<Map<String,Object>> results= new ArrayList<>();//结果数据
        ResultSetMetaData metaData = rs.getMetaData(); // 获得列的结果
        List<String> colNameList= new ArrayList<>();
        int cols_len = metaData.getColumnCount(); // 获取总的列数
        for (int i = 0; i < cols_len; i  ) {
            colNameList.add(metaData.getColumnName(i 1));
        }
        while (rs.next()) {
            Map<String, Object> map= new HashMap<>();
            for(int i=0;i<cols_len;i  ){
                String key=colNameList.get(i);
                Object value=rs.getString(colNameList.get(i));
                map.put(key, value);
            }
            results.add(map);
        }
        return results;
    }
 
    public static void main(String[] args) throws Exception {
        String sql = "SELECT * FROM `t1` LIMIT 1";
        List<Map<String, Object>> maps = queryListBySql(sql);
        logger.info(maps.toString());
    }
}

0 人点赞