HBase行锁与MVCC分析

2018-03-20 17:35:20 浏览数 (1)

四个部分分析:

  • 案例场景
  • 流程解析
  • 0.94-0.96实现方案分析
  • 模拟试验及分析

一、案例场景

代码语言:javascript复制
转发微博

抱歉,此微博已被作者删除。查看帮助:http://t.cn/zWSudZc

| 转发| 收藏| 评论

本来是不同的短链码,结果删除后,会只在同一个token上操作,也就是 被=zWSudZc  

引发几个操作:

delete zWSudZc mid

decr zWSudZc  shareCount

引起的问题是发现写操作堵死在 zWSudZc这个rowKey上

微博feed如果采用HBase,以mid为rowKey,热门微博的操作也会面临这种问题。分析这个问题前我们先要了解HBase 如何保证写一致性:

冲突预防:避免分布式的并发写操作,将对特定数据项的所有写操作路由到单个节点上(可以是全局主节点或者分区主节点)。为了避免冲突,数据库必须牺牲网络隔离情况下的可用性。这种方法常用于许多提供强一致性保证的系统(例如大多数关系数据库,HBase,MongoDB)。

可以做如下猜想,单节点更新时:

  • 写操作会lock住读锁
  • 写操作集中执行,排队等待耗时。

二、流程解析

checkAndPut append increment operation in HRegion (HBase 0.94.X)

  • startRegionOperation (lock.readLock().lock());
  • rowLock lock
  • updatesLock.readLock().lock()
  • mvcc begion
  • mvcc finish
  • updatesLock.unLock
  • closeRegionOperation
  • get scan
  • startRegionOperation
  • MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
  • closeRegionOperation

三种锁区别

region lock updatesLock 都是ReentrantReadWriteLock。ReentrantReadWriteLock 可多读,有写锁被占则阻塞其他所有操作。updatesLock 只在region flush时写锁被占用,region lock 没有出现writelock被占用情况,怀疑无用。rowlock  为MultiVersionConsistencyControl 中 ConcurrentHashMap<HashedBytes, CountDownLatch> 类型,变量名lockedRows  闭锁

MVCC  MultiVersionConsistencyControl

  • 管理memstore的读/写一致性。Use MVCC to make this set of increments/appends atomic to reads
  • 0.94  0.94.2 中是待实现。TODO  in  increment append checkAnd (少一次MVCC,后续流程会看到)
  • 0.96  realized 
  • put operation,目前项目用的比较多的操作
  • 0.94:  HRegion internalPut

三、0.94-0.96实现方案分析

0.94中

  • increment  append  checkAndPut都使用了行锁和mvcc,但put调用的internalPut没有使用行锁,只使用了mvcc
  • 流程:
  • startRegionOperation (lock.readLock().lock());
  • rowLock lock
  • updatesLock.readLock().lock()
  • mvcc begion
  • mvcc finish
  • updatesLock.unLock
  • closeRegionOperation

0.96:

流程:

    (1)  Acquire RowLock

    (1a) BeginMVCC Finish MVCC

    (2)  Begin MVCC

    (3)  Do work

    (4)  Release RowLock

    (5)  Append to WAL

    (6)  Finish MVCC

wait for all prior MVCC transactions to finish - while we hold the row lock (so that we are guaranteed to see the latest state)

如果版本升级到0.96  由于MVCC的介入  increment操作可能更慢

0.96预计做的改进:

commiter也认为两次mvcc没必要 ,改进流程  https://issues.apache.org/jira/browse/HBASE-7263

(1)  Acquire RowLock

(1a) Grab Release RowWriteLock (instead of BeginMVCC Finish MVCC)

(1b) Grab RowReadLock (new step!)

(2)  Begin MVCC

(3)  Do work

(4)  Release RowLock

(5)  Append to WAL

(6)  Finish MVCC

(7)  Release RowReadLock (new step!)

另外也去掉了client端无用的分配lockid方法

四、模拟试验及分析

  • 构造模拟代码

HBaseInsertTest1类,  TestKeyValueSkipListSet为提取 HBase的KeyValueSkipListSet作为公有类,存储数据使用

代码语言:javascript复制
package com.daodao.hbase;

import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * Created with IntelliJ IDEA.
 *
 * @author guanpu
 *         Date: 13-1-9
 *         Time: 下午5:53
 *         分析0.94 insert操作性能
 */
