TBase如何接入Kafka组件进行数据消费

2020-11-26 16:25:01 浏览数 (1)

TBase如何接入Kafka组件进行数据消费

TBase是腾讯云数据库团队维护的HTAP分布式数据库集群。

TBaseTBase
代码语言:txt复制
    分布式HTAP数据库 TBase(TencentDB for TBase,TBase)是基于postgresql-xc的BSD开源协议 ,进行自主研发的分布式数据库系统。TBase 集高扩展性、SQL 高兼容度、完整的分布式事务支持、多级容灾及多维度资源隔离等功能于一身,目TBaseV2.15完全兼容pgV10。采用无共享的集群架构,提供容灾、备份、恢复、监控、安全、审计等全套解决方案,适用于TB- PB级的数据应用场景。
代码语言:txt复制
    同时TBase支持异构数据的同步和迁移,在对应的运维管理平台OSS系统中留有对应的KAFKA接口,我们可以使用kafka来做其他数据库到TBase或反向TBase到其他数据的数据迁移或者同步工作。接下来我们就来简单看下,TBase是如何接入和使用kafka组件来进行数据处理的。

kafka简介:Kafka是一个开源流处理平台,Kafka是通过解析数据库端日志来进行发布订阅消息的系统,它可以处理消费者在网站中的所有动作流数据。

KAFKAKAFKA

本次我将kafka接入TBase平台,进行TBase数据的数据消费,即我们将其作为如下图中producer的角色来生产数据,然后接入kafka平台经过加工,将数据转换为json格式读取出来再进行处理,这个过程叫消费consumer。这次我们先简单的用kafka 的单机来进行本次实验。

KAFKA工作流程KAFKA工作流程

本次实验一共分为以下几个部分:

第一部分:KAFKA的主机配置

第二部分:KAFKA接入TBase 的OSS管理平台

第三部分:连接TBase进行实验数据的创建

第四部分:消费TBase生产的数据查看效果

具体实验操作如下:

实验环境:

操作系统centos 7.6

1、已安装TBase分布式数据库,2个dn节点

cn001:172.21.16.17 :11345

dn001:172.21.16.17 :11000

dn002:172.21.16.21 :11000

2、kafka独立服务器:172.21.16.12

使用端口:2181 、9092、8083

第一部分:KAFKA的主机配置

kafka主机配置如下:

1、获取软件包和依赖的jdk包地址如下: 在kafka主机上下载

wget https://tasev2-1300276124.cos.ap-beijing.myzijiebao.com/kafka_2.11-2.4.0.tar.gz

wget https://tasev2-1300276124.cos.ap-beijing.myzijiebao.com/jdk-8u40-linux-x64.gz

2、解压并配置相关环境变量

tar -zxvf jdk-8u40-linux-x64.gz -C /usr/local/ ; tar - xvf kafka_2.11-2.4.0.tar.gz -C /usr/local/

使用root账号,配置环境变量如下:

cat /etc/profile

export JAVAHOME=/usr/local/jdk1.8.0_40

export CLASSPATH=/usr/local/jdk1.8.0_40/lib/dt.jar:/usr/local/jdk1.8.0_40/lib/tools.ja

export KAFKAHOME=/usr/local/kafka_2.11-2.4.0

export PATH=$JAVAHOME/bin:$KAFKAHOME/bin:$PATH

使变量生效:source /etc/profile

  1. kafka参数文件修改和目录创建,修改3个文件,创建2个目录
KAFKA修改zookeeper配置文件KAFKA修改zookeeper配置文件

cd /usr/local/kafka_2.11-2.4.0/config

egrep dataDir * 查看对应的zookeeper组件的数据目录,并创建

mkdir -p /data/tbase/kafka/zookeeper

查看zookeeper目录查看zookeeper目录

2.2配置kafka服务

cd /data/tbase/kafka/kafka_2.11-2.4.0/config

vim server.properties

log.dirs=/data/tbase/kafka/kafka-logs

listeners=PLAINTEXT://172.21.16.12:9092 <----- 修改IP为kafka主机IP

mkdir -p /data/tbase/kafka/kafka-logs

kafka-log目录kafka-log目录

2.3 配置connector服务

vim connect-distributed.properties

bootstrap.servers=172.21.16.12:9092

connector 配置connector 配置

检查一下配置内容,然后启动对应的三个配置的服务

1)启动zookeeper-server 服务

zookeeper-server-start.sh -daemon /usr/local/kafka_2.11-2.4.0/config/zookeeper.properties

#确认2181端口是否被监听

lsof -i:2181

2)启动kafka服务

kafka-server-start.sh -daemon /usr/local/kafka_2.11-2.4.0/config/server.properties

确认9092端口是否被监听

lsof -i:9092

3)启动connector服务

connect-distributed.sh -daemon /usr/local/kafka_2.11-2.4.0/config/connect-distributed.properties

#确认8083端口是否被监听

lsof -i:8083

如果有服务无法启动,可以查看kafka 的日志目录/usr/local/kafka_2.11-2.4.0/logs中分析日志进行排查。

