实时同步Hbase WAL日志到kafka,笔者这边使用场景有以下两个:
- 解决多个流Join关联(超过三个流以上),对关联字段作为rowkey,实时写入到Hbase组装成一张宽表,解析WAL日志,并把rowkey实时推送到kafka,Flink再反向查询Hbase并进行实时统计分析
- 利用Hbase的列动态扩展能力,实时对数据进行预处理,组装宽表,解析WAL日志把rowkey实时推送到kafka,Flink再反向查询Hbase,并批量写入到clickhouse提供分钟级的数据OLAP分析加工处理
实现原理
Hbase提供了跨集群的数据同步方式Replication,可通过自定义Replication Endpoint,把消息写入kafka,先来了解Hbase Replication集群之间进行复制同步的过程,整体数据复制流程如下图:
- 在创建Peer集群Replication链路时,每一个Regionserver会创建一个ReplicationSource线程,ReplicationSource首先把当前正在写入的HLog都保存在复制队列中,然后再Regionserver上注册一个Listener,用来监听HLog Roll操作,如果Regionserver做了HLog Roll操作,那么ReplicationSource收到这个操作后,会把这个HLog分到对应的walGroup-Queue里面,同时把HLog文件名持久化到Zookeeper上,这样重启后还可以接着复制未完成的HLog
- 每个walGroup-Queue后端有一个ReplicationSourceWALReader的线程,不断的从Queue中取出一个Hlog,然后把HLog中的entry逐个读取出来,放到一个名为entryBatchQueue的队列中
- 每个entryBatchQueue的队列后端有一个ReplicationSourceShipper的线程,不断的从Queue中读取Log Entry,交给Peer的ReplicationEndpoint,ReplicationEndpoint把这些entry打包成一个replicationWALEntry操作,通过RPC发送到Peer集群的某个RegionServer上
- 对应Peer集群上的RegionServer把replicationWALEntry解析成若干个Batch操作,并调用batch接口执行。待RPC调用成功之后,ReplicationSourceShipper会更新最近一次成功复制的HLog Position到Zookeeper以便RegionServer重启后,下次能找到最新的Position开始复制
通过以上Hbase Replication的复制过程,可理解,可通过自定义ReplicationEndpoint把entry解析发送到kafka,即可实现实时解析WAL日志推送到消息系统。
Hbase默认RepliactionEndpoint实现
Hbase默认对应的RepliactionEndpoint实现是HBaseInterClusterReplicationEndpoint,其中封装replicationWALEntry通过RPC发送到Peer集群,对应方法replicateEntries,可参考该类自定义一个KafkaInterClusterReplicationEndpoint类,改写replicateEntries方法推送数据到kafka。
代码语言:javascript复制 protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOException {
try {
int entriesHashCode = System.identityHashCode(entries);
if (LOG.isTraceEnabled()) {
long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
}
LOG.info("---entries size----" entries.size());
ProduceUtils produceUtils = ProduceUtils.getInstance(brokerServers);
try {
entries.forEach(entry -> {
TableName table = entry.getKey().getTableName();
WALEdit edit = entry.getEdit();
long origLogSeqNum = entry.getKey().getOrigLogSeqNum();
long sequenceId = entry.getKey().getSequenceId();
long writeTime = entry.getKey().getWriteTime();
String nameSpace = Bytes.toString(table.getNamespace());
String tableName = Bytes.toString(table.getName());
String rowKey = "";
LOG.info("------------------");
LOG.info("----namespace---" nameSpace);
LOG.info("----tableName---" tableName);
LOG.info("----brokerServers---" brokerServers);
ArrayList<Cell> cells = edit.getCells();
if (cells != null && cells.size() > 0) {
rowKey = Bytes.toString(CellUtil.cloneRow(cells.get(0)));
}
List<String> rowKeys = new ArrayList();
cells.forEach(cell -> {
String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
if (!rowKeys.contains(rowkey)) {
rowKeys.add(rowkey);
}
});
HashMap<String, Object> map = new HashMap<>();
map.put("origLogSeqNum", origLogSeqNum);
map.put("sequenceId", sequenceId);
map.put("batchIndex", batchIndex);
map.put("nameSpace", nameSpace);
map.put("tableName", tableName);
map.put("rowKeys", rowKeys);
map.put("currentTime", System.currentTimeMillis());
map.put("writeTime", writeTime);
map.put("peerId", peerId);
LOG.info("----rowKey---" CollUtil.join(rowKeys, ","));
String jsonStr = JSONUtil.toJsonStr(map);
produceUtils.send(topicName, rowKey, jsonStr);
LOG.info("------------------");
});
if (LOG.isTraceEnabled()) {
LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
}
} catch (Exception e) {
if (LOG.isTraceEnabled()) {
LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
}
throw e;
} finally {
produceUtils.flush();
produceUtils.close();
}
} catch (Exception e) {
e.printStackTrace();
}
return batchIndex;
}
注意java客户端如批量写入Hbase,对应WAL日志是一条记录。
如何配置自定义的ReplicationEndpoint
- 创建Hbase表
create 'person', {NAME=>'info',REPLICATION_SCOPE => '1'}
注意REPLICATION_SCOPE属于设置为1,表示开启复制
- 创建复制链路
add_peer '111', ENDPOINT_CLASSNAME => 'cn.com.legend.hbase.replication.KafkaInterClusterReplicationEndpoint',CONFIG => { "brokerServers" => "192.168.111.129:9092,192.168.111.130:9092,192.168.111.131:9092", "topicName" => "test" },TABLE_CFS => { "person" => ["info"]}
注意ENDPOINT_CLASSNAME属性,修改成自定义的ReplicationEndpoint,CONFIG 属性可配置自定义的参数,可在自定义的ReplicationEndpoint类init方法中通过以下方式获取.
代码语言:javascript复制 @Override
public void init(Context context) throws IOException {
super.init(context);
ReplicationPeer replicationPeer = context.getReplicationPeer();
Configuration configuration = replicationPeer.getConfiguration();
this.brokerServers = configuration.get("brokerServers");
this.topicName = configuration.get("topicName");
peerId = replicationPeer.getId();
this.conf = HBaseConfiguration.create(ctx.getConfiguration());
- 设置串行复制
set_peer_serial '111',true
串行复制和费串行复制有啥区别,可自行查找资料.
- 删除复制链路
remove_peer '111'
附上涉及的完整类实现:
- KafkaReplicationEndpoint类
package cn.com.legend.hbase.replication;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
/**
* A {@link BaseReplicationEndpoint} for replication endpoints whose
* target cluster is an HBase cluster.
*/
public abstract class KafkaReplicationEndpoint extends BaseReplicationEndpoint
implements Abortable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaReplicationEndpoint.class);
@Override
public void start() {
startAsync();
}
@Override
public void stop() {
stopAsync();
}
@Override
protected void doStart() {
try {
notifyStarted();
} catch (Exception e) {
notifyFailed(e);
}
}
@Override
protected void doStop() {
notifyStopped();
}
@Override
// Synchronize peer cluster connection attempts to avoid races and rate
// limit connections when multiple replication sources try to connect to
// the peer cluster. If the peer cluster is down we can get out of control
// over time.
public synchronized UUID getPeerUUID() {
return UUID.randomUUID();
}
@Override
public void abort(String why, Throwable e) {
LOG.error("The HBaseReplicationEndpoint corresponding to peer " ctx.getPeerId()
" was aborted for the following reason(s):" why, e);
}
@Override
public boolean isAborted() {
// Currently this is never "Aborted", we just log when the abort method is called.
return false;
}
}
- KafkaInterClusterReplicationEndpoint类
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.com.legend.hbase.replication;
import cn.com.legend.hbase.kafka.ProduceUtils;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
* implementation for replicating to another HBase cluster.
* For the slave cluster it selects a random number of peers
* using a replication ratio. For example, if replication ration = 0.1
* and slave cluster has 100 region servers, 10 will be selected.
* <p>
* A stream is considered down when we cannot contact a region server on the
* peer cluster for more than 55 seconds by default.
* </p>
*/
@InterfaceAudience.Private
public class KafkaInterClusterReplicationEndpoint extends KafkaReplicationEndpoint {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaInterClusterReplicationEndpoint.class);
private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
private ClusterConnection conn;
private Configuration localConf;
private Configuration conf;
// How long should we sleep for each retry
private long sleepForRetries;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Socket timeouts require even bolder actions since we don't want to DDOS
private int socketTimeoutMultiplier;
// Amount of time for shutdown to wait for all tasks to complete
private long maxTerminationWait;
// Size limit for replication RPCs, in bytes
private int replicationRpcLimit;
//Metrics for this source
private MetricsSource metrics;
private String replicationClusterId = "";
private ThreadPoolExecutor exec;
private int maxThreads;
private Path baseNamespaceDir;
private Path hfileArchiveDir;
private boolean replicationBulkLoadDataEnabled;
private Abortable abortable;
private boolean dropOnDeletedTables;
private boolean isSerial = false;
//kafka 地址
private String brokerServers;
// topic名称
private String topicName;
// peerId
private String peerId;
@Override
public void init(Context context) throws IOException {
super.init(context);
ReplicationPeer replicationPeer = context.getReplicationPeer();
Configuration configuration = replicationPeer.getConfiguration();
this.brokerServers = configuration.get("brokerServers");
this.topicName = configuration.get("topicName");
peerId = replicationPeer.getId();
this.conf = HBaseConfiguration.create(ctx.getConfiguration());
this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
maxRetriesMultiplier);
// A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
// tasks to terminate when doStop() is called.
long maxTerminationWaitMultiplier = this.conf.getLong(
"replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
this.maxTerminationWait = maxTerminationWaitMultiplier *
this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkKafkaThread-%d").build());
this.abortable = ctx.getAbortable();
// Set the size limit for replication RPCs to 95% of the max request size.
// We could do with less slop if we have an accurate estimate of encoded size. Being
// conservative for now.
this.replicationRpcLimit = (int) (0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE,
RpcServer.DEFAULT_MAX_REQUEST_SIZE));
this.dropOnDeletedTables =
this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
this.replicationBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false);
if (this.replicationBulkLoadDataEnabled) {
replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
}
// Construct base namespace directory and hfile archive directory path
Path rootDir = FSUtils.getRootDir(conf);
Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
baseNamespaceDir = new Path(rootDir, baseNSDir);
hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
isSerial = context.getPeerConfig().isSerial();
}
/**
* Do the sleeping logic
*
* @param msg Why we sleep
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code>
*/
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("{} {}, sleeping {} times {}",
logPeerId(), msg, sleepForRetries, sleepMultiplier);
}
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
}
}
return sleepMultiplier < maxRetriesMultiplier;
}
private int getEstimatedEntrySize(Entry e) {
long size = e.getKey().estimatedSerializedSizeOf() e.getEdit().estimatedSerializedSizeOf();
return (int) size;
}
private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
int numSinks = 10;
int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 1), numSinks);
List<List<Entry>> entryLists =
Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
int[] sizes = new int[n];
for (Entry e : entries) {
int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n);
int entrySize = getEstimatedEntrySize(e);
// If this batch has at least one entry and is over sized, move it to the tail of list and
// initialize the entryLists[index] to be a empty list.
if (sizes[index] > 0 && sizes[index] entrySize > replicationRpcLimit) {
entryLists.add(entryLists.get(index));
entryLists.set(index, new ArrayList<>());
sizes[index] = 0;
}
entryLists.get(index).add(e);
sizes[index] = entrySize;
}
return entryLists;
}
private List<List<Entry>> createSerialBatches(final List<Entry> entries) {
Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (Entry e : entries) {
regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>())
.add(e);
}
return new ArrayList<>(regionEntries.values());
}
/**
* Divide the entries into multiple batches, so that we can replicate each batch in a thread pool
* concurrently. Note that, for serial replication, we need to make sure that entries from the
* same region to be replicated serially, so entries from the same region consist of a batch, and
* we will divide a batch into several batches by replicationRpcLimit in method
* serialReplicateRegionEntries()
*/
private List<List<Entry>> createBatches(final List<Entry> entries) {
if (isSerial) {
return createSerialBatches(entries);
} else {
return createParallelBatches(entries);
}
}
private TableName parseTable(String msg) {
// ... TableNotFoundException: '<table>'/n...
Pattern p = Pattern.compile("TableNotFoundException: '([\S]*)'");
Matcher m = p.matcher(msg);
if (m.find()) {
String table = m.group(1);
try {
// double check that table is a valid table name
TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table)));
return TableName.valueOf(table);
} catch (IllegalArgumentException ignore) {
}
}
return null;
}
// Filter a set of batches by TableName
private List<List<Entry>> filterBatches(final List<List<Entry>> oldEntryList, TableName table) {
return oldEntryList
.stream().map(entries -> entries.stream()
.filter(e -> !e.getKey().getTableName().equals(table)).collect(Collectors.toList()))
.collect(Collectors.toList());
}
private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
List<List<Entry>> batches) throws IOException {
int futures = 0;
for (int i = 0; i < batches.size(); i ) {
List<Entry> entries = batches.get(i);
if (!entries.isEmpty()) {
LOG.info("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
replicateContext.getSize());
if (LOG.isTraceEnabled()) {
LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
replicateContext.getSize());
}
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
pool.submit(createReplicator(entries, i));
futures ;
}
}
IOException iox = null;
long lastWriteTime = 0;
for (int i = 0; i < futures; i ) {
try {
// wait for all futures, remove successful parts
// (only the remaining parts will be retried)
Future<Integer> f = pool.take();
int index = f.get();
List<Entry> batch = batches.get(index);
batches.set(index, Collections.emptyList()); // remove successful batch
// Find the most recent write time in the batch
long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
if (writeTime > lastWriteTime) {
lastWriteTime = writeTime;
}
} catch (InterruptedException ie) {
iox = new IOException(ie);
} catch (ExecutionException ee) {
// cause must be an IOException
iox = (IOException) ee.getCause();
}
}
if (iox != null) {
// if we had any exceptions, try again
throw iox;
}
LOG.info("----lastWriteTime----" lastWriteTime);
return lastWriteTime;
}
/**
* Do the shipping logic
*/
@Override
public boolean replicate(ReplicateContext replicateContext) {
CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
String walGroupId = replicateContext.getWalGroupId();
int sleepMultiplier = 1;
List<List<Entry>> batches = createBatches(replicateContext.getEntries());
while (this.isRunning() && !exec.isShutdown()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier ;
}
continue;
}
try {
long lastWriteTime;
// replicate the batches to sink side.
lastWriteTime = parallelReplicate(pool, replicateContext, batches);
// update metrics
if (lastWriteTime > 0) {
this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
}
return true;
} catch (IOException ioe) {
// Didn't ship anything, but must still age the last time we did
this.metrics.refreshAgeOfLastShippedOp(walGroupId);
if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
sleepMultiplier ;
}
}
}
return false; // in case we exited before replicating
}
protected boolean isPeerEnabled() {
return ctx.getReplicationPeer().isPeerEnabled();
}
@Override
protected void doStop() {
if (this.conn != null) {
try {
this.conn.close();
this.conn = null;
} catch (IOException e) {
LOG.warn("{} Failed to close the connection", logPeerId());
}
}
// Allow currently running replication tasks to finish
exec.shutdown();
try {
exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
// Abort if the tasks did not terminate in time
if (!exec.isTerminated()) {
String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The "
"ThreadPoolExecutor failed to finish all tasks within " maxTerminationWait "ms. "
"Aborting to prevent Replication from deadlocking. See HBASE-16081.";
abortable.abort(errMsg, new IOException(errMsg));
}
notifyStopped();
}
@VisibleForTesting
protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOException {
try {
int entriesHashCode = System.identityHashCode(entries);
if (LOG.isTraceEnabled()) {
long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
}
LOG.info("---entries size----" entries.size());
ProduceUtils produceUtils = ProduceUtils.getInstance(brokerServers);
try {
entries.forEach(entry -> {
TableName table = entry.getKey().getTableName();
WALEdit edit = entry.getEdit();
long origLogSeqNum = entry.getKey().getOrigLogSeqNum();
long sequenceId = entry.getKey().getSequenceId();
long writeTime = entry.getKey().getWriteTime();
String nameSpace = Bytes.toString(table.getNamespace());
String tableName = Bytes.toString(table.getName());
String rowKey = "";
LOG.info("------------------");
LOG.info("----namespace---" nameSpace);
LOG.info("----tableName---" tableName);
LOG.info("----brokerServers---" brokerServers);
ArrayList<Cell> cells = edit.getCells();
if (cells != null && cells.size() > 0) {
rowKey = Bytes.toString(CellUtil.cloneRow(cells.get(0)));
}
List<String> rowKeys = new ArrayList();
cells.forEach(cell -> {
String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
if (!rowKeys.contains(rowkey)) {
rowKeys.add(rowkey);
}
});
HashMap<String, Object> map = new HashMap<>();
map.put("origLogSeqNum", origLogSeqNum);
map.put("sequenceId", sequenceId);
map.put("batchIndex", batchIndex);
map.put("nameSpace", nameSpace);
map.put("tableName", tableName);
map.put("rowKeys", rowKeys);
map.put("currentTime", System.currentTimeMillis());
map.put("writeTime", writeTime);
map.put("peerId", peerId);
LOG.info("----rowKey---" CollUtil.join(rowKeys, ","));
String jsonStr = JSONUtil.toJsonStr(map);
produceUtils.send(topicName, rowKey, jsonStr);
LOG.info("------------------");
});
if (LOG.isTraceEnabled()) {
LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
}
} catch (Exception e) {
if (LOG.isTraceEnabled()) {
LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
}
throw e;
} finally {
produceUtils.flush();
produceUtils.close();
}
} catch (Exception e) {
e.printStackTrace();
}
return batchIndex;
}
private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex)
throws IOException {
int batchSize = 0, index = 0;
List<Entry> batch = new ArrayList<>();
for (Entry entry : entries) {
int entrySize = getEstimatedEntrySize(entry);
if (batchSize > 0 && batchSize entrySize > replicationRpcLimit) {
replicateEntries(batch, index );
batch.clear();
batchSize = 0;
}
batch.add(entry);
batchSize = entrySize;
}
if (batchSize > 0) {
replicateEntries(batch, index);
}
return batchIndex;
}
@VisibleForTesting
protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex) {
return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex)
: () -> replicateEntries(entries, batchIndex);
}
private String logPeerId() {
return "[Source for peer " this.ctx.getPeerId() "]:";
}
}