大数据框架(分区,分桶,分片)

2022-03-23 09:25:13 浏览数 (1)

前言

在大数据分布式中,分区,分桶,分片是设计框架的重点。此篇就来总结各个框架。建议收藏

目录

  • Hive分区与分桶
  • ES分片
  • Kafka分区
  • HBase分区
  • Kudu分区

Hive

Hive分区

是按照数据表的某列或者某些列分为多区,在hive存储上是hdfs文件,也就是文件夹形式。现在最常用的跑T 1数据,按当天时间分区的较多。

把每天通过sqoop或者datax拉取的一天的数据存储一个区,也就是所谓的文件夹与文件。在查询时只要指定分区字段的值就可以直接从该分区查找即可。创建分区表的时候,要通过关键字 partitioned by (column name string)声明该表是分区表,并且是按照字段column name进行分区,column name值一致的所有记录存放在一个分区中,分区属性name的类型是string类型。

当然,可以依据多个列进行分区,即对某个分区的数据按照某些列继续分区。

向分区表导入数据的时候,要通过关键字partition((column name="xxxx")显示声明数据要导入到表的哪个分区

设置分区的影响
  1. 首先是hive本身对分区数有限制,不过可以修改限制的数量;
代码语言:javascript复制
set hive.exec.dynamic.partition=true;
set hive.exec.max.dynamic.partitions=1000; 
set hive.exec.dynamic.partition.mode=nonstrict; 
set hive.exec.parallel.thread.number=264;
  1. hdfs对单个目录下的目录数量或者文件数量也是有限制的,也是可以修改的;
  2. NN的内存肯定会限制,这是最重要的,如果分区数很大,会影响NN服务,进而影响一系列依赖于NN的服务。所以最好合理设置分区规则,对小文件也可以定期合并,减少NN的压力。

Hive分桶

在分区数量过于庞大以至于可能导致文件系统崩溃时,我们就需要使用分桶来解决问题

分桶是相对分区进行更细粒度的划分。分桶则是指定分桶表的某一列,让该列数据按照哈希取模的方式随机、均匀的分发到各个桶文件中。因为分桶操作需要根据某一列具体数据来进行哈希取模操作,故指定的分桶列必须基于表中的某一列(字段) 要使用关键字clustered by 指定分区依据的列名,还要指定分为多少桶

代码语言:javascript复制
create table test(id int,name string) cluster by (id) into 5 buckets .......

insert into buck select id ,name from p cluster by (id)

Hive分区分桶区别

  • 分区是表的部分列的集合,可以为频繁使用的数据建立分区,这样查找分区中的数据时就不需要扫描全表,这对于提高查找效率很有帮助
  • 不同于分区对列直接进行拆分,桶往往使用列的哈希值对数据打散,并分发到各个不同的桶中从而完成数据的分桶过程
  • 分区和分桶最大的区别就是分桶随机分割数据库,分区是非随机分割数据库

ElasticSearch分片

主分片:用于解决数据水平扩展的问题,一个索引的所有数据是分布在所有主分片之上的(每个主分片承担一部分数据,主分片又分布在不同的节点上),一个索引的主分片数量只能在创建时指定,后期无法修改,除非对数据进行重新构建索引(reindex操作)。

副本分片:用于解决数据高可用的问题,一个副本分片即一个主分片的拷贝,其数量可以动态调整,通过增加副本分片也可以实现提升系统读性能的作用。

在集群中唯一一个空节点上创建一个叫做 blogs 的索引。默认情况下,一个索引被分配 5 个主分片

代码语言:javascript复制
{
    "settings": {
        "number_of_shards": 5,
        "number_of_replicas": 1
    }
}

到底分配到那个shard上呢?

代码语言:javascript复制
shard = hash(routing) % number_of_primary_shards

routing 是一个可变值,默认是文档的 _id ,也可以设置成一个自定义的值。routing 通过 hash 函数生成一个数字,然后这个数字再除以 number_of_primary_shards (主分片的数量)后得到余数 。这个在 0 到 number_of_primary_shards 之间的余数,就是所寻求的文档所在分片的位置。

如果数量变化了,那么所有之前路由的值都会无效,文档也再也找不到了

  • 分片过少如15个节点,5个主分片,1个副本 会造成每个索引最多只能使用10个节点(5个主分片,5个从分片),剩余5节点并没有利用上;资源浪费如:3节点;3分主分片,1副本 当数据量较大的时,每个分片就会比较大
  • 分片过多
  1. 创建分片慢:es创建分片的速度会随着集群内分片数的增加而变慢。
  2. 集群易崩溃:在触发es 自动创建Index时,由于创建速度太慢,容易导致大量写入请求堆积在内存,从而压垮集群。
  3. 写入拒绝:分片过多的场景中,如果不能及时掌控业务的变化,可能经常遇到单分片记录超限、写入拒绝等问题。
分片的注意事项
  1. 避免使用非常大的分片,因为这会对群集从故障中恢复的能力产生负面影响。对分片的大小没有固定的限制,但是通常情况下很多场景限制在 30GB 的分片大小以内。
  2. 当在ElasticSearch集群中配置好你的索引后, 你要明白在集群运行中你无法调整分片设置. 既便以后你发现需要调整分片数量, 你也只能新建创建并对数据进行重新索引.
  3. 如果担心数据的快速增长, 建议根据这条限制: ElasticSearch推荐的最大JVM堆空间 是 30~32G, 所以把分片最大容量限制为 30GB, 然后再对分片数量做合理估算。例如, 如果的数据能达到 200GB, 则最多分配7到8个分片。

kafka分区

生产者

分区的原因

  1. 方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
  2. 可以提高并发,因为可以以Partition为单位读写了。

分区的原则

  1. 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
  2. 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
  3. 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。

消费者

分区分配策略

一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费 Kafka有三种分配策略,一是RoundRobin,一是Range。高版本还有一个StickyAssignor策略 将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance)。当以下事件发生时,Kafka 将会进行一次分区分配:

  • 同一个 Consumer Group 内新增消费者
  • 消费者离开当前所属的Consumer Group,包括shuts down 或 crashes
