Spark——底层操作RDD,基于内存处理数据的计算引擎

2020-07-27 14:26:21 浏览数 (1)

第一章 是什么

一 介绍

简介

Apache Spark是一个快速的通用集群计算框架 / 殷勤。它提供Java,Scala,Python和R中的高级API,以及支持常规执行图的优化引擎。它还支持一组丰富的更高级别的工具,包括Spark SQL用于SQL和结构化数据的处理,MLlib机器学习,GraphX用于图形处理和Spark Streaming. 。

作为Apache的顶级项目之一, 它的官网为 http://spark.apache.org

特点

  • 快速 运行工作负载的速度提高了100倍。 Apache Spark使用最先进的DAG调度程序,查询优化器和物理执行引擎,为批处理数据和流数据提供了高性能。
  • 易用 使用Java,Scala,Python,R和SQL快速编写应用程序。 Spark提供了80多个算子,可轻松构建并行应用程序。我们可以 从Scala,Python,R和SQL Shell 交互使用它。
  • 通用 结合使用SQL,流和复杂的分析。 Spark拥有一系列强大的库,包括 SQL和DataFrames,MLlib机器学习, GraphX和Spark Streaming。我们可以在同一应用程序中无缝组合这些库。
  • 到处运行 Spark可在Hadoop,Apache Mesos,Kubernetes,Standalone或云服务器中运行。它可以访问各种数据源。 我们可以在EC2,Hadoop YARN,Mesos或Kubernetes上使用其独立集群模式运行Spark 。访问HDFS, Alluxio, Apache Cassandra, Apache HBase, Apache Hive和数百种其他数据源中的数据。

The Berkeley Data Analytics Stack

回顾hadoop

二 Spark与MapReduce的区别

  • 都是分布式计算框架-
  • Spark基于内存,MR基于HDFS。
  • Spark处理数据的能力一般是MR的十倍以上,Spark中除了基于内存计算外,还有DAG有向无环图来切分任务的执行先后顺序。

三 Spark运行模式

  • Local 多用于本地测试,如在eclipse,idea中写程序测试等。
  • Standalone Standalone是Spark自带的一个资源调度框架,它支持完全分布式。
  • Yarn Hadoop生态圈里面的一个资源调度框架,Spark也是可以基于Yarn来计算的。
  • Mesos 资源调度框架。

要基于Yarn来进行资源调度,必须实现AppalicationMaster接口,Spark实现了这个接口,所以可以基于Yarn。

四 Spark Core

Spark RDD

面试如果问Spark的RDD 我们可以介绍RDD的五大特性以及相关注意实现

RDD Lineage

可以将下图中的每一个猴看成是一个RDD

RDD Lineage依靠他们之间的依赖关系形成了一个有向无环图DAG

但在复杂的逻辑中, 可能是多条lineage组成一个DAG

Spark任务执行原理

  • Driver: 发送task, 资源回收. 如果task的计算结果非常大就不要回收了。会造成oom。
  • Worker: 资源管理从节点
  • Master: 资源管理主节点, 管理多个Worker

注意:

Spark的Driver如果回收多个Worker可能会出现OOM问题

OOM问题 ; out of memery 内存溢出

这些角色都是以JVM进程形式存在

Spark代码流程

  1. 创建SparkConf对象 可以设置Application name。 可以设置运行模式及资源需求。
  2. 创建SparkContext对象
  3. 基于Spark的上下文创建一个RDD,对RDD进行处理。
  4. 应用程序中要有Action类算子来触发Transformation类算子执行。
  5. 关闭Spark上下文对象SparkContext。

Spark 中的算子

  • transformation算子: 懒执行, 也叫转换算子 例如我们的wordcount计算程序中的flatMap, 我们可以在匿名内部类中打印一句话, 看看能否执行. 然后将最后的 foreach注释看看能否执行.由此可以知道 flatMap是transformation算子. foreach是action算子
  • action算子 :也叫行动算子,触发Transformation算子执行,一个spark application中有一个action算子就有一个job
  • 持久化算子: 也叫控制算子, 控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

注意:

分布式文件系统(File system)–加载RDD

transformations延迟执行–针对RDD的操作 , 是某一类算子(函数)

Action触发执行 , Action也是一类算子(函数)

Transformations 转换算子

Transformations类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。

Transformation类算子

  • filter 过滤符合条件的记录数,true保留,false过滤掉。
  • map 将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。 特点:输入一条,输出一条数据
  • flatMap 先map后flat。与map类似,每个输入项可以映射为0到多个输出项( 1对多 )。
  • sample 随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。
  • reduceByKey 将相同的Key根据相应的逻辑进行处理。
  • sortByKey/sortBy 作用在K,V格式的RDD上,对key进行升序或者降序排序。可选参数: false:降序, true:默认,升序
  • join,leftOuterJoin,rightOuterJoin,fullOuterJoin 作用在K,V格式的RDD上。根据K进行连接,对(K,V)join(K,W)返回(K,(V,W)) join后的分区数与父RDD中分区数多的那一个相同。
  • union 合并两个数据集。两个数据集的类型要一致。 返回新的RDD的分区数是合并RDD分区数的总和。
  • intersection 取两个数据集的交集,返回新的RDD与父RDD分区多的一致
  • subtract 取两个数据集的差集,结果RDD的分区数与subtract前面的RDD的分区数一致。
  • mapPartitions 与map类似,遍历的单位是每个partition上的数据。
  • distinct(map reduceByKey map) 去重
  • cogroup 当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>))子RDD的分区与父RDD多的一致。
  • mapPartitionWithIndex 类似于mapPartitions,除此之外还会携带分区的索引值。
  • repartition 增加或减少分区。会产生shuffle。(多个分区分到一个分区不会产生shuffle
  • coalesce coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。 true为产生shuffle,false不产生shuffle。默认是false。 如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用,如果设置成true,效果和repartition一样。即repartition(numPartitions) = coalesce(numPartitions,true)
  • groupByKey 作用在K,V格式的RDD上。根据Key进行分组。作用在(K,V),返回(K,Iterable <V>)。
  • zip 将两个RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的每个分区元素个数必须相同。
  • zipWithIndex 该函数将RDD中的元素和这个元素在RDD中的索引号(从0开始)组合成(K,V)对。
Action行动算子

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

Action类算子

  • count 返回数据集中的元素数。会在结果计算完成后回收到Driver端。
  • take(n) 返回一个包含数据集前n个元素的集合。
  • first first=take(1),返回数据集中的第一个元素。
  • foreach 循环遍历数据集中的每个元素,运行相应的逻辑。
  • collect 将计算结果回收到Driver端。
  • foreachPartition 遍历的数据是每个partition的数据。
  • countByKey 作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。
  • countByValue 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
  • reduce 根据聚合逻辑聚合数据集中的每个元素。
控制算子

控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

  • cache 默认将RDD的数据持久化到内存中。cache是懒执行。 注意:chche () = persist()=persist(StorageLevel.Memory_Only) 测试cache文件: 测试代码: /** * 验证控制算子cache :默认将RDD的数据持久化到内存中。cache是懒执行。 * Author TimePause * Create 2019-12-13 19:55 */ object CacheTest { def main(args: ArrayString): Unit = { val conf = new SparkConf().setMaster("local").setAppName("cache") val sc = new SparkContext(conf) val lines = sc.textFile("./data/persistData.txt") // val lines1: lines.type = lines.cache() val lines1 = lines.persist(StorageLevel.MEMORY_ONLY) val t1 = System.currentTimeMillis() val l: Long = lines1.count() val t2 = System.currentTimeMillis() println("第一次count:" l s",time=${t2-t1}") //用时5833ms val t3 = System.currentTimeMillis() val l1 = lines1.count() val t4 = System.currentTimeMillis() println("第二次count:" l1 s",time=${t4-t3}") //用时183ms, 由此可见基于内存速度要快很多 } }
  • persist: 可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。 持久化级别如下:

cache() : 默认将数据存在内存中

且 cache() = persist() = persist(StorageLevel.MEMORY_ONLY)

persist(): 可以手动指定持久化的级别, 我们经常使用的persist级别(推荐程度由高到低)

代码语言:txt复制
MEMORY_ONLY

MEMORY_ONLY_SER

MEMORY_AND_DISK_SER

MEMORY_AND_DISK

尽量避免使用“_2” 和 “DISK_ONLY” 级别

cache和persist的注意事项:

  1. cache和persist都是懒执行,必须有一个action类算子触发执行。
  2. cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
  3. cache和persist算子后不能立即紧跟action算子。
  4. cache和persist算子持久化的数据当applilcation执行完成之后会被清除。 错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。
  5. checkpoint checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。

checkpoint 的执行原理:

  1. 当RDD的job执行完毕后,会从finalRDD从后往前回溯。
  2. 当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
  3. Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。 优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。 使用:
代码语言:javascript复制
/**
 * 验证控制算子checkPoint
 * Author TimePause
 * Create  2019-12-13 19:55
 */
object CheckPointTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("cache")
    val sc = new SparkContext(conf)
    sc.setCheckpointDir("./ck/word.txt")
    val lines = sc.textFile("./data/word.txt")

    lines.checkpoint()
    lines.count()
  }

}

运行结果

第二章 安装

一 搭建Standalone集群

1).Spark官网下载安装包,解压

2).进入安装包的conf目录下,复制slaves.template文件,复制后名称为slaves

添加从节点(worker)名称。保存。

3).复制spark-env.sh.template文件,复制后名称为spark-env.sh

修改spark-env.sh

代码语言:javascript复制
JAVA_HOME jdk所在目录
SPARK_MASTER_IP:master的ip
SPARK_MASTER_PORT:提交任务的端口,默认是7077
SPARK_WORKER_CORES:每个worker从节点能够支配的core的个数
SPARK_WORKER_MEMORY:每个worker从节点能够支配的内存数

4).同步到其他节点上

5).启动集群

进入sbin目录下,执行当前目录下的./start-all.sh

6)访问集群的图形化界面, 默认为8080

二 提交任务

这是使用的jar是Spark自带的一个jar, 用于计算圆周率, 无需自己手动编写, 执行运行即可, 在此用于测试Spark能否正常提交任务undefined

如何提交任务

方法一

通过bin,目录下的 spark-submit来提交(在那一个节点都可以,命令都如下,不会改变)

代码语言:javascript复制
# ./spark-submit --master spark提交任务的ip和端口 提交的jar的全限定路径 提交的jar的名称 运行jar/任务的task数(图1)
./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

图1

方法二

因此如果我们使用方法二, 会在任务提交时一直占用当前shell以及网卡资源,为了消除这个影响我们选择方法二

将spark安装包原封不动的拷贝到一个新的节点上,然后,在新的节点上提交任务即可。

在bin目录下, 命令依旧如下

代码语言:javascript复制
# ./spark-submit --master spark提交任务的ip和端口 提交的jar的全限定路径 提交的jar的名称 运行jar/任务的task数(图1)
./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

这样占用的就不是Spark集群节点的网卡和shell, 而是无关的节点的相关资源了

Spark提交任务的jar包管理问题的解决

1) Spark基于Standalone模式提交任务

基于Standalone每次提交任务时,都会在Worker节点Spark安装目录的/work目录下生成一个命名为app-xxx-xxx的目录,这个目录下存放程序运行时所需的依赖的jar包。每次提交任务都会在这个work目录下生成一个application目录且不会自动清理。如果时间长了就有可能占用大量的磁盘空间。

清理:可以在worker节点的Spark-env.sh中配置如下参数,定期清理work目录。

代码语言:javascript复制
export SPARK_WORKER_OPTS="  
-Dspark.worker.cleanup.enabled=true   #是否开启自动清理
-Dspark.worker.cleanup.interval=1800     #每隔多长时间清理一次,单位s
-Dspark.worker.cleanup.appDataTtl=604800"  # 保留最近多长时间的数据,单位s

以上参数中:

spark.worker.cleanupenabled=true 只有运行完成的application才会被清理。

spark.worker.cleanup.interval 清理周期,单位s,默认值为30分钟。

