Sampler类与4种采样方式

2022-09-02 13:31:34 浏览数 (1)

由于我们不能将大量数据一次性放入网络中进行训练,所以需要分批进行数据读取。这一过程涉及到如何从数据集中读取数据的问题,pytorch提供了Sampler基类【1】与多个子类实现不同方式的数据采样。子类包含:

  • Sequential Sampler(顺序采样)
  • Random Sampler(随机采样)
  • Subset Random Sampler(子集随机采样)
  • Weighted Random Sampler(加权随机采样)等等。

1、基类Sampler

代码语言:javascript复制
class Sampler(object):
    r"""Base class for all Samplers.
    """
    def __init__(self, data_source):
        pass
    def __iter__(self):
        raise NotImplementedError

对于所有的采样器来说,都需要继承Sampler类,必须实现的方法为__iter__(),也就是定义迭代器行为,返回可迭代对象。除此之外,Sampler类并没有定义任何其它的方法。

2、顺序采样Sequential Sampler

代码语言:javascript复制
class SequentialSampler(Sampler):
    r"""Samples elements sequentially, always in the same order.
    Arguments:
        data_source (Dataset): dataset to sample from
    """
    def __init__(self, data_source):
        self.data_source = data_source
    def __iter__(self):
        return iter(range(len(self.data_source)))
    def __len__(self):
        return len(self.data_source)

顺序采样类并没有定义过多的方法,其中初始化方法仅仅需要一个Dataset类对象作为参数。对于__len__()只负责返回数据源包含的数据个数;__iter__()方法负责返回一个可迭代对象,这个可迭代对象是由range产生的顺序数值序列,也就是说迭代是按照顺序进行的。前述几种方法都只需要self.data_source实现了__len__()方法,因为这几种方法都仅仅使用了len(self.data_source)函数。所以下面采用同样实现了__len__()的list类型来代替Dataset类型做测试:

代码语言:javascript复制
# 定义数据和对应的采样器
data = list([17, 22, 3, 41, 8])
seq_sampler = sampler.SequentialSampler(data_source=data)
# 迭代获取采样器生成的索引
for index in seq_sampler:
    print("index: {}, data: {}".format(str(index), str(data[index])))

得到下面的输出,说明Sequential Sampler产生的索引是顺序索引

代码语言:javascript复制
index: 0, data: 17
index: 1, data: 22
index: 2, data: 3
index: 3, data: 41
index: 4, data: 8

3、随机采样RandomSampler

代码语言:javascript复制
class RandomSampler(Sampler):
    r"""Samples elements randomly. If without replacement, then sample from a shuffled dataset.
    If with replacement, then user can specify :attr:`num_samples` to draw.
    Arguments:
        data_source (Dataset): dataset to sample from
        replacement (bool): samples are drawn with replacement if ``True``, default=``False``
        num_samples (int): number of samples to draw, default=`len(dataset)`. This argument
            is supposed to be specified only when `replacement` is ``True``.
    """
    def __init__(self, data_source, replacement=False, num_samples=None):
        self.data_source = data_source
        # 这个参数控制的应该为是否重复采样
        self.replacement = replacement
        self._num_samples = num_samples
    # 省略类型检查
    @property
    def num_samples(self):
        # dataset size might change at runtime
        # 初始化时不传入num_samples的时候使用数据源的长度
        if self._num_samples is None:
            return len(self.data_source)
        return self._num_samples
    # 返回数据集长度
    def __len__(self):
        return self.num_samples

最重要的是__iter__()方法,定义了核心的索引生成行为:

代码语言:javascript复制
def __iter__(self):
    n = len(self.data_source)
    if self.replacement:
        # 生成的随机数是可能重复的
        return iter(torch.randint(high=n, size=(self.num_samples,), dtype=torch.int64).tolist())
    # 生成的随机数是不重复的
    return iter(torch.randperm(n).tolist())

其中if判断处返回了两种随机值,根据是否在初始化中给出replacement参数决定是否重复采样,区别核心在于randint()函数生成的随机数学列是可能包含重复数值的,而randperm()函数生成的随机数序列是绝对不包含重复数值的,下面分别测试是否使用replacement作为输入的情况,首先是不使用时:

代码语言:javascript复制
ran_sampler = sampler.RandomSampler(data_source=data)
# 得到下面输出
ran_sampler = sampler.RandomSampler(data_source=data)
index: 0, data: 17
index: 2, data: 3
index: 3, data: 41
index: 4, data: 8
index: 1, data: 22

可以看出生成的随机索引是不重复的,下面是采用replacement参数的情况:

代码语言:javascript复制
ran_sampler = sampler.RandomSampler(data_source=data, replacement=True)
# 得到下面的输出
index: 0, data: 17
index: 4, data: 8
index: 3, data: 41
index: 4, data: 8
index: 2, data: 3

此时生成的随机索引是有重复的(4出现两次),也就是说第“4”条数据可能会被重复的采样。

4、子集随机采样Subset Random Sampler