Range分区分配策略

Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n 1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。假如有10个分区,3个消费者线程,把分区按照序号排列

0,1,2,3,4,5,6,7,8,9

消费者线程为

C1-0,C2-0,C2-1

那么用partition数除以消费者线程的总数来决定每个消费者线程消费几个partition,如果除不尽,前面几个消费者将会多消费一个分区。在我们的例子里面,我们有10个分区,3个消费者线程,10/3 = 3,而且除除不尽,那么消费者线程C1-0将会多消费一个分区,所以最后分区分配的结果看起来是这样的:

C1-0:0,1,2,3

C2-0:4,5,6

C2-1:7,8,9

如果有11个分区将会是:

C1-0:0,1,2,3

C2-0:4,5,6,7

C2-1:8,9,10

假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:

C1-0:T1(0,1,2,3) T2(0,1,2,3)

C2-0:T1(4,5,6) T2(4,5,6)

C2-1:T1(7,8,9) T2(7,8,9)

RoundRobinAssignor分区分配策略

RoundRobinAssignor策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者. 使用RoundRobin策略有两个前提条件必须满足:

  • 同一个消费者组里面的所有消费者的num.streams(消费者消费线程数)必须相等;
  • 每个消费者订阅的主题必须相同。

加入按照 hashCode 排序完的topic-partitions组依次为

T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9

我们的消费者线程排序为

C1-0, C1-1, C2-0, C2-1

最后分区分配的结果为:

C1-0 将消费 T1-5, T1-2, T1-6 分区

C1-1 将消费 T1-3, T1-1, T1-9 分区

C2-0 将消费 T1-0, T1-4 分区

C2-1 将消费 T1-8, T1-7 分区

StickyAssignor分区分配策略

Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:

  • 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个
  • 分区的分配尽可能的与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目的,StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多。

假设消费组内有3个消费者

C0、C1、C2

它们都订阅了4个主题:

t0、t1、t2、t3

并且每个主题有2个分区,也就是说整个消费组订阅了

t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1这8个分区

最终的分配结果如下:

消费者C0:t0p0、t1p1、t3p0

消费者C1:t0p1、t2p0、t3p1

消费者C2:t1p0、t2p1

这样初看上去似乎与采用RoundRobinAssignor策略所分配的结果相同

此时假设消费者C1脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配。如果采用RoundRobinAssignor策略,那么此时的分配结果如下:

消费者C0:t0p0、t1p0、t2p0、t3p0

