Spark Core入门1【Spark集群安装、高可用、任务执行流程、使用Scala/Java/Lambda编写Spark WordCount】

2021-05-14 17:31:20 浏览数 (1)

一、Spark介绍

Spark是一种快速、通用、可扩展的大数据分析引擎,包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目。

Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。

Spark的优点:

1、快:与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG(有向无环图)执行引擎,可以通过基于内存来高效处理数据流。

2、易用:Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。

3、通用:Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。

4、Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。

二、Spark集群安装

2.1   下载spark

(1)从spark官方下载spark安装包

(2)上传spark安装包到Linux上

(3)解压安装包到指定位置

代码语言:javascript复制
tar -zxvf spark-2.3.3-bin-hadoop2.7.tgz -C /root/apps/spark

2.2    配置spark

spark配置文件都在spark/conf下

进入到spark安装目录

代码语言:javascript复制
cd /root/apps/spark

进入conf目录并重命名并修改spark-env.sh.template文件

代码语言:javascript复制
cd conf/
mv spark-env.sh.template spark-env.sh
vi spark-env.sh

在该配置文件中添加如下配置

代码语言:javascript复制
export JAVA_HOME=/usr/local/jdk1.8
export SPARK_MASTER_IP=hdp-01
export SPARK_MASTER_PORT=7077

保存退出

重命名并修改slaves.template文件

代码语言:javascript复制
mv slaves.template slaves
vi slaves

在该文件中添加子节点所在的位置(Worker节点,指定哪些机器需要作为从节点启动)

代码语言:javascript复制
hdp-02
hdp-03

保存退出 将配置好的Spark拷贝到其他节点上 将spark拷贝到其他机器上hdp-02 、hdp-03的/root/apps目录下

代码语言:javascript复制
for i in {2,3}; do scp -r /root/apps/spark/ hdp-0$i:/root/apps; done

Spark集群配置完毕,目前是1个Master,2个Worker,在hdp-01启动spark集群

代码语言:javascript复制
/root/apps/spark/sbin/start-all.sh

启动后执行jps命令,主节点(hdp-01)上有Master进程,其他子节点(hdp-02、hdp-03)上有Worker进行。

登录Spark管理界面查看集群状态(主节点):http://hdp-01:8080/   【检验】 注意7077是rpc通信端口,内部是Netty

到此为止,Spark集群安装完毕,但是有一个很大的问题,那就是Master节点存在单点故障,要解决此问题,就要借助zookeeper,并且启动至少两个Master节点来实现高可靠,配置方式比较简单:

Spark集群规划:hdp-01,hdp-04是Master;hdp-02、hdp-03是Worker

安装配置zk集群,并启动zk集群

停止spark所有服务,修改配置文件spark-env.sh,在该配置文件中删掉SPARK_MASTER_IP

并添加如下配置

代码语言:javascript复制
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hdp-01:2181,hdp-02:2181,hdp-03:2181 -Dspark.deploy.zookeeper.dir=/spark"

#除此之外还可以修改Worker的核数(线程数,不能超过实际物理机器的线程)、内存大小等配置
#配置文件上有示例,直接抄即可
export SPARK_WORKER_CORES=8
export SPARK_WORKER_MEMORY=2g

1.在hdp-01节点上修改slaves配置文件内容指定worker节点 2.在hdp-01上执行sbin/start-all.sh脚本,后在hdp-04上执行sbin/start-master.sh启动第二个Master【意味着只在hdp-04启动一个Master】如果连接http://hdo-04:8080,会发现Status显示STANDBY状态,且没有Workers信息。

在Spark集群启动的时候,所有的Master和Worker都连接到Zookeeper集群中。zk的作用如下:

1、zk集群会选举出一个Master作为活跃(alive)的Master,另外一个Master处于Stand By状态。

2、zk集群还会保存活跃的Master信息

3、zk集群还会保存所有Worker的资源信息和资源使用情况,如图中hdp-01作为活跃的Master,它能获取所有的Worker(hdp-02、hdp-03)的使用情况,如果hdp-01挂掉,那么会切换为hdp-04作为活跃的Master,它也应该能获取获取所有的Worker信息,那么Worker的资源信息和资源使用情况就应该保存在zk中。【为了故障切换】

2.3    总结:

1、先启动zk集群

2、启动spark集群,但只会启动一个Master,另外一台Master机器需要手动启动

3、如果模拟hdp-01故障,那么hdp-04会由STANDBY状态切换为MASTER状态。当hdp-01修复后,hdp-01为STANDBY状态,hdp-04仍为MASTER状态。在故障切换的过程中,会短暂性终止spark服务。

三、执行Spark程序

3.1    入门案例——蒙特卡罗算法求Pi

