kafka
介绍
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
主要应用场景是:日志收集系统和消息系统。
消息传递模式是:发布—订阅模式。
Kafka主要设计目标如下:
以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
同时支持离线数据处理和实时数据处理。
Scale out:支持在线水平扩展
优点
解耦、冗余、扩展性、灵活性和峰值的处理能力、可恢复性、顺序保证、缓冲、异步通信
工作原理
消息传递模式:发布—订阅模式
解释:
在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。
kafka中的术语
- broker:中间的kafka cluster,存储消息,是由多个server组成的集群。
- topic:kafka给消息提供的分类方式。broker用来存储不同topic的消息数据。
kafka将所有消息组织成多个topic的形式存储,而每个topic又可以拆分成多个partition,每个partition又由一个一个消息组成。每个消息都被标识了一个递增序列号代表其进来的先后顺序,并按顺序存储在partition中。这样,消息就以一个个id的方式,组织起来。
producer选择一个topic,生产消息,消息会通过分配策略append到某个partition末尾。
consumer选择一个topic,通过id指定从哪个位置开始消费消息。消费完成之后保留id,下次可以从这个位置开始继续消费,也可以从其他任意位置开始消费。
id在kafka中称为offset,它的好处是
- 消费者可以根据需求,灵活制定offset消费。
- 保证了消息不变性,为并发消费提供了线程安全的保证。
- 消息访问的并行高效性。
- 增加消息系统的可伸缩性。
- 保证消息可靠性。
- 灵活的持久化策略。
- 备份高可用性。
- producer:往broker中某个topic里面生产数据。
producer生产消息需要如下参数:
- topic:往哪个topic生产消息。
- partition:往哪个partition生产消息。
- key:根据该key将消息分区到不同partition。
- message:消息。
- consumer:从broker中某个topic获取数据。
传统消息系统有两种模式:
点对点
发布订阅
kafka通过consumer group将两种模式统一处理:每个consumer将自己标记consumer group名称,之后系统会将consumer group按名称分组,将消息复制并分发给所有分组,每个分组只有一个consumer能消费这条消息。
- Partitions
每个Topics划分为一个或者多个Partition,并且Partition中的每条消息都被标记了一个sequential id ,也就是offset,并且存储的数据是可配置存储时间的 。
实战案例----kafka数据通过flume收集并存储到hbase
1、准备工作
- 因为任务中需要访问腾讯云消息队列 CKafka,所以需要先创建一个 CKafka 实例,具体见 消息队列 CKafka。
- 已经开通了腾讯云,并且创建了一个 EMR 集群。在创建 EMR 集群的时候需要在软件配置界面选择 Spark 组件。
2、在 EMR 集群使用 Kafka 工具包
首先需要查看 CKafka 的内网 IP 与端口号。登录消息队列 CKafka 的控制台,选择要使用的 CKafka 实例,在基本消息中查看其内网 IP 为 $kafkaIP,而端口号一般默认为 9092。在 topic 管理界面新建一个topic即可。
3、配置flume
- 创建flume的配置文件hbase_kafka.properties
vim hbase_kafka.properties
agent.sources = kafka_source
agent.channels = mem_channel
agent.sinks = hbase_sink
# 以下配置source
agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.channels = mem_channel
agent.sources.kafka_source.batchSize = 5000
agent.sources.kafka_source.kafka.bootstrap.servers = $kafkaIP:9092
agent.sources.kafka_source.kafka.topics = kafka_test
# 以下配置sink
agent.sinks.hbase_sink.channel = mem_channel
agent.sinks.hbase_sink.table = foo_table
agent.sinks.hbase_sink.columnFamily = cf
agent.sinks.hbase_sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# 以下配置channel
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 100000
agent.channels.mem_channel.transactionCapacity = 10000
- 创建hbase表
hbase shell
create 'foo_table','cf'
- 运行flume
./bin/flume-ng agent --conf ./conf/-f hbase_kafka.properties -n agent -Dflume.root.logger=INFO,console
- 运行kafka的producer
./bin/kafka-console-producer.sh --broker-list $kafkaIP:9092 --topic kafka_test
hello
world
4、测试
- 在kafka生产者客户端数据信息并回车
- 观察hbase表中是否有相应数据