Note_Spark_Day02:Standalone集群模式和使用IDEA开发应用程序

2021-12-07 09:36:39 浏览数 (1)

Spark Day02:Spark 基础环境(二)

代码语言:javascript复制
Hadoop3.0-HDFS
	https://www.bilibili.com/video/BV1yX4y1K7Lq

Hadoop3.0-MapReduce
	https://www.bilibili.com/video/BV1Tf4y167U8

Hadoop3.0-yarn
	https://www.bilibili.com/video/BV1wh411S76Z

01-[了解]-上次课程内容回顾

主要讲解2个方面的内容:Spark 框架概述和Spark 快速入门。

代码语言:javascript复制
1、Spark 框架概述
	- Spark 框架诞生背景
		加州大学、伯克利分校、APMLab实验室、2009年
	- Spark 框架功能(官方定义),类似MapReduce框架,分析处理数据
		Apache Spark™ is a unified analytics engine for large-scale data processing.
		分析引擎、统一的(任意类型分析基本都可以完成)、大规模数据集(海量数据)
	- Spark 发展史
		2009年、2010年发布论文(RDD)、2014年(1.0)、2016年(2.0)、2020年(3.0)
	- Spark 官方四个特性
		快Speed,与MapReduce相比较,2个方面比较
		统一
		支持多语言,Scala、Java、Python、R、SQL
	- 框架模块
		Core、SQL、Streaming(StructuredStreaming)、MLlib及GraphX、PySpark和SparkR等
	- 运行方式
		本地模型运行(1JVM进程,运行Task,线程方式)、集群模式运行和容器(云端):K8s
		
2、Spark 快速入门
	- 环境准备
		导入虚拟机、基本配置
		Spark 框架基本配置(设置):解压、设置JAVA和Scala环境变量
	- spark-shell
		本地模式运行交互式命令行
		$SPARK_HOME/bin/spark-shell --master local[2]
	- 经典案例:词频统计WordCount
		mapflatMap
		reduceByKey
		数据结构:RDD,认为就是一个集合,比如列表List,存储很多数据,调用高价函数处理数据
	- 圆周率PI
		使用提交命令:spark-submit --class xxx --master yyyy xxx.jar parameter

02-[了解]-今日课程内容提纲

讲解2个方面的内容:Standalone集群模式和使用IDEA开发应用程序。

代码语言:javascript复制
1、Standalone 集群
	Spark框架自身提供类似Hadoop YARN分布式集群资源管理集群Standalone功能,管理集群资源和分配资源运行Spark应用程序。
	集群架构组成,类似Hadoop YARN集群架构
	配置、部署、启动和测试
	Spark应用运行在集群上架构组成
	Spark 应用运行WEB UI监控

2、IDEA应用开发,编写入门案例词频统计
	创建Maven Project
	SparkContext实例创建
	WordCount代码编写
	使用spark-submit提交应用执行

03-[掌握]-Standalone集群【架构组成】

​ Spark Stanadlone集群类似Hadoop YARN集群功能,管理整个集群中资源(CUP Core核数、内存Memory、磁盘Disk、网络带宽等) ​ Standalone集群使用了分布式计算中的master-slave模型,master是集群中含有Master进程的节点,slave是集群中的Worker节点含有Executor进程。

  • Standalone集群主从架构:Master-Slave
  • 主节点:老大,管理者,Master
  • 从节点:小弟,干活的,Workers

Spark Standalone集群,类似Hadoop YARN,管理集群资源和调度资源:

  • Master,管理整个集群资源,接收提交应用,分配资源给每个应用,运行Task任务
  • Worker,管理每个机器的资源,分配对应的资源来运行Task;每个从节点分配资源信息给Worker管理,资源信息包含内存Memory和CPU Cores核数
  • HistoryServer,Spark Application运行完成以后,保存事件日志数据至HDFS,启动HistoryServer可以查 看应用运行相关信息。

04-[掌握]-Standalone 集群【配置和部署】

Standalone集群安装服务规划与资源配置:

需要将三台虚拟机,全部恢复到【04、分布式集群环境】快照。

按照讲义上步骤进行配置即可,具体步骤如下:

05-[掌握]-Standalone 集群【服务启动和运行应用】

​ 在Master节点node1.itcast.cn上启动,进入$SPARK_HOME,必须配置主节点到所有从节点的SSH无密钥登录,集群各个机器时间同步

  • 主节点Master启动命令
