2021年大数据Spark(十一):应用开发基于IDEA集成环境

2021-10-09 17:19:55 浏览数 (1)


Spark应用开发-基于IDEA

实际开发Spark 应用程序使用IDEA集成开发环境,Spark课程所有代码均使用Scala语言开发,利用函数式编程分析处理数据,更加清晰简洁。

企业中也使用Java语言开发Spark程序,但较少,后续也可以给大家演示

创建工程

创建Maven Project工程

添加依赖至POM文件中,内容如下:

代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast</groupId>
    <artifactId>spark_v8_bak</artifactId>
    <version>1.0-SNAPSHOT</version>
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <properties>
        <encoding>UTF-8</encoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.7.5</hadoop.version>
        <spark.version>2.4.5</spark.version>
    </properties>
    <dependencies>
        <!--依赖Scala语言-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!--SparkCore依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--SparkSQL依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--SparkSQL  Hive依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive-thriftserver_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- spark-streaming-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--spark-streaming Kafka依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--StructuredStreaming Kafka依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.5</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
            <!-- 指定编译java的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
            </plugin>
            <!-- 指定编译scala的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

WordCount本地运行

http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html

代码语言:javascript复制
package cn.itcast.hello

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
    def main(args: Array[String]): Unit = {
        //1.创建SparkContext
        val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")//设置运行参数
        val sc: SparkContext = new SparkContext(conf)//创建sc
        sc.setLogLevel("WARN") //设置日志级别

        //2.读取文本文件
        //RDD:A Resilient Distributed Dataset (RDD)
        //弹性分布式数据集,我们可以把它理解为一个分布式的集合
        //Spark对于Scala集合的封装,使用起来更方便,就像操作起来就像本地集合一样简单,那这样程序员用起来就很happy
        //RDD[每一行数据]
        val fileRDD: RDD[String] = sc.textFile("data/input/words.txt")

        //3.处理数据,每一行按" "切分,每个单词记为1,按照单词进行聚合
        //3.1每一行按" "切分
        //RDD[单词]
        val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//_表示每一行
        //3.2每个单词记为1
        //val unit: RDD[(String, Int)] = wordRDD.map(word=>(word,1))
        //(hello,1),(hello,1),(hello,1),(hello,1)
        val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//_表示每个单词
        //3.3按照单词进行聚合
        //reduceByKey是Spark提供的API,Scala没有,如果是Scala得先groupBy,再对Value进行操作
        //reduceByKey即根据key进行reduce(聚合)
        //_ _
        //第1个_表示之前聚合的历史值
        //第2个_表示当前这一次操作的值
        //RDD[(hello,4)]....
        val resultRDD: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_ _)

        //4.将结果收集到本地,变为本地集合
        val result: Array[(String, Int)] = resultRDD.collect()

        //5.打印
        //result.foreach(println)
        println(result.toBuffer)//array转为buffer可以直接打印内容

        //为了测试,线程休眠,查看WEB UI界面
        Thread.sleep(1000 * 120)

        //6.关闭
        sc.stop()
    }
}

WordCount集群运行

注意

写入HDFS如果存在权限问题:

进行如下设置:

hadoop fs -chmod -R 777  /

并在代码中添加:

System.setProperty("HADOOP_USER_NAME", "root")

修改代码如下

将开发测试完成的WordCount程序打成jar保存,使用【spark-submit】分别提交运行在本地模式LocalMode和集群模式Standalone集群。先修改代码,通过master设置运行模式及传递处理数据路径,代码如下:

