kafka集群搭建及Java客户端使用
kafka简介
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。 优势:
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒;
- 可扩展性:kafka集群支持热扩展;
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
- 容错性:允许集群中节点故障(若副本数量为n,则允许n-1个节点故障);
- 高并发:支持数千个客户端同时读写。
术语
- Record(消息):Kafka处理的主要对象。
- Topic(主题):主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。
- Partition(分区):一个有序不变的消息序列。每个Topic下可以有多个分区。
- Offset(消息位移):表示分区中每条消息的位置信息,是一个单调递增且不变的值。
- Replica(副本):Kafka中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者(leader)副本和追随者(follower)副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
- Broker(代理):Kafka以集群的方式运行,集群中的每一台服务器称之为一个代理(broker)Producer(生产者):消息生产者,向Broker发送消息的客户端。
- Consumer(消费者):消息消费者,从Broker读取消息的客户端。
- ConsumerOffset(消费者位移):表征消费者消费进度,每个消费者都有自己的消费者位移
- ConsumerGroup(消费者组):每个Consumer属于一个特定的ConsumerGroup,一条消息可以被多个不同的ConsumerGroup消费,但是一个ConsumerGroup中只能有一个Consumer能够消费该消息。每一个Topic,下面可以有多个分区(Partition)日志文件。Partition是一个有序的message序列,这些message按顺序添加到一个叫做commitlog的文件中。每个partition中的消息都有一个唯一的编号,称之为offset,用来唯一标示某个分区中的message。每个consumer是基于自己在commitlog中的消费进度(offset)来进行工作的。在kafka中,消费offset由consumer自己来维护;一般情况下我们按照顺序逐条消费commitlog中的消息,当然我可以通过指定offset来重复消费某些消息,或者跳过某些消息。
应用场景
- 日志收集:用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer;
- 消息系统:解耦生产者和消费者、缓存消息等;
- 用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后消费者通过订阅这些topic来做实时的监控分析,亦可保存到数据库;
- 运营指标:kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
- 流式处理:比如sparkstreaming和storm
kafka使用与集群搭建
环境准备 Kafka是用Scala语言开发的,运行在JVM上,在安装Kafka之前需要先安装JDK kafka依赖zookeeper,需要先安装zookeeper,下载地址
代码语言:javascript复制wgethttp://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-
3.4.9.tar.gz
tar-zxvfzookeeper-3.4.9.tar.gz
cdzookeeper-3.4.9
cpconf/zoo_sample.cfgconf/zoo.cfg
#启动zookeeper服务
bin/zkServer.shstart
#启动客户端
bin/zkCli.sh
下载kafka安装包 参考官网安装步骤
代码语言:javascript复制wgethttps://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz
tar-zxvf kafka_2.11-1.1.1.tgz
cd kafka_2.11-1.1.1.tgz
启动kafka服务
代码语言:javascript复制bin/kafka-server-start.sh config/server.properties
server.properties是kafka核心配置文件(官网)
Property | Default | Description |
---|---|---|
broker.id | 0 | 每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一的即可 |
log.dirs | /tmp/kafka-logs | kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行 |
listeners | 9092 | server接受客户端连接的端口 |
zookeeper.connect | localhost:2181 | zooKeeper连接字符串的格式为hostname:port,此处hostname和port分别ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接方式为hostname1:port1, hostname2:port2,hostname3:port3 |
log.retention.hours | 168 | 每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样 |
min.insync.replicas | 1 | 当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常 |
delete.topic.enable | false | 是否运行删除主题 |
创建主题
代码语言:javascript复制#创建分区数是1,副本数是1的主题为test的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#查看topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
#查看命令帮助
bin/kafka-topics.sh
启动Producer发送消息
代码语言:javascript复制bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动Consumer消费消息
代码语言:javascript复制# --group 指定消费组 --from-beginning 从头开始消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -- group consumer1 --from-beginning
# 查看消费组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
- 单播消息:kafka中,在同一个消费组里,一条消息只能被某一个消费者消费
- 多播消息:针对Kafka同一条消息只能被同一个消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组
集群配置 配置3个broker
代码语言:javascript复制> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
#启动broker
bin/kafka-server-start.sh -daemon config/server-1.properties bin/kafka-server-start.sh -daemon config/server-2.properties
#创建一个 副本为3,分区为3的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 - -partitions 3 --topic my-replicated-topic
# 查看topic的情况
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
- leader节点负责给定partition的所有读写请求。
- replicas表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
- isr是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。
Java中kafka‐clients应用 Java中使用kafka,引入maven依赖
代码语言:javascript复制>
>org.apache.kafka>
>kafka-clients>
>1.1.1>
>
具体Java客户端学习源码地址:项目源码