代码语言:javascript复制
[root@node1 ~]# /export/server/spark/sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /export/server/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-node1.itcast.cn.out
[root@node1 ~]# 
[root@node1 ~]# jps
15076 DataNode
15497 Master
15545 Jps
14973 NameNode

WEB UI页面地址:http://node1.itcast.cn:8080

  • 从节点Workers启动命令
代码语言:javascript复制
/export/server/spark/sbin/start-slaves.sh
  • 历史服务器HistoryServer
代码语言:javascript复制
/export/server/spark/sbin/start-history-server.sh

WEB UI页面地址:http://node1.itcast.cn:18080

​ 将上述运行在Local Mode的圆周率PI程序,运行在Standalone集群上,修改【--master】地址为Standalone集群地址:spark://node1.itcast.cn:7077,具体命令如下:

代码语言:javascript复制
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit 
--master spark://node1.itcast.cn:7077 
--class org.apache.spark.examples.SparkPi 
${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar 
10

查看Master主节点WEB UI界面:

06-[掌握]-Spark 应用架构组成

登录到Spark HistoryServer历史服务器WEB UI界面,点击刚刚运行圆周率PI程序:

切换到【Executors】Tab页面:

从图中可以看到Spark Application运行到集群上时,由两部分组成:Driver Program和Executors

每个Executor相当于线程池,每个线程运行Task任务,需要1Core CPU。

  • 第一、Driver Program
    • 相当于AppMaster,整个应用管理者,负责应用中所有Job的调度执行;
    • 运行JVM Process,运行程序的MAIN函数,必须创建SparkContext上下文对象;
    • 一个SparkApplication仅有一个;
  • 第二、Executors
    • 相当于一个线程池,运行JVM Process,其中有很多线程,每个线程运行一个Task任务, 一个Task运行需要1 Core CPU,所有可以认为Executor中线程数就等于CPU Core核数;
    • 一个Spark Application可以有多个,可以设置个数和资源信息;

07-[掌握]-Spark 应用WEB UI 监控

Spark 提供了多个监控界面,当运行Spark任务后可以直接在网页对各种信息进行监控查看。 运行spark-shell交互式命令在Standalone集群上,命令如下:

代码语言:javascript复制
/export/server/spark/bin/spark-shell --master spark://node1.itcast.cn:7077

在spark-shell中执行词频统计WordCount程序代码,运行如下:

代码语言:javascript复制
val inputRDD = sc.textFile("/datas/wordcount.data")
val wordcountsRDD = inputRDD.flatMap(line => line.split("\s ")).map(word => (word, 1)).reduceByKey((tmp, item) => tmp  item)
wordcountsRDD.take(5)

截图如下:

可以发现在一个Spark Application中,包含多个Job,每个Job有多个Stage组成,每个Job执行按照DAG图进行的。

其中每个Stage中包含多个Task任务,每个Task以线程Thread方式执行,需要1Core CPU。

Spark Application程序运行时三个核心概念:Job、Stage、Task,说明如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rAyWpkFn-1627098684378)(/img/image-20210420160752870.png)]

Job和Stage及Task之间关系:

08-[理解]-Standalone 集群【Standalone HA】

Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障(SPOF:single Point of Failover)的问题。

ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。 使用Zookeeper集群:选举leader、监控leader

基于Zookeeper实现HA:http://spark.apache.org/docs/2.4.5/spark-standalone.html#high-availability

09-[掌握]-IDEA 应用开发【构建Maven Project】

Spark课程代码,创建一个Maven Project工程,每天创建Maven Module模块,方便复习。

创建Maven Project工程【bigdata-spark_2.11】,设置GAV三要素的值如下:

创建Maven Module模块【spark-chapter01_2.11】,对应的GAV三要素值如下:

至此,将Maven Module模块创建完成,可以开始编写第一个Spark程序。

10-[掌握]-IDEA 应用开发【应用入口SparkContext】

Spark Application程序入口为:SparkContext,任何一个应用首先需要构建SparkContext对象,如下两步构建:

11-[掌握]-IDEA 应用开发【编程实现:WordCount】

​ 从HDFS上读取数据,所以需要将HDFS Client配置文件放入到Maven Module资源目录下,同时设置应用运行时日志信息。

完整代码如下:

代码语言:javascript复制
package cn.itcast.spark.start

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 使用Spark实现词频统计WordCount程序
 */
object SparkWordCount {
	
