spark 数据处理 -- 数据采样【随机抽样、分层抽样、权重抽样】

2021-12-08 09:35:35 浏览数 (1)

文章大纲

  • 简介
    • 简单抽样方法都有哪些?
      • 随机抽样
      • 分层抽样
      • 权重抽样
    • SMOT 过采样
    • 欠采样
  • spark 数据采样 是均匀分布的嘛?
  • spark 代码样例
    • scala 版本 sampleBy
    • python版本
  • spark 数据类型转换
  • 参考文献

简介

简单抽样方法都有哪些?

简单抽样一般分为:

RandomSampling - 随机采样 StratifiedSampling - 分层采样 WeightedSampling - 权重采样

计算逻辑

  • 随机采样 系统随机从数据集中采集样本,随机种子的输入值不同导致采样结果不同。
  • 分层采样 分层抽样法也叫类型抽样法。它是从一个可以分成不同子总体(或称为层)的总体中,按规定的比例从不同层中随机抽取样品(个体)的方法。这种方法的优点是,样本的代表性比较好,抽样误差比较小。缺点是抽样手续较简单随机抽样还要繁杂些。定量调查中的分层抽样是一种卓越的概率抽样方式,在调查中经常被使用。 选择分层键列,假设分层键列为性别,其中男性与女性的比例为6:4,那么采样结果的样本比例也为6:4。
  • 权重采样 选择权重值列,假设权重值列为班级,样本A的班级序号为2,样本B的班级序号为1,则样本A被采样的概率为样本B的2倍。
  • 采样数 最终的采样数依赖于采样量计算方式,假设原始数据集样本数为100,如果选择数量方式,则最终数据集的采样数量与输入数量一致,如果选择比例方式,比例为0.8,则最终数据集的采样数量80。

随机抽样

分层抽样

样例:

https://www.cnblogs.com/itboys/p/9801489.html

pyspark 样例:

https://www.it1352.com/1933988.html

代码语言:javascript复制
from pyspark.sql.functions import lit
list = [(2147481832,23355149,1),(2147481832,973010692,1),(2147481832,2134870842,1),(2147481832,541023347,1),(2147481832,1682206630,1),(2147481832,1138211459,1),(2147481832,852202566,1),(2147481832,201375938,1),(2147481832,486538879,1),(2147481832,919187908,1),(214748183,919187908,1),(214748183,91187908,1)]
df = spark.createDataFrame(list, ["x1","x2","x3"])
df.show()

df.count()

df.groupBy("x1").count().show()
fractions = df.select("x1").distinct().withColumn("fraction", lit(0.8)).rdd.collectAsMap()
print(fractions)                                                            
# {2147481832: 0.8, 214748183: 0.8}
sampled_df = df.stat.sampleBy("x1", fractions, seed)
sampled_df.show()

sampled_df.count()
# 9

sampled_df.groupBy("x1").count().show()

参考:

  • https://stackoverflow.com/questions/32238727/stratified-sampling-in-spark/32241887

权重抽样

一个集合里有 n 个元素,每个元素有不同的权重,现在要不放回地随机抽取 m 个元素,每个元素被抽中的概率为元素的权重占总权重的比例。

https://www.jianshu.com/p/ece671db9813

https://lotabout.me/2018/Weighted-Random-Sampling/ https://stackoverflow.com/questions/48558131/sampling-with-weight-using-pyspark https://www.codenong.com/44352986/

SMOT 过采样

  • 针对类别不平衡的数据集,通过设定标签列、过采样标签和过采样率,使用SMOTE算法对设置的过采样标签类别的数据进行过采样输出过采样后的数据集
  • SMOTE算法使用插值的方法来为选择的少数类生成新的样本

欠采样

spark 数据采样 是均匀分布的嘛?

https://stackoverflow.com/questions/31633117/spark-is-sample-method-on-dataframes-uniform-sampling

There are a few code paths here:

If withReplacement = false && fraction > .4 then it uses a souped up random number generator (rng.nextDouble() <= fraction) and lets that do the work. This seems like it would be pretty uniform. If withReplacement = false && fraction <= .4 then it uses a more complex algorithm (GapSamplingIterator) that also seems pretty uniform. At a glance, it looks like it should be uniform also If withReplacement = true it does close to the same thing, except it can duplicate by the looks of it, so this looks to me like it would not be as uniform as the first two

spark 代码样例

特别注意的是,sample 函数用来随机抽样,主要是给dataset 用的。sampleBy 是用来做分层抽样的,主要是给dataframe 用的。

spark scala最新版文档:

  • http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/DataFrameStatFunctions.html

spark scala老版本的文档:

  • http://spark.apache.org/docs/2.4.7/api/scala/index.html#org.apache.spark.sql.DataFrameStatFunctions

pyspark rdd 文档:

  • http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sample.html?highlight=sample#pyspark.RDD.sample

pyspark dataframe 文档:

  • http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.sample.html?highlight=sample#pyspark.sql.DataFrame.sample

scala 版本 sampleBy

代码语言:javascript复制
def sampleBy[T](col: String, fractions: Map[T, Double], seed: Long): DataFrame

Returns a stratified sample without replacement based on the fraction given on each stratum.

  • T stratum type
  • col column that defines strata
  • fractions sampling fraction for each stratum. If a stratum is not specified, we treat its fraction as zero.
  • seed random seed
  • returns a new DataFrame that represents the stratified sample
代码语言:javascript复制
val df = spark.createDataFrame(Seq((1, 1), (1, 2), (2, 1), (2, 1), (2, 3), (3, 2),
  (3, 3))).toDF("key", "value")
val fractions = Map(1 -> 1.0, 3 -> 0.5)
df.stat.sampleBy("key", fractions, 36L).show()
 --- ----- 
|key|value|
 --- ----- 
|  1|    1|
|  1|    2|
|  3|    2|
 --- ----- 

python版本

代码语言:javascript复制
def sample(self, withReplacement=None, fraction=None, seed=None):
        """Returns a sampled subset of this :class:`DataFrame`.

        .. versionadded:: 1.3.0

        Parameters
        ----------
        withReplacement : bool, optional
            Sample with replacement or not (default ``False``).
        fraction : float, optional
            Fraction of rows to generate, range [0.0, 1.0].
        seed : int, optional
            Seed for sampling (default a random seed).

        Notes
        -----
        This is not guaranteed to provide exactly the fraction specified of the total
        count of the given :class:`DataFrame`.

        `fraction` is required and, `withReplacement` and `seed` are optional.

        Examples
        --------
        >>> df = spark.range(10)
        >>> df.sample(0.5, 3).count()
        7
        >>> df.sample(fraction=0.5, seed=3).count()
        7
        >>> df.sample(withReplacement=True, fraction=0.5, seed=3).count()
        1
        >>> df.sample(1.0).count()
        10
        >>> df.sample(fraction=1.0).count()
        10
        >>> df.sample(False, fraction=1.0).count()
        10
        """

        # For the cases below:
        #   sample(True, 0.5 [, seed])
        #   sample(True, fraction=0.5 [, seed])
        #   sample(withReplacement=False, fraction=0.5 [, seed])
        is_withReplacement_set = 
            type(withReplacement) == bool and isinstance(fraction, float)

        # For the case below:
        #   sample(faction=0.5 [, seed])
        is_withReplacement_omitted_kwargs = 
            withReplacement is None and isinstance(fraction, float)

        # For the case below:
        #   sample(0.5 [, seed])
        is_withReplacement_omitted_args = isinstance(withReplacement, float)

        if not (is_withReplacement_set
                or is_withReplacement_omitted_kwargs
                or is_withReplacement_omitted_args):
            argtypes = [
                str(type(arg)) for arg in [withReplacement, fraction, seed] if arg is not None]
            raise TypeError(
                "withReplacement (optional), fraction (required) and seed (optional)"
                " should be a bool, float and number; however, "
                "got [%s]." % ", ".join(argtypes))

        if is_withReplacement_omitted_args:
            if fraction is not None:
                seed = fraction
            fraction = withReplacement
            withReplacement = None

        seed = int(seed) if seed is not None else None
        args = [arg for arg in [withReplacement, fraction, seed] if arg is not None]
        jdf = self._jdf.sample(*args)
        return DataFrame(jdf, self.sql_ctx)

