集群部署
- 配置 server.properties
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能,当前版本此配置默认为true,已从配置文件移除
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
其他服务器一样配置
- 启动集群
bin/kafka-server-start.sh -daemon config/server.properties
其他服务器一样。
Kafka 命令行操作
topic 操作
脚本
kafka]$ binkafka-topics.sh
命令选项
选项 | 描述 |
---|---|
--alter | 更改分区数,副本分配,和/或主题的配置。 |
--at-min-isr-partitions | 如果在描述主题时设置,则仅显示 isr 计数为的分区等于配置的最小值。 不是支持 --zookeeper 选项。 |
--bootstrap-server <String: server to connect to> | 必需:要连接的 Kafka 服务器。 如果提供此项,则不需要直接的 Zookeeper 连接。 |
--command-config <String: command config property file> | 包含要传递给管理客户端的配置的属性文件。 这仅与 --bootstrap-server 选项一起用于描述和更改代理配置。 |
--config <String: name=value> | |
--create | 创建一个新的topic |
--delete | 删除一个topic |
--delete-config <String: name> | 要为现有主题删除的主题配置覆盖(请参阅 --config 选项下的配置列表)。 不支持 --bootstrap-server 选项。 |
--describe | 列出给定主题的详细信息。 |
--disable-rack-aware | 禁用机架感知副本分配 |
--exclude-internal | 运行 list 或 describe 命令时排除内部主题。 默认会列出内部主题 |
--force | 禁止控制台提示 |
--help | 打印帮助信息。 |
--if-exists | 如果在更改或删除或描述主题时设置,则该操作仅在主题存在时执行。 不支持 --bootstrap-server 选项。 |
--if-not-exists | 如果在创建主题时设置,则只有在主题不存在时才会执行操作。 不支持 --bootstrap- 服务器选项。 |
--list | 列出所有可用的topic。 |
--partitions <Integer: # of partitions> | 设置topic 分区数 |
--replication-factor <Integer:replication factor> | 指定topic的副本数 |
--topic <String: topic> | 指定topic 名称 |
--topics-with-overrides | 如果在描述主题时设置,则仅显示已覆盖配置的主题 |
--unavailable-partitions | 如果在描述主题时设置,则只显示其领导者不可用的分区 |
--under-min-isr-partitions | 如果在描述主题时设置,则仅显示 isr 计数小于配置的最小值的分区。 不支持 --zookeeper 选项。 |
--under-replicated-partitions | 如果在描述主题时设置,则仅显示在复制分区下 |
--version | 展示Kafka版本 |
--zookeeper <String: hosts> | 已弃用,zookeeper 连接的连接字符串,格式为 host:port。 可以提供多个主机以允许故障转移。 |
案例
- 创建一个 topic
语法:
kafka-topics.sh --create --zookeeper <host>:<port> --if-not-exists --replication-factor <副本数> --partitions <分区数> --topic <副本名称>
bin]$ kafka-topics.sh --create --zookeeper hadoop102:2181 --if-not-exists --replication-factor 3 --partitions 3 --topic test
#输出结果
Created topic test.
- 查看当前服务器中的所有 topic
语法:
kafka-topics.sh --zookeeper <host>:<port> --list
bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --list
# 输出结果
__consumer_offsets
abc
test
- 删除一个topic
语法:
kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test
需要server.properties中设置delete.topic.enable=true否则只是标记删除。
bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test
# 输出结果
Topic test is marked for deletion. # 并不会马上删除,而是先对该topic做一个标记,后面再进行删除
#需要在 配置中设置 delete.topic.enable=true ,否则不会进行删除
Note: This will have no impact if delete.topic.enable is not set to true.
- 查看 topic 详情
语法:
--describe
[atguigu@hadoop102 bin]$ kafka-topics.sh --describe --bootstrap-server hadoop102:9092 --topic abc
# topic abc 详细信息
Topic: abc PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: abc Partition: 0 Leader: 1 Replicas: 2,0,1 Isr: 1,2,0
参数 | 描述 |
---|---|
Topic | topic名称 |
PartitionCount | 分区数 |
ReplicationFactor | 定义的分区数 |
Configs | 配置 |
Partition | 当前分区位置 |
Leader | 当前那个broker为Leader |
Replicas | 副本位置 |
Isr | lsr同步队列 |
producer 操作
脚本
kafka]$ binkafka-console-producer.sh
命令选项
选项 | 描述 |
---|---|
--batch-size <Integer: size> | 如果消息不是同步发送的,则要在单个批次中发送的消息数。 (默认值:200) |
--broker-list <String: broker-list> | 链接Kafka,必需:采用 HOST1:PORT1,HOST2:PORT2 形式的代理列表字符串。 |
--compression-codec [String: compression-codec] | 支持的压缩方式'none', 'gzip', 'snappy', 'lz4', or 'zstd'. 默认 'gzip' |
--help | 打印帮助信息 |
--line-reader <String: reader_class> | 用于从标准输入读取行的类的类名。默认情况下,每行都作为单独的消息读取。 (默认:kafka.tools.ConsoleProducer$LineMessageReader) |
--max-block-ms <Long: > | 生产者发送的最大时间(默认:60000) |
--max-memory-bytes <Long: > | 缓冲大小,以字节为单位 (默认:33554432) |
--max-partition-memory-bytes <Long: The buffer size allocated for a memory in bytes per partition> | 合并数据的最小数 (默认: 16384) |
--message-send-max-retries <Integer> | 退休数,默认为3 |
--metadata-expiry-ms <Long:> | 强制刷新数据条数默认为300000,元数据以毫秒为单位的过期间隔时间段 |
--producer-property <String> | 传递用户定义的Producer_Prop的机制 |
--producer.config <String: config file> | 指定配置文件。 请注意, [producer-property] 优先于此配置。 |
--property <String: prop> | 一种将用户定义的属性以 key=value 的形式传递给消息阅读器的机制。 这允许对用户定义的消息阅读器进行自定义配置。 |
--request-required-acks <String:> | 设置ack(确认收到)的三种模式(0,1,-1),默认为1 |
--request-timeout-ms <Integer:> | 设置ack 的超时时间(单位毫秒)默认为 1500 |
--retry-backoff-ms <Integer> | 等待选举时间,默认为100) |
--socket-buffer-size <Integer: size> | 设置 tcp RECV 大小(默认: 102400) |
--sync | 设置为同步的 |
--timeout <Integer: timeout_ms> | 如果设置和生产者运行异步模式,这给一条消息的最长时间是否有足够的队列等待批处理大小。该值以ms为单位。(默认:1000) |
--topic <String: topic> | 生产的消息发送给定的主题 |
--version | 显示Kafka版本 |
- 发送消息
语法:
kafka-console-producer.sh --broker-list <kafkaIP1>:<端口> <kafkaIP2>:<端口> --topic <topic名称>
bin]$ kafka-console-producer.sh --broker-list hadoop102:9092 hadoop103:9092 --topic abc
#输出
>hello
hadoop102 接收 topic abc 消息
代码语言:javascript复制[admin@hadoop102 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic abc
#接收生产者推送的消息
hello
hadoop103 接收 topic abc 消息
代码语言:javascript复制[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
#接收生产者推送的消息
hello
consumer操作
脚本
kafka]$ bin/kafka-console-consumer.sh
命令选项
选项 | 描述 |
---|---|
--bootstrap-server <String: server to connect to> | 需:要连接的服务器。 |
--consumer-property <String: consumer_prop> | 一种将用户定义的属性以 key=value 的形式传递给消费者的机制。 |
--consumer.config <String: config file> | consumer配置属性文件。 请注意, [consumer-property] 优先于此配置。 |
--enable-systest-events | 记录消费者的消息及生命周期,用于系统测试 |
--formatter <String: class> | 用于格式化 kafka 消息以供显示的类的名称。 (默认:kafka.tools.DefaultMessageFormatter) |
--from-beginning | 如果消费者还没有一个既定的偏移量来消费,那么从日志中出现的最早的消息而不是最新的消息开始。 |
--group <String: consumer group id> | 消费者的消费者组ID。 |
--help | 打印帮助信息 |
--isolation-level <String> | 设置为 read_committed 以过滤掉未提交的事务消息。 设置为 read_uncommitted 以读取所有消息。 (默认值:read_uncommitted) |
--key-deserializer <String: deserializer for key> | 设置 密钥的解串器 |
--max-messages <Integer: num_messages> | 退出前消费的最大消息数。 如果未设置,则消耗是连续的。 |
--offset <String: consume offset> | 要消耗的偏移量 id(非负数),或 'earliest' 表示从开始,或 'latest' 表示从结束(默认值:latest) |
--partition <Integer: partition> | 要消费的分区。 除非指定了“--offset”,否则消耗从分区的末尾开始。 |
--property <String: prop> | 初始化消息格式化程序的属性 |
--skip-message-on-error | 如果在处理消息时出现错误,请跳过而不是暂停。 |
--timeout-ms <Integer: timeout_ms> | 如果指定,则在指定的时间间隔内没有可供消费的消息时退出。要消费的主题 ID。 |
--value-deserializer <String: deserializer for values> | 值的解串器 |
--version | 显示Kafka版本 |
--whitelist <String: whitelist> | 指定要包含以供使用的主题白名单的正则表达式。 |
案例
- 消费消息
语法:
kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名称>
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
#接收生产者推送的消息
hello
- 消费所有的消息
语法:
--from-beginning
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc --from-beginning
#接收生产者推送的消息
sh
nihao
发哦那旮
ka
niha
hdalfajkl
你好
股东大法师
hello
python
hello
haoh
hello
hello
hflahfla
flajklfja
flajla
afadf