kafka(二)Kafka快速入门

2022-04-10 09:23:43 浏览数 (1)

集群部署

  1. 配置 server.properties
代码语言:javascript复制
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能,当前版本此配置默认为true,已从配置文件移除
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

其他服务器一样配置

  1. 启动集群
代码语言:javascript复制
bin/kafka-server-start.sh -daemon config/server.properties

其他服务器一样。

Kafka 命令行操作

topic 操作

脚本 kafka]$ binkafka-topics.sh 命令选项

选项

描述

--alter

更改分区数,副本分配,和/或主题的配置。

--at-min-isr-partitions

如果在描述主题时设置,则仅显示 isr 计数为的分区等于配置的最小值。 不是支持 --zookeeper 选项。

--bootstrap-server <String: server to connect to>

必需:要连接的 Kafka 服务器。 如果提供此项,则不需要直接的 Zookeeper 连接。

--command-config <String: command config property file>

包含要传递给管理客户端的配置的属性文件。 这仅与 --bootstrap-server 选项一起用于描述和更改代理配置。

--config <String: name=value>

--create

创建一个新的topic

--delete

删除一个topic

--delete-config <String: name>

要为现有主题删除的主题配置覆盖(请参阅 --config 选项下的配置列表)。 不支持 --bootstrap-server 选项。

--describe

列出给定主题的详细信息。

--disable-rack-aware

禁用机架感知副本分配

--exclude-internal

运行 list 或 describe 命令时排除内部主题。 默认会列出内部主题

--force

禁止控制台提示

--help

打印帮助信息。

--if-exists

如果在更改或删除或描述主题时设置,则该操作仅在主题存在时执行。 不支持 --bootstrap-server 选项。

--if-not-exists

如果在创建主题时设置,则只有在主题不存在时才会执行操作。 不支持 --bootstrap- 服务器选项。

--list

列出所有可用的topic。

--partitions <Integer: # of partitions>

设置topic 分区数

--replication-factor <Integer:replication factor>

指定topic的副本数

--topic <String: topic>

指定topic 名称

--topics-with-overrides

如果在描述主题时设置,则仅显示已覆盖配置的主题

--unavailable-partitions

如果在描述主题时设置,则只显示其领导者不可用的分区

--under-min-isr-partitions

如果在描述主题时设置,则仅显示 isr 计数小于配置的最小值的分区。 不支持 --zookeeper 选项。

--under-replicated-partitions

如果在描述主题时设置,则仅显示在复制分区下

--version

展示Kafka版本

--zookeeper <String: hosts>

已弃用,zookeeper 连接的连接字符串,格式为 host:port。 可以提供多个主机以允许故障转移。

案例

  1. 创建一个 topic 语法:kafka-topics.sh --create --zookeeper <host>:<port> --if-not-exists --replication-factor <副本数> --partitions <分区数> --topic <副本名称>
代码语言:javascript复制
bin]$ kafka-topics.sh --create --zookeeper hadoop102:2181 --if-not-exists --replication-factor 3 --partitions 3 --topic test
#输出结果
Created topic test.
  1. 查看当前服务器中的所有 topic 语法: kafka-topics.sh --zookeeper <host>:<port> --list
代码语言:javascript复制
bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --list
# 输出结果
__consumer_offsets
abc
test
  1. 删除一个topic 语法:kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test 需要server.properties中设置delete.topic.enable=true否则只是标记删除。
代码语言:javascript复制
bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test
# 输出结果
Topic test is marked for deletion. # 并不会马上删除,而是先对该topic做一个标记,后面再进行删除
#需要在 配置中设置 delete.topic.enable=true ,否则不会进行删除
Note: This will have no impact if delete.topic.enable is not set to true.
  1. 查看 topic 详情 语法:--describe
代码语言:javascript复制
[atguigu@hadoop102 bin]$ kafka-topics.sh  --describe --bootstrap-server hadoop102:9092 --topic abc
#  topic  abc 详细信息
Topic: abc  PartitionCount: 1   ReplicationFactor: 3    Configs: segment.bytes=1073741824
    Topic: abc  Partition: 0    Leader: 1   Replicas: 2,0,1 Isr: 1,2,0

参数

描述

Topic

topic名称

PartitionCount

分区数

ReplicationFactor

定义的分区数

Configs

配置

Partition

当前分区位置

Leader

当前那个broker为Leader

Replicas

副本位置

Isr

lsr同步队列


producer 操作

脚本 kafka]$ binkafka-console-producer.sh 命令选项

选项

描述

--batch-size <Integer: size>

如果消息不是同步发送的,则要在单个批次中发送的消息数。 (默认值:200)

--broker-list <String: broker-list>

链接Kafka,必需:采用 HOST1:PORT1,HOST2:PORT2 形式的代理列表字符串。

--compression-codec [String: compression-codec]

