文章大纲
- 简介
- 简单抽样方法都有哪些?
- 随机抽样
- 分层抽样
- 权重抽样
- SMOT 过采样
- 欠采样
- 简单抽样方法都有哪些?
- spark 数据采样 是均匀分布的嘛?
- spark 代码样例
- scala 版本 sampleBy
- python版本
- spark 数据类型转换
- 参考文献
简介
简单抽样方法都有哪些?
简单抽样一般分为:
RandomSampling - 随机采样 StratifiedSampling - 分层采样 WeightedSampling - 权重采样
计算逻辑
- 随机采样 系统随机从数据集中采集样本,随机种子的输入值不同导致采样结果不同。
- 分层采样 分层抽样法也叫类型抽样法。它是从一个可以分成不同子总体(或称为层)的总体中,按规定的比例从不同层中随机抽取样品(个体)的方法。这种方法的优点是,样本的代表性比较好,抽样误差比较小。缺点是抽样手续较简单随机抽样还要繁杂些。定量调查中的分层抽样是一种卓越的概率抽样方式,在调查中经常被使用。 选择分层键列,假设分层键列为性别,其中男性与女性的比例为6:4,那么采样结果的样本比例也为6:4。
- 权重采样 选择权重值列,假设权重值列为班级,样本A的班级序号为2,样本B的班级序号为1,则样本A被采样的概率为样本B的2倍。
- 采样数 最终的采样数依赖于采样量计算方式,假设原始数据集样本数为100,如果选择数量方式,则最终数据集的采样数量与输入数量一致,如果选择比例方式,比例为0.8,则最终数据集的采样数量80。
随机抽样
分层抽样
样例:
https://www.cnblogs.com/itboys/p/9801489.html
pyspark 样例:
https://www.it1352.com/1933988.html
代码语言:javascript复制from pyspark.sql.functions import lit
list = [(2147481832,23355149,1),(2147481832,973010692,1),(2147481832,2134870842,1),(2147481832,541023347,1),(2147481832,1682206630,1),(2147481832,1138211459,1),(2147481832,852202566,1),(2147481832,201375938,1),(2147481832,486538879,1),(2147481832,919187908,1),(214748183,919187908,1),(214748183,91187908,1)]
df = spark.createDataFrame(list, ["x1","x2","x3"])
df.show()
df.count()
df.groupBy("x1").count().show()
fractions = df.select("x1").distinct().withColumn("fraction", lit(0.8)).rdd.collectAsMap()
print(fractions)
# {2147481832: 0.8, 214748183: 0.8}
sampled_df = df.stat.sampleBy("x1", fractions, seed)
sampled_df.show()
sampled_df.count()
# 9
sampled_df.groupBy("x1").count().show()
参考:
- https://stackoverflow.com/questions/32238727/stratified-sampling-in-spark/32241887
权重抽样
一个集合里有 n 个元素,每个元素有不同的权重,现在要不放回地随机抽取 m 个元素,每个元素被抽中的概率为元素的权重占总权重的比例。
https://www.jianshu.com/p/ece671db9813
https://lotabout.me/2018/Weighted-Random-Sampling/ https://stackoverflow.com/questions/48558131/sampling-with-weight-using-pyspark https://www.codenong.com/44352986/
SMOT 过采样
- 针对类别不平衡的数据集,通过设定标签列、过采样标签和过采样率,使用SMOTE算法对设置的过采样标签类别的数据进行过采样输出过采样后的数据集
- SMOTE算法使用插值的方法来为选择的少数类生成新的样本
欠采样
spark 数据采样 是均匀分布的嘛?
https://stackoverflow.com/questions/31633117/spark-is-sample-method-on-dataframes-uniform-sampling
There are a few code paths here:
If withReplacement = false && fraction > .4 then it uses a souped up random number generator (rng.nextDouble() <= fraction) and lets that do the work. This seems like it would be pretty uniform. If withReplacement = false && fraction <= .4 then it uses a more complex algorithm (GapSamplingIterator) that also seems pretty uniform. At a glance, it looks like it should be uniform also If withReplacement = true it does close to the same thing, except it can duplicate by the looks of it, so this looks to me like it would not be as uniform as the first two
spark 代码样例
特别注意的是,sample 函数用来随机抽样,主要是给dataset 用的。sampleBy 是用来做分层抽样的,主要是给dataframe 用的。
spark scala最新版文档:
- http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/DataFrameStatFunctions.html
spark scala老版本的文档:
- http://spark.apache.org/docs/2.4.7/api/scala/index.html#org.apache.spark.sql.DataFrameStatFunctions
pyspark rdd 文档:
- http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sample.html?highlight=sample#pyspark.RDD.sample
pyspark dataframe 文档:
- http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.sample.html?highlight=sample#pyspark.sql.DataFrame.sample
scala 版本 sampleBy
代码语言:javascript复制def sampleBy[T](col: String, fractions: Map[T, Double], seed: Long): DataFrame
Returns a stratified sample without replacement based on the fraction given on each stratum.
- T stratum type
- col column that defines strata
- fractions sampling fraction for each stratum. If a stratum is not specified, we treat its fraction as zero.
- seed random seed
- returns a new DataFrame that represents the stratified sample
val df = spark.createDataFrame(Seq((1, 1), (1, 2), (2, 1), (2, 1), (2, 3), (3, 2),
(3, 3))).toDF("key", "value")
val fractions = Map(1 -> 1.0, 3 -> 0.5)
df.stat.sampleBy("key", fractions, 36L).show()
--- -----
|key|value|
--- -----
| 1| 1|
| 1| 2|
| 3| 2|
--- -----
python版本
代码语言:javascript复制def sample(self, withReplacement=None, fraction=None, seed=None):
"""Returns a sampled subset of this :class:`DataFrame`.
.. versionadded:: 1.3.0
Parameters
----------
withReplacement : bool, optional
Sample with replacement or not (default ``False``).
fraction : float, optional
Fraction of rows to generate, range [0.0, 1.0].
seed : int, optional
Seed for sampling (default a random seed).
Notes
-----
This is not guaranteed to provide exactly the fraction specified of the total
count of the given :class:`DataFrame`.
`fraction` is required and, `withReplacement` and `seed` are optional.
Examples
--------
>>> df = spark.range(10)
>>> df.sample(0.5, 3).count()
7
>>> df.sample(fraction=0.5, seed=3).count()
7
>>> df.sample(withReplacement=True, fraction=0.5, seed=3).count()
1
>>> df.sample(1.0).count()
10
>>> df.sample(fraction=1.0).count()
10
>>> df.sample(False, fraction=1.0).count()
10
"""
# For the cases below:
# sample(True, 0.5 [, seed])
# sample(True, fraction=0.5 [, seed])
# sample(withReplacement=False, fraction=0.5 [, seed])
is_withReplacement_set =
type(withReplacement) == bool and isinstance(fraction, float)
# For the case below:
# sample(faction=0.5 [, seed])
is_withReplacement_omitted_kwargs =
withReplacement is None and isinstance(fraction, float)
# For the case below:
# sample(0.5 [, seed])
is_withReplacement_omitted_args = isinstance(withReplacement, float)
if not (is_withReplacement_set
or is_withReplacement_omitted_kwargs
or is_withReplacement_omitted_args):
argtypes = [
str(type(arg)) for arg in [withReplacement, fraction, seed] if arg is not None]
raise TypeError(
"withReplacement (optional), fraction (required) and seed (optional)"
" should be a bool, float and number; however, "
"got [%s]." % ", ".join(argtypes))
if is_withReplacement_omitted_args:
if fraction is not None:
seed = fraction
fraction = withReplacement
withReplacement = None
seed = int(seed) if seed is not None else None
args = [arg for arg in [withReplacement, fraction, seed] if arg is not None]
jdf = self._jdf.sample(*args)
return DataFrame(jdf, self.sql_ctx)
根据每个层上给定的分数返回分层样本,不进行替换。
代码语言:javascript复制 def sampleBy(self, col, fractions, seed=None):
"""
Returns a stratified sample without replacement based on the
fraction given on each stratum.
.. versionadded:: 1.5.0
Parameters
----------
col : :class:`Column` or str
column that defines strata
.. versionchanged:: 3.0
Added sampling by a column of :class:`Column`
fractions : dict
sampling fraction for each stratum. If a stratum is not
specified, we treat its fraction as zero.
seed : int, optional
random seed
Returns
-------
a new :class:`DataFrame` that represents the stratified sample
Examples
--------
>>> from pyspark.sql.functions import col
>>> dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
>>> sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
>>> sampled.groupBy("key").count().orderBy("key").show()
--- -----
|key|count|
--- -----
| 0| 3|
| 1| 6|
--- -----
>>> dataset.sampleBy(col("key"), fractions={2: 1.0}, seed=0).count()
33
"""
if isinstance(col, str):
col = Column(col)
elif not isinstance(col, Column):
raise ValueError("col must be a string or a column, but got %r" % type(col))
if not isinstance(fractions, dict):
raise ValueError("fractions must be a dict but got %r" % type(fractions))
for k, v in fractions.items():
if not isinstance(k, (float, int, str)):
raise ValueError("key must be float, int, or string, but got %r" % type(k))
fractions[k] = float(v)
col = col._jc
seed = seed if seed is not None else random.randint(0, sys.maxsize)
return DataFrame(self._jdf.stat().sampleBy(col, self._jmap(fractions), seed), self.sql_ctx)
spark 数据类型转换
DataFrame/Dataset 转 RDD: val rdd1=testDF.rdd
val rdd2=testDS.rdd
RDD 转 DataFrame: // 一般用元组把一行的数据写在一起,然后在toDF中指定字段名
import spark.implicits._
val testDF = rdd.map {line=>
(line._1,line._2)
}.toDF(“col1”,“col2”)
RDD 转 Dataet: // 核心就是要定义case class
import spark.implicits._
case class Coltest(col1:String, col2:Int)
val testDS = rdd.map{line=>Coltest(line._1,line._2)}.toDS
DataSet 转 DataFrame: // 这个转换简单,只是把 case class 封装成Row
import spark.implicits._
val testDF = testDS.toDF
DataFrame 转 DataSet: // 每一列的类型后,使用as方法(as方法后面还是跟的case class,这个是核心),转成Dataset。
import spark.implicits._
case class Coltest … …
val testDS = testDF.as[Coltest]
特别注意:
在使用一些特殊操作时,一定要加上import spark.implicits._ 不然toDF、toDS无法使用
今天学习了一招,发现DataFrame 转换为DataSet 时候比较讨厌,居然需要动态写个case class 其实不需要
可以这么写:
代码语言:javascript复制df_dataset = df.asInstanceOf[Dataset[_]]
参考文献
https://gist.github.com/frne/391b809e3528efe6aac718e1a64f4603 https://gist.github.com/yoyama/ce83f688717719fc8ca145c3b3ff43fd