kafka介绍和常见操作

2023-05-19 17:52:31 浏览数 (1)

# 1.应用场景方面

RabbitMQ:用于实时的,对可靠性要求较高的消息传递上。

kafka:用于处于活跃的流式数据,大数据量的数据处理上。

# 2.架构模型方面

producer,broker,consumer

RabbitMQ:以broker为中心,有消息的确认机制

kafka:以consumer为中心,无消息的确认机制

# 3.吞吐量方面

RabbitMQ:支持消息的可靠的传递,支持事务,不支持批量操作,基于存储的可靠性的要求存储可以采用内存或硬盘,吞吐量小。

kafka:内部采用消息的批量处理,数据的存储和获取是本地磁盘顺序批量操作,消息处理的效率高,吞吐量高。

# 4.集群负载均衡方面

RabbitMQ:本身不支持负载均衡,需要loadbalancer的支持

kafka:采用zookeeper对集群中的broker,consumer进行管理,可以注册topic到zookeeper上,通过zookeeper的协调机制,producer保存对应的topic的broker信息,可以随机或者轮询发送到broker上,producer可以基于语义指定分片,消息发送到broker的某个分片上。

kafka单机版搭建

下载kafka_2.13-2.6.0.tgz安装包

tar -zxf kafka_2.13-2.6.0.tgz

mv kafka_2.13-2.6.0/ kafka

自带的Zookeeper程序脚本与配置文件名与原生Zookeeper稍有不同。kafka自带的Zookeeper程序在bin目录下的zookeeper-server-start.sh脚本进行启动,zookeeper-server-stop.sh脚本进行停止。另外Zookeeper的配制文件在路径config/zookeeper.properties,如果有需要可以修改其中的参数。

首先强调一点,kafka的日志目录和zookeeper数据目录,这两项默认放在tmp目录,而tmp目录中内容会随重启而丢失,所以我们遇到的时候最好自定义一个路径。 zookeeper.properties 配置为

dataDir=/data/kafka/data/kfkzookeeper clientPort=2181 admin.enableServer=false tickTime=2000 initLimit=10 syncLimit=5 server.1=172.168.199.223:2888:3888

zookeeper配置myid文件 三台服务器都要在其zookeeper数据目录dataDir下创建一个myid文件,文件内只需填入上述配置文件中broker id的值,作为集群识别标识。以其中一台192.168.11.21服务器为例:

[root@host-192-168-11-21 ~]# cd /data/kafka/data/kfkzookeeper [root@host-192-168-11-21 kfkzookeeper]# echo 1 > myid 创建好后,查看一下,没问题,myid里面只有一个数值

kafka配置文件

进入kafka/config目录下,参考如下修改server.properties文件

broker.id=1 delete.topic.enable=true num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka/data/kafka num.partitions=3 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=172.168.199.223:2181 zookeeper.connection.timeout.ms=60000 group.initial.rebalance.delay.ms=0

启动zookeeper服务

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

启动kafka

bin/kafka-server-start.sh -daemon config/server.properties

查看java项目进程

jps

# 5.kafka常见操作

启动Zookeeper

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

关闭 Zookeeper

bin/zookeeper-server-stop.sh -daemon config/zookeeper.properties

启动kafka

bin/kafka-server-start.sh -daemon config/server.properties

关闭kafka

bin/kafka-server-stop.sh config/server.properties

创建topic

--partitions指定分区数 --replication-factor 指定分区数的副本

bin/kafka-topics.sh --create --zookeeper 192.168.11.21:2181 --replication-factor 3 --partitions 3 --topic test

./kafka-topics.sh --create --zookeeper 172.168.199.223:2181 --replication-factor 1 --partitions 1 --topic fgbp-log-pro

查看topic列表

bin/kafka-topics.sh --list --zookeeper 192.168.11.21:2181

查看topic详情

bin/kafka-topics.sh --zookeeper 192.168.11.21:2181 --describe --topic test

创建生产者,在一台服务器

bin/kafka-console-producer.sh --broker-list 192.168.11.21:9092 --topic test

创建消费者,在另一台服务器

bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.22:9092 --topic test

删除topic

bin/kafka-topics.sh --zookeeper 192.168.11.22:2181 --delete --topic test

查看kafka topic数据内容

kafka-console-consumer.sh --bootstrap-server kafka-node-0:9093 kafka-node-1:9094 kafka-node-2:9095 --from-beginning --topic gaoyingfutures_bars_live

0 人点赞