支持的压缩方式'none', 'gzip', 'snappy', 'lz4', or 'zstd'. 默认 'gzip'

--help

打印帮助信息

--line-reader <String: reader_class>

用于从标准输入读取行的类的类名。默认情况下,每行都作为单独的消息读取。 (默认:kafka.tools.ConsoleProducer$LineMessageReader)

--max-block-ms <Long: >

生产者发送的最大时间(默认:60000)

--max-memory-bytes <Long: >

缓冲大小,以字节为单位 (默认:33554432)

--max-partition-memory-bytes <Long: The buffer size allocated for a memory in bytes per partition>

合并数据的最小数 (默认: 16384)

--message-send-max-retries <Integer>

退休数,默认为3

--metadata-expiry-ms <Long:>

强制刷新数据条数默认为300000,元数据以毫秒为单位的过期间隔时间段

--producer-property <String>

传递用户定义的Producer_Prop的机制

--producer.config <String: config file>

指定配置文件。 请注意, [producer-property] 优先于此配置。

--property <String: prop>

一种将用户定义的属性以 key=value 的形式传递给消息阅读器的机制。 这允许对用户定义的消息阅读器进行自定义配置。

--request-required-acks <String:>

设置ack(确认收到)的三种模式(0,1,-1),默认为1

--request-timeout-ms <Integer:>

设置ack 的超时时间(单位毫秒)默认为 1500

--retry-backoff-ms <Integer>

等待选举时间,默认为100)

--socket-buffer-size <Integer: size>

设置 tcp RECV 大小(默认: 102400)

--sync

设置为同步的

--timeout <Integer: timeout_ms>

如果设置和生产者运行异步模式,这给一条消息的最长时间是否有足够的队列等待批处理大小。该值以ms为单位。(默认:1000)

--topic <String: topic>

生产的消息发送给定的主题

--version

显示Kafka版本

  1. 发送消息 语法:kafka-console-producer.sh --broker-list <kafkaIP1>:<端口> <kafkaIP2>:<端口> --topic <topic名称>
代码语言:javascript复制
bin]$ kafka-console-producer.sh --broker-list hadoop102:9092 hadoop103:9092 --topic abc
#输出
>hello

hadoop102 接收 topic abc 消息

代码语言:javascript复制
[admin@hadoop102 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic abc
#接收生产者推送的消息
hello

hadoop103 接收 topic abc 消息

代码语言:javascript复制
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
#接收生产者推送的消息
hello

consumer操作

脚本 kafka]$ bin/kafka-console-consumer.sh 命令选项

选项

描述

--bootstrap-server <String: server to connect to>

需:要连接的服务器。

--consumer-property <String: consumer_prop>

一种将用户定义的属性以 key=value 的形式传递给消费者的机制。

--consumer.config <String: config file>

consumer配置属性文件。 请注意, [consumer-property] 优先于此配置。

--enable-systest-events

记录消费者的消息及生命周期,用于系统测试

--formatter <String: class>

用于格式化 kafka 消息以供显示的类的名称。 (默认:kafka.tools.DefaultMessageFormatter)

--from-beginning

如果消费者还没有一个既定的偏移量来消费,那么从日志中出现的最早的消息而不是最新的消息开始。

--group <String: consumer group id>

消费者的消费者组ID。

--help

打印帮助信息

--isolation-level <String>

设置为 read_committed 以过滤掉未提交的事务消息。 设置为 read_uncommitted 以读取所有消息。 (默认值:read_uncommitted)

--key-deserializer <String: deserializer for key>

设置 密钥的解串器

--max-messages <Integer: num_messages>

退出前消费的最大消息数。 如果未设置,则消耗是连续的。

--offset <String: consume offset>

要消耗的偏移量 id(非负数),或 'earliest' 表示从开始,或 'latest' 表示从结束(默认值:latest)

--partition <Integer: partition>

要消费的分区。 除非指定了“--offset”,否则消耗从分区的末尾开始。

--property <String: prop>

初始化消息格式化程序的属性

--skip-message-on-error

如果在处理消息时出现错误,请跳过而不是暂停。

--timeout-ms <Integer: timeout_ms>

如果指定,则在指定的时间间隔内没有可供消费的消息时退出。要消费的主题 ID。

--value-deserializer <String: deserializer for values>

值的解串器

--version

显示Kafka版本

--whitelist <String: whitelist>

指定要包含以供使用的主题白名单的正则表达式。

案例

  1. 消费消息 语法:kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名称>
代码语言:javascript复制
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
#接收生产者推送的消息
hello
  1. 消费所有的消息 语法:--from-beginning
代码语言:javascript复制
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc --from-beginning
#接收生产者推送的消息
sh
nihao
发哦那旮
ka
niha
hdalfajkl
你好
股东大法师
hello
python
hello
haoh
hello
hello
hflahfla
flajklfja
flajla
afadf

0 人点赞