spark.worker.clearnup.appDataTtl 保存多长时间的数据,单位s,默认是一周

  • standalone-client模式: 这种模式提交任务时,以下方式可以解决jar包依赖问题:
  • 可以将依赖的jar包全部打入一个jar包中,直接提交任务,jar包相对大。这个jar包会被复制到每个work节点的work目录app-xx-xx目录下。 ./spark-submit --master spark://node1:7077 --class com.bjsxt.spark.areaRoadFlow.AreaTop3RoadFlowAnalyze /root/test/TrafficProject-1.0-SNAPSHOT-jar-with-dependencies.jar
  1. 可以将依赖的jar包在提交任务时使用–jars 来指定,这种方式每次都会给每个work节点的worker目录中application-xx-xx路径复制一份依赖的jar包(提交的任务jar包也会被复制到这个路径下)。提交任务时,使用不含有依赖的jar包。 ./spark-submit --master spark://node1:7077 --jars /root/test/mysql-connector-java-5.1.47.jar,/root/test/fastjson-1.2.11.jar --class com.bjsxt.spark.areaRoadFlow.AreaTop3RoadFlowAnalyze /root/test/TrafficProject-1.0-SNAPSHOT.jar 4
  2. 提交任务时使用不含有依赖的jar包,将依赖的jar包放入每台worker节点的spark安装目录下jars目录下(如果客户端时单独一台,客户端这个目录下也要放jar包),这种模式不会将依赖的jar包复制到worker节点worke下application-xxx-xxx目录,class所在的提交jar包会在这个目录下: ./spark-submit --master spark://node1:7077 --class com.bjsxt.spark.areaRoadFlow.AreaTop3RoadFlowAnalyze /root/test/TrafficProject-1.0-SNAPSHOT.jar
  • standalone-cluster模式: cluster模式提交任务时,由于Driver是在worker节点中启动,会去worker节点路径中寻找class所在的jar包,这样需要将class所在的jar包在每个worker点中有一份,比较麻烦,最好将class所在的jar包上传到hdfs中的某个路径中,提交任务时指定hdfs路径即可。
  • 提交任务时将所有依赖包打入一个jar包,使用含有依赖的jar包,这个jar包会被复制到每台worker节点的worke目录app-xx-xx下。 ./spark-submit --master spark://node1:7077 --deploy-mode cluster --class com.bjsxt.spark.areaRoadFlow.AreaTop3RoadFlowAnalyze hdfs://node1:9000/spark/TrafficProject-1.0-SNAPSHOT-jar-with-dependencies.jar
  1. 提交任务时使用不含有依赖包的jar包,使用- -jars 来指定依赖的jar包。由于Driver会在worker节点启动,所以每台worker节点上都要含有所有- - jars指定的路径和包。-- jars 也可以指定hdfs中的路径,这样就不需要每台worker节点要含有 - -jars的路径和包,但是依赖的hdfs中的jar包会被复制到每台worker节点的work目录app-xx-xx下。 ./spark-submit --master spark://node1:7077 --deploy-mode cluster --jars hdfs://node1:9000/spark/fastjson-1.2.11.jar,hdfs://node1:9000/spark/mysql-connector-java-5.1.47.jar --class com.bjsxt.spark.areaRoadFlow.AreaTop3RoadFlowAnalyze hdfs://node1:9000/spark/TrafficProject-1.0-SNAPSHOT.jar
  2. 提交任务时使用不含有依赖包的jar包,将所有依赖的jar包在每台worker节点的spark安装目录下的jas目录中,class所在的jar包会被复制到每台worker节点的work目录app-xx-xx下,依赖的jar包不会被放在这个路径下。 ./spark-submit --master spark://node1:7077 --deploy-mode cluster --class com.bjsxt.spark.areaRoadFlow.AreaTop3RoadFlowAnalyze hdfs://node1:9000/spark/TrafficProject-1.0-SNAPSHOT.jar
2) Spark基于Yarn模式提交任务
  • 无论spark基于Yarn提交任务的client模式还是cluster模式,都会在HDFS中/user/${username}/.sparkStaging目录下产生一个application的目录,这个目录存放class所在的jar包以及- -jars 指定的依赖的jar包,客户端提交任务后,spark任务节点的spark安装目录jars目录下的所有jar包以__spark_libs__5619457283046496725.zip的形式上传到这个路径下。
  • 如果spark_home/jars目录下的jar包很多,会上传很久,导致任务执行很慢,可以通过在客户端spark_home/conf/spark-defaults.conf中配置spark.yarn.jars hdfs://node1:9000/sparkjars/* (sparkjars 目录需要在hdfs中创建,要配置访问权限755),将spark_home/jars下的所有jar包都上传到hdfs中sparkjars目录下,这样每次提交任务时,就不会从客户端的spark_home/jars下上传所有jar包,只是从hdfs中sparkjars下读取,速度会很快,省略了上传的过程。
  • 每运行一个application都会在sparkStaging路径下产生一个目录,这个路径默认是没有读取权限的,如果读取查看,可以使用命令:hdfs dfs -chmod -R 755 /user/root/.sparkStaging 来更改权限查看。当application运行完成,这个路径会自动删除,可以通过参数spark.yarn.preserve.staging.files false来配置,这个参数配置在Spark提交任务节点的spark安装目录下的conf/spark-defaults.conf中,默认为false,每次运行完成任务之后会自动清除,如果改成true,每次运行完成任务之后不会清除目录。

提交任务有依赖jar包时,有以下三种方式选择:

  1. 提交任务时,可以将所有依赖的jar包打入一个jar包,这样运行任务时,这个jar包会被上传到HDFS中/user/${username}/.sparkStaging/application-xxx下,如果jar包很大,上传速度慢。
  2. 提交任务时,不将所有的依赖jar包打入一个jar包,可以使用—jars来指定,但是每次指定的jar包会上传到hdfs中/user/${username}/.sparkStaging/application-xxx路径下,如果依赖jar包很多,上传会很慢,导致任务执行慢。
  3. 提交任务时,不将所有的依赖jar包打入一个jar包,将所有的依赖放入hdfs路径sparkjars中 ,这样提交任务时不需要指定- - jars,直接运行即可,默认在执行任务时,会将hdfs中sparkjars包下的所有包复制到/user/${username}/.sparkStaging/application-xxx下,省去上传的过程。 以上所有提交任务的命令,类似standalone 命令,只是master 为yarn 。

三 Standalone 模式两种提交任务方式

Standalone-client 提交任务方式

提交命令

代码语言:txt复制
真实提交时必须将这个命令修改成一行然后运行
代码语言:javascript复制
## 不指定参数
./spark-submit 
--master spark://node1:7077
--class org.apache.spark.examples.SparkPi 
../examples/jars/spark-examples_2.11-2.3.1.jar 100

./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100


## 添加一行参数--deploy-mode client 
./spark-submit 
--master spark://node1:7077 
--deploy-mode client 
--class org.apache.spark.examples.SparkPi 
../examples/jars/spark-examples_2.11-2.3.1.jar 100

./spark-submit --master spark://node1:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

执行原理图解

执行流程

在client节点启动Driver进程后,Driver向Master申请资源,向woker发送task,并接受worker的执行结果

  1. client模式提交任务后,会在客户端启动Driver进程。
  2. Driver会向Master申请启动Application启动的资源。
  3. 资源申请成功,Driver端将task发送到worker端执行。
  4. worker将task执行结果返回到Driver端。

总结

client模式适用于测试调试程序 。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。

在Driver端可以看到task执行的情况。生产环境下不能使用client模式,是因为:假设要提交100个application到集群运行,Driver每次都会在client端启动,那么就会导致客户端100次网卡流量暴增的问题。

Standalone-cluster 提交任务方式

提交命令

代码语言:txt复制
真实提交时必须将这个命令修改成一行然后运行
代码语言:javascript复制
./spark-submit 
--master spark://node1:7077 
--deploy-mode cluster  
--class org.apache.spark.examples.SparkPi 
../examples/jars/spark-examples_2.11-2.3.1.jar 100

./spark-submit --master spark://node1:7077 --deploy-mode cluster  --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100

执行原理图解

执行流程

在worker节点启动Driver进程后,Driver向Master申请资源,向woker发送task,并接受worker的执行结果

  1. cluster模式提交应用程序后,会向Master请求启动Driver.
  2. Master接受请求,随机在集群一台节点启动Driver进程。
  3. Driver启动后为当前的应用程序申请资源。
  4. Driver端发送task到worker节点上执行(节点可能不止一个)。
  5. worker将执行情况和执行结果返回给Driver端。

总结

cluster模式适合在生产模式(项目上线环境)使用, Driver进程是在集群某一台Worker上启动的,在客户端是无法查看task的执行情况(包括执行结果!!!)。假设要提交100个application到集群运行,每次Driver会随机在集群中某一台Worker上启动,那么这100次网卡流量暴增的问题就散布在集群上。

如何通过WEB UI界面查看结果?
  1. 重启一下spark集群(方便查找), 以集群命令运行,查看web ui,待Completed Applications 出现我们东西, 说明任务执行完成
  2. 找到对应的Completed Drivers ,点击蓝色连接, 如图1
  3. 在 Finished Drivers 中查看执行结果日志, 如图2
  4. 结果在输出的日志中, 需要仔细查看, 如图3

图1

图2

图3

四 Yarn模式两种提交任务方式

使用前提

需要有dhfs集群和yarn框架的支持, 但是无需启动 spark Standalone集群

使用前的步骤

  1. 启动Zookeeper集群 zKServer.sh start
  2. 启动hdfs集群 start-dfs.sh
  3. 启动yarn框架start-yarn.sh
  4. 启动resourccemanager yarn-daemon.sh start resourcemanager
  5. 修改用于提交任务Spark客户端的配置文件 spark-env.sh, 添加自己hadoop配置文件所在目录

yarn-client提交任务方式

注意: 下面三种方式效果相同, 但是在输入要整合成一行命令输入

代码语言:javascript复制
格式: 
./spark-submit --master yarn --class 类所在全限定路径 jar所在位置  task数量
举例: 
./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 100


# --master yarn–client
./spark-submit 
--master yarn–client
--class org.apache.spark.examples.SparkPi 
../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

# --deploy-mode  client 
./spark-submit 
--master yarn 
--deploy-mode  client 
--class org.apache.spark.examples.SparkPi 
../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

结果输出在当前命令行

执行原理图解

执行流程

RS接收Driver发送的资源请求, 在NM上启动AM, 接收AM启动成功后的资源请求, 分配给NM给AM, 启动Executor. Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端。

  1. 客户端提交一个Application,在客户端启动一个Driver进程。
  2. 应用程序启动后会向RS(ResourceManager)发送请求,启动AM(ApplicationMaster)的资源。
  3. RS收到请求,随机选择一台NM(NodeManager)启动AM。这里的NM相当于Standalone中的Worker节点。
  4. AM启动后,会向RS请求一批container资源,用于启动Executor.
  5. RS会找到一批NM返回给AM,用于启动Executor。
  6. AM会向NM发送命令启动Executor。
  7. Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端。

总结

Yarn-client模式同样是适用于测试 ,因为Driver运行在本地,Driver会与yarn集群中的Executor进行大量的通信,会造成客户机网卡流量的大量增加.

ApplicationMaster的作用:

  1. 为当前的Application申请资源
  2. 给NameNode发送消息启动Executor。 注意:ApplicationMaster有launchExecutor和申请资源的功能,并没有作业调度的功能。

yarn-cluster提交任务方式

提交命令

代码语言:javascript复制
格式
./spark-submit (--master yarn-cluster 或者--deploy-mode cluster ) --class 类的全限定路径 jar所在位置  task数目
举例
方式一
./spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.3.1.jar 1000

方式二
./spark-submit --master yarn --deploy-mode cluster ../examples/jars/spark-examples_2.11-2.3.1.jar 1000

停止集群任务命令:
yarn application -kill applicationID

执行原理图解

执行流程

RM接收客户端请求, 在NM上启动AM(相当于Driver), 接收AM请求, 返回AM一批NM节点 AM连接NM发送请求启动Executor, 接收Executor的反向注册, 最后发送任务到Executor

  1. 客户机提交Application应用程序,发送请求到RS(ResourceManager),请求启动AM(ApplicationMaster)。
  2. RS收到请求后随机在一台NM(NodeManager)上启动AM(相当于Driver端)。
  3. AM启动,AM发送请求到RS,请求一批container用于启动Executor。
  4. RS返回一批NM节点给AM。
  5. AM连接到NM,发送请求到NM启动Executor。
  6. Executor反向注册到AM所在的节点的Driver。Driver发送task到Executor。

总结

Yarn-Cluster主要用于生产环境中, 因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。

ApplicationMaster的作用:

  1. 为当前的Application申请资源
  2. 给NameNode发送消息启动Excutor。
  3. 任务调度。
如何通过WEB UI界面查看结果?

访问自己 ResourceManager所在节点的 8088端口, eg: http://node3:8088

  1. 任务正在执行时效果图
  1. 任务结束点击该id
  1. 查看该任务的 logs 日志文件
  1. 选择标准输出日志
  1. 结果就在日志内容中

第三章 宽窄依赖和资源任务调度

一 术语解释

简单关系图

二 窄依赖和宽依赖

RDD之间有一系列的依赖关系,依赖关系又分为窄依赖和宽依赖。

  • 窄依赖 父RDD和子RDD partition之间的关系是一对一 的。或者父RDD一个partition只对应一个子RDD的partition情况下的父RDD和子RDD partition关系是多对一的。不会有shuffle的产生。
  • 宽依赖 父RDD与子RDD partition之间的关系是一对多会有shuffle的产生

宽窄依赖图理解

