导语:Spark 为结构化数据处理引入了一个称为 Spark SQL 的编程模块。它提供了一个称为 DataFrame 的编程抽象,并且可以充当分布式 SQL 查询引擎。
一、使用SparkSQL交互式控制台操作hive
在使用 SparkSQL 之前请登录 EMR 集群的 Master 节点。登录 EMR 的方式请参考 登录 Linux 实例。这里我们可以选择使用 WebShell 登录。单击对应云服务器右侧的登录,进入登录界面,用户名默认为 root,密码为创建 EMR 时用户自己输入的密码。输入正确后,即可进入 EMR 命令行界面。
由于hive的超级用户是hadoop,所以 EMR 命令行先使用以下指令切换到 Hadoop 用户:
代码语言:javascript复制[root@172 ~]# su Hadoop
通过如下命令您可以进入 SparkSQL 的交互式控制台:
代码语言:javascript复制[hadoop@10~]$ spark-sql --master yarn --num-executors 64 --executor-memory 2g
其中 --master 表示您的 master URL,--num-executors 表示 executor 数量,--executor-memory 表示 executor 的储存容量。
下面介绍一些 SparkSQL 的基本操作。
新建一个数据库并查看:
代码语言:javascript复制 spark-sql> create database sparksql;
Time taken: 0.907 seconds
spark-sql> show databases;
default
sparksqltest
Time taken: 0.131 seconds, Fetched 5 row(s)
在新建的数据库中新建一个表,并进行查看:
代码语言:javascript复制 spark-sql> use sparksql;
Time taken: 0.076 seconds
spark-sql> create table sparksql_test(a int,b string);
Time taken: 0.374 second
sspark-sql> show tables;
sparksql_test
falseTime taken: 0.12 seconds, Fetched 1 row(s)
向表中插入两行数据并查看:
代码语言:javascript复制 spark-sql> insert into sparksql_test values (42,'hello'),(48,'world');
Time taken: 2.641 seconds
spark-sql> select * from sparksql_test;
42 hello
48 world
Time taken: 0.503 seconds, Fetched 2 row(s)
二、创建Maven工程操作Hive
1、新建一个maven工程
2、添加如下pom依赖
代码语言:javascript复制<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
继续在pom添加打包和编译插件
代码语言:javascript复制<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>utf-8</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3、新建一个demo进行测试
代码语言:javascript复制public static void main(String[] args){
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.enableHiveSupport()
.getOrCreate();
Dataset<Row> df = spark.read().json(args[0]);
RDD<Row> test = df.rdd();
test.saveAsTextFile(args[1]);
}
4、将工程进行编译打包
5、将jar包移动到集群的master节点
6、通过spark-submit提交任务
代码语言:javascript复制 [hadoop@10 /]$ spark-submit --class Demo --master yarn-client tyyz-emr-1.0-SNAPSHOT.jar /user/hadoop/people.json /user/hadoop/result
7、结果如下:
代码语言:javascript复制 [hadoop@172 spark]$ hadoop fs -cat /user/hadoop/$output/part-00000
[null,Michael]
[30,Andy]
[19,Justin]
附spark-submit参数提交详解:
命令行参数 | Spark 属性 | 环境变量 | 描述 | 默认值 |
---|---|---|---|---|
action | 默认动作,提交任务 | SUBMIT | ||
--master | spark.master | spark 地址 | ||
--archives | 归档文件列表,会被解压到每个executor的工作目录 | |||
--depoly-mode | spark.submit.deployMode | DELOY_MODE | 在client (提交机侧)或者 cluster (在集群中) 上运行driver程序 | client |
--class | Java/Scala 程序的主类,main class | |||
--files | spark.files | 使用逗号分隔的每个executor运行时需要的文件列表, 逗号分隔 | ||
--jars | spark.jars | 作业执行过程中使用到的其他jar,可以使用逗号分隔添加多个jar | ||
--keytab | spark.yarn.keytab | 包含keytab文件的全路径。 | ||
--kill | kill 指定的driver | |||
--name | spark.app.name | 程序名称 | ||
--packages | spark.jars.packages | 从maven添加作业执行过程中使用到的包,查找顺序先本地仓库再远程仓库。可以添加多个,每个的格式为:groupId:artifactId:version | ||
--exclude-packages | 需要排除的包,可以为多个,使用逗号分隔。 避免与--packages 中的冲突 | |||
--respositories | 远程仓库。可以添加多个,逗号分隔。 | |||
--properties-file | spark.yarn.principal | 配置文件 | conf/default.spark | |
--conf | 其他额外的spark配置属性。 | |||
--py-files | 使用逗号分隔的放置在python应用程序PYTHONPATH 上的.zip, .egg, .py的文件列表。 | |||
--status | ||||
--supervise | ||||
--total-executor-cores | spark.cores.max | |||
--verbose | 打印debug 信息 | |||
--version | 打印当前Spark版本 | |||
--usage-error | ||||
--driver-class-path | spark.driver.extraClassPath | 作用于driver的额外类路径,使用–jar时会自动添加路径。 | ||
--driver-library-path | spark.driver.extraLibraryPath | 作用于driver的外部lib包 | ||
--driver-java-options | spark.driver.extraJavaOptions | 作用于driver的额外java配置 | ||
--driver-memory | spark.driver.memory | SPAKR_DRIVER_MEMORY | driver程序所需内存量 | |
--driver-cores | spark.driver.cores | SPARK_DRIVER_CORES | driver程序所需CORES数量 | 默认值1 |
--executor-cores | spark.executor.cores | exeutor 程序所需CORES数量 | 默认值1 | |
--executor-memory | spark.executor.memory | executor 程序所需内存数量 | ||
--queue | 设置提交到yarn的队列名称 | 默认值 default | ||
--num-executors | spark.executor.instances | 所需executors的数量 | 默认值2 |