RDD序列化

2022-05-06 15:27:03 浏览数 (1)

序列化介绍

在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。

为什么需要序列化?

通过几个案例演示,讲解spark开发中常见的几个关于序列化问题(org.apache.spark.SparkException: Task not serializable),然后引出为什么需要进行序列化。

案例一演示:外部变量

代码语言:javascript复制
class SerializableRDD {

  val x=10

  @Test
  def demo01(): Unit ={
    val conf=new SparkConf().setMaster("local[4]").setAppName("test")
    val sc = new SparkContext(conf)

    // 定义一个集合,
    val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))


    // 使用map算子,对每个元素都*x的值
    val rdd2: RDD[Int] = rdd1.map(m => m * x)
    // 结果展示
    println(rdd2.collect.toList)
  }
}

结果:报错了,意思是说SerializableRDD没有序列化。

代码语言:javascript复制
org.apache.spark.SparkException: Task not serializable
Serialization stack:
    - object not serializable (class: SerializableRDD, value: SerializableRDD@3c904f1e)

知道问题后,SerializableRDD 实现序列化

代码语言:javascript复制
class SerializableRDD extends Serializable {
 // 内部代码和上面一样,为了不影响阅读,就忽略掉了。
}

结果:

代码语言:javascript复制
List(10, 20, 30, 40, 50, 60, 70, 80, 90)

案例二演示:局部变量

  • SerializableRDD 未实现序列化接口
  • 将变量x放到函数中。
代码语言:javascript复制
class SerializableRDD{

  @Test
  def demo01(): Unit ={
    val conf=new SparkConf().setMaster("local[4]").setAppName("test")
    val sc = new SparkContext(conf)

    val x=10
    // 定义一个集合,
    val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))

    // 使用map算子,对每个元素都*x的值
    val rdd2: RDD[Int] = rdd1.map(m => m * x)
    // 结果展示
    println(rdd2.collect.toList)

  }

}

此时运行:会有问题吗? 结果没有:其原因是因为x属于局部变量,可以直接进行序列化。而放到外部,那么就需要与SerializableRDD关联,序列化x变量前肯定要序列化SerializableRDD,否则就会报错。

代码语言:javascript复制
List(10, 20, 30, 40, 50, 60, 70, 80, 90)

案例三演示:自定义对象

代码语言:javascript复制
class SerializableRDD{

  @Test
  def demo01(): Unit ={
    val conf=new SparkConf().setMaster("local[4]").setAppName("test")
    val sc = new SparkContext(conf)

    // 定义一个集合,
    val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
    
    // 创建对象
    val p=new Person()
    // 使用map算子,对每个元素都*x的值
    val rdd2: RDD[Int] = rdd1.map(m => p.m1(m))
    // 结果展示
    println(rdd2.collect.toList)

  }

}

/**
 * 随便定义一个变量,加啥无所谓
 */
class Person(){

  val x=10

  def m1(m:Int):Int={
    m*x
  }
}

运行结果如何?是否有问题?,你可能会说Person没有序列化。 是的报错了。

代码语言:javascript复制
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: Person

Person实现序列化

代码语言:javascript复制
class Person() extends Serializable {

  val x=10

  def m1(m:Int):Int={
    m*x
  }
}

再次运行,结果就正常了

代码语言:javascript复制
List(10, 20, 30, 40, 50, 60, 70, 80, 90)

也许你还在想为啥不把Person类定义到SerializableRDD中,其原因和变量x定义到SerializableRDD作用是一样的。肯定需要先将SerializableRDD序列化。

案例四演示:样例类

代码语言:javascript复制
class SerializableRDD{



  @Test
  def demo01(): Unit ={
    val conf=new SparkConf().setMaster("local[4]").setAppName("test")
    val sc = new SparkContext(conf)

    val x=10
    // 定义一个集合,
    val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))

    // 创建对象
    val p=new Person2(x)


    // 使用map算子,对每个元素都*x的值
    val rdd2: RDD[Int] = rdd1.map(m => p.m1(m))
    // 结果展示
    println(rdd2.collect.toList)

  }

}