	def main(args: Array[String]): Unit = {
		// TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息
		val sc: SparkContext = {
			// 其一、构建SparkConf对象,设置应用名称和master
			val sparkConf: SparkConf = new SparkConf()
    			.setAppName("SparkWordCount")
    			.setMaster("local[2]")
			// 其二、创建SparkContext实例,传递sparkConf对象
			new SparkContext(sparkConf)
		}
		
		// TODO: 第一步、从HDFS读取文件数据,sc.textFile方法,将数据封装到RDD中
		val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data")
		
		// TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
		/*
				mapreduce spark spark hive
						| flatMap() = map   flatten
				mapreduce
				spark
				spark
				hive
						|map
				mapreduce,1
				spark,1
				spark,1
				hive,1
						|	reduceByKey
				spark, 2
				mapreduce, 1
				hive, 1
		 */
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 按照分隔符分割单词
			.flatMap(line => line.split("\s "))
			// 转换单词为二元组,表示每个单词出现一次
			.map(word => word -> 1)
			// 按照单词分组,对组内执进行聚合reduce操作,求和
			.reduceByKey((tmp, item) => tmp   item)
		
		// TODO: 第三步、将最终处理结果RDD保存到HDFS或打印控制台
		resultRDD.saveAsTextFile("/datas/spark-wordcount")
		resultRDD.foreach(tuple => println(tuple))
		
		// 为了查看应用监控,可以让进程休眠
		Thread.sleep(100000)
		
		// 应用结束,关闭资源
		sc.stop()
	}
	
}

12-[掌握]-IDEA 应用开发【编程实现:TopKey】

​ 在上述词频统计WordCount代码基础上,对统计出的每个单词的词频Count,按照降序排序,获取词频次数最多Top3单词

数据结构RDD中关于排序函数有如下三个:

  • 1)、sortByKey:针对RDD中数据类型key/value对时,按照Key进行排序
  • 2)、sortBy:针对RDD中数据指定排序规则
  • 3)、top:按照RDD中数据采用降序方式排序,如果是Key/Value对,按照Key降序排序

具体演示代码如下,建议使用sortByKey函数进行数据排序操作,慎用top函数。

代码语言:javascript复制
package cn.itcast.spark.top

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 使用Spark实现词频统计WordCount程序,按照词频降序排序
 */
object SparkTopKey {
	
	def main(args: Array[String]): Unit = {
		// TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息
		val sc: SparkContext = {
			// 其一、构建SparkConf对象,设置应用名称和master
			val sparkConf: SparkConf = new SparkConf()
    			.setAppName("SparkWordCount")
    			.setMaster("local[2]")
			// 其二、创建SparkContext实例,传递sparkConf对象
			new SparkContext(sparkConf)
		}
		
		// TODO: 第一步、从HDFS读取文件数据,sc.textFile方法,将数据封装到RDD中
		val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data")
		
		// TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
		/*
				mapreduce spark spark hive
						| flatMap() = map   flatten
				mapreduce
				spark
				spark
				hive
						|map
				mapreduce,1
				spark,1
				spark,1
				hive,1
						|	reduceByKey
				spark, 2
				mapreduce, 1
				hive, 1
		 */
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 按照分隔符分割单词
			.flatMap(line => line.split("\s "))
			// 转换单词为二元组,表示每个单词出现一次
			.map(word => word -> 1)
			// 按照单词分组,对组内执进行聚合reduce操作,求和
			.reduceByKey((tmp, item) => tmp   item)
		
		
		// TODO: 第三步、将最终处理结果RDD保存到HDFS或打印控制台
		/*
			(spark,11)
			(hadoop,3)
			(hive,6)
			(hdfs,2)
			(mapreduce,4)
			(sql,2)
		 */
		resultRDD.foreach(tuple => println(tuple))
		println("===========================")
		
		// =========================== sortByKey =========================
		resultRDD
			// 将单词和词频互换
			.map(tuple => tuple.swap) // (tuple => (tuple._2, tuple._1))
			// 调用sortByKey安装,按照Key进行排序,设置降序排序
			.sortByKey(ascending = false)
			// 打印结果
    		.take(3)
			.foreach(tuple => println(tuple))
		println("===========================")
		// =========================== sortBy =========================
		/*
		  def sortBy[K](
		      f: (T) => K, // 指定排序规则
		      ascending: Boolean = true,
		      numPartitions: Int = this.partitions.length
		  )
		  (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
		 */
		resultRDD
			.sortBy(tuple => tuple._2, ascending = false)
			// 打印结果
			.take(3)
			.foreach(tuple => println(tuple))
		println("===========================")
		// =========================== top =========================
		/*
		  def top(num: Int)(implicit ord: Ordering[T]): Array[T]
		 */
		resultRDD
			.top(3)(Ordering.by(tuple => - tuple._2))
			.foreach(tuple => println(tuple))
		
		
		// 为了查看应用监控,可以让进程休眠
		Thread.sleep(100000)
		
		// 应用结束,关闭资源
		sc.stop()
	}
	
}