public class HBaseInsertTest1 {
    volatile TestKeyValueSkipListSet kvset;
    final ReentrantReadWriteLock lock =
            new ReentrantReadWriteLock();
    final ReentrantReadWriteLock updatesLock =
            new ReentrantReadWriteLock();
    private final MultiVersionConsistencyControl mvcc =
            new MultiVersionConsistencyControl();
    private static AtomicInteger finishedCount;
    private static AtomicLong mvccTime = new AtomicLong(0l);
    private static AtomicLong rowlockTime = new AtomicLong(0l);
    private static AtomicLong lockTime = new AtomicLong(0l);
    private static AtomicLong updateLockTime = new AtomicLong(0l);
    private static AtomicLong insertTime = new AtomicLong(0l);
    private static AtomicLong releaseTime = new AtomicLong(0l);

    private final ConcurrentHashMap<String, CountDownLatch> lockedRows =
            new ConcurrentHashMap<String, CountDownLatch>();

    public HBaseInsertTest1() {
        kvset = new TestKeyValueSkipListSet(new KeyValue.KVComparator());
        finishedCount = new AtomicInteger(0);
    }

    class HBaseInsertTask implements Runnable {

        public void run() {
            for (int i = 0; i < 100000; i  ) {
                String key = "key"   i;
                long time = System.nanoTime();
                MultiVersionConsistencyControl.WriteEntry localizedWriteEntry = null;
                try {


                    lock.readLock().lock();   // like startRegionOperation do
                    lockTime.set(lockTime.get()   (System.nanoTime() - time));

                    time = System.nanoTime();
                    Integer lid = getLock(key);     //get rowKey lock
                    lockTime.set(System.nanoTime() - time);

                    time = System.nanoTime();
                    updatesLock.readLock().lock();
                    updateLockTime.set(updateLockTime.get()   (System.nanoTime() - time));

                    time = System.nanoTime();
                    localizedWriteEntry = mvcc.beginMemstoreInsert();
                    mvccTime.set(mvccTime.get()   (System.nanoTime() - time));

                    time = System.nanoTime();
                    kvset.add(new KeyValue(Bytes.toBytes(key), Bytes.toBytes("f"), Bytes.toBytes("column"),
                            1l, Bytes.toBytes(1l)));
                    insertTime.set(insertTime.get()   (System.nanoTime() - time));

                    time = System.nanoTime();
                    mvcc.completeMemstoreInsert(localizedWriteEntry);
                    mvccTime.set(mvccTime.get()   (System.nanoTime() - time));
                } catch (Exception e) {
                    System.out.println(e);
                } finally {
                    time = System.nanoTime();
                    updatesLock.readLock().unlock();

                    CountDownLatch rowLatch = lockedRows.remove(key);
                    rowLatch.countDown();

                    lock.readLock().unlock();
                    releaseTime.set(releaseTime.get()   (System.nanoTime() - time));

                }
            }
            finishedCount.set(finishedCount.get()   1);
            return;
        }

        private Integer getLock(String key) {
            CountDownLatch rowLatch = new CountDownLatch(1);

            // loop until we acquire the row lock (unless !waitForLock)
            while (true) {

                CountDownLatch existingLatch = lockedRows.putIfAbsent(key, rowLatch);
                if (existingLatch == null) {
                    break;
                } else {
                    try {
                        if (!existingLatch.await(30000,
                                TimeUnit.MILLISECONDS)) {
                            System.out.println("some thing wrong in waiting");
                            return null;
                        }
                    } catch (InterruptedException ie) {
                        // Empty
                    }
                }
            }
            return 1;
        }
    }

    private class DaodaoTestWatcher implements Runnable {

        @Override
        public void run() {
            long time = System.nanoTime();
            while (finishedCount.get() != 50) {

            }
            System.out.println("cost time:"   (System.nanoTime() - time) / 1000000000.0);
            System.out.println("cost time:  mvcc"   mvccTime.get() / 1000000000.0 / 50);
            System.out.println("cost time:  lock"   lockTime.get() / 1000000000.0 / 50);
            System.out.println("cost time:  update"   updateLockTime.get() / 1000000000.0 / 50);
            System.out.println("cost time:  rowlock"   rowlockTime.get() / 1000000000.0 / 50);
            System.out.println("cost time:  release"   releaseTime.get() / 1000000000.0 / 50);
        }
    }