根据每个层上给定的分数返回分层样本,不进行替换。

代码语言:javascript复制
    def sampleBy(self, col, fractions, seed=None):
        """
        Returns a stratified sample without replacement based on the
        fraction given on each stratum.

        .. versionadded:: 1.5.0

        Parameters
        ----------
        col : :class:`Column` or str
            column that defines strata

            .. versionchanged:: 3.0
               Added sampling by a column of :class:`Column`
        fractions : dict
            sampling fraction for each stratum. If a stratum is not
            specified, we treat its fraction as zero.
        seed : int, optional
            random seed

        Returns
        -------
        a new :class:`DataFrame` that represents the stratified sample

        Examples
        --------
        >>> from pyspark.sql.functions import col
        >>> dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
        >>> sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
        >>> sampled.groupBy("key").count().orderBy("key").show()
         --- ----- 
        |key|count|
         --- ----- 
        |  0|    3|
        |  1|    6|
         --- ----- 
        >>> dataset.sampleBy(col("key"), fractions={2: 1.0}, seed=0).count()
        33
        """
        if isinstance(col, str):
            col = Column(col)
        elif not isinstance(col, Column):
            raise ValueError("col must be a string or a column, but got %r" % type(col))
        if not isinstance(fractions, dict):
            raise ValueError("fractions must be a dict but got %r" % type(fractions))
        for k, v in fractions.items():
            if not isinstance(k, (float, int, str)):
                raise ValueError("key must be float, int, or string, but got %r" % type(k))
            fractions[k] = float(v)
        col = col._jc
        seed = seed if seed is not None else random.randint(0, sys.maxsize)
        return DataFrame(self._jdf.stat().sampleBy(col, self._jmap(fractions), seed), self.sql_ctx)

spark 数据类型转换

DataFrame/Dataset 转 RDD: val rdd1=testDF.rdd

val rdd2=testDS.rdd

RDD 转 DataFrame: // 一般用元组把一行的数据写在一起,然后在toDF中指定字段名

import spark.implicits._

val testDF = rdd.map {line=>

(line._1,line._2)

}.toDF(“col1”,“col2”)

RDD 转 Dataet: // 核心就是要定义case class

import spark.implicits._

case class Coltest(col1:String, col2:Int)

val testDS = rdd.map{line=>Coltest(line._1,line._2)}.toDS

DataSet 转 DataFrame: // 这个转换简单,只是把 case class 封装成Row

import spark.implicits._

val testDF = testDS.toDF

DataFrame 转 DataSet: // 每一列的类型后,使用as方法(as方法后面还是跟的case class,这个是核心),转成Dataset。

import spark.implicits._

case class Coltest … …

val testDS = testDF.as[Coltest]

特别注意:

在使用一些特殊操作时,一定要加上import spark.implicits._ 不然toDF、toDS无法使用

今天学习了一招,发现DataFrame 转换为DataSet 时候比较讨厌,居然需要动态写个case class 其实不需要

可以这么写:

代码语言:javascript复制
df_dataset = df.asInstanceOf[Dataset[_]]

参考文献

https://gist.github.com/frne/391b809e3528efe6aac718e1a64f4603 https://gist.github.com/yoyama/ce83f688717719fc8ca145c3b3ff43fd

0 人点赞