我们进行 Spark 进行编程的时候, 初始化工作是在 driver
端完成的, 而实际的运行程序是在executor
端进行的. 所以就涉及到了进程间的通讯, 数据是需要序列化的.
RDD 中函数的传递
1. 传递函数
- 1. 创建传递函数
package day03
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
**
@author 不温卜火
**
* @create 2020-07-24 19:31
**
* MyCSDN :https://buwenbuhuo.blog.csdn.net/
*/
// 创建的主程序
object Demo01 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Demo01").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.parallelize(Array("hello world", "hello buwenbuhuo", "xia0li", "hahah"), 2)
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}
}
//需求: 在 RDD 中查找出来包含 query 子字符串的元素
// 创建的类
// query 为需要查找的子字符串
class Searcher(val query: String){
// 判断 s 中是否包括子字符串 query
def isMatch(s : String) ={
s.contains(query)
}
// 过滤出包含 query字符串的字符串组成的新的 RDD
def getMatchedRDD1(rdd: RDD[String]) ={
rdd.filter(isMatch) //
}
// 过滤出包含 query字符串的字符串组成的新的 RDD
def getMatchedRDD2(rdd: RDD[String]) ={
rdd.filter(_.contains(query))
}
}
- 2. 运行查看结果(会报错)
注意
:
直接运行程序会发现报错: 没有初始化. 因为rdd.filter(isMatch)
用到了对象this
的方法isMatch
, 所以对象this
需要序列化,才能把对象从driver
发送到executor
.
- 3. 解决方案: 让
Searcher
类实现序列化接口:Serializable
2. 传递变量
- 创建函数
package day03
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
**
*
*@author 不温卜火
**
* @create 2020-07-24 20:12
**
* MyCSDN :https://buwenbuhuo.blog.csdn.net/
*/
object Demo02 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Demo02").setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.parallelize(Array("hello world", "hello buwenbuhuo", "xiaoli", "hahah"), 2)
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD2(rdd)
result.collect.foreach(println)
}
// query 为需要查找的子字符串
class Searcher(val query: String) {
// 判断 s 中是否包括子字符串 query
def isMatch(s: String) = {
s.contains(query)
}
// 过滤出包含 query字符串的字符串组成的新的 RDD
def getMatchedRDD2(rdd: RDD[String]) = {
rdd.filter(_.contains(query))
}
}
}
- 2. 运行查看结果(会报错)
报错原因
:
这次没有传递函数, 而是传递了一个属性过去. 仍然会报错没有序列化. 因为this
仍然没有序列化.
- 3. 解决方案:
- 1.让类实现序列化接口:
Serializable
- 2.传递局部变量而不是属性
3. kryo 序列化框架
参考地址: https://github.com/EsotericSoftware/kryo
Java 的序列化比较重, 能够序列化任何的类. 比较灵活,但是相当的慢, 并且序列化后对象的体积也比较大.
Spark 出于性能的考虑, 支持另外一种序列化机制: kryo (2.0开始支持). kryo 比较快和简洁.(速度是Serializable
的10倍). 想获取更好的性能应该使用 kryo 来序列化.
从2.0开始, Spark 内部已经在使用 kryo 序列化机制: 当 RDD 在 Shuffle
数据的时候, 简单数据类型, 简单数据类型的数组和字符串类型已经在使用 kryo 来序列化.
有一点需要注意的是: 即使使用 kryo 序列化, 也要继承 Serializable 接口.
- 1.代码案例
package day03
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
**
*
*@author 不温卜火
**
* @create 2020-07-24 20:36
**
* MyCSDN :https://buwenbuhuo.blog.csdn.net/
*/
object Demo03 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("Demo03")
.setMaster("local[*]")
// 替换默认的序列化机制 可以省(如果调用registerKryoClasses
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[Searcher]))
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.parallelize(Array("hello world", "hello buwenbuhuo", "xiaoli", "hahah"), 2)
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}
}
case class Searcher(val query: String) {
// 判断 s 中是否包括子字符串 query
def isMatch(s: String) = {
s.contains(query)
}
// 过滤出包含 query字符串的字符串组成的新的 RDD
def getMatchedRDD1(rdd: RDD[String]) = {
rdd.filter(isMatch) //
}
// 过滤出包含 query字符串的字符串组成的新的 RDD
def getMatchedRDD2(rdd: RDD[String]) = {
val q = query
rdd.filter(_.contains(q))
}
}
- 2.运行案例
本次的分享就到这里了