Spark SQL 快速入门系列(8) | | Hive与Spark SQL的读写操作

2020-10-28 17:41:47 浏览数 (1)

Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQ L编译时可以包含 Hive 支持,也可以不包含。

  包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF (用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的一点是,如果要在 Spark SQL 中包含Hive 的库,并不需要事先安装 Hive。一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。

  若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到 Spark的配置文件目录中($SPARK_HOME/conf)。即使没有部署好 Hive,Spark SQL 也可以运行。 需要注意的是,如果你没有部署好Hive,Spark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。

一. 使用内嵌的 Hive

  如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可.

  Hive 的元数据存储在 derby 中, 仓库地址:$SPARK_HOME/spark-warehouse

  然而在实际使用中, 几乎没有任何人会使用内置的 Hive

二. 使用外置的 Hive

2.1 准备工作

  • 1. Spark 要接管 Hive 需要把 hive-site.xml copy 到conf/目录下.
代码语言:javascript复制
[bigdata@hadoop002 conf]$ cp /opt/module/hive/conf/hive-site.xml hive-site.xml

// 内容如下
<configuration>
        <property>
                  				<name>javax.jdo.option.ConnectionURL</name>
                  <value>jdbc:mysql://hadoop002:3306/metastore?createDatabaseIfNotExist=true</value>
                  <description>JDBC connect string for a JDBC metastore</description>
        </property>

        <property>
                  <name>javax.jdo.option.ConnectionDriverName</name>
                     <value>com.mysql.jdbc.Driver</value>
                      <description>Driver class name for a JDBC metastore</description>
         </property>

        <property>
                  <name>javax.jdo.option.ConnectionUserName</name>
                       <value>root</value>
                    <description>username to use against metastore database</description>
         </property>

        <property>
                  <name>javax.jdo.option.ConnectionPassword</name>
                   <value>199712</value>
                   <description>password to use against metastore database</description>
        </property>
        <property>
                      <name>hive.zookeeper.quorum</name>
                      <value>hadoop002,hadoop003,hadoop004</value>
   <description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
        </property>
        <property>
                      <name>hive.zookeeper.client.port</name>
                      <value>2181</value>
<description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
        </property>

</configuration>
  • 2. 把 Mysql 的驱动 copy 到 jars/目录下.
代码语言:javascript复制
[bigdata@hadoop002 jars]$ cp /opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar ./
  • 3. 如果访问不到hdfs, 则需要把core-site.xml和hdfs-site.xml 拷贝到conf/目录下.
代码语言:javascript复制
[bigdata@hadoop002 conf]$ cp /opt/module/hadoop-2.7.2/etc/hadoop/core-site.xml  ./
[bigdata@hadoop002 conf]$ cp /opt/module/hadoop-2.7.2/etc/hadoop/hdfs-site.xml  ./

2.2 启动 spark-shell

  • 1. 查看默认的数据仓库
代码语言:javascript复制
scala> spark.sql("show tables").show
  • 2. 查看某个数据库
代码语言:javascript复制
scala> spark.sql("select * from emp").show

//  显示100行
scala> spark.sql("select * from emp").show(100)

//  显示截断的内容
scala> spark.sql("select * from emp").show(100,false)
  • 3. 测试聚合(看运行速度相比hive而言)
代码语言:javascript复制
scala> spark.sql("select count(*) from emp").show(100,false)

// 退出
scala> :q

2.2 启动 spark-sql

  在spark-shell执行 hive 方面的查询比较麻烦.spark.sql("").show

  Spark 专门给我们提供了书写 HiveQL 的工具: spark-sql

代码语言:javascript复制
// 一般用于测试学习
[bigdata@hadoop002 spark]$ bin/spark-sql 

spark-sql> select count(*) from emp;

2.3 使用hiveserver2 beeline

  spark-sql 得到的结果不够友好, 所以可以使用hiveserver2 beeline

  • 1. 启动 thrift服务器
代码语言:javascript复制
[bigdata@hadoop002 spark]$ sbin/start-thriftserver.sh 
  • 2. 启动beeline客户端
代码语言:javascript复制
[bigdata@hadoop002 spark]$ bin/beeline 
# 然后输入
!connect jdbc:hive2://hadoop002:10000
# 然后按照提示输入用户名和密码
  • 3. 运行

三. 在代码中访问 Hive

  • 1. 拷贝 hive-site.xml 到 resources 目录下
  • 2. 添加依赖
代码语言:javascript复制
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

3.1 从hive中读数据

  • 1. 源码
代码语言:javascript复制
package com.buwenbuhuo.spark.sql.day02.hive

import org.apache.spark.sql.SparkSession

/**
 **
 *
 * @author 不温卜火
 *         *
 * @create 2020-08-05 14:19
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object HiveRead {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("HiveRead")
      // 添加支持外部hive
      .enableHiveSupport()
      .getOrCreate()
    import spark.implicits._

    spark.sql("show databases")
    spark.sql("select * from emp").show

    spark.close()

  }

}
  • 2. 运行结果

3.2 从hive中写数据

3.2.1 使用hive的insert语句去写

3.2.1.1 写入数据(默认保存到本地)
  • 1.源码
代码语言:javascript复制
package com.buwenbuhuo.spark.sql.day02.hive

import org.apache.spark.sql.SparkSession

/**
 **
 *
 * @author 不温卜火
 *         *
 * @create 2020-08-05 14:26
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object HiveWrite {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("HiveWrite")
      // 添加支持外部hive
      .enableHiveSupport()
      .getOrCreate()
    import spark.implicits._
    // 先创建一个数据库
    // 创建一次就行否则会报错
//    spark.sql("create database spark0805").show
    spark.sql("use spark0805")
//    spark.sql("create table user1(id int,name string)").show
    spark.sql("insert into user1 values(10,'lisi')").show


  }
}
  • 2. 运行结果
  • 3. 插入结果并没有在hive中,而在本地中(默认情况下创建的数据是在本地)
3.2.1.2 通过参数修改数据库仓库的地址
  • 1. 源码
代码语言:javascript复制
package com.buwenbuhuo.spark.sql.day02.hive

import org.apache.spark.sql.SparkSession

/**
 **
 *
 * @author 不温卜火
 *         *
 * @create 2020-08-05 14:26
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object HiveWrite {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","bigdata")
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("HiveWrite")
      // 添加支持外部hive
      .enableHiveSupport()
      .config("spark.sql.warehouse.dir", "hdfs://hadoop002:9000/user/hive/warehouse")
      .getOrCreate()
    import spark.implicits._
    // 先创建一个数据库
    // 创建一次就行否则会报错
    spark.sql("create database spark0806").show
    spark.sql("use spark0806")
    spark.sql("create table user1(id int,name string)").show
    spark.sql("insert into user1 values(10,'lisi')").show

  }
}
  • 2. 运行结果
  • 3. 在web端查看结果

3.2.2 df.svaeAsTable(" ")

  • 源码
代码语言:javascript复制
package com.buwenbuhuo.spark.sql.day02.hive

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 **
 *
 * @author 不温卜火
 *         *
 * @create 2020-08-05 14:26
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object HiveWrite {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","bigdata")
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("HiveWrite")
      // 添加支持外部hive
      .enableHiveSupport()
      .config("spark.sql.warehouse.dir", "hdfs://hadoop002:9000/user/hive/warehouse")
      .getOrCreate()
    import spark.implicits._
    val df: DataFrame = spark.read.json("d:/users.json")
    spark.sql("user spark1016")
    // 可以把数据写入到hive中,表可以存着也可以不存在
    df.write.mode("append").saveAsTable("user2")
    spark.close()
    
    }
  }

3.2.3 df.svaeAsTable(" ")

  • 源码
代码语言:javascript复制
package com.buwenbuhuo.spark.sql.day02.hive

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 **
 *
 * @author 不温卜火
 *         *
 * @create 2020-08-05 14:26
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object HiveWrite {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","bigdata")
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("HiveWrite")
      // 添加支持外部hive
      .enableHiveSupport()
      .config("spark.sql.warehouse.dir", "hdfs://hadoop002:9000/user/hive/warehouse")
      .getOrCreate()
    import spark.implicits._
    val df: DataFrame = spark.read.json("d:/users.json")
    spark.sql("user spark1016")
    df.write.insertInto("user2") // 基本等价于mode("append").saveAsTable("user2")

    spark.close()
    
    }
  }

  本次的分享就到这里了

0 人点赞