三 Stage

Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务。然后将这些task以taskSet的形式提交给TaskScheduler运行。 stage是由一组并行的task组成。undefined

stage切割规则

切割规则:从后往前,遇到宽依赖就切割stage

stage计算模式

pipeline管道计算模式,pipeline只是一种计算思想,模式。

粗粒度资源申请和细粒度资源申请

粗粒度资源申请(Spark)

  • 在Application执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的task执行完成后,才会释放这部分资源。
  • 优点:在Application执行之前,所有的资源都申请完毕,每一个task直接使用资源就可以了,不需要task在执行前自己去申请资源,task启动就快了,task执行快了,stage执行就快了,job就快了,application执行就快了。
  • 缺点:直到最后一个task执行完成才会释放资源,集群的资源无法充分利用。

细粒度资源申请(MapReduce)

  • Application执行之前不需要先去申请资源,而是直接执行,让job中的每一个task在执行前自己去申请资源,task执行完成就释放资源。
  • 优点:集群的资源可以充分利用。
  • 缺点:task自己去申请资源,task启动变慢,Application的运行就相应的变慢了。

第四章Spark 源码分析与算法案例

一 Spark-Submit提交参数

  • –master MASTER_URL, 可以是spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local
  • –deploy-mode DEPLOY_MODE, Driver程序运行的地方,client或者cluster,默认是client。
  • –class CLASS_NAME, 主类名称,含包名
  • –jars 逗号分隔的本地JARS, Driver和executor依赖的第三方jar包
  • –files 用逗号隔开的文件列表,会放置在每个executor工作目录中
  • –conf spark的配置属性
  • –driver-memory Driver程序使用内存大小(例如:1000M,5G),默认1024M
  • –executor-memory 每个executor内存大小(如:1000M,2G),默认1G

Spark standalone with cluster deploy mode only:

  • –driver-cores Driver程序的使用core个数(默认为1),仅限于Spark standalone模式 Spark standalone or Mesos with cluster deploy mode only:
  • –supervise 失败后是否重启Driver,仅限于Spark alone或者Mesos模式

Spark standalone and Mesos only:

  • –total-executor-cores executor使用的总核数,仅限于SparkStandalone、Spark on Mesos模式

Spark standalone and YARN only:

  • –executor-cores 每个executor使用的core数,Spark on Yarn默认为1,standalone默认为worker上所有可用的core。

YARN-only:

  • –driver-cores driver使用的core,仅在cluster模式下,默认为1。
  • –queue QUEUE_NAME 指定资源队列的名称,默认:default
  • –num-executors 一共启动的executor数量,默认是2个。

二 资源调度源码分析

资源请求简单图

资源调度Master路径:

路径:spark-1.6.0/core/src/main/scala/org.apache.spark/deploy/Master/Master.scala

提交应用程序,submit的路径:

路径:spark-1.6.0/core/src/main/scala/org.apache.spark/ deploy/SparkSubmit.scala

总结:

  1. Executor在集群中分散启动,有利于task计算的数据本地化。
  2. 默认情况下(提交任务的时候没有设置–executor-cores选项),每一个Worker为当前的Application启动一个Executor,这个Executor会使用这个Worker的所有的cores和1G内存。
  3. 如果想在Worker上启动多个Executor,提交Application的时候要加–executor-cores这个选项。
  4. 默认情况下没有设置–total-executor-cores,一个Application会使用Spark集群中所有的cores。

总结如下图:

Spark RPCEnv 环境准备 Master注册启动

SparkSubmit_资源申请完毕启动Executor

三 任务调度源码分析

  • Action算子开始分析 任务调度可以从一个Action类算子开始。因为Action类算子会触发一个job的执行。
  • 划分stage,以taskSet形式提交任务DAGScheduler 类中getMessingParentStages()方法是切割job划分stage。可以结合以下这张图来分析:

四 WordCount实现

java代码

代码语言:javascript复制
public class wc3 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("wc3");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("./data/word.txt");// {hello,world}
        JavaRDD<String> stringJavaRDD = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
        JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = stringJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);//(hello,1),(world,1)
            }
        });
        JavaPairRDD<String, Integer> stringIntegerJavaPairRDD1 = stringIntegerJavaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1   v2;//(hello,27),(world,1)
            }
        });
        JavaPairRDD<Integer, String> integerStringJavaPairRDD = stringIntegerJavaPairRDD1.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.swap();//(27,hello),(1,world)
            }
        });
        JavaPairRDD<Integer, String> integerStringJavaPairRDD1 = integerStringJavaPairRDD.sortByKey();
        JavaPairRDD<String, Integer> result = integerStringJavaPairRDD1.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                return integerStringTuple2.swap();//正序(world,1),(hello,27)
            }
        });
        result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                System.out.println(stringIntegerTuple2);
            }
        });
        System.out.println("qqq");
        sc.stop();
    }
}

Scala代码

代码语言:javascript复制
object Wc {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("wc1")
    val sc = new SparkContext(conf)
    sc.textFile("./data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ _).sortBy(_._2,false).foreach(println)
    sc.stop()
  }
}

数据格式

运行结果

五 统计网站Pv和Uv

Scala代码

代码语言:javascript复制
/**
 * 网站访问量统计, 多多练习和理解
 *
 * Author TimePause
 * Create  2019-12-16 21:09
 */
object PvUv {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("pvuv")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("./data/pvuvdata")

    //pv页面访问量: 用户访问某个网站次数,无需去重
    //这里没有flatmap, 直接通过map返回了split分割后形成的数组的第五个元素n,组成(n,1)的元组,然后进行wc操作
    val result = lines.map(line => {(line.split("t")(5), 1)}).reduceByKey(_   _).sortBy(_._2,false)
      .foreach(tp=>{println(s"网站 ${tp._1} 的页面访问量为: ${tp._2}")})


    //uv 独立访问用户数,一般为1天统计一次, 需要去重
    //别老是忘记去去重过后的元素line2.split("_")(1)
    lines.map(line=>{(line.split("t")(0) "_" line.split("t")(5))}).distinct()
      .map(line2=>{(line2.split("_")(1),1)}).reduceByKey(_ _).sortBy(_._2,false)
      .foreach(tp=>{println(s"网站 ${tp._1} 的独立访问用户数为: ${tp._2}")})
  }
}

数据格式

运行结果

六 二次排序

java代码

代码语言:javascript复制
SparkConf sparkConf = new SparkConf()
.setMaster("local")
.setAppName("SecondarySortTest");
final JavaSparkContext sc = new JavaSparkContext(sparkConf);

JavaRDD<String> secondRDD = sc.textFile("secondSort.txt");

JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public Tuple2<SecondSortKey, String> call(String line) throws Exception {
           String[] splited = line.split(" ");
           int first = Integer.valueOf(splited[0]);
           int second = Integer.valueOf(splited[1]);
           SecondSortKey secondSortKey = new SecondSortKey(first,second);
           return new Tuple2<SecondSortKey, String>(secondSortKey,line);
	}
});

pairSecondRDD.sortByKey(false).foreach(new  
               VoidFunction<Tuple2<SecondSortKey,String>>() {
	
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public void call(Tuple2<SecondSortKey, String> tuple) throws Exception {
             System.out.println(tuple._2);
	}
});
代码语言:javascript复制
public class SecondSortKey  implements Serializable,Comparable<SecondSortKey>{
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private int first;
	private int second;
	public int getFirst() {
		return first;
	}
	public void setFirst(int first) {
		this.first = first;
	}
	public int getSecond() {
		return second;
	}
	public void setSecond(int second) {
		this.second = second;
	}
	public SecondSortKey(int first, int second) {
		super();
		this.first = first;
		this.second = second;
	}
	@Override
	public int compareTo(SecondSortKey o1) {
		if(getFirst() - o1.getFirst() ==0 ){
			return getSecond() - o1.getSecond();
		}else{
			return getFirst() - o1.getFirst();
		}
	}
}

Scala代码

代码语言:javascript复制
/**
 * 样例类: 实现二次排序的逻辑实现
 * @param first
 * @param second
 */
case class SecondSortKey(val first:Int,val second:Int) extends  Ordered[SecondSortKey]  {
  def compare(that: SecondSortKey): Int = {
    if(this.first-that.first==0)
      this.second- that.second
    else
      this.first-that.first
  }
}

/**
  * 二次排序问题
  */
object SecondSort {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("secondarySort")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("./data/secondSort.txt")
    val transRDD: RDD[(SecondSortKey,String)] = lines.map(s=>{(SecondSortKey(s.split(" ")(0).toInt,s.split(" ")(1).toInt),s)})
    transRDD.sortByKey(false).map(_._2).foreach(println)

  }
}

数据样式

运行结果

七 分组取TopN

java代码

代码语言:javascript复制
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("TopOps");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> linesRDD = sc.textFile("scores.txt");

JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {

/**
 * 
 */
private static final long serialVersionUID = 1L;

@Override
public Tuple2<String, Integer> call(String str) throws Exception {
	String[] splited = str.split("t");
	String clazzName = splited[0];
	Integer score = Integer.valueOf(splited[1]);
	return new Tuple2<String, Integer> (clazzName,score);
        }
});

pairRDD.groupByKey().foreach(new 
            VoidFunction<Tuple2<String,Iterable<Integer>>>() {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
    public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {
	String clazzName = tuple._1;
	Iterator<Integer> iterator = tuple._2.iterator();
	
	Integer[] top3 = new Integer[3];
	
	while (iterator.hasNext()) {
         Integer score = iterator.next();

           for (int i = 0; i < top3.length; i  ) {
	     if(top3[i] == null){
                top3[i] = score;
                break;
	      }else if(score > top3[i]){
                 for (int j = 2; j > i; j--) {
	            top3[j] = top3[j-1];
                 }
                top3[i] = score;
                break;
	     }
       }
 }
 System.out.println("class Name:" clazzName);
 for(Integer sscore : top3){
      System.out.println(sscore);
  }
}
});	

Scala代码

代码语言:javascript复制
/**
 * 分组取topN
 *
 * Author TimePause
 * Create  2019-12-18 15:50
 */
object TopNTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("topn")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("./data/scores.txt")
    //class1 100=>(class1,int(100))
    val pairInfo = lines.map(one => {
      (one.split(" ")(0), one.split(" ")(1).toInt)
    })
    val value = pairInfo.groupByKey().map(tp => {
      val classname = tp._1
      val iter = tp._2.iterator

      val top3Score = new Array[Int](3)
      val loop = new Breaks
      while (iter.hasNext) {
        val currScore = iter.next()

        loop.breakable {
          for (i <- 0.until(top3Score.size)) {
            if (top3Score(i) == 0) {
              top3Score(i) = currScore
              loop.break()
            } else if (currScore > top3Score(i)) {
              // 2到i, 步长为-1
              for (j <- 2.until(i, -1)) {
                top3Score(j) = top3Score(j - 1)
              }
              top3Score(i) = currScore
              loop.break()
            }
          }
        }
      }
      (classname,top3Score.toBuffer)
    }).collect()
    value.foreach(println)
  }
}

八 广播变量和累加器

广播变量

原理图

测试代码

代码语言:javascript复制
object BroadCastTest {
  def main(args:Array[String]): Unit ={
    val conf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)

    val list: List[String] = List[String]("hello timepause")
    // 将该数组定义成广播变量
    val bcValue: Broadcast[List[String]] = sc.broadcast(list)
    val lines = sc.textFile("./data/word.txt")

    lines.filter(one=>{
      val value: List[String] = bcValue.value //获取广播变量数组的值
      // list.contains(one)
      value.contains(one)
    }).foreach(println)
  }
}

注意事项

  • 能不能将一个RDD使用广播变量广播出去? 不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。
  • 广播变量只能在Driver端定义,不能在Executor端定义。
  • 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

累加器的使用

原理图

演示代码

代码语言:javascript复制
val conf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.accumulator(0)
sc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}}
println(accumulator.value)
sc.stop()

注意事项

累加器在Driver端定义赋初始值,累加器只能在Driver端读取,在Excutor端更新。


第五章 Spark的深入使用

一 SparkShell 的使用

概念:

SparkShell是Spark自带的一个快速原型开发工具,也可以说是Spark的scala REPL(Read-Eval-Print-Loop),即交互式shell。支持使用scala语言来进行Spark的交互式编程。

使用:

启动Standalone集群,./start-all.sh ( sbin )

在客户端bin目录下启动 spark-shell:

代码语言:javascript复制
./spark-shell --master spark://node1:7077

启动hdfs,创建目录spark/test,上传文件wc.txt

代码语言:javascript复制
启动hdfs集群:
    start-all.sh
创建目录:
    hdfs dfs -mkdir -p /spark/test
上传wc.txt
    hdfs dfs -put /root/test/wc.txt /spark/test/

word,txt部分内容

