Apache Kafka 消息队列

2022-01-19 14:00:01 浏览数 (1)

各大厂商选择的消息队列的应用不尽相同,市面上也有很多的产品,为了更好的适应就业,自己必须靠自己去学习,本篇文章讲述的就是,Kafka 消息队列

网络找的 :黑马Kafka笔记代码下载

Kafka 简介:

是一款分布式,基于 发布订阅模式的 消息队列产品,主要应用于大数据实时处理领域。

使用Kafka的好处?

好处就是使用消息队列的好处:削峰填谷、异步解耦

使用kafka的条件

依赖Zookeeper(帮助Kafka 集群存储信息,帮助消费者存储消费的位置信息)

下载Kafka

kafka_2.12-2.7.0下载

安装Kafka

启动 zookeper

进入bin目标,直接

代码语言:javascript复制
启动
./kafka-server-start.sh -daemon ../config/server.properties
参数说明
    -daemon 的作用是后台启动,不占用当前终端打印台
    ../config/server.properties 是指定配置文件,不指定配置文件不行

停止 Kafka
./kafka-server-stop.sh 

查看是否启动成功

代码语言:javascript复制
jps

启动成功了!

尚硅谷 在这里 提到了 shell 脚本 https://www.bilibili.com/video/BV1a4411B7V9?p=6&spm_id_from=pageDriver 不会,需要补充学习一下 16分钟之后

这里 补充一下配置文件的说明

  • zookeeper.connect 指明Zookeeper主机地址,如果zookeeper是集群则以逗号隔开,如: 172.6.14.61:2181,172.6.14.62:2181,172.6.14.63:2181
  • listeners 监听列表,broker对外提供服务时绑定的IP和端口。多个以逗号隔开,如果监听器名称不是一个安全的 协议, listener.security.protocol.map也必须设置。主机名称设置0.0.0.0绑定所有的接口,主机名称为 空则绑定默认的接口。如:PLAINTEXT://myhost:9092、SSL://:9091 CLIENT://0.0.0.0:9092、REPLICATION://localhost:9093
  • broker.id broker的唯一标识符,如果不配置则自动生成,建议配置且一定要保证集群中必须唯一,默认-1
  • log.dirs 日志数据存放的目录,如果没有配置则使用log.dir,建议此项配置。
  • message.max.bytes 服务器接受单个消息的最大大小,默认1000012 约等于976.6KB。

命令行操作

一台机器只能拥有一个副本 即replication-factor

topic 主题名称,partitions 分区数,replication-factor 备份数

代码语言:javascript复制
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic first --partitions 2 --replication-factor 1

./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic heima001 --partitions 2 --replication-factor 1

代码语言:javascript复制
./kafka-topics.sh --list --zookeeper 127.0.0.1:2181

此时 日志里就会出现数据

代码语言:javascript复制
[root@localhost bin]# ./kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic first 
下面提示
Topic first is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

此时 去看数据

当在此添加主题相同名字 相同分区的、相同的备份 主题时些数据才会被清除

查看tipics信息

代码语言:javascript复制
./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic first1

读数据

代码语言:javascript复制
/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic heima

发送数据

代码语言:javascript复制
./kafka-console-producer.sh --broker-list localhost:9092 --topic heima

读、写使用如图

代码语言:javascript复制
listeners=PLAINTEXT://0.0.0.0:9092

生产者详解:

①、首先要构造一个 ProducerRecord 对象,该对象可以声明主题Topic分区Partition键 Key以及 值 Value,主题和值是必须要声明的,分区和键可以不用指定。

②、调用send() 方法进行消息发送。

③、因为消息要到网络上进行传输,所以必须进行序列化,序列化器的作用就是把消息的 key 和 value对象序列化成字节数组。

④、接下来数据传到分区器,如果之间的 ProducerRecord 对象指定了分区,那么分区器将不再做 任何事,直接把指定的分区返回;如果没有,那么分区器会根据 Key 来选择一个分区,选择好分区之 后,生产者就知道该往哪个主题和分区发送记录了。

⑤、接着这条记录会被添加到一个记录批次里面,这个批次里所有的消息会被发送到相同的主题和 分区。会有一个独立的线程来把这些记录批次发送到相应的 Broker 上。

⑥、Broker成功接收到消息,表示发送成功,返回消息的元数据(包括主题和分区信息以及记录在 分区里的偏移量)。发送失败,可以选择重试或者直接抛出异常。

同步发送
代码语言:javascript复制
producer.send(record)
异步发送 (相当于单独开线程去发送,不会影响主线程)
代码语言:javascript复制
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println(metadata.partition()   ":"   metadata.offset());
                    }
                }
            });
序列化器

消息要到网络上进行传输,必须进行序列化,而序列化器的作用就是如此。 Kafka 提供了默认的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer), 还有整型(IntegerSerializer)和字节数组(BytesSerializer)序列化器,这些序列化器都实现了接口 (org.apache.kafka.common.serialization.Serializer)基本上能够满足大部分场景的需求。

特殊说明: 解决问题的光鲜,藏着磕Bug的痛苦。 万物皆入轮回,谁也躲不掉! 以上文章,均是我实际操作,写出来的笔记资料,不会出现全文盗用别人文章!烦请各位,请勿直接盗用!

0 人点赞