消费者C2:t0p1、t1p1、t2p1、t3p1

如分配结果所示,RoundRobinAssignor策略会按照消费者C0和C2进行重新轮询分配。而如果此时使用的是StickyAssignor策略,那么分配结果为:

消费者C0:t0p0、t1p1、t3p0、t2p0

消费者C2:t1p0、t2p1、t0p1、t3p1

可以看到分配结果中保留了上一次分配中对于消费者C0和C2的所有分配结果,并将原来消费者C1的“负担”分配给了剩余的两个消费者C0和C2,最终C0和C2的分配还保持了均衡。

如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。

到目前为止所分析的都是消费者的订阅信息都是相同的情况,我们来看一下订阅信息不同的情况下的处理。

举例,同样消费组内有3个消费者:

C0、C1、C2

集群中有3个主题:

t0、t1、t2

这3个主题分别有

1、2、3个分区

也就是说集群中有

t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区

消费者C0订阅了主题t0

消费者C1订阅了主题t0和t1

消费者C2订阅了主题t0、t1和t2

如果此时采用RoundRobinAssignor策略:

消费者C0:t0p0

消费者C1:t1p0

消费者C2:t1p1、t2p0、t2p1、t2p2

如果此时采用的是StickyAssignor策略:

消费者C0:t0p0

消费者C1:t1p0、t1p1

消费者C2:t2p0、t2p1、t2p2

此时消费者C0脱离了消费组,那么RoundRobinAssignor策略的分配结果为:

消费者C1:t0p0、t1p1

消费者C2:t1p0、t2p0、t2p1、t2p2

StickyAssignor策略,那么分配结果为:

消费者C1:t1p0、t1p1、t0p0

消费者C2:t2p0、t2p1、t2p2

可以看到StickyAssignor策略保留了消费者C1和C2中原有的5个分区的分配:

t1p0、t1p1、t2p0、t2p1、t2p2。

从结果上看StickyAssignor策略比另外两者分配策略而言显得更加的优异,这个策略的代码实现也是异常复杂。

注意

在实际开发过程中,kafka与spark或者flink对接的较多,一个分区对应的是一个并行度,如果并行度不够,这个时候会多个分区数据集中到一个并行度上。所以需要合理设置并行度

HBase分区

HBase每张表在底层存储上是由至少一个Region组成,Region实际上就是HBase表的分区。HBase新建一张表时默认Region即分区的数量为1,一般在生产环境中我们都会手动给Table提前做 “预分区”,使用合适的分区策略创建好一定数量的分区并使分区均匀分布在不同regionserver上。一个分区在达到一定大小时会自动Split,一分为二

HBase分区过多有哪些影响:
  • 频繁刷写:我们知道Region的一个列族对应一个MemStore,假设HBase表都有统一的1个列族配置,则每个Region只包含一个MemStore。通常HBase的一个MemStore默认大小为128 MB,见参数hbase.hregion.memstore.flush.size。当可用内存足够时,每个MemStore可以分配128 MB空间。当可用内存紧张时,假设每个Region写入压力相同,则理论上每个MemStore会平均分配可用内存空间。因此,当节点Region过多时,每个MemStore分到的内存空间就会很小。这个时候,写入很小的数据量就会被强制Flush到磁盘,将会导致频繁刷写。频繁刷写磁盘,会对集群HBase与HDFS造成很大的压力,可能会导致不可预期的严重后果。
  • 压缩风暴:因Region过多导致的频繁刷写,将在磁盘上产生非常多的HFile小文件,当小文件过多的时候HBase为了优化查询性能就会做Compaction操作,合并HFile减少文件数量。当小文件一直很多的时候,就会出现 “压缩风暴”。Compaction非常消耗系统io资源,还会降低数据写入的速度,严重的会影响正常业务的进行。
  • MSLAB内存消耗较大:MSLAB(MemStore-local allocation buffer)存在于每个MemStore中,主要是为了解决HBase内存碎片问题,默认会分配 2 MB 的空间用于缓存最新数据。如果Region数量过多,MSLAB总的空间占用就会比较大。比如当前节点有1000个包含1个列族的Region,MSLAB就会使用1.95GB的堆内存,即使没有数据写入也会消耗这么多内存。
  • Master assign region时间较长:HBase Region过多时Master分配Region的时间将会很长。特别体现在重启HBase时Region上线时间较长,严重的会达到小时级,造成业务长时间等待的后果。
  • 影响MapReduce并发数:当使用MapReduce操作HBase时,通常Region数量就是MapReduce的任务数,Region数量过多会导致并发数过多,产生过多的任务。任务太多将会占用大量资源,当操作包含很多Region的大表时,占用过多资源会影响其他任务的执行。