第二部分:KAFKA接入TBase 的OSS管理平台

1、接下来登录TBase分布式数据的管控平台,进行kafka的接入配置。

TBase 管理控制台OSSTBase 管理控制台OSS

2、将配置好的kafka服务器接入到TBase 的数据同步模块中

接入kafka数据同步接入kafka数据同步

3、开启同步开关

打开数据同步开关打开数据同步开关

4、配置TBase允许访问的主机IP,添加KAFKA主机的ip到dn001、dn002 两个节点的名单中,并下发配置。

dn001添加鉴权配置dn001添加鉴权配置
dn002数据节点鉴权配置dn002数据节点鉴权配置

5、查看配置过之后的数据同步配置详情信息。

查看数据同步配置详情查看数据同步配置详情

第三部分:连接TBase进行实验数据的创建

1、连接到TBase 的命令行界面创建测试表t1

创建TBase分布式表t1:

create table t1 (id int primary key,name varchar(20)) distribute by shard (id);

插入测试数据:

insert into t1 values(100,'张三' ),(200,'李四' ),(300,'王五' );

创建测试数据创建测试数据
插入测试数据插入测试数据

第四部分:消费TBase生产的数据查看效果

切换到kafka 主机上进行数据的消费测试:

1、 查询生成的topic(相当于数据库中的表)

kafka-topics.sh --list --zookeeper 172.21.16.12:2181

列出topic信息列出topic信息

消费刚刚创建的t1表的全部数据

2、kafka-console-consumer.sh --bootstrap-server 172.21.16.12:9092 --topic tbase_zhao_1.postgres.public.t1 --from-beginning

注:tbase_zhao_1.postgres.public.t1 格式:实例名字:数据库名:模式名:表名

消费TBase 表t1的数据消费TBase 表t1的数据

3、消费出来或叫做读取出来的结果如下:

第一行数据第一行数据
第二行数据第二行数据
第三行数据第三行数据

同时我们在进行TBase端的数据插入时,数据会被实时的消费出来。

postgres=# insert into t1 values(400,'张飞');

INSERT 0 1

postgres=# insert into t1 values(500,'刘备');

INSERT 0 1

postgres=# insert into t1 values(600,'关羽');

INSERT 0 1

============================消费记录如下============================================

"payload":{"before":null,"after":{"id":400,"name":"张飞"},"source":{"version":"1.1.1.RELEASE-TBase","connector":"postgresql","name":"tbase_zhao_1.postgres","ts_ms":1606377445676,"snapshot":"false","db":"postgres","schema":"public","table":"t1","txId":682,"lsn":234887184,"xmin":null},"op":"c","ts_ms":1606377445679,"transaction":null}}

"payload":{"before":null,"after":{"id":500,"name":"刘备"},"source":{"version":"1.1.1.RELEASE-TBase","connector":"postgresql","name":"tbase_zhao_1.postgres","ts_ms":1606377457368,"snapshot":"false","db":"postgres","schema":"public","table":"t1","txId":682,"lsn":234887896,"xmin":null},"op":"c","ts_ms":1606377457372,"transaction":null}}

"payload":{"before":null,"after":{"id":600,"name":"关羽"},"source":{"version":"1.1.1.RELEASE-TBase","connector":"postgresql","name":"tbase_zhao_1.postgres","ts_ms":1606377470187,"snapshot":"false","db":"postgres","schema":"public","table":"t1","txId":683,"lsn":234888832,"xmin":null},"op":"c","ts_ms":1606377470191,"transaction":null}}

因为目前多种数据库在数据类型中支json数据类型,我们可以将kafka消费的数据接入到对应的数据库中加载使用。或者借助应用程序将其处理为纯文本的数据,进而可以进行跨平台或版本的异构数据迁移的同步或迁移操作。

可以使用kafka 将异构平台数据迁到TBase中或反向迁移等,同时也可将TBase数据消费使用,如果异构平台如Oracle,mysql,postgresql,等数据如果有需求迁到TBase中的话,也可以借助腾讯云的DTS中的DB bridge工具进行异构平台数据迁移评估,兼容性语句语法改造,全量/增量同步等功能的一个迁移方案。

Kafka是分布式流平台。

有3个主要特征:

  • 发布和订阅消息流,这一点与传统的消息队列相似。
  • 以容灾持久化方式的消息流存储。
  • 在消息流发生时处理消息流。

Kafka通常使用在两大类应用中:

  • 在系统或应用之间,构建实时、可靠的消息流管道。
  • 构建实时流应用程序,用于转换或响应数据流

Kafka的几个基本概念:

  • Kafka可以作为一个集群运行在跨越多个数据中心的多个服务上。
  • Kafka集群按照分类存储的消息流叫做topic
  • 每一个消息由一个主键、一个值、和一个时间戳组成。

具体更多的kafka相关原理和使用介绍,请访问腾讯云社区Kafka精进

1、https://cloud.tencent.com/developer/article/1476637

2、https://cloud.tencent.com/developer/article/1645644

3、https://cloud.tencent.com/developer/article/1607445

谢谢!

0 人点赞