TBase如何接入Kafka组件进行数据消费
TBase是腾讯云数据库团队维护的HTAP分布式数据库集群。
代码语言:txt复制 分布式HTAP数据库 TBase(TencentDB for TBase,TBase)是基于postgresql-xc的BSD开源协议 ,进行自主研发的分布式数据库系统。TBase 集高扩展性、SQL 高兼容度、完整的分布式事务支持、多级容灾及多维度资源隔离等功能于一身,目TBaseV2.15完全兼容pgV10。采用无共享的集群架构,提供容灾、备份、恢复、监控、安全、审计等全套解决方案,适用于TB- PB级的数据应用场景。
代码语言:txt复制 同时TBase支持异构数据的同步和迁移,在对应的运维管理平台OSS系统中留有对应的KAFKA接口,我们可以使用kafka来做其他数据库到TBase或反向TBase到其他数据的数据迁移或者同步工作。接下来我们就来简单看下,TBase是如何接入和使用kafka组件来进行数据处理的。
kafka简介:Kafka是一个开源流处理平台,Kafka是通过解析数据库端日志来进行发布订阅消息的系统,它可以处理消费者在网站中的所有动作流数据。
本次我将kafka接入TBase平台,进行TBase数据的数据消费,即我们将其作为如下图中producer的角色来生产数据,然后接入kafka平台经过加工,将数据转换为json格式读取出来再进行处理,这个过程叫消费consumer。这次我们先简单的用kafka 的单机来进行本次实验。
本次实验一共分为以下几个部分:
第一部分:KAFKA的主机配置
第二部分:KAFKA接入TBase 的OSS管理平台
第三部分:连接TBase进行实验数据的创建
第四部分:消费TBase生产的数据查看效果
具体实验操作如下:
实验环境:
操作系统centos 7.6
1、已安装TBase分布式数据库,2个dn节点
cn001:172.21.16.17 :11345
dn001:172.21.16.17 :11000
dn002:172.21.16.21 :11000
2、kafka独立服务器:172.21.16.12
使用端口:2181 、9092、8083
第一部分:KAFKA的主机配置
kafka主机配置如下:
1、获取软件包和依赖的jdk包地址如下: 在kafka主机上下载
wget https://tasev2-1300276124.cos.ap-beijing.myzijiebao.com/kafka_2.11-2.4.0.tar.gz
wget https://tasev2-1300276124.cos.ap-beijing.myzijiebao.com/jdk-8u40-linux-x64.gz
2、解压并配置相关环境变量
tar -zxvf jdk-8u40-linux-x64.gz -C /usr/local/ ; tar - xvf kafka_2.11-2.4.0.tar.gz -C /usr/local/
使用root账号,配置环境变量如下:
cat /etc/profile
export JAVAHOME=/usr/local/jdk1.8.0_40
export CLASSPATH=/usr/local/jdk1.8.0_40/lib/dt.jar:/usr/local/jdk1.8.0_40/lib/tools.ja
export KAFKAHOME=/usr/local/kafka_2.11-2.4.0
export PATH=$JAVAHOME/bin:$KAFKAHOME/bin:$PATH
使变量生效:source /etc/profile
- kafka参数文件修改和目录创建,修改3个文件,创建2个目录
cd /usr/local/kafka_2.11-2.4.0/config
egrep dataDir * 查看对应的zookeeper组件的数据目录,并创建
mkdir -p /data/tbase/kafka/zookeeper
2.2配置kafka服务
cd /data/tbase/kafka/kafka_2.11-2.4.0/config
vim server.properties
log.dirs=/data/tbase/kafka/kafka-logs
listeners=PLAINTEXT://172.21.16.12:9092 <----- 修改IP为kafka主机IP
mkdir -p /data/tbase/kafka/kafka-logs
2.3 配置connector服务
vim connect-distributed.properties
bootstrap.servers=172.21.16.12:9092
检查一下配置内容,然后启动对应的三个配置的服务
1)启动zookeeper-server 服务
zookeeper-server-start.sh -daemon /usr/local/kafka_2.11-2.4.0/config/zookeeper.properties
#确认2181端口是否被监听
lsof -i:2181
2)启动kafka服务
kafka-server-start.sh -daemon /usr/local/kafka_2.11-2.4.0/config/server.properties
确认9092端口是否被监听
lsof -i:9092
3)启动connector服务
connect-distributed.sh -daemon /usr/local/kafka_2.11-2.4.0/config/connect-distributed.properties
#确认8083端口是否被监听
lsof -i:8083
如果有服务无法启动,可以查看kafka 的日志目录/usr/local/kafka_2.11-2.4.0/logs中分析日志进行排查。
第二部分:KAFKA接入TBase 的OSS管理平台
1、接下来登录TBase分布式数据的管控平台,进行kafka的接入配置。
2、将配置好的kafka服务器接入到TBase 的数据同步模块中
3、开启同步开关
4、配置TBase允许访问的主机IP,添加KAFKA主机的ip到dn001、dn002 两个节点的名单中,并下发配置。
5、查看配置过之后的数据同步配置详情信息。
第三部分:连接TBase进行实验数据的创建
1、连接到TBase 的命令行界面创建测试表t1
创建TBase分布式表t1:
create table t1 (id int primary key,name varchar(20)) distribute by shard (id);
插入测试数据:
insert into t1 values(100,'张三' ),(200,'李四' ),(300,'王五' );
第四部分:消费TBase生产的数据查看效果
切换到kafka 主机上进行数据的消费测试:
1、 查询生成的topic(相当于数据库中的表)
kafka-topics.sh --list --zookeeper 172.21.16.12:2181
消费刚刚创建的t1表的全部数据
2、kafka-console-consumer.sh --bootstrap-server 172.21.16.12:9092 --topic tbase_zhao_1.postgres.public.t1 --from-beginning
注:tbase_zhao_1.postgres.public.t1 格式:实例名字:数据库名:模式名:表名
3、消费出来或叫做读取出来的结果如下:
同时我们在进行TBase端的数据插入时,数据会被实时的消费出来。
postgres=# insert into t1 values(400,'张飞');
INSERT 0 1
postgres=# insert into t1 values(500,'刘备');
INSERT 0 1
postgres=# insert into t1 values(600,'关羽');
INSERT 0 1
============================消费记录如下============================================
"payload":{"before":null,"after":{"id":400,"name":"张飞"},"source":{"version":"1.1.1.RELEASE-TBase","connector":"postgresql","name":"tbase_zhao_1.postgres","ts_ms":1606377445676,"snapshot":"false","db":"postgres","schema":"public","table":"t1","txId":682,"lsn":234887184,"xmin":null},"op":"c","ts_ms":1606377445679,"transaction":null}}
"payload":{"before":null,"after":{"id":500,"name":"刘备"},"source":{"version":"1.1.1.RELEASE-TBase","connector":"postgresql","name":"tbase_zhao_1.postgres","ts_ms":1606377457368,"snapshot":"false","db":"postgres","schema":"public","table":"t1","txId":682,"lsn":234887896,"xmin":null},"op":"c","ts_ms":1606377457372,"transaction":null}}
"payload":{"before":null,"after":{"id":600,"name":"关羽"},"source":{"version":"1.1.1.RELEASE-TBase","connector":"postgresql","name":"tbase_zhao_1.postgres","ts_ms":1606377470187,"snapshot":"false","db":"postgres","schema":"public","table":"t1","txId":683,"lsn":234888832,"xmin":null},"op":"c","ts_ms":1606377470191,"transaction":null}}
因为目前多种数据库在数据类型中支json数据类型,我们可以将kafka消费的数据接入到对应的数据库中加载使用。或者借助应用程序将其处理为纯文本的数据,进而可以进行跨平台或版本的异构数据迁移的同步或迁移操作。
可以使用kafka 将异构平台数据迁到TBase中或反向迁移等,同时也可将TBase数据消费使用,如果异构平台如Oracle,mysql,postgresql,等数据如果有需求迁到TBase中的话,也可以借助腾讯云的DTS中的DB bridge工具进行异构平台数据迁移评估,兼容性语句语法改造,全量/增量同步等功能的一个迁移方案。
Kafka是分布式流平台。
有3个主要特征:
- 发布和订阅消息流,这一点与传统的消息队列相似。
- 以容灾持久化方式的消息流存储。
- 在消息流发生时处理消息流。
Kafka通常使用在两大类应用中:
- 在系统或应用之间,构建实时、可靠的消息流管道。
- 构建实时流应用程序,用于转换或响应数据流
Kafka的几个基本概念:
- Kafka可以作为一个集群运行在跨越多个数据中心的多个服务上。
- Kafka集群按照分类存储的消息流叫做topic。
- 每一个消息由一个主键、一个值、和一个时间戳组成。
具体更多的kafka相关原理和使用介绍,请访问腾讯云社区Kafka精进
1、https://cloud.tencent.com/developer/article/1476637
2、https://cloud.tencent.com/developer/article/1645644
3、https://cloud.tencent.com/developer/article/1607445
谢谢!