代码语言:javascript复制
# 如果直接使用foreach进行输出, 结果会在执行的日志中显示,需要通过图形化界面查看
scala> sc.textFile("hdfs://node2:8020/spark/data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ _).sortBy(_._2,false).foreach(println)
 
# 如果通过其他的 Action算子触发执行将会显示结果, 如下                                                                               
scala> sc.textFile("hdfs://node2:8020/spark/data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ _).sortBy(_._2,false).collect()
res2: Array[(String, Int)] = Array((hello,110), (myself,36), (world,36), (timepause,14), (ah,12), (sz,8), (worldd,4))

# 将hdfs文件赋给一个rdd变量
scala> var rdd=sc.textFile("hdfs://node2:8020/spark/data/word.txt")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://node2:8020/spark/data/word.txt MapPartitionsRDD[11] at textFile at <console>:24

# 可以这样进行导包
scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel

# 设置为只使用内存并序列化
scala> rdd.persist(StorageLevel.MEMORY_ONLY_SER)
res1: org.apache.spark.rdd.RDD[String] = hdfs://node2:8020/spark/data/word.txt MapPartitionsRDD[11] at textFile at <console>:24

二 SparkUI

  1. SparkUI界面介绍 可以指定提交Application的名称 ./spark-shell --master spark://node1:7077 --name myapp
  2. 配置historyServer 临时配置,对本次提交的应用程序起作用 ./spark-shell --master spark://node1:7077 --name myapp1 --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://node1:9000/spark/test 停止程序,在Web Ui中Completed Applications对应的ApplicationID中能查看history。
  3. 永久配置, spark-default.conf配置文件中配置HistoryServer 对所有提交的Application都起作用, 在客户端节点,进入…/spark-2.3.1/conf/ spark-defaults.conf最后加入: ##开启记录事件日志的功能 spark.eventLog.enabled true ##设置事件日志存储的目录 spark.eventLog.dir hdfs://node1:9000/spark/test ##设置HistoryServer加载事件日志的位置 spark.history.fs.logDirectory hdfs://node1:9000/spark/test ##日志优化选项,压缩日志 spark.eventLog.compress true 或者像本人这样配置 spark.eventLog.enabled true spark.eventLog.dir hdfs://mycluster/spark/log spark.history.fs.logDirectory hdfs://mycluster/spark/log spark.eventLog.compress true

mycluster为我的Hadoop集群名称. 如何查找自己Hadoop集群名称?

位于自己 /hadoop/etc/hadoop/hdfs-site.xml 文件下

  1. 启动HistoryServer:undefined./start-history-server.sh 访问HistoryServer:node4:18080,之后所有提交的应用程序运行状况都会被记录。

三 Spark MasterHA

  • Standalone集群只有一个Master,如果Master挂了就无法提交应用程序,需要给Master进行高可用配置,Master的高可用可以使用fileSystem(文件系统)和zookeeper(分布式协调服务)。
  • fileSystem只有存储功能,可以存储Master的元数据信息,用fileSystem搭建的Master高可用,在Master失败时,需要我们手动启动另外的备用Master,这种方式不推荐使用。
  • zookeeper有选举和存储功能,可以存储Master的元素据信息,使用zookeeper搭建的Master高可用,当Master挂掉时,备用的Master会自动切换,推荐使用这种方式搭建Master的HA。
  • 搭建Master HA的官方介绍

原理图

搭建步骤

  1. 在Spark Master节点上配置主Master,配置spark-env.sh 指定Zookeeper集群ip port以及在ZK中存放Master状态行业选举信息的文件名称 export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node2:2181,node3:2181,node4:2181 -Dspark.deploy.zookeeper.dir=/sparkmaster1219"
  2. 发送到其他worker节点上 不需要发送客户端所在节点node4
  1. 找一台节点(非主Master节点, 例如node2)配置备用 Master,修改spark-env.sh配置节点上的MasterIP
  1. 启动集群之前启动zookeeper集群: ../zkServer.sh start
  2. 启动spark Standalone集群,启动备用Master
  1. 打开主Master和备用Master WebUI页面,观察状态。
  2. 测试验证 提交SparkPi程序,kill主Master观察现象。

注意点

  • 每次使用这个环境都需要手动启动Master ./start-master,sh
  • 主备切换过程中不能提交Application。
  • 主备切换过程中不影响已经在集群中运行的Application。因为Spark是粗粒度资源调度。(资源先申请完毕后使用)

第六章 Spark Shuffle

SparkShuffle概念

  • reduceByKey会将上一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是<key,value>对的形式,这样每一个key对应一个聚合起来的value。
  • 问题:聚合之前,每一个key对应的value不一定都是在一个partition中,也不太可能在同一个节点上,因为RDD是分布式的弹性的数据集,RDD的partition极有可能分布在各个节点上。
  • 如何聚合? – Shuffle Write:上一个stage的每个map task就必须保证将自己处理的当前分区的数据相同的key写入一个分区文件中,可能会写入多个不同的分区文件中。 – Shuffle Read:reduce task就会从上一个stage的所有task所在的机器上寻找属于自己的那些分区文件,这样就可以保证每一个key所对应的value都会汇聚到同一个节点上去处理和聚合。
  • Spark中有两种Shuffle管理类型,HashShufflManager和SortShuffleManager,Spark1.2之前是HashShuffleManager, Spark1.2引入SortShuffleManager,在Spark 2.0 版本中已经将HashShuffleManager丢弃。

一 Spark 两种shuffleManager管理机制

HashShuffleManager

1) 普通机制

普通机制示意图

执行流程

  1. 每一个map task将不同结果写到不同的buffer中,每个buffer的大小为32K。buffer起到数据缓存的作用。
  2. 每个buffer文件最后对应一个磁盘小文件。
  3. reduce task来拉取对应的磁盘小文件。

总结

  1. map task的计算结果会根据分区器(默认是hashPartitioner)来决定写入到哪一个磁盘小文件中去。ReduceTask会去Map端拉取相应的磁盘小文件。
  2. 产生的磁盘小文件的个数: M(map task的个数)*R(reduce task的个数)

存在的问题

产生的磁盘小文件过多,会导致以下问题:

  1. 在Shuffle Write过程中会产生很多写磁盘小文件的对象
  2. 在Shuffle Read过程中会产生很多读取磁盘小文件的对象
  3. 在JVM堆内存中对象过多会造成频繁的gc,gc还无法解决运行所需要的内存 的话,就会OOM。
  4. 在数据传输过程中会有频繁的网络通信,频繁的网络通信出现通信故障的可能性大大增加,一旦网络通信出现了故障会导致shuffle file cannot find 由于这个错误导致的task失败,TaskScheduler不负责重试,由DAGScheduler负责重试Stage。

2) 合并机制

合并机制示意图

总结

产生磁盘小文件的个数:E(Executor的个数)*R(reduce的个数)

SortShuffleManager

  1. 普通机制 普通机制示意图

执行流程

  1. map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是5M
  2. 在shuffle的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过5M时,比如现在内存结构中的数据为5.01M,那么他会申请5.01*2-5=5.02M内存给内存数据结构。
  3. 如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。
  4. 在溢写之前内存结构中的数据会进行排序分区
  5. 然后开始溢写磁盘,写磁盘是以batch的形式去写,一个batch是1万条数据,
  6. map task执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引文件。
  7. reduce task去map端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。

总结

产生磁盘小文件的个数: 2*M(map task的个数)

2) bypass机制

bypass机制示意图

总结

  1. bypass运行机制的触发条件如下: shuffle reduce task的数量小于spark.shuffle.sort.bypassMergeThreshold的参数值。这个值默认是200。
  2. 产生的磁盘小文件为:2*M(map task的个数)

二 Shuffle文件寻址

shuffle文件寻址图

Shuffle文件寻址流程

  1. 当map task执行完成后,会将task的执行情况和磁盘小文件的地址封装到MpStatus对象中,通过MapOutputTrackerWorker对象向Driver中的MapOutputTrackerMaster汇报。
  2. 在所有的map task执行完毕后,Driver中就掌握了所有的磁盘小文件的地址。
  3. 在reduce task执行之前,会通过Excutor中MapOutPutTrackerWorker向Driver端的MapOutputTrackerMaster获取磁盘小文件的地址。
  4. 获取到磁盘小文件的地址后,会通过BlockManager连接数据所在节点,然后通过BlockTransferService进行数据的传输。
  5. BlockTransferService默认启动5个task去节点拉取数据。默认情况下,5个task拉取数据量不能超过48M。

BlockManager

BlockManager块管理者,是Spark架构中的一个模块,也是一个主从架构。

  • BlockManagerMaster,主对象,存在于Driver中。 BlockManagerMaster会在集群中有用到广播变量和缓存数据或者删除缓存数据的时候,通知BlockManagerSlave传输或者删除数据。
  • BlockManagerSlave,从对象,存在于Excutor中。 BlockManagerSlave会与BlockManagerSlave之间通信。
  • 无论在Driver端的BlockManager还是在Excutor端的BlockManager都含有三个对象: ① DiskStore:负责磁盘的管理。 ② MemoryStore:负责内存的管理。 ③ BlockTransferService:负责数据的传输。

三 Spark 内存管理

  • Spark执行应用程序时,Spark集群会启动Driver和Executor两种JVM进程,Driver负责创建SparkContext上下文,提交任务,task的分发等。Executor负责task的计算任务,并将结果返回给Driver。同时需要为需要持久化的RDD提供储存。Driver端的内存管理比较简单,这里所说的Spark内存管理针对Executor端的内存管理。
  • Spark内存管理分为静态内存管理和统一内存管理,Spark1.6之前使用的是静态内存管理,Spark1.6之后引入了统一内存管理。
  • 静态内存管理中存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的但用户可以应用程序启动前进行配置。
  • 统一内存管理与静态内存管理的区别在于储存内存和执行内存共享同一块空间,可以互相借用对方的空间。

Spark1.6以上版本默认使用的是统一内存管理,可以通过参数spark.memory.useLegacyMode 设置为true(默认为false)使用静态内存管理。

静态内存管理分布图

  1. 统一内存管理分布图

reduce 中OOM如何处理?

  1. 减少每次拉取的数据量
  2. 提高shuffle聚合的内存比例
  3. 提高Excutor的总内存

四 Shuffle调优

SparkShuffle调优配置项如何使用?

  1. 在代码中,不推荐使用,硬编码。 new SparkConf().set(“spark.shuffle.file.buffer”,”64”)
  2. 在提交spark任务的时候,推荐使用。 spark-submit --conf spark.shuffle.file.buffer=64 –conf ….
  3. 在conf下的spark-default.conf配置文件中,不推荐,因为是写死后, 所有应用程序都要用。

附其他的调优参数

代码语言:javascript复制
spark.shuffle.file.buffer
默认值:32k
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。


spark.reducer.maxSizeInFlight
默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.shuffle.io.maxRetries
默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
shuffle file not find    taskScheduler不负责重试task,由DAGScheduler负责重试stage


spark.shuffle.io.retryWait
默认值:5s
参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。


spark.shuffle.memoryFraction
默认值:0.2
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。


spark.shuffle.manager
默认值:sort|hash
参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。


spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。


spark.shuffle.consolidateFiles
默认值:false
参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

五 SparkCore总结


第七章 SparkSQL

一 SparkSQL

Shark

  • Shark是基于Spark计算框架之上且兼容Hive语法的SQL执行引擎,由于底层的计算采用了Spark,性能比MapReduce的Hive普遍快2倍以上,当数据全部load在内存的话,将快10倍以上,因此Shark可以作为交互式查询应用服务来使用。
  • 除了基于Spark的特性外,Shark是完全兼容Hive的语法,表结构以及UDF函数等,已有的HiveSql可以直接进行迁移至Shark上Shark底层依赖于Hive的解析器,查询优化器,但正是由于Shark的整体设计架构对Hive的依赖性太强,难以支持其长远发展,比如不能和Spark的其他组件进行很好的集成,无法满足Spark的一栈式解决大数据处理的需求。

SparkSQL介绍

Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制。

  • SparkSQL支持查询原生的RDD。 RDD是Spark平台的核心概念,是Spark能够高效的处理大数据的各种场景的基础。
  • 能够在Scala中写SQL语句。支持简单的SQL语法检查,能够在Scala中写Hive语句访问Hive数据,并将结果取回作为RDD使用。

Spark on Hive和Hive on Spark

  • Spark on Hive: Hive只作为储存角色,Spark负责sql解析优化,执行。
  • Hive on Spark:Hive即作为存储又负责sql的解析优化,Spark负责执行。

DataFrame

  • DataFrame也是一个分布式数据容器。与RDD类似,然而DataFrame更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。
  • 从API易用性的角度上 看, DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。
  • DataFrame就Row类型的DataSet。

SparkSQL的数据源

SparkSQL的数据源可以是JSON类型的字符串,JDBC,Parquent,Hive,HDFS等。

SparkSQL底层架构