实际上是通过数学采样的方式计算Pi,采样的次数越多,计算的Pi值越准确。

代码语言:javascript复制
/root/apps/spark-2.3.3-bin-hadoop2.7/bin/spark-submit 
--master spark://hdp-01:7077,spark://hdp-02:7077 
--class org.apache.spark.examples.SparkPi 
--executor-memory 2048mb 
--total-executor-cores 24 
/root/apps/spark-2.3.3-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.3.jar 
10000

/**
解析:
第一行是指通过spark提交任务的客户端spark-submit
第二行是指定master所在的机器 底层是rpc通信协议 spark://主机名或ip地址:7077  7077是rpc通信端口,而8080是http外部访问端口,需要区分开来。提交任务可以指定多个master地址,目的是为了提交任务高可用
第三行是指执行哪一个类 全路径类名,官方自带的蒙特卡罗求Pi样例(底层是通过反射执行)
第四、五行是指执行的内存大小,cpu核数(实际上这里的核数是执行的线程数)
第六行是指该样例所在的jar包位置   2.11为scala版本  2.3.3为spark版本
第六行是指采样的次数,采样次数越多,求Pi越精确
*/

最终求的:Pi is roughly 3.141852462837049   采样次数可以设置更高试试

此时登录http://hdp-01:8080中,即spark后台管理界面,查看到新增了一个已完成任务。

Completed Applications

Application ID

Name

Cores

Memory per Executor

Submitted Time

User

State

Duration

app-20190427200200-0004

Spark Pi

16

2.0 GB

2019/04/27 20:02:00

root

FINISHED

2.8 min

在执行过程中,有一些细节需要说一下:

假设我现在的集群架构是这样:

hdp-01为Master(alive)、hdp-02也为Master(stand by) 

hdp-03、hdp-04、hdp-05为Worker  , 假设我在机器hdp-05中提交了蒙特卡罗求Pi任务

在执行任务的过程中,给集群中的所有机器输入jps,查看后台java任务都有哪些?

(1)在hdp-05中,存在CoarseGrainedExecutorBackend(执行任务真正执行的地方)、SparkSubmit(提交任务到Spark集群,和Master通信、调度任务等功能)、Worker等

(2)在hdp-03、hdp-04【即Worker机器】中都多了CoarseGrainedExecutorBackend进程,但无SparkSubmit进程

(3)在任务执行完成后再jps,发现SparkSubmit和CoarseGrainedExecutorBackend都消失了,原因是被释放了,节约资源

总结:CoarseGrainedExecutorBackend(简称Executor)在Worker执行任务时候启动进程,SparkSubmit在提交任务的机器执行进程,在任务执行完毕后,Executor和SparkSubmit都被释放。

3.2    Spark shell

spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。

上面的方式没有指定master的地址,即用的是spark的local模式运行【模拟spark集群运行的过程】

代码语言:javascript复制
/root/apps/spark-2.3.3-bin-hadoop2.7/bin/spark-shell

只有书写master地址,才能与master建立连接,才能向master申请资源,才能将任务提交到集群

代码语言:javascript复制
/root/apps/spark-2.3.3-bin-hadoop2.7/bin/spark-shell  
--master spark://hdp-01:7077 
--executor-memory 2g 
--total-executor-cores 2

参数说明: --master spark://hdp-01:7077  指定Master的地址,如果需要指定多个Master地址,只需要使用逗号分割即可 --executor-memory 2g              指定每个worker可用内存为2G,如果不指定内存,默认运行内存1024mb --total-executor-cores 2            指定整个集群使用的cup核数为2个

在spark-shell运行后,执行jps命令,发现提交任务的机器存在CoarseGrainedExecutorBackend和SparkSubmit,而其他worker寄去存在CoarseGrainedExecutorBackend,Master机器的进程和执行spark-shell之前没有明显变化。说明spark-shell在执行后,即使任务未提交到spark集群中,进程也依旧在后台保持执行。【实际上就是创建SparkContext】

指定了Master地址,那么就会将任务提交到集群中,开始时sparksubmit(客户端)要连接Master,并向Master申请计算资源(内存和核数等),Master进行资源调度(就是让哪些Worker启动Executor)。在准备工作时,这些进程都准备好了【实际上该过程底层就是创建SparkContext的过程】

注意: 如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。

Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可

WordCount代码:【本地文件系统】

