Spark On Hive
Spark SQL模块从发展来说,从Apache Hive框架而来,发展历程:Hive(MapReduce)-> Shark (Hive on Spark) -> Spark SQL(SchemaRDD -> DataFrame -> Dataset),所以SparkSQL天然无缝集成Hive,可以加载Hive表数据进行分析。
http://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html
spark-sql中集成Hive
SparkSQL集成Hive本质就是:读取Hive框架元数据MetaStore,此处启动Hive MetaStore服务即可。
nohup /export/server/hive/bin/hive --service metastore &
编写配置文件hive-site.xml,并放于node1的【$SPARK_HOME/conf】目录
cd /export/server/spark/conf/
vim hive-site.xml
代码语言:javascript复制<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://node3:9083</value>
</property>
</configuration>
也可以将hive-site.xml分发到集群中所有Spark的conf目录,此时任意机器启动应用都可以访问Hive表数据。
使用sparksql操作hive
/export/server/spark/bin/spark-sql --master local[2] --conf spark.sql.shuffle.partitions=2
show database;
show tables;
CREATE TABLE person (id int, name string, age int) row format delimited fields terminated by ' ';
LOAD DATA LOCAL INPATH 'file:///root/person.txt' INTO TABLE person;
show tables;
select * from person;
Spark代码中集成Hive
在IDEA中开发应用,集成Hive,读取表的数据进行分析,构建SparkSession时需要设置HiveMetaStore服务器地址及集成Hive选项,首先添加MAVEN依赖包:
代码语言:javascript复制<!--SparkSQL Hive依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
范例演示代码如下:
代码语言:javascript复制package cn.it.sql
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
/**
* SparkSQL集成Hive
*/
object SparkSQLHive {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[*]")
.config("spark.sql.shuffle.partitions", "4")
.config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://node3:9083")
.enableHiveSupport()//开启hive语法的支持
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
import org.apache.spark.sql.functions._
//查看有哪些表
spark.sql("show tables").show()
//创建表
spark.sql("CREATE TABLE person2 (id int, name string, age int) row format delimited fields terminated by ' '")
//加载数据
spark.sql("LOAD DATA LOCAL INPATH 'file:///D:/person.txt' INTO TABLE person2")
//查看有哪些表
spark.sql("show tables").show()
//查询数据
spark.sql("select * from person2").show()
}
}