代码语言:javascript复制
package cn.itcast.hello

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
    def main(args: Array[String]): Unit = {
        //为了程序健壮性,判断是否传递参数
        if(args.length != 2){
            println("Usage: SparkSubmit <input> <output>............")
            System.exit(1)//非0表示非正常退出程序
        }

        //1.创建SparkContext
        val conf: SparkConf = new SparkConf().setAppName("wc")//.setMaster("local[*]")//设置运行参数
        val sc: SparkContext = new SparkContext(conf)//创建sc
        sc.setLogLevel("WARN") //设置日志级别

        //2.读取文本文件
        //RDD:A Resilient Distributed Dataset (RDD)
        //弹性分布式数据集,我们可以把它理解为一个分布式的集合
        //Spark对于Scala集合的封装,使用起来更方便,就像操作起来就像本地集合一样简单,那这样程序员用起来就很happy
        //RDD[每一行数据]
        val fileRDD: RDD[String] = sc.textFile(args(0))

        //3.处理数据,每一行按" "切分,每个单词记为1,按照单词进行聚合
        //3.1每一行按" "切分
        //RDD[单词]
        val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//_表示每一行
        //3.2每个单词记为1
        //val unit: RDD[(String, Int)] = wordRDD.map(word=>(word,1))
        //(hello,1),(hello,1),(hello,1),(hello,1)
        val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//_表示每个单词
        //3.3按照单词进行聚合
        //reduceByKey是Spark提供的API,Scala没有,如果是Scala得先groupBy,再对Value进行操作
        //reduceByKey即根据key进行reduce(聚合)
        //_ _
        //第1个_表示之前聚合的历史值
        //第2个_表示当前这一次操作的值
        //RDD[(hello,4)]....
        val resultRDD: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_ _)

        //4.将结果收集到本地,变为本地集合
        //val result: Array[(String, Int)] = resultRDD.collect()

        //5.输出
        //result.foreach(println)
        //println(result.toBuffer)//array转为buffer可以直接打印内容
        
        resultRDD.saveAsTextFile(s"${args(1)}-${System.currentTimeMillis()}")//文件输出路径
        
        //为了测试,线程休眠,查看WEB UI界面
        Thread.sleep(1000 * 120)

        //6.关闭
        sc.stop()
    }
}

打成jar包

改名

上传jar包

上传至HDFS文件系统目录【/spark/apps/】下,方便在其他机器提交任务时也可以读取。

创建HDFS目录

hdfs dfs -mkdir -p /spark/apps/

上传jar包

hdfs dfs -put /root/wc.jar /spark/apps/

提交到Yarn

代码语言:javascript复制
SPARK_HOME=/export/server/spark

${SPARK_HOME}/bin/spark-submit 

--master yarn 

--deploy-mode cluster 

--driver-memory 512m 

--executor-memory 512m 

--num-executors 1 

--total-executor-cores 2 

--class cn.itcast.hello.WordCount 

hdfs://node1:8020/spark/apps/wc.jar 

hdfs://node1:8020/wordcount/input/words.txt hdfs://node1:8020/wordcount/output

http://node1:8088/cluster

​​​​​​​WordCount-Java8版[了解]

说明:

Scala中函数的本质是对象

Java8中函数的本质可以理解为匿名内部类对象,即Java8中的函数本质也是对象

Java8中的函数式编程的语法,lambda表达式

(参数)->{函数体}

书写原则:能省则省,不能省则加上

代码语言:javascript复制
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;


public class WordCountJava8 {
    public static void main(String[] args){
        //1.创建sc
        SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        jsc.setLogLevel("WARN");
        //2.读取文件
        JavaRDD<String> fileRDD = jsc.textFile("data/input/words.txt");
        //3.处理数据
        //3.1每一行按照" "切割
        //java8中的函数格式: (参数列表)->{函数体;}  注意:原则也是能省则省
        //public interface FlatMapFunction<T, R> extends Serializable {
        //  Iterator<R> call(T t) throws Exception;
        //}
        //通过查看源码,我们发现,flatMap中需要的函数的参数是T(就是String)
        //返回值是Iterator
        //所以我们在函数体里面要返回Iterator
        JavaRDD<String> wordRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        //3.2每个单词记为1 (word,1)
        //public interface PairFunction<T, K, V> extends Serializable {
        //  Tuple2<K, V> call(T t) throws Exception;
        //}
        JavaPairRDD<String, Integer> wordAndOneRDD = wordRDD.mapToPair(word -> new Tuple2<>(word, 1));
        //3.3按照key进行聚合
        //public interface Function2<T1, T2, R> extends Serializable {
        //  R call(T1 v1, T2 v2) throws Exception;
        //}
        JavaPairRDD<String, Integer> wrodAndCountRDD = wordAndOneRDD.reduceByKey((a, b) -> a   b);

        //4.收集结果并输出
        List<Tuple2<String, Integer>> result = wrodAndCountRDD.collect();
        //result.forEach(t->System.out.println(t));
        result.forEach(System.out::println);
        //函数式编程的思想:行为参数化,你要干嘛,把要做的事情当作参数进行传递就可以了

        //5.关闭
        jsc.stop();
    }
}

​​​​​​​WordCount流程图解

WordCount,主要流程如下图所示:

0 人点赞