首先拿到sql后解析一批未被解决的逻辑计划,再经过分析得到分析后的逻辑计划

再经过一批优化规则转换成一批最佳优化的逻辑计划,再经过SparkPlanner的策略转化成一批物理计划

随后经过消费模型转换成一个个的Spark任务执行

谓词下推(predicate Pushdown)

二 创建DataFrame的几种方式

官网关于创建DataFrame的介绍

1. 读取json格式的文件创建DataFrame

注意:

  • json文件中的json数据不能嵌套json格式数据。
  • DataFrame是一个一个Row类型的RDD,df.rdd()/df.javaRdd()。
  • 可以两种方式读取json格式的文件。
  • df.show()默认显示前20行数据。
  • DataFrame原生API可以操作DataFrame。
  • 注册成临时表时,表中的列默认按ascii顺序显示列。

java

代码语言:javascript复制
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("jsonfile");
SparkContext sc = new SparkContext(conf);
		
//创建sqlContext
SQLContext sqlContext = new SQLContext(sc);
		
/**
 * DataFrame的底层是一个一个的RDD  RDD的泛型是Row类型。
 * 以下两种方式都可以读取json格式的文件
 */
 DataFrame df = sqlContext.read().format("json").load("sparksql/json");
// DataFrame df2 = sqlContext.read().json("sparksql/json.txt");
// df2.show();
 /**
  * DataFrame转换成RDD
  */
 RDD<Row> rdd = df.rdd();
/**
 * 显示 DataFrame中的内容,默认显示前20行。如果现实多行要指定多少行show(行数)
 * 注意:当有多个列时,显示的列先后顺序是按列的ascii码先后显示。
 */
// df.show();
/**
 * 树形的形式显示schema信息
 */
 df.printSchema();
		
 /**
  * dataFram自带的API 操作DataFrame
  */
  //select name from table
 // df.select("name").show();
 //select name age 10 as addage from table
	 df.select(df.col("name"),df.col("age").plus(10).alias("addage")).show();
 //select name ,age from table where age>19
	 df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show();
 //select count(*) from table group by age
 df.groupBy(df.col("age")).count().show();
		
 /**
   * 将DataFrame注册成临时的一张表,这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘
  */
 df.registerTempTable("jtable");
		
 DataFrame sql = sqlContext.sql("select age,count(1) from jtable group by age");
 DataFrame sql2 = sqlContext.sql("select * from jtable");
		
 sc.stop();

Scala

代码语言:javascript复制
val conf = new SparkConf()
conf.setMaster("local").setAppName("jsonfile")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("sparksql/json")  
//val df1 = sqlContext.read.format("json").load("sparksql/json")

df.show()
df.printSchema()
//select * from table
df.select(df.col("name")).show()
//select name from table where age>19
df.select(df.col("name"),df.col("age")).where(df.col("age").gt(19)).show()
//select count(*) from table group by age
df.groupBy(df.col("age")).count().show();
 
/**
 * 注册临时表
 */
df.registerTempTable("jtable")
val result  = sqlContext.sql("select  * from jtable")
result.show()
sc.stop()

2. 通过json格式的RDD创建DataFrame

java

代码语言:javascript复制
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("jsonRDD");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> nameRDD = sc.parallelize(Arrays.asList(
	"{"name":"zhangsan","age":"18"}",
	"{"name":"lisi","age":"19"}",
	"{"name":"wangwu","age":"20"}"
));
JavaRDD<String> scoreRDD = sc.parallelize(Arrays.asList(
"{"name":"zhangsan","score":"100"}",
"{"name":"lisi","score":"200"}",
"{"name":"wangwu","score":"300"}"
));

DataFrame namedf = sqlContext.read().json(nameRDD);
DataFrame scoredf = sqlContext.read().json(scoreRDD);
namedf.registerTempTable("name");
scoredf.registerTempTable("score");

DataFrame result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name");
result.show();

sc.stop();

Scala

代码语言:javascript复制
val conf = new SparkConf()
conf.setMaster("local").setAppName("jsonrdd")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

val nameRDD = sc.makeRDD(Array(
  "{"name":"zhangsan","age":18}",
  "{"name":"lisi","age":19}",
  "{"name":"wangwu","age":20}"
))
val scoreRDD = sc.makeRDD(Array(
		"{"name":"zhangsan","score":100}",
		"{"name":"lisi","score":200}",
		"{"name":"wangwu","score":300}"
		))
val nameDF = sqlContext.read.json(nameRDD)
val scoreDF = sqlContext.read.json(scoreRDD)
nameDF.registerTempTable("name") 		
scoreDF.registerTempTable("score") 		
val result = sqlContext.sql("select name.name,name.age,score.score from name,score where name.name = score.name")
result.show()
sc.stop()

3. 非json格式的RDD创建DataFrame

1) 通过反射的方式将非json格式的RDD转换成DataFrame(不建议使用)
  • 自定义类要可序列化
  • 自定义类的访问级别是Public
  • RDD转成DataFrame后会根据映射将字段按Assci码排序
  • 将DataFrame转换成RDD时获取字段两种方式,一种是df.getInt(0)下标获取(不推荐使用),另一种是df.getAs(“列名”)获取(推荐使用)

java

代码语言:javascript复制
/**
* 注意:
* 1.自定义类必须是可序列化的
* 2.自定义类访问级别必须是Public
* 3.RDD转成DataFrame会把自定义类中字段的名称按assci码排序
*/
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("RDD");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("sparksql/person.txt");
JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {

	/**
	* 
	*/
	private static final long serialVersionUID = 1L;

	@Override
	public Person call(String s) throws Exception {
          Person p = new Person();
          p.setId(s.split(",")[0]);
          p.setName(s.split(",")[1]);
          p.setAge(Integer.valueOf(s.split(",")[2]));
          return p;
	}
});
/**
* 传入进去Person.class的时候,sqlContext是通过反射的方式创建DataFrame
* 在底层通过反射的方式获得Person的所有field,结合RDD本身,就生成了DataFrame
*/
DataFrame df = sqlContext.createDataFrame(personRDD, Person.class);
df.show();
df.registerTempTable("person");
sqlContext.sql("select  name from person where id = 2").show();

/**
* 将DataFrame转成JavaRDD
* 注意:
* 1.可以使用row.getInt(0),row.getString(1)...通过下标获取返回Row类型的数据,但是要注意列顺序问题---不常用
* 2.可以使用row.getAs("列名")来获取对应的列值。
* 
*/
JavaRDD<Row> javaRDD = df.javaRDD();
JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {

	/**
	* 
	*/
	private static final long serialVersionUID = 1L;

	@Override
	public Person call(Row row) throws Exception {
            Person p = new Person();
            //p.setId(row.getString(1));
            //p.setName(row.getString(2));
            //p.setAge(row.getInt(0));

            p.setId((String)row.getAs("id"));
            p.setName((String)row.getAs("name"));
            p.setAge((Integer)row.getAs("age"));
            return p;
	}
});
map.foreach(new VoidFunction<Person>() {
	
	/**
	* 
	*/
	private static final long serialVersionUID = 1L;

	@Override
	public void call(Person t) throws Exception {
          System.out.println(t);
	}
});

sc.stop();

Scala

代码语言:javascript复制
val conf = new SparkConf()
conf.setMaster("local").setAppName("rddreflect")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lineRDD = sc.textFile("./sparksql/person.txt")
/**
 * 将RDD隐式转换成DataFrame
 */
import sqlContext.implicits._

val personRDD = lineRDD.map { x => {
  val person = Person(x.split(",")(0),x.split(",")(1),Integer.valueOf(x.split(",")(2)))
  person
} }
val df = personRDD.toDF();
df.show()

/**
 * 将DataFrame转换成PersonRDD
 */
val rdd = df.rdd
val result = rdd.map { x => {
  Person(x.getAs("id"),x.getAs("name"),x.getAs("age"))
} }
result.foreach { println}
sc.stop()
2) 动态创建Schema将非json格式的RDD转换成DataFrame

java:

代码语言:javascript复制
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("rddStruct");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("./sparksql/person.txt");
/**
 * 转换成Row类型的RDD
 */
JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public Row call(String s) throws Exception {
          return RowFactory.create(
                String.valueOf(s.split(",")[0]),
                String.valueOf(s.split(",")[1]),
                Integer.valueOf(s.split(",")[2])
	);
	}
});
/**
 * 动态构建DataFrame中的元数据,一般来说这里的字段可以来源自字符串,也可以来源于外部数据库
 */
List<StructField> asList =Arrays.asList(
	DataTypes.createStructField("id", DataTypes.StringType, true),
	DataTypes.createStructField("name", DataTypes.StringType, true),
	DataTypes.createStructField("age", DataTypes.IntegerType, true)
);

StructType schema = DataTypes.createStructType(asList);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);

df.show();
sc.stop();

Scala

代码语言:javascript复制
val conf = new SparkConf()
conf.setMaster("local").setAppName("rddStruct")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lineRDD = sc.textFile("./sparksql/person.txt")
val rowRDD = lineRDD.map { x => {
  val split = x.split(",")
  RowFactory.create(split(0),split(1),Integer.valueOf(split(2)))
} }

val schema = StructType(List(
  StructField("id",StringType,true),
  StructField("name",StringType,true),
  StructField("age",IntegerType,true)
))

val df = sqlContext.createDataFrame(rowRDD, schema)
df.show()
df.printSchema()
sc.stop()
  1. 读取parquet文件创建DataFrame

注意:

可以将DataFrame存储成parquet文件。保存成parquet文件的方式有两种

代码语言:javascript复制
df.write().mode(SaveMode.Overwrite)format("parquet")
                                    .save("./sparksql/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");

SaveMode指定文件保存时的模式。

  • Overwrite:覆盖
  • Append:追加
  • ErrorIfExists:如果存在就报错
  • Ignore:如果存在就忽略

java

代码语言:javascript复制
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("parquet");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> jsonRDD = sc.textFile("sparksql/json");
DataFrame df = sqlContext.read().json(jsonRDD);
/**
 * 将DataFrame保存成parquet文件,SaveMode指定存储文件时的保存模式
 * 保存成parquet文件有以下两种方式:
 */
df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
df.show();
/**
 * 加载parquet文件成DataFrame	
 * 加载parquet文件有以下两种方式:	
 */

DataFrame load = sqlContext.read().format("parquet").load("./sparksql/parquet");
load = sqlContext.read().parquet("./sparksql/parquet");
load.show();

sc.stop();

Scala

代码语言:javascript复制
 val conf = new SparkConf()
 conf.setMaster("local").setAppName("parquet")
 val sc = new SparkContext(conf)
 val sqlContext = new SQLContext(sc)
 val jsonRDD = sc.textFile("sparksql/json")
 val df = sqlContext.read.json(jsonRDD)
 df.show()
  /**
  * 将DF保存为parquet文件
  */
df.write.mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet")
 df.write.mode(SaveMode.Overwrite).parquet("./sparksql/parquet")
 /**
  * 读取parquet文件
  */
 var result = sqlContext.read.parquet("./sparksql/parquet")
 result = sqlContext.read.format("parquet").load("./sparksql/parquet")
 result.show()
 sc.stop()

4. 读取JDBC中的数据创建DataFrame(MySql为例)

两种方式创建DataFrame

java:

代码语言:javascript复制
parkConf conf = new SparkConf();
conf.setMaster("local").setAppName("mysql");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
/**
 * 第一种方式读取MySql数据库表,加载为DataFrame
 */
Map<String, String> options = new HashMap<String,String>();
options.put("url", "jdbc:mysql://192.168.179.4:3306/spark");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user", "root");
options.put("password", "123456");
options.put("dbtable", "person");
DataFrame person = sqlContext.read().format("jdbc").options(options).load();
person.show();
person.registerTempTable("person");
/**
 * 第二种方式读取MySql数据表加载为DataFrame
 */
DataFrameReader reader = sqlContext.read().format("jdbc");
reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "123456");
reader.option("dbtable", "score");
DataFrame score = reader.load();
score.show();
score.registerTempTable("score");

DataFrame result = 
sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name");
result.show();
/**
 * 将DataFrame结果保存到Mysql中
 */
Properties properties = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "123456");
result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.179.4:3306/spark", "result", properties);

sc.stop();

Scala

代码语言:javascript复制
val conf = new SparkConf()
conf.setMaster("local").setAppName("mysql")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
/**
 * 第一种方式读取Mysql数据库表创建DF
 */
val options = new HashMap[String,String]();
options.put("url", "jdbc:mysql://192.168.179.4:3306/spark")
options.put("driver","com.mysql.jdbc.Driver")
options.put("user","root")
options.put("password", "123456")
options.put("dbtable","person")
val person = sqlContext.read.format("jdbc").options(options).load()
person.show()
person.registerTempTable("person")
/**
 * 第二种方式读取Mysql数据库表创建DF
 */
