toc
1. 基本介绍
BufferedMutator主要用来异步批量的将数据写入一个hbase表,就像Htable一样。通过Connection获取一个实例。
Map/reduce 任务是BufferedMutator的好的使用案例。Map/Reduce任务获益于batch操作,但是没有留出flush接口。BufferedMutator从Map/Reduce任务接受数据,会依据一些先验性的经验批量提交数据,比如puts堆积的数量,由于批量提交时异步的,所以M/R逻辑不会因为数据的batch提交而阻塞。Map/Reduce 批处理任务每个线程会有一个BufferedMutator。单个BufferedMutator也能够很高效用于大数据量的在线系统,来成批的写puts入hbase表。
2. BufferedMutator使用举例
这里分为以下两个批量写入场景
2.1 单次一张表批量写入
代码语言:txt复制Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "zookeeperHost");
final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
for (int i = 0; i < e.getNumExceptions(); i ) {
LOG.info("Failed to sent put " e.getRow(i) "."); }
}
};
BufferedMutatorParams params = new BufferedMutatorParams(TABLE)
.listener(listener);
params.writeBufferSize(123123L);
try {
Connection conn = ConnectionFactory.createConnection(conf);
BufferedMutator mutator = conn.getBufferedMutator(params);
Put p = new Put(Bytes.toBytes("someRow"));
p.addColumn(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("some value"));
mutator.mutate(p);
mutator.close();
conn.close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}多次多张表批量写入
可以使用一个Map保存多个Table的连接,这里使用的是线程安全的ConcurrentHashMap,如果是单线程的场景可以换成HashMap以提高效率。
private static Map<String, BufferedMutator> tableConnectionMgr = new ConcurrentHashMap<>();
private BufferedMutator getTableConnection(String tableName) throws IOException {
if (tableConnectionMgr.get(tableName) != null) {
return tableConnectionMgr.get(tableName);
}
Connection connection = ConnectionFactory.createConnection(config);
BufferedMutator table = connection.getBufferedMutator(TableName.valueOf(tableName));
tableConnectionMgr.put(tableName, table);
log.info("hbase table: {} connect established!", tableName);
return tableConnectionMgr.get(tableName);
}3 源码介绍
3.1 主要类介绍
BufferedMutatorParams
实例化一个BufferedMutator所需要的参数。
主要参数TableName(表名),writeBufferSize(写缓存大小),maxKeyValueSize(最大key-value大小),ExecutorService(执行线程池),ExceptionListener(监听BufferedMutator的异常)。
BufferedMutatorImpl
用来和hbase表交互,类似于Htable,但是意味着批量,异步的puts。通过HConnectionImplementation获得实例,具体方法如下:
代码语言:txt复制public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
if (params.getTableName() == null) {
throw new IllegalArgumentException("TableName cannot be null.");
}
if (params.getPool() == null) {
params.pool(HTable.getDefaultExecutor(getConfiguration()));
}
if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
params.writeBufferSize(connectionConfig.getWriteBufferSize());
}
if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
}
return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
}AsyncProcess
AsyncProcess内部维护的有一个线程池,我们的操作会被封装成runnable,然后扔到线程池里执行。这个过程是异步的,直到任务数达到最大值。
HConnectionImplementation
一个集群的链接。通过它可以找到master,定位到regions的分布,保持locations的缓存,并指导如何校准localtions信息。
3.2 源码过程
3.2.1 BufferedMutator构建的过程
- 首先是要构建一个HBaseConfiguration
Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "zookeeperHost");- 接着是构建BufferedMutatorParams
```java代码语言:txt复制final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {代码语言:txt复制 @Override代码语言:txt复制 public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {代码语言:txt复制 for (int i = 0; i < e.getNumExceptions(); i ) {代码语言:txt复制 LOG.info("Failed to sent put " e.getRow(i) ".");代码语言:txt复制 }代码语言:txt复制 }代码语言:txt复制};代码语言:txt复制BufferedMutatorParams params = new BufferedMutatorParams(TABLE)代码语言:txt复制 .listener(listener);代码语言:txt复制params.writeBufferSize(123);代码语言:txt复制```- 最后构建HConnection
Connection conn = ConnectionFactory.*createConnection*(getConf())- 最后构建BufferMutator
BufferedMutator mutator = conn.getBufferedMutator(params)3.2.2 数据发送的过程
- 构建put或者Listput
- 调用BufferedMutator.mutate方法
- 刷写到hbase
> 刷写到hbase三种方法:
>代码语言:txt复制> 一,显式调用BufferedMutator.flush
>代码语言:txt复制> 二,发送结束的时候调用BufferedMutator.close
>代码语言:txt复制> 三,它根据当前缓存大于了设置的写缓存大小代码语言:txt复制while (undealtMutationCount.get() != 0 && currentWriteBufferSize.get() > writeBufferSize) { backgroundFlushCommits(false); }代码语言:txt复制最终都是调用的backgroundFlushCommits方法。- rpc的过程
入口是backgroundFlushCommits方法。Ap是AsyncProcess的实例。代码语言:txt复制ap.submit(tableName, taker, true, null, false);代码语言:txt复制首先是构建了一个HashMap,可以通过server找到该server上我们需要的region代码语言:txt复制```java代码语言:txt复制//可以根据我们的server找到要发送到该server的actions代码语言:txt复制Map<ServerName, MultiAction<Row>> actionsByServer =代码语言:txt复制 new HashMap<ServerName, MultiAction<Row>>();代码语言:txt复制```代码语言:txt复制获取所有的region信息,所有region的副本都被包括在内代码语言:txt复制```java代码语言:txt复制RegionLocations locs = connection.locateRegion(代码语言:txt复制 tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);代码语言:txt复制```代码语言:txt复制获取默认的region信息此时一个region只会返回一个默认id指定的位置。代码语言:txt复制```java代码语言:txt复制loc = locs.getDefaultRegionLocation();代码语言:txt复制```代码语言:txt复制将row操作转变为action,并加入actionsByServer 代码语言:txt复制```java代码语言:txt复制//可以操作将row操作变为Action代码语言:txt复制Action<Row> action = new Action<Row>(r, posInList);代码语言:txt复制setNonce(ng, r, action);代码语言:txt复制retainedActions.add(action);代码语言:txt复制// TODO: replica-get is not supported on this path代码语言:txt复制byte[] regionName = loc.getRegionInfo().getRegionName();代码语言:txt复制addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);代码语言:txt复制it.remove();代码语言:txt复制```代码语言:txt复制接着是代码语言:txt复制AsyncProcess.submitMultiActions代码语言:txt复制AsyncRequestFutureImpl<CResult>代码语言:txt复制.sendMultiAction(actionsByServer, 1, null, false);代码语言:txt复制内部主要是根据server,获取MultiAction,然后构建Runnable代码语言:txt复制```java代码语言:txt复制for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {代码语言:txt复制 ServerName server = e.getKey();代码语言:txt复制 MultiAction<Row> multiAction = e.getValue();代码语言:txt复制 Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,代码语言:txt复制 numAttempt);代码语言:txt复制 // make sure we correctly count the number of runnables before we try to reuse the send代码语言:txt复制 // thread, in case we had to split the request into different runnables because of backoff代码语言:txt复制 if (runnables.size() > actionsRemaining) {代码语言:txt复制 actionsRemaining = runnables.size();代码语言:txt复制 }代码语言:txt复制```代码语言:txt复制然后,遍历执行Runnable代码语言:txt复制```java代码语言:txt复制for (Runnable runnable : runnables) {代码语言:txt复制 if ((--actionsRemaining == 0) && reuseThread代码语言:txt复制 && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0) {代码语言:txt复制 runnable.run();代码语言:txt复制 } else {代码语言:txt复制 try {代码语言:txt复制 pool.submit(runnable);代码语言:txt复制```- Runnable的构建及Run方法
主要是进入getNewMultiActionRunnable代码语言:txt复制```java代码语言:txt复制List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());代码语言:txt复制for (DelayingRunner runner : actions.values()) {代码语言:txt复制 incTaskCounters(runner.getActions().getRegions(), server);代码语言:txt复制 String traceText = "AsyncProcess.sendMultiAction";代码语言:txt复制 Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);代码语言:txt复制 // use a delay runner only if we need to sleep for some time代码语言:txt复制 if (runner.getSleepTime() > 0) {代码语言:txt复制 runner.setRunner(runnable);代码语言:txt复制 traceText = "AsyncProcess.clientBackoff.sendMultiAction";代码语言:txt复制 runnable = runner;代码语言:txt复制 if (connection.getConnectionMetrics() != null) {代码语言:txt复制 connection.getConnectionMetrics().incrDelayRunners();代码语言:txt复制 connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());代码语言:txt复制 }代码语言:txt复制 } else {代码语言:txt复制 if (connection.getConnectionMetrics() != null) {代码语言:txt复制 connection.getConnectionMetrics().incrNormalRunners();代码语言:txt复制 }代码语言:txt复制 }代码语言:txt复制 runnable = Trace.wrap(traceText, runnable);代码语言:txt复制 toReturn.add(runnable);代码语言:txt复制```代码语言:txt复制进入SingleServerRequestRunnable,分析其Run方法代码语言:txt复制```java代码语言:txt复制// setup the callable based on the actions, if we don't have one already from the request代码语言:txt复制if (callable == null) {代码语言:txt复制 callable = createCallable(server, tableName, multiAction);代码语言:txt复制}代码语言:txt复制RpcRetryingCaller<MultiResponse> caller = createCaller(callable, rpcTimeout);代码语言:txt复制try {代码语言:txt复制 if (callsInProgress != null) {代码语言:txt复制 callsInProgress.add(callable);代码语言:txt复制 }代码语言:txt复制 res = caller.callWithoutRetries(callable, operationTimeout);代码语言:txt复制```代码语言:txt复制然后是RpcRetryingCaller中调用了MultiServerCallable的call方法,主要是构建请求,调用RPC。这就进入了服务端也即RSRpcServices的mutil方法。代码语言:txt复制```java代码语言:txt复制responseProto = getStub().multi(controller, requestProto);代码语言:txt复制```3.2.3 HRegionserver端处理
RSRpcServices是服务端,本文对应的服务端实现是RSRpcServices.mutli。
代码语言:txt复制if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
byte[] family = condition.getFamily().toByteArray();
byte[] qualifier = condition.getQualifier().toByteArray();
CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
ByteArrayComparable comparator =
ProtobufUtil.toComparator(condition.getComparator());
processed = checkAndRowMutate(region, regionAction.getActionList(),
cellScanner, row, family, qualifier, compareOp,
comparator, regionActionResultBuilder);
} else {
mutateRows(region, regionAction.getActionList(), cellScanner,
regionActionResultBuilder);
processed = Boolean.TRUE;
}根据条件进入checkAndRowMutate或者mutateRows。
根据类型做不同的操作,然后正式进入执行操作
代码语言:txt复制MutationType type = action.getMutation().getMutateType();
if (rm == null) {
rm = new RowMutations(action.getMutation().getRow().toByteArray());
}
switch (type) {
case PUT:
rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
break;
case DELETE:
rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
break;
default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " type.name());
}
// To unify the response format with doNonAtomicRegionMutation and read through client's
// AsyncProcess we have to add an empty result instance per operation
resultOrExceptionOrBuilder.clear();
resultOrExceptionOrBuilder.setIndex(i );
builder.addResultOrException(
resultOrExceptionOrBuilder.build());
}
region.mutateRow(rm);HRegion.mutateRow方法
HRegion.mutateRowsWithLocks
代码语言:txt复制public void mutateRowsWithLocks(Collection<Mutation> mutations,
Collection<byte[]> rowsToLock) throws IOException {
mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
public void mutateRowsWithLocks(Collection<Mutation> mutations,
Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
processRowsWithLocks(proc, -1, nonceGroup, nonce);
}具体处理的过程,可以自行去看了,源码注释条例很清晰。
4. 总结
Hbase的JAVA API客户端,写操作有三种实现:
- HTablePool
源码请看hbase权威指南。代码语言:txt复制- HConnection
这种方式要自己实现一个线程池。代码语言:txt复制```java代码语言:txt复制Connection conn = ConnectionFactory.createConnection(conf);代码语言:txt复制TableName tabName= TableName.valueOf("tableName");代码语言:txt复制Table table=conn.getTable(tabName);代码语言:txt复制```- BufferedMutator
建议put操作采用这种方式。代码语言:txt复制批量,异步puts操作。5. Ref
- https://cloud.tencent.com/developer/article/1032502
- hbase权威指南