13-[理解]-Spark 应用提交命令【spark-submit】

​ 使用IDEA集成开发工具开发测试Spark Application程序以后,类似MapReduce程序一样,打成jar包,使用命令【spark-submit】提交应用的执行,提交命令帮助文档:

代码语言:javascript复制
[root@node1 ~]# /export/server/spark/bin/spark-submit --help
Usage: spark-submit [options]  [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Usage: spark-submit run-example [options] example-class [example args]

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn,
                              k8s://https://host:port, or local (Default: local[*]).
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --class CLASS_NAME          Your application's main class (for Java / Scala apps).
  --name NAME                 A name of your application.
  --jars JARS                 Comma-separated list of jars to include on the driver
                              and executor classpaths.
  --packages                  Comma-separated list of maven coordinates of jars to include
                              on the driver and executor classpaths. Will search the local
                              maven repo, then maven central and any additional remote
                              repositories given by --repositories. The format for the
                              coordinates should be groupId:artifactId:version.
  --exclude-packages          Comma-separated list of groupId:artifactId, to exclude while
                              resolving the dependencies provided in --packages to avoid
                              dependency conflicts.
  --repositories              Comma-separated list of additional remote repositories to
                              search for the maven coordinates given with --packages.
  --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to place
                              on the PYTHONPATH for Python apps.
  --files FILES               Comma-separated list of files to be placed in the working
                              directory of each executor. File paths of these files
                              in executors can be accessed via SparkFiles.get(fileName).

  --conf PROP=VALUE           Arbitrary Spark configuration property.
  --properties-file FILE      Path to a file from which to load extra properties. If not
                              specified, this will look for conf/spark-defaults.conf.

  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  --driver-java-options       Extra Java options to pass to the driver.
  --driver-library-path       Extra library path entries to pass to the driver.
  --driver-class-path         Extra class path entries to pass to the driver. Note that
                              jars added with --jars are automatically included in the
                              classpath.

  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).

  --proxy-user NAME           User to impersonate when submitting the application.
                              This argument does not work with --principal / --keytab.

  --help, -h                  Show this help message and exit.
  --verbose, -v               Print additional debug output.
  --version,                  Print the version of current Spark.

 Cluster deploy mode only:
  --driver-cores NUM          Number of cores used by the driver, only in cluster mode
                              (Default: 1).

 Spark standalone or Mesos with cluster deploy mode only:
  --supervise                 If given, restarts the driver on failure.
  --kill SUBMISSION_ID        If given, kills the driver specified.
  --status SUBMISSION_ID      If given, requests the status of the driver specified.

 Spark standalone and Mesos only:
  --total-executor-cores NUM  Total cores for all executors.

 Spark standalone and YARN only:
  --executor-cores NUM        Number of cores per executor. (Default: 1 in YARN mode,
                              or all available cores on the worker in standalone mode)

 YARN-only:
  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").
  --num-executors NUM         Number of executors to launch (Default: 2).
                              If dynamic allocation is enabled, the initial number of
                              executors will be at least NUM.
  --archives ARCHIVES         Comma separated list of archives to be extracted into the
                              working directory of each executor.
  --principal PRINCIPAL       Principal to be used to login to KDC, while running on
                              secure HDFS.
  --keytab KEYTAB             The full path to the file that contains the keytab for the
                              principal specified above. This keytab will be copied to
                              the node running the Application Master via the Secure
                              Distributed Cache, for renewing the login tickets and the
                              delegation tokens periodically.

提交一个应用命令: Usage: spark-submit [options] [app arguments]

  • 第一种:基本参数配置
  • 第二种:Driver Program 参数配置
  • 第三种:Executor 参数配置

​ 每个Spark Application运行时,需要启动Executor运行任务Task,需要指定Executor个数及每个Executor资源信息(内存Memory和CPU Core核数)。

官方案例,提交Spark应用运行设置

14-[掌握]-IDEA应用开发【应用打包运行】