val reader = sqlContext.read.format("jdbc")
reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark")
reader.option("driver","com.mysql.jdbc.Driver")
reader.option("user","root")
reader.option("password","123456")
reader.option("dbtable", "score")
val score = reader.load()
score.show()
score.registerTempTable("score")
val result = sqlContext.sql("select person.id,person.name,score.score from person,score where person.name = score.name")
result.show()
/**
 * 将数据写入到Mysql表中
 */
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "123456")
result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.179.4:3306/spark", "result", properties)

sc.stop()

5. 读取Hive中的数据加载成DataFrame

  • HiveContext是SQLContext的子类,连接Hive建议使用HiveContext。
  • 由于本地没有Hive环境,要提交到集群运行,提交命令:
代码语言:javascript复制
./spark-submit 
--master spark://node1:7077,node2:7077 
--executor-cores 1 
--executor-memory 2G 
--total-executor-cores 1
--class com.bjsxt.sparksql.dataframe.CreateDFFromHive 
/root/test/HiveTest.jar

java

代码语言:javascript复制
SparkConf conf = new SparkConf();
conf.setAppName("hive");
JavaSparkContext sc = new JavaSparkContext(conf);
//HiveContext是SQLContext的子类。
HiveContext hiveContext = new HiveContext(sc);
hiveContext.sql("USE spark");
hiveContext.sql("DROP TABLE IF EXISTS student_infos");
//在hive中创建student_infos表
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by 't' ");
hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos");

hiveContext.sql("DROP TABLE IF EXISTS student_scores"); 
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by 't'");  
hiveContext.sql("LOAD DATA "
  "LOCAL INPATH '/root/test/student_scores'"
  "INTO TABLE student_scores");
/**
 * 查询表生成DataFrame
 */
DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score "
  "FROM student_infos si "
  "JOIN student_scores ss "
  "ON si.name=ss.name "
  "WHERE ss.score>=80");

hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");

goodStudentsDF.registerTempTable("goodstudent");
DataFrame result = hiveContext.sql("select * from goodstudent");
result.show();

/**
 * 将结果保存到hive表 good_student_infos
 */
goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");

Row[] goodStudentRows = hiveContext.table("good_student_infos").collect();  
for(Row goodStudentRow : goodStudentRows) {
	System.out.println(goodStudentRow);  
}
sc.stop();

Scala

代码语言:javascript复制
val conf = new SparkConf()
 conf.setAppName("HiveSource")
 val sc = new SparkContext(conf)
 /**
  * HiveContext是SQLContext的子类。
  */
 val hiveContext = new HiveContext(sc)
 hiveContext.sql("use spark")
 hiveContext.sql("drop table if exists student_infos")
 hiveContext.sql("create table if not exists student_infos (name string,age int) row format  delimited fields terminated by 't'")
 hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos")
 
 hiveContext.sql("drop table if exists student_scores")
 hiveContext.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by 't'")
 hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores")
 
 val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
 hiveContext.sql("drop table if exists good_student_infos")
 /**
  * 将结果写入到hive表中
  */
 df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")
 
 sc.stop()

6. Spark On Hive的配置

  1. 在Spark客户端配置Hive On Spark 在Spark客户端安装包下spark-1.6.0/conf中创建文件hive-site.xml:(或者从hive配置文件复制) 配置hive的metastore路径(启动数据源服务hive --service metastore &所在的节点)
代码语言:javascript复制
<configuration>
   <property>
        <name>hive.metastore.uris</name>
        <value>thrift://node3:9083</value>
   </property>
</configuration>
  1. 启动Hive的metastore服务(node3)
代码语言:javascript复制
# 阻塞式启动
hive --service metastore 
# 后台启动
hive --service metastore &
  1. 启动zookeeper集群,启动HDFS集群。
  2. 启动SparkShell 读取Hive中的表总数,对比hive中查询同一表查询总数测试时间。
代码语言:javascript复制
./spark-shell --master spark://node1:7077,node2:7077  
--executor-cores 1 
--executor-memory 1g 
--total-executor-cores 1


## 方式一
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
hc.sql("show databases").show
hc.sql("user default").show
hc.sql("select count(*) from jizhan").show
## 方式二 
spark.sql("show  tables").show
  1. 注意: 如果使用Spark on Hive 查询数据时,出现错误:

找不到HDFS集群路径,要在客户端机器conf/spark-env.sh中设置HDFS的路径

如何提交Spark-hive任务
  1. 将下面代码所在的项目打包, 将含有依赖的jar上传至虚拟机
代码语言:javascript复制
/**
  * 读取Hive中的数据
  * 要开启 :enableHiveSupport
  */
object CreateDataFrameFromHive {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("CreateDataFrameFromHive").enableHiveSupport().getOrCreate()
    spark.sql("use spark")
    spark.sql("drop table if exists student_infos")
    spark.sql("create table if not exists student_infos (name string,age int) row format  delimited fields terminated by 't'")
    spark.sql("load data local inpath '/root/test/student_infos' into table student_infos")

    spark.sql("drop table if exists student_scores")
    spark.sql("create table if not exists student_scores (name string,score int) row format delimited fields terminated by 't'")
    spark.sql("load data local inpath '/root/test/student_scores' into table student_scores")
//    val frame: DataFrame = spark.table("student_infos")  可以将表转换成DataFrame
//    frame.show(100)

    val df = spark.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")
    df.show(100)
    spark.sql("drop table if exists good_student_infos")
    /**
      * 将结果写入到hive表中
      */
    df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")

  }
}
  1. hive创建表后所需的文件内容
代码语言:javascript复制
student_infos
-------------------
zhangsan	18
lisi	19
wangwu	20


student_scores
---------------
zhangsan	100
lisi	200
wangwu	300
  1. 可以看到打包后会有两个, 第一个包没有相关的jar,只有404kb, 只含有代码; 第二个jar有209MB, 含有运行该代码所需要的所有jar包, 可以直接运行

2.通过相关命令上传并执行该jar任务

代码语言:javascript复制
./spark-submit 
--master spark://node1:7077 
--class  com.bjsxt.scalaspark.sql.DataSetAndDataFrame.CreateDataFrameFromHive /root/test/MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 

3.进入hive中, 进入相应的DB查看

注意: 如果没有该DB, 可以手动创建

7.自定义函数UDF和UDAF

UDF:用户自定义函数

可以自定义类实现UDFX接口。

java

代码语言:javascript复制
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("udf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public Row call(String s) throws Exception {
return RowFactory.create(s);
	}
});

List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));

StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD,schema);
df.registerTempTable("user");

/**
 * 根据UDF函数参数的个数来决定是实现哪一个UDF  UDF1,UDF2。。。。UDF1xxx
 */
sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public Integer call(String t1) throws Exception {
             return t1.length();
	}
}, DataTypes.IntegerType);
sqlContext.sql("select name ,StrLen(name) as length from user").show();

//sqlContext.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
//
//	/**
//	 * 
//	 */
//	private static final long serialVersionUID = 1L;
//
//	@Override
//	public Integer call(String t1, Integer t2) throws Exception {
//return t1.length() t2;
//	}
//} ,DataTypes.IntegerType );
//sqlContext.sql("select name ,StrLen(name,10) as length from user").show();

sc.stop();	

scala:

代码语言:javascript复制
val conf = new SparkConf()
conf.setMaster("local").setAppName("udf")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc);
val rdd = sc.makeRDD(Array("zhansan","lisi","wangwu"))
val rowRDD = rdd.map { x => {
  RowFactory.create(x)
} }
val schema = DataTypes.createStructType(Array(StructField("name",StringType,true)))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.registerTempTable("user")
//sqlContext.udf.register("StrLen",(s : String)=>{s.length()})
//sqlContext.sql("select name ,StrLen(name) as length from user").show
sqlContext.udf.register("StrLen",(s : String,i:Int)=>{s.length() i})
sqlContext.sql("select name ,StrLen(name,10) as length from user").show
sc.stop()
UDAF:用户自定义聚合函数

实现UDAF函数如果要自定义类要继承UserDefinedAggregateFunction类

java

代码语言:javascript复制
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("udaf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu","zhangsan","zhangsan","lisi"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public Row call(String s) throws Exception {
              return RowFactory.create(s);
	}
});

List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/**
 * 注册一个UDAF函数,实现统计相同值得个数
 * 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的
 */
sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {
	
   /**
    * 
    */
   private static final long serialVersionUID = 1L;
   /**
    * 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑
    * buffer.getInt(0)获取的是上一次聚合后的值
    * 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合 
    * 大聚和发生在reduce端.
    * 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算
    */
   @Override
   public void update(MutableAggregationBuffer buffer, Row arg1) {
         buffer.update(0, buffer.getInt(0) 1);

   }
   /**
    * 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理
    * 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来
    * buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值       
    * buffer2.getInt(0) : 这次计算传入进来的update的结果
    * 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作
    */
   @Override
   public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
     buffer1.update(0, buffer1.getInt(0)   buffer2.getInt(0));
   }
   /**
    * 指定输入字段的字段及类型
    */
   @Override
   public StructType inputSchema() {
     return DataTypes.createStructType(
      Arrays.asList(DataTypes.createStructField("name", 
          DataTypes.StringType, true)));
   }
   /**
    * 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果
    */
   @Override
   public void initialize(MutableAggregationBuffer buffer) {
         buffer.update(0, 0);
   }
   /**
    * 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果
    */
   @Override
   public Object evaluate(Row row) {
      return row.getInt(0);
   }
   
   @Override
   public boolean deterministic() {
     //设置为true
     return true;
   }
   /**
    * 指定UDAF函数计算后返回的结果类型
    */
   @Override
   public DataType dataType() {
      return DataTypes.IntegerType;
   }
   /**
    * 在进行聚合操作的时候所要处理的数据的结果的类型
    */
   @Override
   public StructType bufferSchema() {
       return 
       DataTypes.createStructType(
   Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, 
            true)));
   }
   
});

sqlContext.sql("select name ,StringCount(name) from user group by name").show();

sc.stop();

Scala

代码语言:javascript复制
class MyUDAF extends UserDefinedAggregateFunction  {
  // 聚合操作时,所处理的数据的类型
  def bufferSchema: StructType = {
    DataTypes.createStructType(Array(DataTypes.createStructField("aaa", IntegerType, true)))
  }
  // 最终函数返回值的类型
  def dataType: DataType = {
    DataTypes.IntegerType
  }

  def deterministic: Boolean = {
    true
  }
  // 最后返回一个最终的聚合值     要和dataType的类型一一对应
  def evaluate(buffer: Row): Any = {
    buffer.getAs[Int](0)
  }
  // 为每个分组的数据执行初始化值
  def initialize(buffer: MutableAggregationBuffer): Unit = {
     buffer(0) = 0
  }
  //输入数据的类型
  def inputSchema: StructType = {
    DataTypes.createStructType(Array(DataTypes.createStructField("input", StringType, true)))
  }
  // 最后merger的时候,在各个节点上的聚合值,要进行merge,也就是合并
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Int](0) buffer2.getAs[Int](0) 
  }
  // 每个组,有新的值进来的时候,进行分组对应的聚合值的计算
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Int](0) 1
  }
}

object UDAF {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("udaf")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val rdd = sc.makeRDD(Array("zhangsan","lisi","wangwu","zhangsan","lisi"))
    val rowRDD = rdd.map { x => {RowFactory.create(x)} }
    
    val schema = DataTypes.createStructType(Array(DataTypes.createStructField("name", StringType, true)))
    val df = sqlContext.createDataFrame(rowRDD, schema)
    df.show()
    df.registerTempTable("user")
    /**
     * 注册一个udaf函数
     */
    sqlContext.udf.register("StringCount", new MyUDAF())
    sqlContext.sql("select name ,StringCount(name) from user group by name").show()
    sc.stop()
  }
}

8. 开窗函数

注意:

row_number() 开窗函数是按照某个字段分组,然后取另一字段的前几个值,相当于分组取topN

如果SQL语句里面使用到了开窗函数,那么这个SQL语句必须使用HiveContext来执行,HiveContext默认情况下在本地无法创建。在MySql8之后也增加了开窗函数。(一般在Spark集群中运行,将任务提交至集群中运行)

开窗函数格式:

代码语言:javascript复制
row_number() over (partitin by XXX order by XXX)

java代码

代码语言:javascript复制
SparkConf conf = new SparkConf();
   conf.setAppName("windowfun");
   JavaSparkContext sc = new JavaSparkContext(conf);
   HiveContext hiveContext = new HiveContext(sc);
   hiveContext.sql("use spark");
   hiveContext.sql("drop table if exists sales");
   hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) "
        "row format delimited fields terminated by 't'");
   hiveContext.sql("load data local inpath '/root/test/sales' into table sales");
   /**
    * 开窗函数格式:
    * 【 rou_number() over (partitin by XXX order by XXX) 】
    */
   DataFrame result = hiveContext.sql("select riqi,leibie,jine "
         	  "from ("
              "select riqi,leibie,jine,"
              "row_number() over (partition by leibie order by jine desc) rank "
              "from sales) t "
           "where t.rank<=3");
   result.show();
   sc.stop();