代码语言:javascript复制
scala> sc.textFile("/root/w.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ _).collect

注意:需要具备的条件是:所有Worker机器上都需要有/root/w.txt,否则将会报错。真正执行计算的不是Master,也不是Worker,而是进程CoarseGrainedExecutorBackend。上述的方式是从本地文件系统读取数据的WordCount计算,真实环境应该是基于HDFS分布式文件系统读取文件。Spark先与namenode通信,找到数据存在哪些datanode中,最后从具体的datanode中读取数据。如果当前的机器或者集群的其他机器,其本地文件系统没有数据文件也没关系,基于HDFS分布式文件系统,集群上的每个节点都可以通过网络从HDFS中读取数据进行计算。

WordCount代码:【HDFS分布式文件系统】

代码语言:javascript复制
scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ _).collect
res1: Array[(String, Int)] = Array((scala,1), (hello,3), (java,1), (spark,2), (hi,2), (dianxin,2))

排序:

代码语言:javascript复制
scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ _).sortBy(_._2).collect
res2: Array[(String, Int)] = Array((scala,1), (java,1), (spark,2), (hi,2), (dianxin,2), (hello,3))

scala> sc.textFile("hdfs://hdp-01:9000/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ _).sortBy(_._2,false).collect 
res3: Array[(String, Int)] = Array((hello,3), (spark,2), (hi,2), (dianxin,2), (scala,1), (java,1))

四、Scala和Java执行WordCount对比

4.1    Scala执行WordCount

1、导入pom.xml

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcats</groupId>
    <artifactId>spark-wordcount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
        <hadoop.version>2.6.5</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>
        <!-- 导入scala的依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- 导入spark的依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- 指定hadoop-client API的版本 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>


            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2、WordCount Scala代码

代码语言:javascript复制
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkConfig配置对象,配置Spark应用程序的名字
    val conf: SparkConf = new SparkConf()
    conf.setAppName("scalaWordCount")
    //2.创建Spark执行入口————SparkContext Driver和Master通信就是通过SparkContext进行通信
    val sc = new SparkContext(conf)
    //3.指定以后从哪读取数据创建RDD(弹性分布式数据集)
    val lines: RDD[String] = sc.textFile(args(0)) //返回的结果是读取的一行行文件数据集
    //4.切分压平
    val words: RDD[String] = lines.flatMap(_.split(" "))
    //5.将单词和1组合在一起变成元组
    val wordWithOne: RDD[(String, Int)] = words.map((_,1))
    //6.按照key进行聚合
    val reduced: RDD[(String, Int)] = wordWithOne.reduceByKey(_   _)
    //7.排序
    val sortReduced = reduced.sortBy(_._2, false)   //_为元组(key,出现次数),按出现次数降序排列
    //8.将结果存入HDFS中
    sortReduced.saveAsTextFile(args(1))
    //9.释放资源sc
    sc.stop()
  }
}

3、使用Maven命令打包

4、上传至服务器且确保HDFS处于运行状态,执行命令

代码语言:javascript复制
[root@hdp-01 bin]# ./spark-submit --master spark://hdp-01:7077 --class cn.itcats.spark.ScalaWordCount /root/spark-wordcount-1.0-SNAPSHOT.jar hdfs://hdp-01:9000/wordcount hdfs://hdp-01:9000/wordcount_res

需要注意的是:args(1),即结果存入HDFS中的文件路径不应该为HDFS中已存在的路径,否则将会抛出异常

代码语言:javascript复制
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hdp-01:9000/wordcount already exists

5、查看执行结果

代码语言:javascript复制
[root@hdp-01 bin]# hadoop fs -ls /wordcount_res
Found 3 items
-rw-r--r--   3 root supergroup          0 2019-04-28 21:42 /wordcount_res/_SUCCESS
-rw-r--r--   3 root supergroup         10 2019-04-28 21:42 /wordcount_res/part-00000
-rw-r--r--   3 root supergroup         48 2019-04-28 21:42 /wordcount_res/part-00001

实际上Spark读写HDFS中的数据是基于Hadoop中的HDFSClient,即基于HDFS的API读取数据。

6、数据结果实际上被写入多个文件中,全局有序

代码语言:javascript复制
[root@hdp-01 bin]# hadoop fs -cat /wordcount_res/part-00000
(hello,3)
[root@hdp-01 bin]# hadoop fs -cat /wordcount_res/part-00001
(spark,2)
(hi,2)
(dianxin,2)
(scala,1)
(java,1)

在MapRecue中,有多少个ReduceTask决定了有多少个结果文件,可以通过指定ReduceTask数量来决定最后结果文件的数量。在我们上文在写Spark程序的时候我并没有指定以后生成多少个结果文件?那么为什么最终是三个结果文件呢?

4.2    Java执行WordCount

1、导入pom.xml依赖,可以直接使用4.1中的pom依赖文件

2、WordCount Java代码