​ 将开发测试完成的WordCount程序打成jar保存,使用【spark-submit】分别提交运行在本地模式LocalMode和集群模式Standalone集群。 先修改代码,通过master设置运行模式及传递处理数据路径

代码语言:javascript复制
package cn.itcast.spark.submit

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 使用Spark实现词频统计WordCount程序
 */
object SparkSubmit {
	
	def main(args: Array[String]): Unit = {
		
		//判断是否传递2个参数,如果不是,直接抛出异常
		if(args.length < 2){
			println("Usage: SparkSubmit   ...................")
			System.exit(-1)
		}
		
		// TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息
		val sc: SparkContext = {
			// 其一、构建SparkConf对象,设置应用名称和master
			val sparkConf: SparkConf = new SparkConf()
    			.setAppName("SparkWordCount")
    			//.setMaster("local[2]")
			// 其二、创建SparkContext实例,传递sparkConf对象
			new SparkContext(sparkConf)
		}
		
		// TODO: 第一步、从HDFS读取文件数据,sc.textFile方法,将数据封装到RDD中
		val inputRDD: RDD[String] = sc.textFile(args(0))
		
		// TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
		/*
				mapreduce spark spark hive
						| flatMap() = map   flatten
				mapreduce
				spark
				spark
				hive
						|map
				mapreduce,1
				spark,1
				spark,1
				hive,1
						|	reduceByKey
				spark, 2
				mapreduce, 1
				hive, 1
		 */
		val resultRDD: RDD[(String, Int)] = inputRDD
			// 按照分隔符分割单词
			.flatMap(line => line.split("\s "))
			// 转换单词为二元组,表示每个单词出现一次
			.map(word => word -> 1)
			// 按照单词分组,对组内执进行聚合reduce操作,求和
			.reduceByKey((tmp, item) => tmp   item)
		
		// TODO: 第三步、将最终处理结果RDD保存到HDFS或打印控制台
		resultRDD.saveAsTextFile(s"${args(1)}-${System.currentTimeMillis()}")
		
		
		// 应用结束,关闭资源
		sc.stop()
	}
	
}

打成jar包,上传至HDFS文件系统:/spark/apps

代码语言:javascript复制
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit 
--master local[2] 
--class cn.itcast.spark.submit.SparkSubmit 
hdfs://node1.itcast.cn:8020/spark/apps/spark-day02_2.11-1.0.0.jar 
/datas/wordcount.data /datas/swc-output



SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit 
--master spark://node1.itcast.cn:7077,node2.itcast.cn:7077 
--class cn.itcast.spark.submit.SparkSubmit 
--driver-memory 512m 
--executor-memory 512m 
--executor-cores 1 
--total-executor-cores 2 
hdfs://node1.itcast.cn:8020/spark/apps/spark-day02_2.11-1.0.0.jar 
/datas/wordcount.data /datas/swc-output

附录一、创建Maven模块

1)、Maven 工程结构

​ MAVEN工程GAV三要素:

代码语言:javascript复制
    <parent>
        <artifactId>bigdata-spark_2.11artifactId>
        <groupId>cn.itcast.sparkgroupId>
        <version>1.0.0version>
    parent>
    <modelVersion>4.0.0modelVersion>

    <artifactId>spark-chapter01_2.11artifactId>

2)、POM 文件内容

​ Maven 工程POM文件中内容(依赖包):

代码语言:javascript复制
            aliyun
            http://maven.aliyun.com/nexus/content/groups/public/
        
        
            cloudera
            https://repository.cloudera.com/artifactory/cloudera-repos/
        
        
            jboss
            http://repository.jboss.com/nexus/content/groups/public
        
    

    
        2.11.12
        2.11
        2.4.5
        2.6.0-cdh5.16.2
    

    
        
        
            org.scala-lang
            scala-library
            ${scala.version}
        
        
        
            org.apache.spark
            spark-core_${scala.binary.version}
            ${spark.version}
        
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        
    

    
        target/classes
        target/test-classes
        
            
                ${project.basedir}/src/main/resources
            
        
        
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.0
                
                    1.8
                    1.8
                    UTF-8
                
            
            
                net.alchim31.maven
                scala-maven-plugin
                3.2.0
                
                    
                        
                            compile
                            testCompile

IDEA中配置远程连接服务器

.0 1.8 1.8 UTF-8 net.alchim31.maven scala-maven-plugin 3.2.0 compile testCompile

代码语言:javascript复制
> IDEA中配置远程连接服务器

[外链图片转存中...(img-Isvrrx8P-1627098684386)]

0 人点赞