代码语言:javascript复制
class SubsetRandomSampler(Sampler):
    r"""Samples elements randomly from a given list of indices, without replacement.
    Arguments:
        indices (sequence): a sequence of indices
    """
    def __init__(self, indices):
        # 数据集的切片,比如划分训练集和测试集
        self.indices = indices
    def __iter__(self):
        # 以元组形式返回不重复打乱后的“数据”
        return (self.indices[i] for i in torch.randperm(len(self.indices)))
    def __len__(self):
        return len(self.indices)

上述代码中__len__()的作用与前面几个类的相同,依旧是返回数据集的长度,区别在于__iter__()返回的并不是随机数序列,而是通过随机数序列作为indices的索引,进而返回打乱的数据本身。需要注意的仍然是采样是不重复的,也是通过randperm()函数实现的。按照网上可以搜集到的资料,Subset Random Sampler应该用于训练集、测试集和验证集的划分,下面将data划分为train和val两个部分,再次指出__iter__()返回的的不是索引,而是索引对应的数据:

代码语言:javascript复制
sub_sampler_train = sampler.SubsetRandomSampler(indices=data[0:2])
sub_sampler_val = sampler.SubsetRandomSampler(indices=data[2:])
# 下面是train输出
index: 17
index: 22
*************
# 下面是val输出
index: 8
index: 41
index: 3

5、加权随机采样WeightedRandomSampler

代码语言:javascript复制
class WeightedRandomSampler(Sampler):
    r"""Samples elements from ``[0,..,len(weights)-1]`` with given probabilities (weights)."""
    def __init__(self, weights, num_samples, replacement=True):
         # ...省略类型检查
         # weights用于确定生成索引的权重
        self.weights = torch.as_tensor(weights, dtype=torch.double)
        self.num_samples = num_samples
        # 用于控制是否对数据进行有放回采样
        self.replacement = replacement
    def __iter__(self):
        # 按照加权返回随机索引值
        return iter(torch.multinomial(self.weights, self.num_samples, self.replacement).tolist())

对于Weighted Random Sampler类的__init__()来说,replacement参数依旧用于控制采样是否是有放回的;num_sampler用于控制生成的个数;weights参数对应的是“样本”的权重而不是“类别的权重”。其中__iter__()方法返回的数值为随机数序列,只不过生成的随机数序列是按照weights指定的权重确定的,测试代码如下:

代码语言:javascript复制
# 位置[0]的权重为0,位置[1]的权重为10,其余位置权重均为1.1
weights = torch.Tensor([0, 10, 1.1, 1.1, 1.1, 1.1, 1.1])
wei_sampler = sampler.WeightedRandomSampler(weights, 6, True)
# 下面是输出:
index: 1
index: 2
index: 3
index: 4
index: 1
index: 1

从输出可以看出,位置[1]由于权重较大,被采样的次数较多,位置[0]由于权重为0所以没有被采样到,其余位置权重低所以都仅仅被采样一次。

6、批采样BatchSampler

代码语言:javascript复制
class BatchSampler(Sampler):
    r"""Wraps another sampler to yield a mini-batch of indices."""
    def __init__(self, sampler, batch_size, drop_last):、
        # ...省略类型检查
        # 定义使用何种采样器Sampler
        self.sampler = sampler
        self.batch_size = batch_size
        # 是否在采样个数小于batch_size时剔除本次采样
        self.drop_last = drop_last
    def __iter__(self):
        batch = []
        for idx in self.sampler:
            batch.append(idx)
            # 如果采样个数和batch_size相等则本次采样完成
            if len(batch) == self.batch_size:
                yield batch
                batch = []
        # for结束后在不需要剔除不足batch_size的采样个数时返回当前batch        
        if len(batch) > 0 and not self.drop_last:
            yield batch
    def __len__(self):
        # 在不进行剔除时,数据的长度就是采样器索引的长度
        if self.drop_last:
            return len(self.sampler) // self.batch_size
        else:
            return (len(self.sampler)   self.batch_size - 1) // self.batch_size

在定义好各种采样器以后,需要进行“batch”的采样。BatchSampler类的__init__()函数中sampler参数对应前面介绍的XxxSampler类实例,也就是采样方式的定义;drop_last为“True”时,如果采样得到的数据个数小于batch_size则抛弃本个batch的数据。对于__init__()中的for循环,作用应该是以“生成器”的方式不断的从sampler中获取batch,比如sampler中包含1.5个batch_size的数据,那么在drop_last为False的情况下,for循环就会yield出个batch,下面的例子中batch sampler采用的采样器为顺序采样器:

代码语言:javascript复制
seq_sampler = sampler.SequentialSampler(data_source=data)
batch_sampler = sampler.BatchSampler(seq_sampler, 3, False)
# 下面是输出
batch: [0, 1, 2]
batch: [3, 4]

总结:

对于深度神经网络来说,数据的读取方式对网络的优化过程是有很大影响的。比如常见的策略为“打乱”数据集,使得每个epoch中各个batch包含的数据是不同的。在训练网络时,应该从上述的几种采样策略中进行选择,从而确定最优方式。

各位看官老爷,如果觉得对您有用麻烦赏个子,创作不易,0.1元就行了。下面是微信乞讨码:

0 人点赞