代码语言:javascript复制
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class JavaWordCount {
    public static void main(String[] args) {
        //1.创建SparkContext对象
        SparkConf sparkConf = new SparkConf().setAppName("javaWordCount");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        //2.指定以后从哪读数据
        JavaRDD<String> lines = sc.textFile(args[0]);
        //3.读取的数据为一行行的RDD数据集  切分压平   输入为String类型  输出也为String类型
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String lines) throws Exception {
                //lines.split(" ")返回类型为数组类型,需要返回Iterator类型
                return Arrays.asList(lines.split(" ")).iterator();
            }
        });
        //4.将words组装为元组类型  传入String的words 返回元组 需要调用mapToPair
        JavaPairRDD<String, Integer> wordWithOne = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });
        //5.将key相同的元素聚合在一起
        JavaPairRDD<String, Integer> reduced = wordWithOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1   v2;
            }
        });
        //6.对结果进行排序  发现只有sortByKey  所以应该将Tuple中的键值对换位置,调用mapToPair方法
        JavaPairRDD<Integer, String> swaped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
                //return new Tuple2<Integer, String>(tuple._2, tuple._1);
                return tuple.swap();
            }
        });
        //排序后的结果  (次数, Key)
        JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
        //再换回 (Key, 次数的顺序)
        JavaPairRDD<String, Integer> res = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception {
                return tuple2.swap();
            }
        });
        //7.将结果保存到HDFS中
        res.saveAsTextFile(args[1]);
        //8.关闭sparkContext资源
        sc.stop();
    }
}

3、使用Maven命令打包

4、上传至服务器且确保HDFS处于运行状态,执行命令,同4.1中的操作。需要注意的是修改主函数全包名引用

代码语言:javascript复制
[root@hdp-01 bin]# ./spark-submit --master spark://hdp-01:7077 --class cn.itcats.spark.JavaWordCount /root/spark-wordcount-1.0-SNAPSHOT.jar hdfs://hdp-01:9000/wordcount hdfs://hdp-01:9000/wordcount_res

5、查看执行结果

代码语言:javascript复制
[root@hdp-01 bin]# hadoop fs -ls /wordcount_res
Found 3 items
-rw-r--r--   3 root supergroup          0 2019-04-28 21:42 /wordcount_res/_SUCCESS
-rw-r--r--   3 root supergroup         10 2019-04-28 21:42 /wordcount_res/part-00000
-rw-r--r--   3 root supergroup         48 2019-04-28 21:42 /wordcount_res/part-00001

实际上Spark读写HDFS中的数据是基于Hadoop中的HDFSClient,即基于HDFS的API读取数据。

6、数据结果实际上被写入多个文件中,全局有序

代码语言:javascript复制
[root@hdp-01 bin]# hadoop fs -cat /wordcount_res/part-00000
(hello,3)
[root@hdp-01 bin]# hadoop fs -cat /wordcount_res/part-00001
(spark,2)
(hi,2)
(dianxin,2)
(scala,1)
(java,1)

4.2    Lambda表达式执行WordCount

编写Lambda表达式代码

代码语言:javascript复制
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class LambdaWordCount {
    public static void main(String[] args) {
        //1.创建SparkContext对象
        SparkConf sparkConf = new SparkConf().setAppName("lambdaWordCount");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        //2.指定以后从哪读数据
        JavaRDD<String> lines = sc.textFile(args[0]);
        //3.读取的数据为一行行的RDD数据集  切分压平
        JavaRDD<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
        //4.将words组装为元组类型 w为words中的每个单词
        JavaPairRDD<String, Integer> wordWithOne = words.mapToPair(x -> new Tuple2<String, Integer>(x, 1));
        //5.根据Key进行聚合
        JavaPairRDD<String, Integer> reduced = wordWithOne.reduceByKey((m, n) -> m   n);
        //6.调整顺序  (次数,Key)
        JavaPairRDD<Integer, String> swaped1 = reduced.mapToPair(x -> x.swap());
        //7.排序
        JavaPairRDD<Integer, String> sorted = swaped1.sortByKey(false);
        //7.调整顺序 (Key,次数)
        JavaPairRDD<Integer, String> swaped2 = reduced.mapToPair(x -> x.swap());
        //8.将结果保存到HDFS中
        swaped2.saveAsTextFile(args[1]);
        //9.关闭资源
        sc.stop();
    }
}

4.3    本地调试代码

代码上只有一行改动:

代码语言:javascript复制
//1.创建SparkConfig配置对象,配置Spark应用程序的名字
    //2.local为本地单线程执行  local[4]为本地4线程执行   local[*]本地多少线程就多少线程执行
    val conf: SparkConf = new SparkConf().setAppName("sparkWordCount").setMaster("local[4]")

0 人点赞