Hbase WAL日志数据实时增量推送至Kafka

2022-04-27 09:17:09 浏览数 (1)

实时同步Hbase WAL日志到kafka,笔者这边使用场景有以下两个:

  • 解决多个流Join关联(超过三个流以上),对关联字段作为rowkey,实时写入到Hbase组装成一张宽表,解析WAL日志,并把rowkey实时推送到kafka,Flink再反向查询Hbase并进行实时统计分析
  • 利用Hbase的列动态扩展能力,实时对数据进行预处理,组装宽表,解析WAL日志把rowkey实时推送到kafka,Flink再反向查询Hbase,并批量写入到clickhouse提供分钟级的数据OLAP分析加工处理

实现原理

Hbase提供了跨集群的数据同步方式Replication,可通过自定义Replication Endpoint,把消息写入kafka,先来了解Hbase Replication集群之间进行复制同步的过程,整体数据复制流程如下图:

  • 在创建Peer集群Replication链路时,每一个Regionserver会创建一个ReplicationSource线程,ReplicationSource首先把当前正在写入的HLog都保存在复制队列中,然后再Regionserver上注册一个Listener,用来监听HLog Roll操作,如果Regionserver做了HLog Roll操作,那么ReplicationSource收到这个操作后,会把这个HLog分到对应的walGroup-Queue里面,同时把HLog文件名持久化到Zookeeper上,这样重启后还可以接着复制未完成的HLog
  • 每个walGroup-Queue后端有一个ReplicationSourceWALReader的线程,不断的从Queue中取出一个Hlog,然后把HLog中的entry逐个读取出来,放到一个名为entryBatchQueue的队列中
  • 每个entryBatchQueue的队列后端有一个ReplicationSourceShipper的线程,不断的从Queue中读取Log Entry,交给Peer的ReplicationEndpoint,ReplicationEndpoint把这些entry打包成一个replicationWALEntry操作,通过RPC发送到Peer集群的某个RegionServer上
  • 对应Peer集群上的RegionServer把replicationWALEntry解析成若干个Batch操作,并调用batch接口执行。待RPC调用成功之后,ReplicationSourceShipper会更新最近一次成功复制的HLog Position到Zookeeper以便RegionServer重启后,下次能找到最新的Position开始复制

通过以上Hbase Replication的复制过程,可理解,可通过自定义ReplicationEndpoint把entry解析发送到kafka,即可实现实时解析WAL日志推送到消息系统。

Hbase默认RepliactionEndpoint实现

Hbase默认对应的RepliactionEndpoint实现是HBaseInterClusterReplicationEndpoint,其中封装replicationWALEntry通过RPC发送到Peer集群,对应方法replicateEntries,可参考该类自定义一个KafkaInterClusterReplicationEndpoint类,改写replicateEntries方法推送数据到kafka。

代码语言:javascript复制
    protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOException {
        try {
            int entriesHashCode = System.identityHashCode(entries);
            if (LOG.isTraceEnabled()) {
                long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
                LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
                        logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
            }

            LOG.info("---entries size----"   entries.size());
            ProduceUtils produceUtils = ProduceUtils.getInstance(brokerServers);
            try {
                entries.forEach(entry -> {
                    TableName table = entry.getKey().getTableName();
                    WALEdit edit = entry.getEdit();

                    long origLogSeqNum = entry.getKey().getOrigLogSeqNum();
                    long sequenceId = entry.getKey().getSequenceId();

                    long writeTime = entry.getKey().getWriteTime();
                    String nameSpace = Bytes.toString(table.getNamespace());
                    String tableName = Bytes.toString(table.getName());
                    String rowKey = "";

                    LOG.info("------------------");
                    LOG.info("----namespace---"   nameSpace);
                    LOG.info("----tableName---"   tableName);
                    LOG.info("----brokerServers---"   brokerServers);

                    ArrayList<Cell> cells = edit.getCells();
                    if (cells != null && cells.size() > 0) {
                        rowKey = Bytes.toString(CellUtil.cloneRow(cells.get(0)));
                    }

                    List<String> rowKeys = new ArrayList();
                    cells.forEach(cell -> {
                        String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
                        if (!rowKeys.contains(rowkey)) {
                            rowKeys.add(rowkey);
                        }
                    });


                    HashMap<String, Object> map = new HashMap<>();
                    map.put("origLogSeqNum", origLogSeqNum);
                    map.put("sequenceId", sequenceId);
                    map.put("batchIndex", batchIndex);
                    map.put("nameSpace", nameSpace);
                    map.put("tableName", tableName);
                    map.put("rowKeys", rowKeys);
                    map.put("currentTime", System.currentTimeMillis());
                    map.put("writeTime", writeTime);
                    map.put("peerId", peerId);
                    LOG.info("----rowKey---"   CollUtil.join(rowKeys, ","));
                    String jsonStr = JSONUtil.toJsonStr(map);
                    produceUtils.send(topicName, rowKey, jsonStr);
                    LOG.info("------------------");
                });

                if (LOG.isTraceEnabled()) {
                    LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
                }
            } catch (Exception e) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
                }
                throw e;
            } finally {
                produceUtils.flush();
                produceUtils.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return batchIndex;
    }

