Spark应用开发-基于IDEA
实际开发Spark 应用程序使用IDEA集成开发环境,Spark课程所有代码均使用Scala语言开发,利用函数式编程分析处理数据,更加清晰简洁。
企业中也使用Java语言开发Spark程序,但较少,后续也可以给大家演示
创建工程
创建Maven Project工程
添加依赖至POM文件中,内容如下:
代码语言:javascript复制<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast</groupId>
<artifactId>spark_v8_bak</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>apache</id>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<properties>
<encoding>UTF-8</encoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.5</hadoop.version>
<spark.version>2.4.5</spark.version>
</properties>
<dependencies>
<!--依赖Scala语言-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--SparkCore依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--SparkSQL依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--SparkSQL Hive依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark-streaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--spark-streaming Kafka依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--StructuredStreaming Kafka依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<!-- 指定编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
WordCount本地运行
http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html
代码语言:javascript复制package cn.itcast.hello
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//1.创建SparkContext
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")//设置运行参数
val sc: SparkContext = new SparkContext(conf)//创建sc
sc.setLogLevel("WARN") //设置日志级别
//2.读取文本文件
//RDD:A Resilient Distributed Dataset (RDD)
//弹性分布式数据集,我们可以把它理解为一个分布式的集合
//Spark对于Scala集合的封装,使用起来更方便,就像操作起来就像本地集合一样简单,那这样程序员用起来就很happy
//RDD[每一行数据]
val fileRDD: RDD[String] = sc.textFile("data/input/words.txt")
//3.处理数据,每一行按" "切分,每个单词记为1,按照单词进行聚合
//3.1每一行按" "切分
//RDD[单词]
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//_表示每一行
//3.2每个单词记为1
//val unit: RDD[(String, Int)] = wordRDD.map(word=>(word,1))
//(hello,1),(hello,1),(hello,1),(hello,1)
val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//_表示每个单词
//3.3按照单词进行聚合
//reduceByKey是Spark提供的API,Scala没有,如果是Scala得先groupBy,再对Value进行操作
//reduceByKey即根据key进行reduce(聚合)
//_ _
//第1个_表示之前聚合的历史值
//第2个_表示当前这一次操作的值
//RDD[(hello,4)]....
val resultRDD: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_ _)
//4.将结果收集到本地,变为本地集合
val result: Array[(String, Int)] = resultRDD.collect()
//5.打印
//result.foreach(println)
println(result.toBuffer)//array转为buffer可以直接打印内容
//为了测试,线程休眠,查看WEB UI界面
Thread.sleep(1000 * 120)
//6.关闭
sc.stop()
}
}
WordCount集群运行
注意
写入HDFS如果存在权限问题:
进行如下设置:
hadoop fs -chmod -R 777 /
并在代码中添加:
System.setProperty("HADOOP_USER_NAME", "root")
修改代码如下
将开发测试完成的WordCount程序打成jar保存,使用【spark-submit】分别提交运行在本地模式LocalMode和集群模式Standalone集群。先修改代码,通过master设置运行模式及传递处理数据路径,代码如下:
代码语言:javascript复制package cn.itcast.hello
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//为了程序健壮性,判断是否传递参数
if(args.length != 2){
println("Usage: SparkSubmit <input> <output>............")
System.exit(1)//非0表示非正常退出程序
}
//1.创建SparkContext
val conf: SparkConf = new SparkConf().setAppName("wc")//.setMaster("local[*]")//设置运行参数
val sc: SparkContext = new SparkContext(conf)//创建sc
sc.setLogLevel("WARN") //设置日志级别
//2.读取文本文件
//RDD:A Resilient Distributed Dataset (RDD)
//弹性分布式数据集,我们可以把它理解为一个分布式的集合
//Spark对于Scala集合的封装,使用起来更方便,就像操作起来就像本地集合一样简单,那这样程序员用起来就很happy
//RDD[每一行数据]
val fileRDD: RDD[String] = sc.textFile(args(0))
//3.处理数据,每一行按" "切分,每个单词记为1,按照单词进行聚合
//3.1每一行按" "切分
//RDD[单词]
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))//_表示每一行
//3.2每个单词记为1
//val unit: RDD[(String, Int)] = wordRDD.map(word=>(word,1))
//(hello,1),(hello,1),(hello,1),(hello,1)
val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1))//_表示每个单词
//3.3按照单词进行聚合
//reduceByKey是Spark提供的API,Scala没有,如果是Scala得先groupBy,再对Value进行操作
//reduceByKey即根据key进行reduce(聚合)
//_ _
//第1个_表示之前聚合的历史值
//第2个_表示当前这一次操作的值
//RDD[(hello,4)]....
val resultRDD: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_ _)
//4.将结果收集到本地,变为本地集合
//val result: Array[(String, Int)] = resultRDD.collect()
//5.输出
//result.foreach(println)
//println(result.toBuffer)//array转为buffer可以直接打印内容
resultRDD.saveAsTextFile(s"${args(1)}-${System.currentTimeMillis()}")//文件输出路径
//为了测试,线程休眠,查看WEB UI界面
Thread.sleep(1000 * 120)
//6.关闭
sc.stop()
}
}
打成jar包
改名
上传jar包
上传至HDFS文件系统目录【/spark/apps/】下,方便在其他机器提交任务时也可以读取。
创建HDFS目录
hdfs dfs -mkdir -p /spark/apps/
上传jar包
hdfs dfs -put /root/wc.jar /spark/apps/
提交到Yarn
代码语言:javascript复制SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit
--master yarn
--deploy-mode cluster
--driver-memory 512m
--executor-memory 512m
--num-executors 1
--total-executor-cores 2
--class cn.itcast.hello.WordCount
hdfs://node1:8020/spark/apps/wc.jar
hdfs://node1:8020/wordcount/input/words.txt hdfs://node1:8020/wordcount/output
http://node1:8088/cluster
WordCount-Java8版[了解]
说明:
Scala中函数的本质是对象
Java8中函数的本质可以理解为匿名内部类对象,即Java8中的函数本质也是对象
Java8中的函数式编程的语法,lambda表达式
(参数)->{函数体}
书写原则:能省则省,不能省则加上
代码语言:javascript复制import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class WordCountJava8 {
public static void main(String[] args){
//1.创建sc
SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
jsc.setLogLevel("WARN");
//2.读取文件
JavaRDD<String> fileRDD = jsc.textFile("data/input/words.txt");
//3.处理数据
//3.1每一行按照" "切割
//java8中的函数格式: (参数列表)->{函数体;} 注意:原则也是能省则省
//public interface FlatMapFunction<T, R> extends Serializable {
// Iterator<R> call(T t) throws Exception;
//}
//通过查看源码,我们发现,flatMap中需要的函数的参数是T(就是String)
//返回值是Iterator
//所以我们在函数体里面要返回Iterator
JavaRDD<String> wordRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
//3.2每个单词记为1 (word,1)
//public interface PairFunction<T, K, V> extends Serializable {
// Tuple2<K, V> call(T t) throws Exception;
//}
JavaPairRDD<String, Integer> wordAndOneRDD = wordRDD.mapToPair(word -> new Tuple2<>(word, 1));
//3.3按照key进行聚合
//public interface Function2<T1, T2, R> extends Serializable {
// R call(T1 v1, T2 v2) throws Exception;
//}
JavaPairRDD<String, Integer> wrodAndCountRDD = wordAndOneRDD.reduceByKey((a, b) -> a b);
//4.收集结果并输出
List<Tuple2<String, Integer>> result = wrodAndCountRDD.collect();
//result.forEach(t->System.out.println(t));
result.forEach(System.out::println);
//函数式编程的思想:行为参数化,你要干嘛,把要做的事情当作参数进行传递就可以了
//5.关闭
jsc.stop();
}
}