/**
 * 定义一个样例类
 */
case class Person2(x:Int){
  def m1(m:Int):Int={
    m*x
  }
}

结果:

代码语言:javascript复制
List(10, 20, 30, 40, 50, 60, 70, 80, 90)

咦?为什么样例类可以?其实样例类默认就实现了序列化接口。

问题思考:为什么需要序列化呢?

  • park中 算子里面代码是在executor中执行的
  • 算子外面的代码是在Driver中执行的
  • 所以如果算子里面的函数使用了Driver的对象,此时要求该对象必须能够序列化
  • 样例类默认已经实现了序列化接口

spark是分布式计算引擎,是需要在不同的服务器或线程中运行。若不进行序列化怎么传递数据?明白这句话,在看看上面的总结就明白了。


spark中的序列化

了解序列化之后,再看看spark中哪些序列化,每种序列化有什么优势。

在spark中有两种序列化方式

  1. java的序列化方式(Serializable)
  2. Kryo 序列化方式。

spark默认使用的是Java序列化

  • java序列化: 会序列化对象包的信息、属性的类型信息、继承信息等
  • Kryo序列化: 只序列化基础的信息
  • Kryo序列化整体性能要比java序列化高10倍左右

spark中要想使用kryo序列化:

  1. 在sparkconf中通过set方法配置: set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
  2. 注册待序列化的类[可选]: registerKryoClasses(Array(classOf[Dog]))

注册与不注册的区别:

  • 注册后的类在后续kryo序列化的时候,不会序列化包的信息
  • 类没有注册的话后续在kryo序列化的时候,会序列化包的信息

java序列化方式和

建立一个Student对象

代码语言:javascript复制
class Student extends Serializable {

  @BeanProperty
  var name:String=_

}

java 序列化

代码语言:javascript复制
 /**
   * java 序列化的方式
   */
  def javaSerializable(): Unit ={
    val stu=new Student
    stu.setName("张三")

    val path="C:\Users\123456\Desktop\javaSerializable.txt"
    // 进行序列化

    val out=new FileOutputStream(path)

    val objectOutput=new ObjectOutputStream(out)
    objectOutput.writeObject(stu)
    objectOutput.flush()
    objectOutput.close()


    //进行反序列化
    val objectInput:ObjectInputStream=new ObjectInputStream(new FileInputStream(path))
    val stu2=   objectInput.readObject().asInstanceOf[Student]
    println(stu2.getName)
  }

文件大小(为了美观,就不截图了)

代码语言:javascript复制
javaSerializable.txt 文件大小
65 字节 (65 字节)

反序列化 name 值

代码语言:javascript复制
张三

Kryo序列化 导入相关依赖

代码语言:javascript复制
import com.twitter.chill.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
代码语言:javascript复制
  /**
   * kryo 序列化的方式
   */
  def kryoSerializable(): Unit ={

    val stu=new Student
    stu.setName("张三")

    val path="C:\Users\123456\Desktop\kryoSerializable.txt"

    val kryo=new Kryo()

    //序列化
    val output = new Output(new FileOutputStream(path))
    kryo.writeObject(output,stu)
    //关闭资源;否则会出现 .KryoException: Buffer underflow.
    output.close()

    // 反序列化
    val input = new Input(new FileInputStream(path))
    val stu2= kryo.readObject(input,classOf[Student])
    println(stu2.getName)

  }

文件大小(为了美观,就不截图了)

代码语言:javascript复制
kryoSerializable.txt 文件大小
9 字节 (9 字节)

反序列化 name 值

代码语言:javascript复制
张三

使用kryo 可以大大提高性能

代码语言:javascript复制
>>> 65/9
7.222222222222222

若有兴趣,可以下去测试一下,上面代码直接可以copy修改一下直接使用。


