Hadoop体系回顾
Hadoop 的概念可追溯到 2003,2004 Google2篇论文(老版三辆马车),2011年发布1.0版本,2012年发布稳定版。Hadoop 在2.0版本之前组件主要是 HDFS跟MapReduce。
1.0版本框架主要如下:
职责:
HDFS负责文件存储 MapReduce负责资源调度跟计算
缺点:
- mr基于数据集的计算,所以面向数据
- 基本运算规则从存储介质中获取(采集)数据,然后进行计算,最后将结果存储到介质中,所以主要应用于
一次性
计算,不适合于数据挖掘和机器学习这样的迭代计算
和图形挖掘
计算。 - MR基于文件存储介质的操作,所以性能非常的慢
- MR和hadoop紧密耦合在一起,无法动态替换。违背了OCP原则
2.0版本框架如下:
重点
:在RM中 通过ApplicationMaster隔离Driver跟RM,在NM中 通过Container 来隔离NM跟Task . 细节调度图如下:
一般也就是客户端提交job后,RM负责给提交的job分配一个NM,在这个NM上会有AppMaster
的启动,AppMaster
启动后通知RM,然后RM负责找若干个NM,在每个NM中开辟Container来给任务分配资源, 这些container反向注册到Appmaster中 Appmaster来关联这些任务。核心思想
就是引入了Yarn来负责资源的的调度,细节部分见上图。
Spark介绍
在Hadoop1.0时代由于MR太累赘,很多不方便的地方,因此在Hadoop的Yarn发布前,外部人员开发出来Spark。大致的架构思想跟MR类似就是基于内存,迭代式计算。
什么是Spark
Spark 是一种基于内存
的快速、通用、可扩展的大数据分析引擎。
历史
2009年诞生于加州大学伯克利分校
AMPLab,项目用Scala
语言编写。2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。
Spark内置模块
Spark Core
:
实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义。
Spark SQL
:
是Spark用来操作结构化数据的程序包。通过Spark SQL,我们可以使用 SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。Spark SQL支持多种数据源,比如Hive表、Parquet以及JSON等。
Spark Streaming
:
是Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与Spark Core中的 RDD API高度对应。
Spark MLlib
:
提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据 导入等额外的支持功能。
集群管理器
:
Spark 设计为可以高效地在一个计算节点到数千个计算节点之间
伸缩计算
。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器
(Cluster Manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark自带的一个简易调度 器,叫作独立调度器。
Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的Spark已应用于大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群达到8000台的规模,是当前已知的世界上最大的Spark集群。
Spark特点
快
跟Hadoop中的MapReduce相比,spark是基于内存的运算,速度提高100倍以上,基于硬盘的计算也提高10倍以上,spark实现了高效的DAG执行引擎。可以通过基于内存来高效处理数据流,计算的中间结果存储在内存中。
易用
Spark支持Java、Python、Scala的API,还支持超过80种高级算法,帮助用户快速构建不同的应用,而且Spark支持交互式的Python跟Scala的Shell,可以非常方便的在这些spark-shell中使用Spark集群验证问题。
通用
Spark提供了统一的解决方案,Spark可以用于批处理,交互式查询(Spark SQL)、实时流处理(Spark Streaming)、集群学习(Spark MLlib) 跟图计算(GraphX)。这些不同类型的处理都可以在一个应用各种无缝使用,减少了开发跟维护人员的成本跟不熟平台的物力成本。
兼容性
Spark非常方便的跟其他开源产品融合,比如Spark可以使用Hadoop中的YARN或者Apache MEsos作为它的资源管理跟调度器,并且可以处理所以Hadoop支持的数据,包括HDFS、HBase等,这对已经不熟Hadoop集群的用户特别重要,不需要任何数据迁移就可以使用Spark的强大处理能力。
Spark 运行模式
官方信息
- 官网地址http://spark.apache.org/
- 文档查看地址https://spark.apache.org/docs/2.1.1/
- 下载地址https://spark.apache.org/downloads.html
重要俩角色
在这里插入图片描述
Driver(驱动器)
Spark的驱动器是执行开发程序中的main方法的进程。它负责开发人员编写的用来创建SparkContext、创建RDD,以及进行RDD的转化操作和行动操作代码的执行。如果你是用spark-shell,那么当你启动Spark shell的时候,系统后台自启了一个Spark驱动器程序,就是在Spark shell中预加载的一个叫作 sc
的SparkContext
对象。如果驱动器程序终止,那么Spark应用也就结束了。主要负责:
- 把用户程序转为作业(JOB)
- 跟踪Executor的运行状况
- 为执行器节点调度任务
- UI展示应用运行状况
Executor(执行器)
Spark Executor是一个工作进程,负责在 Spark 作业中运行任务,任务间相互独立。Spark 应用启动时,Executor节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。主要负责:
- 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程;
- 通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。RDD是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
Driver跟Executor关系
看上图也就知道算子放到executor中其余在driver中一般,但是如果算子中用到了外部数据,则外部数据需要在driver跟executor中进行序列化跟反序列化到传输。切记!
Local模式
Local模式就是运行在一台计算机上的模式,通常就是用于在本机练手跟测试,它可以通过以下方式设置Master。
local:s所有计算都运行在一个线程中,没有任何并行计算,通常在本机执行测试代码或者练手。
local[K]:指定使用几个线程来运行计算,比如local[4]就是运行4个Worker线程,通常为哦们CPU有几个Core就指定几个线程,最大化利用CPU计算能力。
local[*]:这种2模式直接帮我们按照CPU最多Cores来设置线程数。
安装使用
- 上传并解压spark安装包
[atguigu@hadoop102 sorfware]$ tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
[atguigu@hadoop102 module]$ mv spark-2.1.1-bin-hadoop2.7 spark
- 官方求PI案例
[atguigu@hadoop102 spark]$ bin/spark-submit
--class org.apache.spark.examples.SparkPi
--executor-memory 1G --total-executor-cores 2
./examples/jars/spark-examples_2.11-2.1.1.jar 100
(spark版本不同example不同)
结果:
Pi is roughly 3.1419875141987514
基本语法
代码语言:javascript复制bin/spark-submit
--class <main-class> --master <master-url>
--deploy-mode <deploy-mode> --conf <key>=<value>
... # other options
<application-jar>
参数说明
参数 | 含义 |
---|---|
--master | 指定Master的地址,默认为Local |
--class | 你的应用的启动类 (如 org.apache.spark.examples.SparkPi) |
--deploy-mode | 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)* |
--conf | 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value” |
application-jar | 打包好的应用jar,包含依赖. 这个URL在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar |
application-arguments | 传给main()方法的参数 |
--executor-memory 1G | 指定每个executor可用内存为1G |
--total-executor-cores 2 | 指定每个executor使用的cup核数为2个 |
WordCount数据准备:在spark/bin 目录下创建input文件夹,然后input里面创建文件:
代码语言:javascript复制1.txt
hello word
hello scala
2.txt
hello spark
启动spark-shell
代码语言:javascript复制bin/spark-shell
...
Spark context Web UI available at http://10.0.0.153:4040
Spark context available as 'sc' (master = local[*], app id = local-1591940789459).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ '_/
/___/ .__/_,_/_/ /_/_ version 2.2.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
-----
[atguigu@hadoop102 spark]$ jps (查看Java进程)
3627 SparkSubmit
4047 Jps
分析流程:
spark-shell启动后有个sc
是系统的SparkContext,可调用很多自带的方法。
sc.textFile("input").flatMap(.split(" ")).map((,1)).reduceByKey(_ _).collect
WordCount程序分析:
参数 | 含义 |
---|---|
textFile("input") | 读取本地文件input文件夹数据; |
flatMap(_.split(" ")) | 压平操作,按照空格分割符将一行数据映射成一个个单词; |
map((_,1)) | 对每一个元素操作,将单词映射为元组; |
reduceByKey( ) | 按照key将值进行聚合,相加; |
collect | 将数据收集到Driver端展示。 |
Standalone模式
概述:构建一个由Master Slave构成的Spark集群(资源的调度跟管理用Spark自带的),Spark运行在集群中。
- 修改配置文件名称: 主要是设定好spark集群启动的若干个节点,跟hadoop类似的。
mv slaves.template slaves
vim slaves
hadoop102
hadoop103
hadoop104
- 修改 spark-env.sh
mv spark-env.sh.template spark-env.sh
SPARK_MASTER_HOST=hadoop102
SPARK_MASTER_PORT=7077
也可以百度配置写别的配置参数哦
- 分发spark包
xsync spark/
- 启动Spark集群
spark/sbin/start-all.sh
- 查看集群服务
util.sh (自写shell脚本)
================atguigu@hadoop102================
3330 Jps
3238 Worker
3163 Master
================atguigu@hadoop103================
2966 Jps
2908 Worker
================atguigu@hadoop104================
2978 Worker
3036 Jps
ssh kg@IP jps (前提配置好无秘钥登陆)
- xsync脚本
#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi
#2 获取文件名称
p1=$1
fname=`basename $p1` # 获得文件名
echo fname=$fname
#3 获取上级目录到绝对路径
pdir=`cd -P $(dirname $p1); pwd` # -P 是进入到软链接 实际路径
echo pdir=$pdir
#4 获取当前用户名称
user=`whoami`
#5 循环 一般就是在102上进行修改 然后运行这个脚本实现 103 跟104的同步
for((host=103; host<105; host )); do
echo ------------------- hadoop$host --------------
rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
done
注意
:如果遇到JAVA_HOME not set
异常,可以在sbin目录下的spark-config.sh
跟conf 目录下的spark-env.sh
文件中加入如下配置(随意一个配置即可):
export JAVA_HOME=/usr/local/java/jdk1.8.0_152
- 集群启动
bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://hadoop102:7077
--executor-memory 1G --total-executor-cores 2 ./examples/jars/spark-examples_2.11-2.1.1.jar 100
- 启动spark shell
参数
:--master spark://hadoop102:7077指定要连接的集群的master
spark-shell --master `spark://hadoop102:7077` --executor-memory 1g --total-executor-cores 2
JobHistoryServer配置
Standalone模式下配置History如下
- spark-default.conf 配置
mv spark-defaults.conf.template spark-defaults.conf
开启Log
vi spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop102:9000/directory
注意
:HDFS上的目录需要提前存在。 2. spark-env.sh
vi spark-env.sh
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080
-Dspark.history.retainedApplications=30
-Dspark.history.fs.logDirectory=hdfs://hadoop102:9000/directory"
参数 | 含义 |
---|---|
spark.eventLog.dir | Application在运行过程中所有的信息均记录在该属性指定的路径下 |
spark.history.ui.port=18080 | WEBUI访问的端口号为18080 |
spark.history.fs.logDirectory=hdfs://hadoop102:9000/directory | 配置了该属性后,在start-history-server.sh时就无需再显式的指定路径,Spark History Server页面只展示该指定路径下的信息 |
spark.history.retainedApplications=30 | 指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。 |
- 分发配置文件
xsync spark-defaults.conf
xsync spark-env.sh
- 启动历史服务
sbin/start-history-server.sh
- 再次执行demo
bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://hadoop102:7077
--executor-memory 1G --total-executor-cores 2 ./examples/jars/spark-examples_2.11-2.1.1.jar 100
- 查看历史服务: hadoop102:18080
Yarn模式(重点)
Spark客户端直接连接Yarn
,不需要额外构建Spark集群。有yarn-client
和yarn-cluster
两种模式,主要区别在于:Driver程序的运行节点。yarn-client
:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出yarn-cluster
:Driver程序运行在由RM(ResourceManager)启动的AP(APPMaster)适用于生产环境。
安装使用
- 修改hadoop配置文件yarn-site.xml,添加如下内容:
[atguigu@hadoop102 hadoop]$ vi yarn-site.xml
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
- 修改spark-env.sh,添加如下配置:
[atguigu@hadoop102 conf]$ vi spark-env.sh
YARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop
- 分发配置文件
[atguigu@hadoop102 conf]$ xsync /opt/module/hadoop-2.7.2/etc/hadoop/yarn-site.xml
[atguigu@hadoop102 conf]$ xsync spark-env.sh
- 执行一个程序
[atguigu@hadoop102 spark]$ bin/spark-submit --class org.apache.spark.examples.SparkPi
--master yarn --deploy-mode client
./examples/jars/spark-examples_2.11-2.1.1.jar 100
注意
:在提交任务之前需启动HDFS以及YARN集群。
日志查看
- 修改配置文件spark-defaults.conf 添加如下内容:
spark.yarn.historyServer.address=hadoop102:18080
spark.history.ui.port=18080
- 分发文件
xsxync spark-defaults.conf
- 重启spark历史服务
[atguigu@hadoop102 spark]$ sbin/stop-history-server.sh
stopping org.apache.spark.deploy.history.HistoryServer
[atguigu@hadoop102 spark]$ sbin/start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/module/spark/logs/spark-atguigu-org.apache.spark.deploy.history.HistoryServer-1-hadoop102.out
- 提交任务到Yarn执行
[atguigu@hadoop102 spark]$ bin/spark-submit --class org.apache.spark.examples.SparkPi
--master yarn --deploy-mode client
./examples/jars/spark-examples_2.11-2.1.1.jar 100
- Web页面查看日志
Mesos模式(了解)
Spark客户端直接连接Mesos;不需要额外构建Spark集群。国内应用比较少,更多的是运用yarn调度
几种模式对比
模式 | Spark安装机器数 | 需启动的进程 | 所属者 |
---|---|---|---|
Local | 1 | 无 | Spark |
Standalone | 3 | Master及Worker | Spark |
Yarn | 1 | Yarn及HDFS | Hadoop |
注意 Spark跟YARN的模式,只需要一台机器提交任务即可了,反正任务的计算跟调度都是通过YARN来搞定的
HA模式
HA 环境的搭建,整体的原因跟思路跟Hadoop中是一样的(spark YARN的模式配置)。操作步骤如下:
- zookeeper正常安装并启动
- 修改spark-env.sh
vi spark-env.sh
注释掉如下内容:
#SPARK_MASTER_HOST=hadoop102
#SPARK_MASTER_PORT=7077
添加上如下内容:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=hadoop102:2181,hadoop103:2181,hadoop104:2181
-Dspark.deploy.zookeeper.dir=/spark"
- 分发配置文件
xsync spark-env.sh
- 在hadoop102上启动全部节点
[atguigu@hadoop102 spark]$ sbin/start-all.sh
- 在hadoop103上单独启动master节点
[atguigu@hadoop103 spark]$ sbin/start-master.sh
- spark HA 集群访问
spark-shell --master spark://hadoop102:7077,hadoop103:7077 --executor-memory 2g --total-executor-cores 2
案例实操
Spark Shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。准备条件
java按照配置完毕,scala安装完毕,spark安装完毕,maven安装完毕。选择spark本地调试不用hadoop模式,这样简单啊!
代码阶段:
- maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sowhat.demo</groupId>
<artifactId>sparktest</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<finalName>WordCount</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<!-- 这个组件让我们不用再 在项目上add frame 选择scala了,可以自动创建 *.scala 文件 -->
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<!-- 主要是定制化打包
https://www.cnblogs.com/fnlingnzb-learner/p/10537228.html-->
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass>WordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
<!--将依赖的第三方jar包打包到jar中,这样方便我们发布可执行的jar包。 -->
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!--名字任意 -->
<phase>package</phase>
<!-- 绑定到package生命周期阶段上 -->
<goals>
<goal>single</goal>
<!-- 只运行一次 -->
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- WordCount 文件
package com.sowhat
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author sowhat
* @create 2020-06-12 15:55
*/
object WordCount {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("WC")
val sc = new SparkContext(conf)
sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ _).sortBy(_._2).saveAsTextFile(args(1))
sc.stop()
}
}
maven打包:
一般将有依赖的jar放到spark服务器上运行即可。
代码语言:javascript复制bin/spark-submit --class com.sowhat.WordCount WordCount-jar-with-dependencies.jar ./input ./output
屏蔽掉烦人的INFO信息,两种方法。 第一种:
编辑您的conf/log4j.properties文件,然后更改以下行: log4j.rootCategory=INFO, console 至 log4j.rootCategory=ERROR, console
第二种:
代码语言:javascript复制import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
- WordCountLocal文件(本地debug模式)
本地Spark程序调试需要使用local
提交模式,即将本机当做运行环境,Master和Worker都为本机。运行时直接加断点调试即可。如下: 创建SparkConf的时候设置额外属性,表明本地执行:
val conf = new SparkConf().setAppName("WC").setMaster("local[*]")
代码语言:javascript复制package com.sowhat
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author sowhat
* @create 2020-06-12 15:55
*/
object WordCountLocal {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("WC").setMaster("local[*]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc = new SparkContext(conf)
//3.使用sc创建RDD并执行相应的transformation和action
val tuples = sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ _).sortBy(_._2).collect()
//4.关闭连接
tuples.foreach(println)
sc.stop()
}
}
如果本机操作系统是windows,如果在程序中使用了hadoop相关的东西,比如写入文件到HDFS,则会遇到如下异常:
出现这个问题的原因,并不是程序的错误,而是用到了hadoop相关的服务,解决办法 是https://github.com/srccodes/hadoop-common-2.2.0-bin解压到任意目录,比如(C:Program Fileshadoop)。
常用端口号
端口号 | 含义 |
---|---|
50070 | HDFSwebUI的端口号 |
8485 | journalnode默认的端口号 |
9000 | 非高可用访问数rpc端口 |
8020 | 高可用访问数据rpc |
8088 | yarn的webUI的端口号 |
8080 | master的webUI,Tomcat的端口号 |
7077 | spark基于standalone的提交任务的端口号 |
8081 | worker的webUI的端口号 |
18080 | historyServer的webUI的端口号 |
4040 | application的webUI的端口号 |
2181 | zookeeper的rpc端口号 |
9083 | hive的metastore的端口号 |
60010 | Hbase的webUI的端口号 |
6379 | Redis的端口号 |
8080 | sparkwebUI的端口号 |
9092 | kafka broker的端口号 |