EMR(弹性MapReduce)入门之kafka实战(十五)

2020-02-18 17:50:25 浏览数 (1)

kafka

介绍

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

主要应用场景是:日志收集系统和消息系统。

消息传递模式是:发布—订阅模式。

Kafka主要设计目标如下:

以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。

高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。

支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。

同时支持离线数据处理和实时数据处理。

Scale out:支持在线水平扩展

优点

解耦、冗余、扩展性、灵活性和峰值的处理能力、可恢复性、顺序保证、缓冲、异步通信

工作原理

消息传递模式:发布—订阅模式

发布订阅模式发布订阅模式

解释:

在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。

发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息

kafka中的术语

  •  broker:中间的kafka cluster,存储消息,是由多个server组成的集群。
  •  topic:kafka给消息提供的分类方式。broker用来存储不同topic的消息数据。

kafka将所有消息组织成多个topic的形式存储,而每个topic又可以拆分成多个partition,每个partition又由一个一个消息组成。每个消息都被标识了一个递增序列号代表其进来的先后顺序,并按顺序存储在partition中。这样,消息就以一个个id的方式,组织起来。

 producer选择一个topic,生产消息,消息会通过分配策略append到某个partition末尾。

 consumer选择一个topic,通过id指定从哪个位置开始消费消息。消费完成之后保留id,下次可以从这个位置开始继续消费,也可以从其他任意位置开始消费。

id在kafka中称为offset,它的好处是

  1. 消费者可以根据需求,灵活制定offset消费。
  2. 保证了消息不变性,为并发消费提供了线程安全的保证。
  3. 消息访问的并行高效性。
  4. 增加消息系统的可伸缩性。
  5. 保证消息可靠性。
  6. 灵活的持久化策略。
  7. 备份高可用性。
  •  producer:往broker中某个topic里面生产数据。

producer生产消息需要如下参数:

  1. topic:往哪个topic生产消息。
  2. partition:往哪个partition生产消息。
  3. key:根据该key将消息分区到不同partition。
  4. message:消息。
  •  consumer:从broker中某个topic获取数据。

传统消息系统有两种模式:

点对点

 发布订阅

kafka通过consumer group将两种模式统一处理:每个consumer将自己标记consumer group名称,之后系统会将consumer group按名称分组,将消息复制并分发给所有分组,每个分组只有一个consumer能消费这条消息。

  • Partitions

 每个Topics划分为一个或者多个Partition,并且Partition中的每条消息都被标记了一个sequential id ,也就是offset,并且存储的数据是可配置存储时间的 。

实战案例----kafka数据通过flume收集并存储到hbase

1、准备工作

  • 因为任务中需要访问腾讯云消息队列 CKafka,所以需要先创建一个 CKafka 实例,具体见 消息队列 CKafka。
  • 已经开通了腾讯云,并且创建了一个 EMR 集群。在创建 EMR 集群的时候需要在软件配置界面选择 Spark 组件。

2、在 EMR 集群使用 Kafka 工具包

首先需要查看 CKafka 的内网 IP 与端口号。登录消息队列 CKafka 的控制台,选择要使用的 CKafka 实例,在基本消息中查看其内网 IP 为 $kafkaIP,而端口号一般默认为 9092。在 topic 管理界面新建一个topic即可。

3、配置flume

  • 创建flume的配置文件hbase_kafka.properties
代码语言:java复制
vim hbase_kafka.properties
agent.sources = kafka_source
agent.channels = mem_channel
agent.sinks = hbase_sink
# 以下配置source
agent.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.channels = mem_channel
agent.sources.kafka_source.batchSize = 5000
agent.sources.kafka_source.kafka.bootstrap.servers = $kafkaIP:9092
agent.sources.kafka_source.kafka.topics = kafka_test
# 以下配置sink
agent.sinks.hbase_sink.channel = mem_channel
agent.sinks.hbase_sink.table = foo_table
agent.sinks.hbase_sink.columnFamily = cf
agent.sinks.hbase_sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# 以下配置channel
agent.channels.mem_channel.type = memory
agent.channels.mem_channel.capacity = 100000
agent.channels.mem_channel.transactionCapacity = 10000
  • 创建hbase表
代码语言:javascript复制
hbase shell
create 'foo_table','cf'
  • 运行flume
代码语言:javascript复制
./bin/flume-ng agent --conf ./conf/-f hbase_kafka.properties -n agent -Dflume.root.logger=INFO,console
  • 运行kafka的producer
代码语言:javascript复制
./bin/kafka-console-producer.sh --broker-list $kafkaIP:9092 --topic kafka_test
hello
world

4、测试

  1. 在kafka生产者客户端数据信息并回车
  2. 观察hbase表中是否有相应数据

0 人点赞