上面介绍了,spark默认是使用java的序列化方式,如何在spark中使用Kryo的序列化方式呢?

我们从spark官网上进行查看相关配置。

spark.serializer

org.apache.spark.serializer.JavaSerializer

Class to use for serializing objects that will be sent over the network or need to be cached in serialized form. The default of Java serialization works with any Serializable Java object but is quite slow, so we recommend using org.apache.spark.serializer.KryoSerializer and configuring Kryo serialization when speed is necessary. Can be any subclass of org.apache.spark.Serializer.

0.5.0

默认使用的是org.apache.spark.serializer.JavaSerializer,也就是java的序列化方式,若我们使用Kryo的序列化方式,只需要将配置改成org.apache.spark.serializer.KryoSerializer即可

如何配置? 在创建SparkContext 是将spark.serializer 添加到配置中(如下),即可。

代码语言:javascript复制
val conf=new SparkConf()
 .setMaster("local[4]")
 .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
 .setAppName("test")
val sc = new SparkContext(conf)

除此之外还可以对某些类进行注册; 配置方式:.registerKryoClasses(Array(classOf[Student]))

代码语言:javascript复制
val conf=new SparkConf()
 .setMaster("local[4]")
 .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
 .registerKryoClasses(Array(classOf[Student]))
 .setAppName("test")
val sc = new SparkContext(conf)

表示后续可以对Student 类可以进行kryo序列化,也可以进行多个类注册.registerKryoClasses(Array(classOf[Student],...)),该方式是可选的。

注册与不注册的区别:

  • 注册后的类在后续kryo序列化的时候,不会序列化包的信息
  • 类没有注册的话后续在kryo序列化的时候,会序列化包的信息

在spark每个算子都会进行一次闭包检查和处理 如:map算子

代码语言:javascript复制
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}

val cleanF = sc.clean(f):就是再进行闭包检查和处理。 然后一直往里点,直到进入clean 中(如下)

代码语言:javascript复制
  private def clean(
      func: AnyRef,
      checkSerializable: Boolean,
      cleanTransitively: Boolean,
      accessedFields: Map[Class[_], Set[String]]): Unit = {}

这个函数代码很长,往下翻

代码语言:javascript复制
if (checkSerializable) {
  ensureSerializable(func)
}

进入ensureSerializable函数中

代码语言:javascript复制
  private def ensureSerializable(func: AnyRef): Unit = {
    try {
      if (SparkEnv.get != null) {
        //根据序列化完成创建
        SparkEnv.get.closureSerializer.newInstance().serialize(func)
      }
    } catch {
      // 若未进行序列化,直接报错
      case ex: Exception => throw new SparkException("Task not serializable", ex)
    }
  }

点击进入.serialize(func)

代码语言:javascript复制
@DeveloperApi
@NotThreadSafe
abstract class SerializerInstance {
  def serialize[T: ClassTag](t: T): ByteBuffer

  def deserialize[T: ClassTag](bytes: ByteBuffer): T

  def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T

  def serializeStream(s: OutputStream): SerializationStream

  def deserializeStream(s: InputStream): DeserializationStream
}

SerializerInstance 是一个抽象类,他有三个子类;

  1. DummySerializerInstance(我们用不到)
  2. JavaSerializerInstance(java 的方式)
  3. KryoSerializerInstance(kryo的方式)

具体使用什么根据spark.serializer 默认为org.apache.spark.serializer.JavaSerializer

最后

序列化

  1. 场景:
    • 如果在Driver中定义了对象,该对象后续需要在executor中使用的时候,此时要求该对象必须能够序列化
    • 如果算子里面的代码是在executor中执行的
    • 算子外面的代码是在Driver中执行的
  2. spark里面默认使用是java序列化,java序列化性能比较低
    • 而kryo序列化性能比java高10倍左右
    • 所以工作中一般使用kryo序列化
  3. spark如何使用kryo序列化
    • 在sparkconf中通过set方法进行设置: .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

0 人点赞