具体计算HBase合理分区数量
代码语言:javascript复制
((RS memory) * (total memstore fraction)) / ((memstore size)*(column families))

字段

解释

RS memory

表示regionserver堆内存大小,即HBASE_HEAPSIZE

total memstore fraction

表示所有MemStore占HBASE_HEAPSIZE的比例,HBase0.98版本以后由hbase.regionserver.global.memstore.size参数控制,老版本由hbase.regionserver.global.memstore.upperLimit参数控制,默认值0.4

memstore size

即每个MemStore的大小,原生HBase中默认128M

column families

即表的列族数量,通常情况下只设置1个,最多不超过3个

假如一个集群中每个regionserver的堆内存是32GB,那么节点上最理想的Region数量应该是32768*0.4/128 ≈ 102,所以,当前环境中单节点理想情况下大概有102个Region 最理想情况是假设每个Region上的填充率都一样,包括数据写入的频次、写入数据的大小,但实际上每个Region的负载各不相同,可能有的Region特别活跃负载特别高,有的Region则比较空闲。所以,通常我们认为2-3倍的理想Region数量也是比较合理的,针对上面举例来说,大概200-300个Region算是合理的。

如果实际的Region数量比2~3倍的计算值还要多,就要实际观察Region的刷写、压缩情况了,Region越多则风险越大。经验告诉我们,如果单节点Region数量过千,集群可能存在较大风险

Kudu分区

为了提供可扩展性,Kudu 表被划分为称为 tablets 的单元,并分布在许多 tablet servers 上。行总是属于单个 tablet 。将行分配给 tablet 的方法由在表创建期间设置的表的分区决定。kudu提供了3种分区方式:

  • Range Partitioning(范围分区) 范围分区可以根据存入数据的数据量,均衡的存储到各个机器上,防止机器出现负载不均衡现象
代码语言:javascript复制
create table people(id Type.INT32, name Type.STRING , age Type.INT32)
RANGE (age) (
    PARTITION 0 <= VALUES < 10,
    PARTITION 10 <= VALUES < 20,
    PARTITION 20 <= VALUES < 30,
    PARTITION 30 <= VALUES < 40,
    PARTITION 40 <= VALUES < 50,
    PARTITION 50 <= VALUES < 60,
    PARTITION 60 <= VALUES < 70,
    PARTITION 70 <= VALUES < 80,
    PARTITION 80 <= VALUES < 120
)
  • Hash Partitioning(哈希分区) 哈希分区通过哈希值将行分配到许多 buckets ( 存储桶 )之一;哈希分区是一种有效的策略,当不需要对表进行有序访问时。哈希分区对于在 tablet 之间随机散布这些功能是有效的,这有助于减轻热点和 tablet 大小不均匀。
代码语言:javascript复制
create table rangeTable(id Type.INT32, name Type.STRING , age Type.INT32)
HASH (id) PARTITIONS 5,
RANGE (id) (
    PARTITION UNBOUNDED
)
  • Multilevel Partitioning(多级分区)
代码语言:javascript复制
create table rangeTable(id Type.INT32, name Type.STRING , age Type.INT32)
HASH (age) PARTITIONS 5,
RANGE (age) (
    PARTITION 0 <= VALUES < 10,
    PARTITION 10 <= VALUES < 20,
    PARTITION 20 <= VALUES < 30,
    PARTITION 30 <= VALUES < 40,
    PARTITION 40 <= VALUES < 50,
    PARTITION 50 <= VALUES < 60,
    PARTITION 60 <= VALUES < 70,
    PARTITION 70 <= VALUES < 80,
    PARTITION 80 <= VALUES < 120

哈希分区有利于最大限度地提高写入吞吐量,而范围分区可避免 tablet 无限增长的问题;hash分区和range分区结合,可以极大提升kudu性能

总结

优秀的设计思想需要深入的研究与发现。工作实践总结是知识积累的很好途径。

0 人点赞