Scala代码

代码语言:javascript复制
 val conf = new SparkConf()
 conf.setAppName("windowfun")
 val sc = new SparkContext(conf)
 val hiveContext = new HiveContext(sc)
 hiveContext.sql("use spark");
 hiveContext.sql("drop table if exists sales");
 hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) "
    "row format delimited fields terminated by 't'");
 hiveContext.sql("load data local inpath '/root/test/sales' into table sales");
 /**
  * 开窗函数格式:
  * 【 rou_number() over (partitin by XXX order by XXX) 】
  */
 val result = hiveContext.sql("select riqi,leibie,jine "
   	  "from ("
      "select riqi,leibie,jine,"
      "row_number() over (partition by leibie order by jine desc) rank "
      "from sales) t "
     "where t.rank<=3");
 result.show();
 sc.stop()

三 Spark SQL总结


第八章 SparkStreaming简介

SparkStreaming是流式处理框架,是Spark API的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来源可以是 :Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,并且可以使用高级功能的复杂算子来处理流数据。例如:map,reduce,join,window 。最终,处理后的数据可以存放在文件系统,数据库等,方便实时展现。

一 SparkStreaming与Storm的区别

  1. Storm是纯实时的流式处理框架,SparkStreaming是准实时的处理框架(微批处理)。因为微批处理,SparkStreaming的吞吐量比Storm要高。
  2. Storm 的事务机制要比SparkStreaming的要完善。
  3. Storm支持动态资源调度。(spark1.2开始和之后也支持)
  4. SparkStreaming擅长复杂的业务处理,Storm不擅长复杂的业务处理,擅长简单的汇总型计算。

storm 和 spark streaming 在实时性,吞吐量等方面的对比 1、实时性:一般Storm的时延性比spark streaming要低,原因是Spark Streaming是小的批处理,通过间隔时长生成批次,一个批次触发一次计算,比如我在程序里面设置间隔时长为5秒,那就是五秒接收到的数据触发一次计算,Storm是实时处理,来一条数据,触发一次计算,所以可以称spark streaming为流式计算,Storm 为实时计算,阿里的JStorm通过实现Trident,也支持小的批处理计算 2、吞吐量 :Storm的吞吐量要略差于Spark Streaming,原因一是Storm从spout组件 接收源数据,通过发射器发送到bolt,bolt对接收到的数据进行处理,处理完以后,写入到外部存储系统中或者发送到下个bolt进行再处理,所以storm是移动数据,不是移动计算;Spark Streaming获取Task要计算的数据在哪个节点上,然后TaskScheduler把task发送到对应节点上进行数据处理,所以Spark Streaming是移动计算不是移动数据,移动计算也是当前计算引擎的主流设计思想;原因二大家很容易看出来,一个是批处理,一个是实时计算,批处理的吞吐量一般要高于实时触发的计算 3、容错机制:storm是acker(ack/fail消息确认机制)确认机制确保一个tuple被完全处理,Spark Streaming是通过存储RDD转化逻辑进行容错,也就是如果数据从A数据集到B数据集计算错误了,由于存储的有A到B的计算逻辑,所以可以从A重新计算生成B,容错机制不一样,暂时无所谓好坏

二 SparkStreaming初始

  1. SparkStreaming初始理解

注意:

  • receiver task是7*24小时一直在执行,一直接受数据,将一段时间内接收来的数据保存到batch中。
  • 假设batchInterval为5s,那么会将接收来的数据每隔5秒封装到一个batch中,batch没有分布式计算特性,这一个batch的数据又被封装到一个RDD中,RDD最终封装到一个DStream中。
  • 例如:假设batchInterval为5秒,每隔5秒通过SparkStreaming将得到一个DStream,在第6秒的时候计算这5秒的数据,假设执行任务的时间是3秒,那么第6~9秒一边在接收数据,一边在计算任务,9 ~10秒只是在接收数据。然后在第11秒的时候重复上面的操作。
  • 如果job执行的时间大于batchInterval会有什么样的问题? 如果接受过来的数据设置的级别是仅内存,接收来的数据会越堆积越多,最后可能会导致OOM(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟 )。

三 SparkStreaming代码

代码注意事项

  • 启动socket server 服务器:nc –lk 9999 ( 下载网络工具netcat yum install -y nc )
  • receiver模式下接受数据,local的模拟线程必须大于等于2,一个线程用来receiver用来接受数据,另一个线程用来执行job。
  • Durations时间设置就是我们能接收的延迟度。这个需要根据集群的资源情况以及任务的执行情况来调节。
  • 创建JavaStreamingContext有两种方式(SparkConf,SparkContext)
  • 所有的代码逻辑完成后要有一个output operation类算子。
  • JavaStreamingContext.start() Streaming框架启动后不能再次添加业务逻辑。
  • JavaStreamingContext.stop() 无参的stop方法将SparkContext一同关闭,stop(false),不会关闭SparkContext。
  • JavaStreamingContext.stop()停止之后不能再调用start。

四 SparkStreaming算子操作

  1. foreachRDD output operation算子,必须对抽取出来的RDD执行action类算子,代码才能执行。
  2. transform transformation类算子 可以通过transform算子,对Dstream做RDD到RDD的任意操作。
  3. updateStateByKey transformation算子updateStateByKey作用:
代码语言:txt复制
1. 为SparkStreaming中每一个Key维护一份state状态,state类型可以是任意类型的,可以是一个自定义的对象,更新函数也可以是自定义的。
2. 通过更新函数对该key的状态不断更新,对于每个新的batch而言,SparkStreaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新。

使用到updateStateByKey要开启checkpoint机制和功能。

多久会将内存中的数据写入到磁盘一份?

如果batchInterval设置的时间小于10秒,那么10秒写入磁盘一份。如果batchInterval设置的时间大于10秒,那么就会batchInterval时间间隔写入磁盘一份。

WordCount

对一定间隔时间内的Wc,而不是全局的Wc

代码语言:javascript复制
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 使用SparkStreaming进行WordCount: 注意这种wc只是对一定间隔时间内的Wc,而不是全局的Wc
 * 注意
 *   1. batchDuration: 代表我们能够处理数据接收的延迟度, 批次数据处理的间隔时间, 可以集合WebUI调节
 *   2. 创建StreamingContext的两种方式
 * val ssc=new StreamingContext(SparkContext, batchDuration)
 * val ssc=new StreamingContext(SparkConf,batchDuration)
 *   3. SparkStreaming操作的是Dstream, 可以使用的DStream的TransFormation算子, 要使用outputOperation类算子触发执行
 *   4. StreamingContext.start()后, 不能添加新的业务逻辑
 *   5. StreamingContext.stop()后, 不能调用StreamingContext,start()重新启动, 因为对象已经被回收
 *   6. StreaningContext.stop(stopSparkContext=true), 默认关闭关闭StreamingContext关闭时会将SparkContext
 *
 * Author TimePause
 * Create  2019-12-21 10:10
 */
object SparkStreamingForWc {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("sswc") //创建2个线程,一个用于接收,一个用于处理
    val sc = new SparkContext(conf)
    // 创建StreamingContext对象,Durations指定批处理间隔时间. 通过socketTextStream设置Socket通信ip和端口
    val ssc = new StreamingContext(sc, Durations.seconds(5))
    val socket: ReceiverInputDStream[String] = ssc.socketTextStream("node4", 9999)
    val words = socket.flatMap(one => {
      (one.split(" "))
    })
    val pairWords = words.map(word => {
      (word, 1)
    })
    val reduceResult = pairWords.reduceByKey((v1, v2) => {
      v1   v2
    })
    // print可以指定输出的行数
    //reduceResult.print(10)

    /**
     *  foreachRDD
     *  1. 拿到DataStream中的RDD.对RDD进行Transformation或者action操作
     *  2. 只有foreachRDD这个operation算子,不会触发执行,必须还要有action算子的支持
     *  3. foreachRDD算子内map算子外的地方的代码是在Driver执行的, 我们可以通过这里动态的改变广播变量, 实现对配置的热部署
     */
    val resultRDD = reduceResult.foreachRDD(rdd => {
      println("******************")   //外部代码在Driver端执行

      val resultRdd = rdd.map(one => {
        println(s"实时接收的数据为-------------${one}")  //内部代码在Executor端执行
      })
      resultRdd.foreach(println)
    })

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

全局的WordCount, 会将实时的结果持久化到磁盘中

代码语言:javascript复制
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

/**
  * UpdateStateByKey 根据key更新状态(全局)
  *   1、为Spark Streaming中每一个Key维护一份state状态,state类型可以是任意类型的, 可以是一个自定义的对象,那么更新函数也可以是自定义的。
  *   2、通过更新函数对该key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新
  */
object UpdateStateByKey {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[2]")
    conf.setAppName("UpdateStateByKey")
    val ssc = new StreamingContext(conf,Durations.seconds(5))
    //设置日志级别
    ssc.sparkContext.setLogLevel("ERROR")
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node4",9999)
    val words: DStream[String] = lines.flatMap(line=>{line.split(" ")})
    val pairWords: DStream[(String, Int)] = words.map(word => {(word, 1)})

    /**
      * 根据key更新状态,需要设置 checkpoint来保存状态
      * 默认key的状态在内存中 有一份,在checkpoint目录中有一份。
      *
      *    多久会将内存中的数据(每一个key所对应的状态)写入到磁盘上一份呢?
      * 	      如果你的batchInterval小于10s  那么10s会将内存中的数据写入到磁盘一份
      * 	      如果bacthInterval 大于10s,那么就以bacthInterval为准
      *
      *    这样做是为了防止频繁的写HDFS
      */
    ssc.checkpoint("./data/streamingCheckpoint")
//    ssc.sparkContext.setCheckpointDir("./data/streamingCheckpoint")
    /**
      * currentValues :当前批次某个 key 对应所有的value 组成的一个集合
      * preValue : 以往批次当前key 对应的总状态值
      */
    val result: DStream[(String, Int)] = pairWords.updateStateByKey((currentValues: Seq[Int], preValue: Option[Int]) => {
      var totalValues = 0
      if (!preValue.isEmpty) {
        totalValues  = preValue.get
      }
      for(value <- currentValues){
        totalValues  = value
      }

      Option(totalValues)
    })
    result.print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}

指定信息的过滤

代码语言:javascript复制
/**
 * 使用transform算子实现指定信息的过滤
 *
 * Author TimePause
 * Create  2019-12-21 16:20
 */
object TransFormTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("windowfun")
    // 规定处理的时间为5秒
    val ssc = new StreamingContext(conf, Durations.seconds(5))
    // 通过获取sparkContext来设置Spark输出的日志级别
    ssc.sparkContext.setLogLevel("error")
    val scoket: ReceiverInputDStream[String] = ssc.socketTextStream("node4", 9999)
    // transform可以将一种格式的DStream转换成另一种格式的DStream
    val transRDD: DStream[(String, String)] = scoket.transform(rdd => {
      val filterRDD: RDD[String] = rdd.filter(line => {
        println(s"====需要被过滤的字符====$line")
        //不加!是只显示所过滤的字符串,反之是显示除了过滤的字符串
        !"world".equals(line.split(" ")(1))
      })
      val mapRDD: RDD[(String, String)] = filterRDD.map(one => {
        (one.split(" ")(0), one.split(" ")(1))
      })
      mapRDD
    })
    transRDD.print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}

窗口操作

  • 假设每隔5s 1个batch,上图中窗口长度为15s,窗口滑动间隔10s。
  • 窗口长度和滑动间隔必须是batchInterval的整数倍。如果不是整数倍会检测报错。
  • 窗口操作理解图:

窗口函数

实现每隔n秒, 打印nm秒的数据

代码语言:javascript复制
/**
 * 窗口函数
 *
 * Author TimePause
 * Create  2019-12-21 15:28
 */
object reduceByKeyAndWindowTest {
    def main(args:Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[2]").setAppName("windowfun")
      val sc = new SparkContext(conf)
      val ssc = new StreamingContext(sc, Durations.seconds(5))
      ssc.sparkContext.setLogLevel("error")
      // 配置窗口函数优化后需要checkpoint保存数据
      ssc.checkpoint("./data/window")
      val socket = ssc.socketTextStream("node4", 9999)
      val words: DStream[String] = socket.flatMap(line => {
        line.split(" ")
      })
      // map 处理后成为一个元组(k,v)形式pairwords
      val pairwords = words.map(word => {
        (word, 1)
      })
      // pairwords调用窗口函数, 结束后紧跟窗口函数参数, 第一个代表窗口时间长度, 第二个代表窗口的滑动间隔
 /*     val result = pairwords.reduceByKeyAndWindow((v1: Int, v2: Int) => {
        v1   v2
      }, Durations.seconds(15), Durations.seconds(5))*/

      // 窗口函数优化, 在表窗口长度结束后, 仍保存其k, v设置为0
     val result = pairwords.reduceByKeyAndWindow((v1: Int, v2: Int) => {v1   v2},
                (v1:Int,v2:Int)=>{v1-v2}, Durations.seconds(15), Durations.seconds(5))
     // val value = pairwords.window(Durations.seconds(15), Durations.seconds(5))

      //打印结果
      result.print()
      // 启动和关闭StreamingContext对象
      ssc.start()
      ssc.awaitTermination()
      ssc.stop()
    }
}