注意java客户端如批量写入Hbase,对应WAL日志是一条记录。

如何配置自定义的ReplicationEndpoint

  • 创建Hbase表
代码语言:javascript复制
create 'person', {NAME=>'info',REPLICATION_SCOPE => '1'}

注意REPLICATION_SCOPE属于设置为1,表示开启复制

  • 创建复制链路
代码语言:javascript复制
add_peer '111', ENDPOINT_CLASSNAME => 'cn.com.legend.hbase.replication.KafkaInterClusterReplicationEndpoint',CONFIG => { "brokerServers" => "192.168.111.129:9092,192.168.111.130:9092,192.168.111.131:9092", "topicName" => "test" },TABLE_CFS => { "person" => ["info"]}

注意ENDPOINT_CLASSNAME属性,修改成自定义的ReplicationEndpoint,CONFIG 属性可配置自定义的参数,可在自定义的ReplicationEndpoint类init方法中通过以下方式获取.

代码语言:javascript复制
 @Override
    public void init(Context context) throws IOException {
        super.init(context);
        ReplicationPeer replicationPeer = context.getReplicationPeer();
        Configuration configuration = replicationPeer.getConfiguration();
        this.brokerServers = configuration.get("brokerServers");
        this.topicName = configuration.get("topicName");
        peerId = replicationPeer.getId();

        this.conf = HBaseConfiguration.create(ctx.getConfiguration());
  • 设置串行复制
代码语言:javascript复制
set_peer_serial '111',true 

串行复制和费串行复制有啥区别,可自行查找资料.

  • 删除复制链路
代码语言:javascript复制
remove_peer '111'

附上涉及的完整类实现:

  • KafkaReplicationEndpoint类
代码语言:javascript复制
package cn.com.legend.hbase.replication;

import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.UUID;

/**
 * A {@link BaseReplicationEndpoint} for replication endpoints whose
 * target cluster is an HBase cluster.
 */
public abstract class KafkaReplicationEndpoint extends BaseReplicationEndpoint
        implements Abortable {

    private static final Logger LOG = LoggerFactory.getLogger(KafkaReplicationEndpoint.class);


    @Override
    public void start() {
        startAsync();
    }

    @Override
    public void stop() {
        stopAsync();
    }

    @Override
    protected void doStart() {
        try {
            notifyStarted();
        } catch (Exception e) {
            notifyFailed(e);
        }
    }

    @Override
    protected void doStop() {
        notifyStopped();
    }

    @Override
    // Synchronize peer cluster connection attempts to avoid races and rate
    // limit connections when multiple replication sources try to connect to
    // the peer cluster. If the peer cluster is down we can get out of control
    // over time.
    public synchronized UUID getPeerUUID() {
        return UUID.randomUUID();
    }


    @Override
    public void abort(String why, Throwable e) {
        LOG.error("The HBaseReplicationEndpoint corresponding to peer "   ctx.getPeerId()
                  " was aborted for the following reason(s):"   why, e);
    }

    @Override
    public boolean isAborted() {
        // Currently this is never "Aborted", we just log when the abort method is called.
        return false;
    }

}

  • KafkaInterClusterReplicationEndpoint类
代码语言:javascript复制
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package cn.com.legend.hbase.replication;

import cn.com.legend.hbase.kafka.ProduceUtils;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
 * implementation for replicating to another HBase cluster.
 * For the slave cluster it selects a random number of peers
 * using a replication ratio. For example, if replication ration = 0.1
 * and slave cluster has 100 region servers, 10 will be selected.
 * <p>
 * A stream is considered down when we cannot contact a region server on the
 * peer cluster for more than 55 seconds by default.
 * </p>
 */
@InterfaceAudience.Private
public class KafkaInterClusterReplicationEndpoint extends KafkaReplicationEndpoint {
    private static final Logger LOG =
            LoggerFactory.getLogger(KafkaInterClusterReplicationEndpoint.class);

    private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;

    private ClusterConnection conn;
    private Configuration localConf;
    private Configuration conf;
    // How long should we sleep for each retry
    private long sleepForRetries;
    // Maximum number of retries before taking bold actions
    private int maxRetriesMultiplier;
    // Socket timeouts require even bolder actions since we don't want to DDOS
    private int socketTimeoutMultiplier;
    // Amount of time for shutdown to wait for all tasks to complete
    private long maxTerminationWait;
    // Size limit for replication RPCs, in bytes
    private int replicationRpcLimit;
    //Metrics for this source
    private MetricsSource metrics;
    private String replicationClusterId = "";
    private ThreadPoolExecutor exec;
    private int maxThreads;
    private Path baseNamespaceDir;
    private Path hfileArchiveDir;
    private boolean replicationBulkLoadDataEnabled;
    private Abortable abortable;
    private boolean dropOnDeletedTables;
    private boolean isSerial = false;
    //kafka 地址
    private String brokerServers;
    // topic名称
    private String topicName;
    // peerId
    private String peerId;

    @Override
    public void init(Context context) throws IOException {
        super.init(context);
        ReplicationPeer replicationPeer = context.getReplicationPeer();
        Configuration configuration = replicationPeer.getConfiguration();
        this.brokerServers = configuration.get("brokerServers");
        this.topicName = configuration.get("topicName");
        peerId = replicationPeer.getId();

        this.conf = HBaseConfiguration.create(ctx.getConfiguration());
        this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
                maxRetriesMultiplier);
        // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
        // tasks to terminate when doStop() is called.
        long maxTerminationWaitMultiplier = this.conf.getLong(
                "replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
        this.maxTerminationWait = maxTerminationWaitMultiplier *
                this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
        // TODO: This connection is replication specific or we should make it particular to
        // replication and make replication specific settings such as compression or codec to use
        // passing Cells.
        this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
        this.sleepForRetries =
                this.conf.getLong("replication.source.sleepforretries", 1000);
        this.metrics = context.getMetrics();
        // per sink thread pool
        this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
        this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
                new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkKafkaThread-%d").build());
        this.abortable = ctx.getAbortable();
        // Set the size limit for replication RPCs to 95% of the max request size.
        // We could do with less slop if we have an accurate estimate of encoded size. Being
        // conservative for now.
        this.replicationRpcLimit = (int) (0.95 * conf.getLong(RpcServer.MAX_REQUEST_SIZE,
                RpcServer.DEFAULT_MAX_REQUEST_SIZE));
        this.dropOnDeletedTables =
                this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);

        this.replicationBulkLoadDataEnabled =
                conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false);
        if (this.replicationBulkLoadDataEnabled) {
            replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
        }
        // Construct base namespace directory and hfile archive directory path
        Path rootDir = FSUtils.getRootDir(conf);
        Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
        baseNamespaceDir = new Path(rootDir, baseNSDir);
        hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
        isSerial = context.getPeerConfig().isSerial();
    }


    /**
     * Do the sleeping logic
     *
     * @param msg             Why we sleep
     * @param sleepMultiplier by how many times the default sleeping time is augmented
     * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
     */
    protected boolean sleepForRetries(String msg, int sleepMultiplier) {
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace("{} {}, sleeping {} times {}",
                        logPeerId(), msg, sleepForRetries, sleepMultiplier);
            }
            Thread.sleep(this.sleepForRetries * sleepMultiplier);
        } catch (InterruptedException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} Interrupted while sleeping between retries", logPeerId());
            }
        }
        return sleepMultiplier < maxRetriesMultiplier;
    }

    private int getEstimatedEntrySize(Entry e) {
        long size = e.getKey().estimatedSerializedSizeOf()   e.getEdit().estimatedSerializedSizeOf();
        return (int) size;
    }

    private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
        int numSinks = 10;
        int n = Math.min(Math.min(this.maxThreads, entries.size() / 100   1), numSinks);
        List<List<Entry>> entryLists =
                Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
        int[] sizes = new int[n];
        for (Entry e : entries) {
            int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n);
            int entrySize = getEstimatedEntrySize(e);
            // If this batch has at least one entry and is over sized, move it to the tail of list and
            // initialize the entryLists[index] to be a empty list.
            if (sizes[index] > 0 && sizes[index]   entrySize > replicationRpcLimit) {
                entryLists.add(entryLists.get(index));
                entryLists.set(index, new ArrayList<>());
                sizes[index] = 0;
            }
            entryLists.get(index).add(e);
            sizes[index]  = entrySize;
        }
        return entryLists;
    }

    private List<List<Entry>> createSerialBatches(final List<Entry> entries) {
        Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR);
        for (Entry e : entries) {
            regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>())
                    .add(e);
        }
        return new ArrayList<>(regionEntries.values());
    }

    /**
     * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool
     * concurrently. Note that, for serial replication, we need to make sure that entries from the
     * same region to be replicated serially, so entries from the same region consist of a batch, and
     * we will divide a batch into several batches by replicationRpcLimit in method
     * serialReplicateRegionEntries()
     */
    private List<List<Entry>> createBatches(final List<Entry> entries) {
        if (isSerial) {
            return createSerialBatches(entries);
        } else {
            return createParallelBatches(entries);
        }
    }

    private TableName parseTable(String msg) {
        // ... TableNotFoundException: '<table>'/n...
        Pattern p = Pattern.compile("TableNotFoundException: '([\S]*)'");
        Matcher m = p.matcher(msg);
        if (m.find()) {
            String table = m.group(1);
            try {
                // double check that table is a valid table name
                TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table)));
                return TableName.valueOf(table);
            } catch (IllegalArgumentException ignore) {
            }
        }
        return null;
    }

    // Filter a set of batches by TableName
    private List<List<Entry>> filterBatches(final List<List<Entry>> oldEntryList, TableName table) {
        return oldEntryList
                .stream().map(entries -> entries.stream()
                        .filter(e -> !e.getKey().getTableName().equals(table)).collect(Collectors.toList()))
                .collect(Collectors.toList());
    }


    private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
                                   List<List<Entry>> batches) throws IOException {
        int futures = 0;
        for (int i = 0; i < batches.size(); i  ) {
            List<Entry> entries = batches.get(i);
            if (!entries.isEmpty()) {

                LOG.info("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
                        replicateContext.getSize());

                if (LOG.isTraceEnabled()) {
                    LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
                            replicateContext.getSize());
                }
                // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
                pool.submit(createReplicator(entries, i));
                futures  ;
            }
        }

        IOException iox = null;
        long lastWriteTime = 0;
        for (int i = 0; i < futures; i  ) {
            try {
                // wait for all futures, remove successful parts
                // (only the remaining parts will be retried)
                Future<Integer> f = pool.take();
                int index = f.get();
                List<Entry> batch = batches.get(index);
                batches.set(index, Collections.emptyList()); // remove successful batch
                // Find the most recent write time in the batch
                long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
                if (writeTime > lastWriteTime) {
                    lastWriteTime = writeTime;
                }
            } catch (InterruptedException ie) {
                iox = new IOException(ie);
            } catch (ExecutionException ee) {
                // cause must be an IOException
                iox = (IOException) ee.getCause();
            }
        }
        if (iox != null) {
            // if we had any exceptions, try again
            throw iox;
        }


        LOG.info("----lastWriteTime----"   lastWriteTime);
        return lastWriteTime;
    }

    /**
     * Do the shipping logic
     */
    @Override
    public boolean replicate(ReplicateContext replicateContext) {
        CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
        String walGroupId = replicateContext.getWalGroupId();
        int sleepMultiplier = 1;

        List<List<Entry>> batches = createBatches(replicateContext.getEntries());
        while (this.isRunning() && !exec.isShutdown()) {
            if (!isPeerEnabled()) {
                if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
                    sleepMultiplier  ;
                }
                continue;
            }

            try {
                long lastWriteTime;

                // replicate the batches to sink side.
                lastWriteTime = parallelReplicate(pool, replicateContext, batches);

                // update metrics
                if (lastWriteTime > 0) {
                    this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
                }
                return true;
            } catch (IOException ioe) {
                // Didn't ship anything, but must still age the last time we did
                this.metrics.refreshAgeOfLastShippedOp(walGroupId);
                if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
                    sleepMultiplier  ;
                }
            }
        }
        return false; // in case we exited before replicating
    }

    protected boolean isPeerEnabled() {
        return ctx.getReplicationPeer().isPeerEnabled();
    }

    @Override
    protected void doStop() {
        if (this.conn != null) {
            try {
                this.conn.close();
                this.conn = null;
            } catch (IOException e) {
                LOG.warn("{} Failed to close the connection", logPeerId());
            }
        }
        // Allow currently running replication tasks to finish
        exec.shutdown();
        try {
            exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
        // Abort if the tasks did not terminate in time
        if (!exec.isTerminated()) {
            String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The "  
                    "ThreadPoolExecutor failed to finish all tasks within "   maxTerminationWait   "ms. "  
                    "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
            abortable.abort(errMsg, new IOException(errMsg));
        }
        notifyStopped();
    }

    @VisibleForTesting
    protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOException {
        try {
            int entriesHashCode = System.identityHashCode(entries);
            if (LOG.isTraceEnabled()) {
                long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
                LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
                        logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
            }

            LOG.info("---entries size----"   entries.size());
            ProduceUtils produceUtils = ProduceUtils.getInstance(brokerServers);
            try {
                entries.forEach(entry -> {
                    TableName table = entry.getKey().getTableName();
                    WALEdit edit = entry.getEdit();

                    long origLogSeqNum = entry.getKey().getOrigLogSeqNum();
                    long sequenceId = entry.getKey().getSequenceId();

                    long writeTime = entry.getKey().getWriteTime();
                    String nameSpace = Bytes.toString(table.getNamespace());
                    String tableName = Bytes.toString(table.getName());
                    String rowKey = "";

                    LOG.info("------------------");
                    LOG.info("----namespace---"   nameSpace);
                    LOG.info("----tableName---"   tableName);
                    LOG.info("----brokerServers---"   brokerServers);

                    ArrayList<Cell> cells = edit.getCells();
                    if (cells != null && cells.size() > 0) {
                        rowKey = Bytes.toString(CellUtil.cloneRow(cells.get(0)));
                    }

                    List<String> rowKeys = new ArrayList();
                    cells.forEach(cell -> {
                        String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
                        if (!rowKeys.contains(rowkey)) {
                            rowKeys.add(rowkey);
                        }
                    });


                    HashMap<String, Object> map = new HashMap<>();
                    map.put("origLogSeqNum", origLogSeqNum);
                    map.put("sequenceId", sequenceId);
                    map.put("batchIndex", batchIndex);
                    map.put("nameSpace", nameSpace);
                    map.put("tableName", tableName);
                    map.put("rowKeys", rowKeys);
                    map.put("currentTime", System.currentTimeMillis());
                    map.put("writeTime", writeTime);
                    map.put("peerId", peerId);
                    LOG.info("----rowKey---"   CollUtil.join(rowKeys, ","));
                    String jsonStr = JSONUtil.toJsonStr(map);
                    produceUtils.send(topicName, rowKey, jsonStr);
                    LOG.info("------------------");
                });

                if (LOG.isTraceEnabled()) {
                    LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
                }
            } catch (Exception e) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
                }
                throw e;
            } finally {
                produceUtils.flush();
                produceUtils.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return batchIndex;
    }

    private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex)
            throws IOException {
        int batchSize = 0, index = 0;
        List<Entry> batch = new ArrayList<>();
        for (Entry entry : entries) {
            int entrySize = getEstimatedEntrySize(entry);
            if (batchSize > 0 && batchSize   entrySize > replicationRpcLimit) {
                replicateEntries(batch, index  );
                batch.clear();
                batchSize = 0;
            }
            batch.add(entry);
            batchSize  = entrySize;
        }
        if (batchSize > 0) {
            replicateEntries(batch, index);
        }
        return batchIndex;
    }

    @VisibleForTesting
    protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex) {
        return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex)
                : () -> replicateEntries(entries, batchIndex);
    }

    private String logPeerId() {
        return "[Source for peer "   this.ctx.getPeerId()   "]:";
    }

}

0 人点赞