    public void test() {
        ExecutorService executorService = Executors.newFixedThreadPool(200);
        for (int i = 0; i < 50; i  )
            executorService.execute(new HBaseInsertTask());
        executorService.execute(new DaodaoTestWatcher());


    }

    public static void main(String[] args) {
        new HBaseInsertTest1().test();
    }


}

耗时:

代码语言:javascript复制
cost time:24.727145
cost time: mvcc22.98698292
cost time: lock0.0
cost time: update0.009690879999999999
cost time: rowlock0.0
cost time: release0.05001874

去掉mvcc

代码语言:javascript复制
cost time:5.190751
cost time:  mvcc0.0073236
cost time:  lock0.0
cost time:  update0.017533220000000002
cost time:  rowlock0.0
cost time:  release1.3753079

0.96代码,在 updatesLock.readLock().lock(); 之后 增加:

代码语言:javascript复制
                     time = System.nanoTime();
                    // wait for all prior MVCC transactions to finish - while we hold the row lock
                    // (so that we are guaranteed to see the latest state)
                    mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
                    mvccTime.set(mvccTime.get()   (System.nanoTime() - time));

耗时:

代码语言:javascript复制
cost time:43.04134
cost time:  mvcc40.70520202
cost time:  lock0.0
cost time:  update0.00937416
cost time:  rowlock0.0
cost time:  release0.05023072

0.94中 increment  append  checkAndPut都使用了行锁和mvcc,但put调用的internalPut没有使用行锁,只使用了mvcc

优化方案:对于单版本服务,可以都更改为加行锁,去掉mvcc,写性能会获得进一步提升。

如果rowkey改为固定单个rowkey

0.94版本 耗时 (各个均为总耗时):

cost time:27.660935 cost time: mvcc3.888678 cost time: lock0.0 cost time: insert9.319777 cost time: update0.964697 cost time: rowlock0.0 cost time: release16.997803

但实际跑HBase插入时key变化耗时比不变key 快4倍,

跑standalone单机测试,两者速度基本相同。性能消耗应该在寻找region或网络传输,需要进一步验证。

总结:

  • region更新耗时主要集中在MVCC
  • 单版本的数据库,我认为可以去掉各种更新操作的MVCC,在修改操作中预先获取rowkey的写锁即可,避免全Region范围的MVCC
  • 从客户端到HBase的单rowkey 整体流程瓶颈 还需要进一步探索真实分布式环境下的状况。

----------------------------------------扩展----------------------------------

MySQL MVCC  by @曾经的阿飞(军伟)

MySQL5.6对与read-trasanction的优化,http://t.cn/zjnPhdq,将trx_list拆分成ro_trx_list和rw_trx_list,创建read-view只需对rw_trx_list进行snapshot,因此读事务不会影响到read-view的snapshot从而不会制约trx_sys_t::mutex,可以提高性能。@yangwm @慢半拍de刀刀 @启盼cobain @jolestar @蔚1984

mvcc 原理

1、与行级锁对应

行级锁 悲观锁

   R  W

R  y  n

W  n  n

MVCC

保存version

更新10 v,读取9 v

扩展知识:乐观锁

select -》  update  =》 再select看 是否有改动,如果有则rollback; 适用于冲突比较少的情况。

redis服务器端 是否也 实现了乐观锁。  ---- 待确认  单线程串行方式是否需要加锁?

2、 innodb mvcc

每行纪录有tx_id  rollback_point  两个字段去做控制,从而实现。

table  : row  c1 c2  tx_id  rollback_point 

rollback_point 指向上一个版本的纪录。

mysql 隔离级别  四种:read onCommit(读到没有提交的事务) 、read Committed(只能读到已提交的数据,从当前active transaction list中判断,从指针回溯)、 repeatable read(可重复读)、Serializable(串行化,所有语句加 select for update,后台加锁)

Read View 小于 active transaction 则正常读。  Read View有间隙 ,读到中间版本也时正确的。

 非Serializable 时,需要手动调用

@蔚1984 的 http://boneylw.sinaapp.com/?p=16  MVCC分析也可以对比阅读一下。

0 人点赞