五 Driver HA(Standalone或者Mesos)

因为SparkStreaming是7*24小时运行,Driver只是一个简单的进程,有可能挂掉,所以实现Driver的HA就有必要(如果使用的Client模式就无法实现Driver HA ,这里针对的是cluster模式)。Yarn平台cluster模式提交任务,AM(AplicationMaster)相当于Driver,如果挂掉会自动启动AM。这里所说的DriverHA针对的是Spark standalone和Mesos资源调度的情况下。

实现Driver的高可用有两个步骤:

第一:提交任务层面,在提交任务的时候加上选项 --supervise,当Driver挂掉的时候会自动重启Driver。

第二:代码层面,使用JavaStreamingContext.getOrCreate(checkpoint路径,JavaStreamingContextFactory)

Driver中元数据包括:

  1. 创建应用程序的配置信息。
  2. DStream的操作逻辑。
  3. job中没有完成的批次数据,也就是job的执行进度。

示例代码

代码语言:javascript复制
/**
  * Driver HA :
  * 1.在提交application的时候  添加 --supervise 选项  如果Driver挂掉 会自动启动一个Driver
  * 2.代码层面恢复Driver(StreamingContext)
  *
  */
object SparkStreamingDriverHA {
  //设置checkpoint目录
  val ckDir = "./data/streamingCheckpoint"
  def main(args: Array[String]): Unit = {
    /**
      * StreamingContext.getOrCreate(ckDir,CreateStreamingContext)
      *   这个方法首先会从ckDir目录中获取StreamingContext【 因为StreamingContext是序列化存储在Checkpoint目录中,恢复时会尝试反序列化这些objects。
      *   如果用修改过的class可能会导致错误,此时需要更换checkpoint目录或者删除checkpoint目录中的数据,程序才能起来。】
      *
      *   若能获取回来StreamingContext,就不会执行CreateStreamingContext这个方法创建,否则就会创建
      */
    val ssc: StreamingContext = StreamingContext.getOrCreate(ckDir,CreateStreamingContext)
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }

  def CreateStreamingContext() = {
    println("=======Create new StreamingContext =======")
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("DriverHA")
    val ssc: StreamingContext = new StreamingContext(conf,Durations.seconds(5))
    ssc.sparkContext.setLogLevel("Error")

    /**
      *   默认checkpoint 存储:
      *     1.配置信息
      *   	2.DStream操作逻辑
      *   	3.batch执行的进度 或者【offset】
      */
    ssc.checkpoint(ckDir)
    val lines: DStream[String] = ssc.textFileStream("./data/streamingCopyFile")
    val words: DStream[String] = lines.flatMap(line=>{line.trim.split(" ")})
    val pairWords: DStream[(String, Int)] = words.map(word=>{(word,1)})
    val result: DStream[(String, Int)] = pairWords.reduceByKey((v1:Int, v2:Int)=>{v1 v2})

//    result.print()

    /**
      * 更改逻辑
      */
    result.foreachRDD(pairRDD=>{
      pairRDD.filter(one=>{
        println("*********** filter *********")
        true
      }).foreach(println)
    })

    ssc
  }
}

六 SparkStreaming整合Kafka

Kafka

kafka是什么?使用场景?

kafka是一个高吞吐的分布式消息队列系统。特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。消息列队常见场景:系统之间解耦合、峰值压力缓冲、异步通信

Kafka更多介绍以及安装请跳转至本页面查看

kafka集群搭建

链接介绍的是Kafka 0.1.0版本, 这里介绍 kafka 0.8.2环境搭建

  1. 上传kafka_2.10-0.8.2.2.tgz包到三个不同节点上,解压。
  2. 配置…/ kafka_2.10-0.8.2.2/config/server.properties文件 节点编号:(不同节点按0,1,2,3整数来配置)

真实数据存储位置:

zookeeper的节点:

  1. 启动zookeeper集群。进入zk集群 zkCli.sh # 查看每个borker所对应的唯一id信息(可以看到当前kafka集群中启动了几个borker) ls /brokers/ids
  2. 三个节点上,启动kafka: bin/kafka-server-start.sh config/server.properties 最好使用自己写的脚本启动,将启动命令写入到一个文件: (放在与bin同一级别下,注意创建后要修改权限:chmod 755 startkafka.sh) nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &

SparkStreaming Kafka

receiver模式
Direct模式

SparkStreaming2.3 kafka_2.11-0.11.0.3 改变

kafka_2.11-0.11.0.3 安装同kafka 0.8.2 不过更换版本时需要删除zk中存放 kafka信息删除方式如下

向 kafka 中生产数据
代码语言:javascript复制
import java.text.SimpleDateFormat
import java.util.{Date, Properties}

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import scala.util.Random

/**
  * 向 kafka 中生产数据
  */
object ProduceDataToKafka {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "node2:9092,node3:9092,node4:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    //批次大小
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384")
    //等待时间
    props.put(ProducerConfig.LINGER_MS_CONFIG,"1")
    props.put("enable.auto.commit","true")


    val producer = new KafkaProducer[String,String](props)
    var counter = 0
    var keyFlag = 0
    while(true){
      counter  =1
      keyFlag  =1
      val content: String = userlogs()
      print(content)
      producer.send(new ProducerRecord[String, String]("testKafka,mytopic1222", s"key-$keyFlag", content))
      if(0 == counter0){
        counter = 0
        Thread.sleep(5000)
      }
    }

    producer.close()
  }

  def userlogs()={
    val userLogBuffer = new StringBuffer("")
    val timestamp = new Date().getTime();
    var userID = 0L
    var pageID = 0L

    //随机生成的用户ID
    userID = Random.nextInt(2000)

    //随机生成的页面ID
    pageID =  Random.nextInt(2000);

    //随机生成Channel
    val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML")
    val channel = channelNames(Random.nextInt(10))

    val actionNames = Array[String]("View", "Register")
    //随机生成action行为
    val action = actionNames(Random.nextInt(2))

    val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
    userLogBuffer.append(dateToday)
      .append("t")
      .append(timestamp)
      .append("t")
      .append(userID)
      .append("t")
      .append(pageID)
      .append("t")
      .append(channel)
      .append("t")
      .append(action)
    System.out.println(userLogBuffer.toString())
    userLogBuffer.toString()
  }


}
消费者接收数据
代码语言:javascript复制
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Durations, StreamingContext}

/**
  * SparkStreaming2.3版本 读取kafka 中数据 :
  *  1.采用了新的消费者api实现,类似于1.6中SparkStreaming 读取 kafka Direct模式。并行度 一样。
  *  2.因为采用了新的消费者api实现,所有相对于1.6的Direct模式【simple api实现】 ,api使用上有很大差别。未来这种api有可能继续变化
  *  3.kafka中有两个参数:
  *      heartbeat.interval.ms:这个值代表 kafka集群与消费者之间的心跳间隔时间,kafka 集群确保消费者保持连接的心跳通信时间间隔。这个时间默认是3s.
  *             这个值必须设置的比session.timeout.ms 小,一般设置不大于 session.timeout.ms  的1/3
  *      session.timeout.ms :
  *             这个值代表消费者与kafka之间的session 会话超时时间,如果在这个时间内,kafka 没有接收到消费者的心跳【heartbeat.interval.ms 控制】,
  *             那么kafka将移除当前的消费者。这个时间默认是30s。
  *             这个时间位于配置 group.min.session.timeout.ms【6s】 和 group.max.session.timeout.ms【300s】之间的一个参数,
  *             如果SparkSteaming 批次间隔时间大于5分钟,也就是大于300s,那么就要相应的调大group.max.session.timeout.ms 这个值。
  *  4.大多数情况下,SparkStreaming读取数据使用 LocationStrategies.PreferConsistent 这种策略,这种策略会将分区均匀的分布在集群的Executor之间。
  *    如果Executor在kafka 集群中的某些节点上,可以使用 LocationStrategies.PreferBrokers 这种策略,那么当前这个Executor 中的数据会来自当前broker节点。
  *    如果节点之间的分区有明显的分布不均,可以使用 LocationStrategies.PreferFixed 这种策略,可以通过一个map 指定将topic分区分布在哪些节点中。
  *
  *  5.新的消费者api 可以将kafka 中的消息预读取到缓存区中,默认大小为64k。默认缓存区在 Executor 中,加快处理数据速度。
  *     可以通过参数 spark.streaming.kafka.consumer.cache.maxCapacity 来增大,也可以通过spark.streaming.kafka.consumer.cache.enabled 设置成false 关闭缓存机制。
  *     "注意:官网中描述这里建议关闭,在读取kafka时如果开启会有重复读取同一个topic partition 消息的问题,报错:KafkaConsumer is not safe for multi-threaded access"
  *
  *  6.关于消费者offset
  *    1).如果设置了checkpoint ,那么offset 将会存储在checkpoint中。
  *     这种有缺点: 第一,当从checkpoint中恢复数据时,有可能造成重复的消费。
  *                第二,当代码逻辑改变时,无法从checkpoint中来恢复offset.
  *    2).依靠kafka 来存储消费者offset,kafka 中有一个特殊的topic 来存储消费者offset。新的消费者api中,会定期自动提交offset。这种情况有可能也不是我们想要的,
  *       因为有可能消费者自动提交了offset,但是后期SparkStreaming 没有将接收来的数据及时处理保存。这里也就是为什么会在配置中将enable.auto.commit 设置成false的原因。
  *       这种消费模式也称最多消费一次,默认sparkStreaming 拉取到数据之后就可以更新offset,无论是否消费成功。自动提交offset的频率由参数auto.commit.interval.ms 决定,默认5s。
  *       *如果我们能保证完全处理完业务之后,可以后期异步的手动提交消费者offset.
  *       注意:这种模式也有弊端,这种将offset存储在kafka中方式,参数offsets.retention.minutes=1440控制offset是否过期删除,默认将offset信息保存一天,
  *       如果停机没有消费达到时长,存储在kafka中的消费者组会被清空,offset也就被清除了。
  *    3).自己存储offset,这样在处理逻辑时,保证数据处理的事务,如果处理数据失败,就不保存offset,处理数据成功则保存offset.这样可以做到精准的处理一次处理数据。
  *
  */
object SparkStreamingOnKafkaDirect {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("SparkStreamingOnKafkaDirect")
    val ssc = new StreamingContext(conf,Durations.seconds(5))
    //设置日志级别
//    ssc.sparkContext.setLogLevel("ERROR")

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node4:9092,node2:9092,node3:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "MyGroupId",//
      /**
        *
        *  earliest :当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始
        *  latest:自动重置偏移量为最大偏移量【默认】*
        *  none:没有找到以前的offset,抛出异常
        */
      "auto.offset.reset" -> "earliest",
      /**
        * 当设置 enable.auto.commit为false时,不会自动向kafka中保存消费者offset.需要异步的处理完数据之后手动提交
        */
      "enable.auto.commit" -> (false: java.lang.Boolean)//默认是true
    )

    val topics = Array[String]("mytopic1222")
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,//消费策略
      Subscribe[String, String](topics, kafkaParams)
    )

    val transStrem: DStream[String] = stream.map((record:ConsumerRecord[String, String]) => {
      val key_value = (record.key, record.value)
      println("receive message key = " key_value._1)
      println("receive message value = " key_value._2)
      key_value._2
    })
    val wordsDS: DStream[String] = transStrem.flatMap(line=>{line.split("t")})
    val result: DStream[(String, Int)] = wordsDS.map((_,1)).reduceByKey(_ _)
    result.print()

    /**
      * 以上业务处理完成之后,异步的提交消费者offset,这里将 enable.auto.commit 设置成false,就是使用kafka 自己来管理消费者offset
      * 注意这里,获取 offsetRanges: Array[OffsetRange] 每一批次topic 中的offset时,必须从 源头读取过来的 stream中获取,不能从经过stream转换之后的DStream中获取。
      */
    stream.foreachRDD { rdd =>
      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      // some time later, after outputs have completed
      for(or <- offsetRanges){
        println(s"current topic = ${or.topic},partition = ${or.partition},fromoffset = ${or.fromOffset},untiloffset=${or.untilOffset}")
      }
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    }
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

七 SparkStreaming 总结

0 人点赞