Spark案例库
案例一:使用SparkRDD实现词频统计
pom.xml文件
代码语言:javascript复制<repositories>
<repository>
<id>aliyunid>
<url>http://maven.aliyun.com/nexus/content/groups/public/url>
repository>
<repository>
<id>clouderaid>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
repository>
<repository>
<id>jbossid>
<url>http://repository.jboss.com/nexus/content/groups/publicurl>
repository>
repositories>
<properties>
<scala.version>2.11.12scala.version>
<scala.binary.version>2.11scala.binary.version>
<spark.version>2.4.5spark.version>
<hadoop.version>2.6.0-cdh5.16.2hadoop.version>
properties>
<dependencies>
<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
<version>${scala.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>${hadoop.version}version>
dependency>
dependencies>
<build>
<outputDirectory>target/classesoutputDirectory>
<testOutputDirectory>target/test-classestestOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resourcesdirectory>
resource>
resources>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.0version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.2.0version>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>testCompilegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
实现代码
代码语言:javascript复制object SparkWordCount {
def main(args: Array[String]): Unit = {
// TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息
val sc: SparkContext = {
// 其一、构建SparkConf对象,设置应用名称和master
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkWordCount")
.setMaster("local[2]")
// 其二、创建SparkContext实例,传递sparkConf对象
new SparkContext(sparkConf)
}
// TODO: 第一步、读取文件数据,sc.textFile方法,将数据封装到RDD中
val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data")
// TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
val resultRDD: RDD[(String, Int)] = inputRDD
// 按照分隔符分割单词
.flatMap(line => line.split("\s "))
// 转换单词为二元组,表示每个单词出现一次
.map(word => word -> 1)
// 按照单词分组,对组内执进行聚合reduce操作,求和
.reduceByKey((tmp, item) => tmp item)
// TODO: 第三步、将最终处理结果打印控制台
resultRDD.foreach(tuple => println(tuple))
// 应用结束,关闭资源
sc.stop()
}
}
案例二:WordCount程序,按照词频降序排序取Top3
pom.xml
代码语言:javascript复制<repositories>
<repository>
<id>aliyunid>
<url>http://maven.aliyun.com/nexus/content/groups/public/url>
repository>
<repository>
<id>clouderaid>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
repository>
<repository>
<id>jbossid>
<url>http://repository.jboss.com/nexus/content/groups/publicurl>
repository>
repositories>
<properties>
<scala.version>2.11.12scala.version>
<scala.binary.version>2.11scala.binary.version>
<spark.version>2.4.5spark.version>
<hadoop.version>2.6.0-cdh5.16.2hadoop.version>
properties>
<dependencies>
<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
<version>${scala.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>${hadoop.version}version>
dependency>
dependencies>
<build>
<outputDirectory>target/classesoutputDirectory>
<testOutputDirectory>target/test-classestestOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resourcesdirectory>
resource>
resources>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.0version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.2.0version>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>testCompilegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
代码实现
代码语言:javascript复制object SparkTopKey {
def main(args: Array[String]): Unit = {
// TODO: 创建SparkContext实例对象,首先构建SparkConf实例,设置应用基本信息
val sc: SparkContext = {
// 其一、构建SparkConf对象,设置应用名称和master
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkWordCount")
.setMaster("local[2]")
// 其二、创建SparkContext实例,传递sparkConf对象
new SparkContext(sparkConf)
}
// TODO: 第一步、读取文件数据,sc.textFile方法,将数据封装到RDD中
val inputRDD: RDD[String] = sc.textFile("/datas/wordcount.data")
// TODO: 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
val resultRDD: RDD[(String, Int)] = inputRDD
// 按照分隔符分割单词
.flatMap(line => line.split("\s "))
// 转换单词为二元组,表示每个单词出现一次
.map(word => word -> 1)
// 按照单词分组,对组内执进行聚合reduce操作,求和
.reduceByKey((tmp, item) => tmp item)
resultRDD
.sortBy(tuple => tuple._2, ascending = false)
// 打印结果
.take(3)
.foreach(tuple => println(tuple))
// 应用结束,关闭资源
sc.stop()
}
}
案例三:采用并行化的方式构建集合Seq中的数据为RDD,进行词频统计
pom.xml
代码语言:javascript复制<repositories>
<repository>
<id>aliyunid>
<url>http://maven.aliyun.com/nexus/content/groups/public/url>
repository>
<repository>
<id>clouderaid>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
repository>
<repository>
<id>jbossid>
<url>http://repository.jboss.com/nexus/content/groups/publicurl>
repository>
repositories>
<properties>
<scala.version>2.11.12scala.version>
<scala.binary.version>2.11scala.binary.version>
<spark.version>2.4.5spark.version>
<hadoop.version>2.6.0-cdh5.16.2hadoop.version>
properties>
<dependencies>
<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
<version>${scala.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>${hadoop.version}version>
dependency>
dependencies>
<build>
<outputDirectory>target/classesoutputDirectory>
<testOutputDirectory>target/test-classestestOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resourcesdirectory>
resource>
resources>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.0version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.2.0version>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>testCompilegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
代码实现
代码语言:javascript复制object _01SparkParallelizeTest {
def main(args: Array[String]): Unit = {
val sc: SparkContext = {
// sparkConf对象
val sparkConf = new SparkConf()
// _01SparkParallelizeTest$ ->(.stripSuffix("$")) -> _01SparkParallelizeTest
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// sc 实例对象
SparkContext.getOrCreate(sparkConf)
}
// TODO: 1、Scala中集合Seq序列存储数据
val linesSeq: Seq[String] = Seq(
"hadoop scala hive spark scala sql sql",
"hadoop scala spark hdfs hive spark",
"spark hdfs spark hdfs scala hive spark"
)
// TODO: 2、并行化集合
val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)
// TODO: 3、词频统计
val resultRDD = inputRDD
.flatMap(line => line.split("\s "))
.map(word => (word, 1))
.reduceByKey((tmp, item) => tmp item)
// TODO: 4、输出结果
resultRDD.foreach(println)
// 应用结束,关闭资源
sc.stop()
}
}
案例四:采用wholeTextFiles()方法读取小文件
pom.xml
代码语言:javascript复制<repositories>
<repository>
<id>aliyunid>
<url>http://maven.aliyun.com/nexus/content/groups/public/url>
repository>
<repository>
<id>clouderaid>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
repository>
<repository>
<id>jbossid>
<url>http://repository.jboss.com/nexus/content/groups/publicurl>
repository>
repositories>
<properties>
<scala.version>2.11.12scala.version>
<scala.binary.version>2.11scala.binary.version>
<spark.version>2.4.5spark.version>
<hadoop.version>2.6.0-cdh5.16.2hadoop.version>
properties>
<dependencies>
<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
<version>${scala.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>${hadoop.version}version>
dependency>
dependencies>
<build>
<outputDirectory>target/classesoutputDirectory>
<testOutputDirectory>target/test-classestestOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resourcesdirectory>
resource>
resources>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.0version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.2.0version>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>testCompilegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
代码实现
代码语言:javascript复制object _02SparkWholeTextFileTest {
def main(args: Array[String]): Unit = {
val sc: SparkContext = {
// sparkConf对象
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// sc 实例对象
SparkContext.getOrCreate(sparkConf)
}
/*
def wholeTextFiles(
path: String,
minPartitions: Int = defaultMinPartitions
): RDD[(String, String)]
Key: 每个小文件名称路径
Value:每个小文件的内容
*/
val inputRDD: RDD[(String, String)] = sc.wholeTextFiles("datas/ratings100", minPartitions = 2)
println(s"RDD 分区数目 = ${inputRDD.getNumPartitions}")
inputRDD.take(2).foreach(tuple => println(tuple))
// 应用结束,关闭资源
sc.stop()
}
}
案例五:RDD中缓存函数,将数据缓存到内存或磁盘、释放缓存
pom.xml
代码语言:javascript复制<repositories>
<repository>
<id>aliyunid>
<url>http://maven.aliyun.com/nexus/content/groups/public/url>
repository>
<repository>
<id>clouderaid>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
repository>
<repository>
<id>jbossid>
<url>http://repository.jboss.com/nexus/content/groups/publicurl>
repository>
repositories>
<properties>
<scala.version>2.11.12scala.version>
<scala.binary.version>2.11scala.binary.version>
<spark.version>2.4.5spark.version>
<hadoop.version>2.6.0-cdh5.16.2hadoop.version>
properties>
<dependencies>
<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
<version>${scala.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>${hadoop.version}version>
dependency>
dependencies>
<build>
<outputDirectory>target/classesoutputDirectory>
<testOutputDirectory>target/test-classestestOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resourcesdirectory>
resource>
resources>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.0version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.2.0version>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>testCompilegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
代码实现
代码语言:javascript复制object _05SparkCacheTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
// 读取文本文件数据
val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data", minPartitions = 2)
// 缓存数据: 将数据缓存至内存
inputRDD.persist()
// 使用Action函数触发缓存
inputRDD.count()
// 释放缓存
inputRDD.unpersist()
//缓存数据:选择缓存级别
inputRDD.persist(StorageLevel.MEMORY_AND_DISK)
// 应用程序运行结束,关闭资源
sc.stop()
}
}
案例六:RDD数据Checkpoint设置案例
pom.xml
代码语言:javascript复制<repositories>
<repository>
<id>aliyunid>
<url>http://maven.aliyun.com/nexus/content/groups/public/url>
repository>
<repository>
<id>clouderaid>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
repository>
<repository>
<id>jbossid>
<url>http://repository.jboss.com/nexus/content/groups/publicurl>
repository>
repositories>
<properties>
<scala.version>2.11.12scala.version>
<scala.binary.version>2.11scala.binary.version>
<spark.version>2.4.5spark.version>
<hadoop.version>2.6.0-cdh5.16.2hadoop.version>
properties>
<dependencies>
<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
<version>${scala.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>${hadoop.version}version>
dependency>
dependencies>
<build>
<outputDirectory>target/classesoutputDirectory>
<testOutputDirectory>target/test-classestestOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resourcesdirectory>
resource>
resources>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.0version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.2.0version>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>testCompilegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
代码实现
代码语言:javascript复制object _06SparkCkptTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
// TODO: 设置检查点目录,将RDD数据保存到那个目录
sc.setCheckpointDir("datas/ckpt/")
// 读取文件数据
val datasRDD = sc.textFile("datas/wordcount.data")
// TODO: 调用checkpoint函数,将RDD进行备份,需要RDD中Action函数触发
datasRDD.checkpoint()
datasRDD.count()
// TODO: 再次执行count函数, 此时从checkpoint读取数据
println(datasRDD.count())
// 应用程序运行结束,关闭资源
sc.stop()
}
}
案例七:广播变量和累加器案例
基于Spark框架使用Scala语言编程实现词频统计WordCount程序,将符号数据过滤,并统计出现的次数
-a. 过滤标点符号数据 使用广播变量 -b. 统计出标点符号数据出现次数 使用累加器
代码实现
代码语言:javascript复制object _05SparkSharedVariableTest {
def main(args: Array[String]): Unit = {
// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
val sc: SparkContext = {
// 创建SparkConf对象,设置应用相关信息,比如名称和master
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 构建SparkContext实例对象,传递SparkConf
new SparkContext(sparkConf)
}
// 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
val inputRDD: RDD[String] = sc.textFile("datas/filter/datas.input", minPartitions = 2)
// TODO: 字典数据,只要有这些单词就过滤: 特殊字符存储列表List中
val list: List[String] = List(",", ".", "!", "#", "$", "%")
// TODO: 将字典数据进行广播变量
val broadcastList: Broadcast[List[String]] = sc.broadcast(list)
// TODO: 定义计数器
val accumulator: LongAccumulator = sc.longAccumulator("number_accu")
// 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
val resultRDD: RDD[(String, Int)] = inputRDD
// 过滤空行数据
.filter(line => null != line && line.trim.length > 0)
// 分割为单词
.flatMap(line => line.trim.split("\s "))
// TODO: 过滤非单词字符
.filter{word =>
// 获取广播变量的值
val wordsList: List[String] = broadcastList.value
// 判断每个单词是否时非单词字符
val flag: Boolean = wordsList.contains(word)
if(flag){
// 如果是非单词字符,累加器加1
accumulator.add(1L)
}
// 返回
! flag
}
// 按照单词分组,进行聚合操作
.map(word => (word, 1))
.reduceByKey(_ _)
// 4. 第三步、将最终处理结果RDD保存到HDFS或打印控制台
resultRDD.foreach(println)
// 可以累加器的值,必须使用RDD Action函数进行触发
println("Accumulator: " accumulator.value)
// 5. 当应用运行结束以后,关闭资源
sc.stop()
}
}
案例八:将RDD数据保存至MySQL表中一般模式
代码语言:javascript复制 a. 对结果数据降低分区数目
b. 针对每个分区数据进行操作
每个分区数据插入数据库时,创建一个连接Connection
pom.xml
代码语言:javascript复制<repositories>
<repository>
<id>aliyunid>
<url>http://maven.aliyun.com/nexus/content/groups/public/url>
repository>
<repository>
<id>clouderaid>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
repository>
<repository>
<id>jbossid>
<url>http://repository.jboss.com/nexus/content/groups/publicurl>
repository>
repositories>
<properties>
<scala.version>2.11.12scala.version>
<scala.binary.version>2.11scala.binary.version>
<spark.version>2.4.5spark.version>
<hadoop.version>2.6.0-cdh5.16.2hadoop.version>
<hbase.version>1.2.0-cdh5.16.2hbase.version>
<mysql.version>8.0.19mysql.version>
properties>
<dependencies>
<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
<version>${scala.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>${hadoop.version}version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-serverartifactId>
<version>${hbase.version}version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-hadoop2-compatartifactId>
<version>${hbase.version}version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-clientartifactId>
<version>${hbase.version}version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>${mysql.version}version>
dependency>
<dependency>
<groupId>com.hankcsgroupId>
<artifactId>hanlpartifactId>
<version>portable-1.7.7version>
dependency>
dependencies>
<build>
<outputDirectory>target/classesoutputDirectory>
<testOutputDirectory>target/test-classestestOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resourcesdirectory>
resource>
resources>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.0version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.2.0version>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>testCompilegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
代码实现:
代码语言:javascript复制object _04SparkWriteMySQL {
def main(args: Array[String]): Unit = {
// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
val sc: SparkContext = {
// 创建SparkConf对象,设置应用相关信息,比如名称和master
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 构建SparkContext实例对象,传递SparkConf
new SparkContext(sparkConf)
}
// 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data")
// 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
val resultRDD: RDD[(String, Int)] = inputRDD
// TODO: 过滤
.filter(line => null != line && line.trim.length > 0 )
// a. 对每行数据按照分割符分割
.flatMap(line => line.trim.split("\s "))
// b. 将每个单词转换为二元组,表示出现一次
.map(word => (word ,1))
.reduceByKey((temp, item) => temp item)
// TODO: 将结果数据resultRDD保存至MySQL表中
resultRDD
// 降低RDD分区数目
.coalesce(1)
.foreachPartition{iter =>
// val xx: Iterator[(String, Int)] = iter
// 直接调用保存分区数据到MySQL表的方法
saveToMySQL(iter)
}
// 5. 当应用运行结束以后,关闭资源
sc.stop()
}
/**
* 定义一个方法,将RDD中分区数据保存至MySQL表
*/
def saveToMySQL(iter: Iterator[(String, Int)]): Unit = {
// step1. 加载驱动类
Class.forName("com.mysql.cj.jdbc.Driver")
// 声明变量
var conn: Connection = null
var pstmt: PreparedStatement = null
try{
// step2. 创建连接
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
"root",
"123456"
)
pstmt = conn.prepareStatement("INSERT INTO db_test.tb_wordcount (word, count) VALUES(?, ?)")
// step3. 插入数据
iter.foreach{case (word, count) =>
pstmt.setString(1, word)
pstmt.setInt(2, count)
pstmt.execute()
}
}catch {
case e: Exception => e.printStackTrace()
}finally {
// step4. 关闭连接
if(null != pstmt) pstmt.close()
if(null != conn) conn.close()
}
}
}
案例九:将RDD数据保存至MySQL表中高级模式
要求:a. 对结果数据降低分区数目 b. 针对每个分区数据进行操作 每个分区数据插入数据库时,创建一个连接Connection c. 批次插入每个分区数据 addBatch executeBatch d. 事务性 手动提交事务,并且还原原来事务 e. 考虑主键存在时,如何保存数据数据 存在,更新数据;不存在,插入数据
pom.xml
代码语言:javascript复制<repositories>
<repository>
<id>aliyunid>
<url>http://maven.aliyun.com/nexus/content/groups/public/url>
repository>
<repository>
<id>clouderaid>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
repository>
<repository>
<id>jbossid>
<url>http://repository.jboss.com/nexus/content/groups/publicurl>
repository>
repositories>
<properties>
<scala.version>2.11.12scala.version>
<scala.binary.version>2.11scala.binary.version>
<spark.version>2.4.5spark.version>
<hadoop.version>2.6.0-cdh5.16.2hadoop.version>
<hbase.version>1.2.0-cdh5.16.2hbase.version>
<mysql.version>8.0.19mysql.version>
properties>
<dependencies>
<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
<version>${scala.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>${hadoop.version}version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-serverartifactId>
<version>${hbase.version}version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-hadoop2-compatartifactId>
<version>${hbase.version}version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-clientartifactId>
<version>${hbase.version}version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>${mysql.version}version>
dependency>
<dependency>
<groupId>com.hankcsgroupId>
<artifactId>hanlpartifactId>
<version>portable-1.7.7version>
dependency>
dependencies>
<build>
<outputDirectory>target/classesoutputDirectory>
<testOutputDirectory>target/test-classestestOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resourcesdirectory>
resource>
resources>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.0version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.2.0version>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>testCompilegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
代码实现:
代码语言:javascript复制object _04SparkWriteMySQLV3 {
def main(args: Array[String]): Unit = {
// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
val sc: SparkContext = {
// 创建SparkConf对象,设置应用相关信息,比如名称和master
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 构建SparkContext实例对象,传递SparkConf
new SparkContext(sparkConf)
}
// 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data")
// 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
val resultRDD: RDD[(String, Int)] = inputRDD
// TODO: 过滤
.filter(line => null != line && line.trim.length > 0 )
// a. 对每行数据按照分割符分割
.flatMap(line => line.trim.split("\s "))
// b. 将每个单词转换为二元组,表示出现一次
.map(word => (word ,1))
.reduceByKey((temp, item) => temp item)
// TODO: 将结果数据resultRDD保存至MySQL表中
resultRDD.coalesce(1).foreachPartition(saveToMySQL)
// 4. 当应用运行结束以后,关闭资源
sc.stop()
}
/**
* 定义一个方法,将RDD中分区数据保存至MySQL表
*/
def saveToMySQL(iter: Iterator[(String, Int)]): Unit = {
// step1. 加载驱动类
Class.forName("com.mysql.cj.jdbc.Driver")
// 声明变量
var conn: Connection = null
var pstmt: PreparedStatement = null
try{
// step2. 创建连接
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
"root",
"123456"
)
pstmt = conn.prepareStatement("replace INTO db_test.tb_wordcount (word, count) VALUES(?, ?)")
// TODO: 考虑事务性,一个分区数据要全部保存,要不都不保存
val autoCommit: Boolean = conn.getAutoCommit // 获取数据库默认事务提交方式
conn.setAutoCommit(false)
// step3. 插入数据
iter.foreach{case (word, count) =>
pstmt.setString(1, word)
pstmt.setInt(2, count)
// TODO: 加入一个批次中
pstmt.addBatch()
}
// TODO:批量执行批次
pstmt.executeBatch()
conn.commit() // 手动提交事务,进行批量插入
// 还原数据库原来事务
conn.setAutoCommit(autoCommit)
}catch {
case e: Exception => e.printStackTrace()
}finally {
// step4. 关闭连接
if(null != pstmt) pstmt.close()
if(null != conn) conn.close()
}
}
}
案例十:从HBase 表中读取数据,封装到RDD数据集
pom.xml
代码语言:javascript复制<repositories>
<repository>
<id>aliyunid>
<url>http://maven.aliyun.com/nexus/content/groups/public/url>
repository>
<repository>
<id>clouderaid>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
repository>
<repository>
<id>jbossid>
<url>http://repository.jboss.com/nexus/content/groups/publicurl>
repository>
repositories>
<properties>
<scala.version>2.11.12scala.version>
<scala.binary.version>2.11scala.binary.version>
<spark.version>2.4.5spark.version>
<hadoop.version>2.6.0-cdh5.16.2hadoop.version>
<hbase.version>1.2.0-cdh5.16.2hbase.version>
<mysql.version>8.0.19mysql.version>
properties>
<dependencies>
<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
<version>${scala.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>${hadoop.version}version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-serverartifactId>
<version>${hbase.version}version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-hadoop2-compatartifactId>
<version>${hbase.version}version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-clientartifactId>
<version>${hbase.version}version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>${mysql.version}version>
dependency>
<dependency>
<groupId>com.hankcsgroupId>
<artifactId>hanlpartifactId>
<version>portable-1.7.7version>
dependency>
dependencies>
<build>
<outputDirectory>target/classesoutputDirectory>
<testOutputDirectory>target/test-classestestOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resourcesdirectory>
resource>
resources>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.0version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.2.0version>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>testCompilegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
代码实现:
代码语言:javascript复制object _03SparkReadHBase {
def main(args: Array[String]): Unit = {
// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
val sc: SparkContext = {
// 创建SparkConf对象,设置应用相关信息,比如名称和master
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// TODO: 设置使用Kryo 序列化方式
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// TODO: 注册序列化的数据类型
.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[Result]))
// 构建SparkContext实例对象,传递SparkConf
new SparkContext(sparkConf)
}
// TODO: 从HBase表读取数据,调用RDD方法:newAPIHadoopRDD
val conf: Configuration = HBaseConfiguration.create()
// 设置连接Zookeeper属性
conf.set("hbase.zookeeper.quorum", "node1")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent", "/hbase")
// 设置将数据保存的HBase表的名称
conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")
val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
// 打印HBase表样本数据
hbaseRDD
.take(6)
.foreach{case (rowKey, result) =>
result.rawCells().foreach{cell =>
println(s"RowKey = ${Bytes.toString(result.getRow)}")
println(s"t${Bytes.toString(CellUtil.cloneFamily(cell))}:"
s"${Bytes.toString(CellUtil.cloneQualifier(cell))} = "
s"${Bytes.toString(CellUtil.cloneValue(cell))}")
}
}
// 5. 当应用运行结束以后,关闭资源
sc.stop()
}
}
案例十一:将RDD数据保存至HBase表中
pom.xml
代码语言:javascript复制<repositories>
<repository>
<id>aliyunid>
<url>http://maven.aliyun.com/nexus/content/groups/public/url>
repository>
<repository>
<id>clouderaid>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/url>
repository>
<repository>
<id>jbossid>
<url>http://repository.jboss.com/nexus/content/groups/publicurl>
repository>
repositories>
<properties>
<scala.version>2.11.12scala.version>
<scala.binary.version>2.11scala.binary.version>
<spark.version>2.4.5spark.version>
<hadoop.version>2.6.0-cdh5.16.2hadoop.version>
<hbase.version>1.2.0-cdh5.16.2hbase.version>
<mysql.version>8.0.19mysql.version>
properties>
<dependencies>
<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
<version>${scala.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>${hadoop.version}version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-serverartifactId>
<version>${hbase.version}version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-hadoop2-compatartifactId>
<version>${hbase.version}version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-clientartifactId>
<version>${hbase.version}version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>${mysql.version}version>
dependency>
<dependency>
<groupId>com.hankcsgroupId>
<artifactId>hanlpartifactId>
<version>portable-1.7.7version>
dependency>
dependencies>
<build>
<outputDirectory>target/classesoutputDirectory>
<testOutputDirectory>target/test-classestestOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resourcesdirectory>
resource>
resources>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.0version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.2.0version>
<executions>
<execution>
<goals>
<goal>compilegoal>
<goal>testCompilegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
代码实现:
代码语言:javascript复制object _02SparkWriteHBase {
def main(args: Array[String]): Unit = {
// 1. 在Spark 应用程序中,入口为:SparkContext,必须创建实例对象,加载数据和调度程序执行
val sc: SparkContext = {
// 创建SparkConf对象,设置应用相关信息,比如名称和master
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 构建SparkContext实例对象,传递SparkConf
new SparkContext(sparkConf)
}
// 2. 第一步、从LocalFS读取文件数据,sc.textFile方法,将数据封装到RDD中
val inputRDD: RDD[String] = sc.textFile("datas/wordcount.data")
// 3. 第二步、调用RDD中高阶函数,进行处理转换处理,函数:flapMap、map和reduceByKey
val resultRDD: RDD[(String, Int)] = inputRDD
// 过滤
.filter(line => null != line && line.trim.length > 0 )
// a. 对每行数据按照分割符分割
.flatMap(line => line.trim.split("\s "))
// b. 将每个单词转换为二元组,表示出现一次
.map(word => (word ,1))
.reduceByKey((temp, item) => temp item)
// TODO: step 1. 转换RDD为RDD[(RowKey, Put)]
/*
* HBase表的设计:
* 表的名称:htb_wordcount
* Rowkey: word
* 列簇: info
* 字段名称: count
create 'htb_wordcount', 'info'
*/
val putsRDD: RDD[(ImmutableBytesWritable, Put)] = resultRDD.map{case (word, count) =>
// 其一、构建RowKey对象
val rowKey: ImmutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(word))
// 其二、构建Put对象
val put: Put = new Put(rowKey.get())
// 设置字段的值
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(count ""))
// 其三、返回二元组(RowKey, Put)
rowKey -> put
}
// TODO: step2. 调用RDD中saveAsNewAPIHadoopFile保存数据
val conf: Configuration = HBaseConfiguration.create()
// 设置连接Zookeeper属性
conf.set("hbase.zookeeper.quorum", "node1")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent", "/hbase")
// 设置将数据保存的HBase表的名称
conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")
putsRDD.saveAsNewAPIHadoopFile(
"datas/hbase/htb_wordcount/",
classOf[ImmutableBytesWritable],
classOf[Put],
classOf[TableOutputFormat[ImmutableBytesWritable]],
conf
)
// 5. 当应用运行结束以后,关闭资源
sc.stop()
}
}