超详细的Kafka教程-从部署到开发到原理都有讲解

2022-05-05 17:21:53 浏览数 (1)

在说Kafka之前,假设你有一定的消息队列的知识。知道消息队列的模式(点对点模式,发布/订阅模式),也知道消息队列的优点,如果不知道没关系,去百度或者Google搜索都有相关详细的资料。那么我们接下来说说Kafka。

为什么选择Kafka

消息中间件有很多。比如ActiveMQ,RabbitMQ,RocketMQ,Kafka。那你在选型的时候一般考虑哪些因素呢?我们来比较下这几个中间件的特点。

特性

ActiveMQ

RabbitMQ

RocketMQ

Kafka

单机吞吐量

万级,吞吐量比RocketMQ和Kafka要低了一个数量级

万级,吞吐量同ActiveMQ

10万级,可以支撑高吞吐量

10万级别,高吞吐量。适合日志采集,实时计算等场景

topic数量对吞吐量的影响

topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic

topic从几十个到几百个的时候,吞吐量会「大幅度下降」所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源

时效性

ms级

微秒级,这是rabbitmq的一大特点,延迟是最低的

ms级

延迟在ms级以内

可用性

高,基于主从架构实现高可用性

同ActiveMQ

非常高,分布式架构

非常高,同样也是分布式式

消息可靠性

有较低的概率丢失数据

经过参数优化配置,可以做到0丢失

同RocketMQ一样也可以做到消息零丢失

功能支持

MQ领域的功能极其完备

基于erlang开发,所以并发能力很强,性能极其好,延时很低

MQ功能较为完善,还是分布式的,扩展性好

功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准

优劣总结

「ActiveMQ」

  • 优点:非常成熟,功能强大,在业内大量的公司以及项目中都有应用
  • 缺点:偶尔会有较低概率丢失消息。而且现在社区以及国内应用都越来越少,官方社区现在对 ActiveMQ 5.x维护越来越少,几个月才发布一个版本。较少在大规模吞吐的场景中使用。

「RabbitMQ」

  • 优点:erlang语言开发,性能极其好,延时很低。吞吐量到万级,MQ功能比较完备。而且开源提供的管理界面非常棒,用起来很好用。社区相对比较活跃,几乎每个月都发布几个版本分。在国内一些互联网公司近几年用rabbitmq也比较多一些。
  • 缺点:RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。

「RocketMQ」

  • 优点:接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障。日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都不错的,还可以支撑大规模的topic数量,支持复杂MQ业务场景。
  • 缺点:社区活跃度相对较为一般,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险。

「Kafka」

  • 优点:就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量。
  • 缺点:有可能消息重复消费。对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略。

从上面的总结我们知道,Kafka可以用于较简单的消息队列(如果对你来说足够使用)。并且较要求较高的吞吐,那么Kafka是你最合适的选择。

什么是Kafka

Kafka本质还是一个存储容器,最初由LinkedIn公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。

Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。

Kafka组成架构

从上面的架构图我们获得几个词:

  • Producer :消息生产者,就是向kafka broker发消息的客户端;
  • Consumer :消息消费者,向kafka broker取消息的客户端;
  • Topic :可以理解为一个队列;
  • Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;
  • Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;
  • Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;
  • Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

Kafka在Linux上的操作

你可能想,我把Kafka安装在Windows下不就完事了吗,为什么还要特意在Linux下面操作呢。但实际生产Kafka等中间件肯定是部署在Linux上面的,作为开发的我们可能也很少接触怎么部署,但是学习一下总归是有好处的。

下载

下载地址:http://kafka.apache.org/downloads

我们选取这个下载

单机部署

把下载的压缩包拷贝到Linux上,解压:

修改config/server.properties

修改Zookeeper的配置。

启动Kafka

注意:如果配置的是单独的Zookeeper,在启动Kafka之前需要启动Zookeeper。如果你有使用docker的经验,你可以使用docker-compose快速搭建一个zk集群。

发现有了kafka进程

端口为9092

集群部署

如果需要部署Kafka集群,我们需要设置多个Broker。

代码语言:javascript复制
 > cp server.properties config/server.properties config/server-1.properties
 > cp server.properties config/server.properties config/server-2.properties

编辑配置文件

代码语言:javascript复制
 config/server-1.properties:
   broker.id=1
   listeners=PLAINTEXT://:9093
   log.dir=/tmp/kafka-logs-1
 config/server-2.properties:
   broker.id=2
   listeners=PLAINTEXT://:9094
   log.dir=/tmp/kafka-logs-2

broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的。接下来我们只需要启动两个新的节点:

代码语言:javascript复制
 > bin/kafka-server-start.sh config/server-1.properties &
 ...
 > bin/kafka-server-start.sh config/server-2.properties &
 ...

现在创建一个副本为3的新topic:my-lvshen-topic

代码语言:javascript复制
 > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-lvshen-topic

运行命令describe topics查看集群中的topic信息

代码语言:javascript复制
 > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-lvshen-topic
 Topic:my-lvshen-topic  PartitionCount:1  ReplicationFactor:3 Configs:
 Topic: my-lvshen-topic Partition: 0  Leader: 1  Replicas: 1,2,0 Isr: 1,2,0

以下是对输出信息的解释:第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。Leader是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。

Replicas是复制分区日志的节点列表,不管这些节点是Leader还是仅仅活着。

isr是一组「同步」Replicas,是Replicas列表的子集,它活着并被指到Leader

命令操作

创建Topic

在安装目录下输入命令

代码语言:javascript复制
 bin/kafka-topics.sh --zookeeper 192.168.42.128:2181/kafka --create --topic LVSHEN-TOPIC --partitions 1 --replication-factor 1

创建了一个topic:「LVSHEN_TOPIC」

查看Topic信息
代码语言:javascript复制
  bin/kafka-topics.sh --zookeeper 192.168.42.128:2181/kafka --describe --topic LVSHEN-TOPIC
生产者生产数据
代码语言:javascript复制
  bin/kafka-console-producer.sh --broker-list 192.168.42.128:9092 --topic LVSHEN-TOPIC
消费者接收数据
代码语言:javascript复制
  bin/kafka-console-consumer.sh --bootstrap-server 192.168.42.128:9092 --topic 'LVSHEN-TOPIC'

Kafka Tool

如果觉得用命令查看太过麻烦,我们可以用工具查看(前提是你的生产环境和你的本地已经打通)。这里推荐一个工具「Kafka Tool」

如图,左边会显示Brokers,Topics,Consumers,右边会显示相关的具体信息。

SpringBoot 默认方式开发Kafka Demo

这里我是采用SpringBoot开发,接下来写一个Java的demo。

Maven导入

代码语言:javascript复制
 <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
 <dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
     <version>2.4.3.RELEASE</version>
 </dependency>

配置文件

代码语言:javascript复制
 ## kafka ##
 spring.kafka.bootstrap-servers=192.168.42.128:9092
 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
 spring.kafka.consumer.group-id=test
 spring.kafka.consumer.enable-auto-commit=true
 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 
 #定义Topic
 spring.kafka.topic=lvshen_demo_test
 
 spring.kafka.listener.missing-topics-fatal=false

生产者类

代码语言:javascript复制
 @Component
 @Slf4j
 public class KafkaProducer {
 
     @Autowired
     private KafkaTemplate kafkaTemplate;
 
     @Value("${spring.kafka.topic}")
     private String topic;
 
     /**
      * 发送kafka消息
      *
      * @param jsonString
      */
     public void send(String jsonString) {
         ListenableFuture future = kafkaTemplate.send(topic, jsonString);
         future.addCallback(o -> log.info("kafka消息发送成功:"   jsonString), throwable -> log.error("kafka消息发送失败:"   jsonString));
     }
 
 }

消费者类

代码语言:javascript复制
 @Component
 @Slf4j
 public class KafkaConsumer {
     @KafkaListener(topics = "${spring.kafka.topic}")
     public void listen(ConsumerRecord<?, ?> record) {
         log.info("topic={}, offset={}, message={}", record.topic(), record.offset(), record.value());
     }
 }

测试

代码语言:javascript复制
 @Test
 public void testDemo() throws InterruptedException {
    log.info("start send");
    kafkaProducer.send("I am Lvshen");
    log.info("end send");
    // 休眠10秒,为了使监听器有足够的时间监听到topic的数据
    Thread.sleep(10);
 }

如上图,控制台消费接收到了数据:

代码语言:javascript复制
 c.l.d.k.kafka.consumer.KafkaConsumer     : topic=lvshen_demo_test, offset=1, message=I am Lvshen

Kafka Tool也显示接收到了消息:

自定义Kafka demo开发

假如你不想使用application.properties里面kafka的配置,我们可以采用第二种开发方法。

定义配置文件

config/kafka-config.properties

代码语言:javascript复制
 #consumer
 kafka.bootstrapServers=192.168.42.128:9092
 kafka.groupId=bootKafka
 kafka.enableAutoCommit=true
 kafka.autoCommitIntervalMs=100
 kafka.sessionTimeoutMs=15000
 
 #producer
 kafka.retries=1
 kafka.batchSize=16384
 kafka.lingerMs=1
 kafka.bufferMemory=1024000

配置类

代码语言:javascript复制
 @Component
 @ConfigurationProperties(prefix="kafka")
 @PropertySource(value = {"classpath:config/kafka-config.properties"}, encoding = "utf-8")
 @Getter
 @Setter
 @AllArgsConstructor
 @NoArgsConstructor
 public class KafkaConfigProperties {
     private String bootstrapServers;
     private String groupId;
     private String enableAutoCommit;
     private String autoCommitIntervalMs;
     private String sessionTimeoutMs;
     private String retries;
     private String batchSize;
     private String lingerMs;
     private String bufferMemory;
 
 }
 
 //文件配置类
 @Component("kafkaConfigurations")
 @EnableKafka
 public class KafkaConfiguration {
     @Autowired
     private KafkaConfigProperties kafkaConfigProperties;
     /**
      * ConcurrentKafkaListenerContainerFactory为创建Kafka监听器的工程类,这里只配置了消费者
      */
     @Bean
     public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
         ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
         factory.setConsumerFactory(consumerFactory());
         factory.getContainerProperties().setPollTimeout(1500);
         factory.setMissingTopicsFatal(false);
         return factory;
     }
 
     /**
      * 根据consumerProps填写的参数创建消费者工厂
      */
     @Bean
     public ConsumerFactory<Integer, String> consumerFactory() {
         return new DefaultKafkaConsumerFactory<>(consumerProps());
     }
 
     /**
      * 根据senderProps填写的参数创建生产者工厂
      */
     @Bean
     public ProducerFactory<Integer, String> producerFactory() {
         return new DefaultKafkaProducerFactory<>(senderProps());
     }
 
     /**
      * kafkaTemplate实现了Kafka发送接收等功能
      */
     @Bean("kafkaTemplates")
     public KafkaTemplate<Integer, String> kafkaTemplate() {
         KafkaTemplate template = new KafkaTemplate<>(producerFactory());
         return template;
     }
 
     /**
      * 消费者配置参数
      */
     private Map<String, Object> consumerProps() {
         Map<String, Object> props = new HashMap<>();
         // 连接地址
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapServers());
         // GroupID
         props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigProperties.getGroupId());
         // 是否自动提交
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
         // 自动提交的频率
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfigProperties.getAutoCommitIntervalMs());
         // Session超时设置
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfigProperties.getSessionTimeoutMs());
         // 键的反序列化方式
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
         // 值的反序列化方式
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         return props;
     }
 
     /**
      * 生产者配置
      */
     private Map<String, Object> senderProps() {
         Map<String, Object> props = new HashMap<>();
         // 连接地址
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapServers());
         // 重试,0为不启用重试机制
         props.put(ProducerConfig.RETRIES_CONFIG, kafkaConfigProperties.getRetries());
         // 控制批处理大小,单位为字节
         props.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaConfigProperties.getBatchSize());
         // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
         props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaConfigProperties.getLingerMs());
         // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaConfigProperties.getBufferMemory());
         // 键的序列化方式
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
         // 值的序列化方式
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         return props;
     }
 }

获取监听

代码语言:javascript复制
 @Component
 @Slf4j
 public class DemoListener {
     /**
      * 声明consumerID为demo,监听topicName为topic.quick.demo的Topic
      */
     @KafkaListener(id = "demo", topics = "topic.quick.demo")
     public void listen(String msgData) {
         log.info("demo receive : " msgData);
     }
 
 }

测试

代码语言:javascript复制
 @Test
     public void testDemoDepth() throws InterruptedException {
         log.info("start send");
         kafkaTemplate.send("topic.quick.demo", "this is a test for depth kafka");
         log.info("end send");
         // 休眠10秒,为了使监听器有足够的时间监听到topic的数据
         Thread.sleep(500000);
     }

Listener监听到kafka里面的数据。

Kafka存储机制

经过上面的描述,我们发现「Partition」很重要。其实「Partition」还可以细分为「Segment」。至于什么是「Segment」,下面会有详细说明。

Partition中文件的存储方式:

每个partion(目录)相当于一个巨型文件被平均分配到多个大小相segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。(默认情况下每个文件大小为1G)。

每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。

好了以上就是关于Kafka的简短的介绍了,如果想要深入学习,可以去官网多多了解相关知识。

Kafka为什么性能如此优越

读写快

kafka会将数据顺序写入磁盘,我们用的磁盘大部分用的是机械磁盘。机械结构的银盘,寻址是最耗时的。所以硬盘随机I/O是很耗性能的,如果是顺序I/O,那么性能会有很大的改善。

MMFile

Kafka的数据并不是实时的写入磁盘(「Memory Mapped Files」),它充分利用了现代操作系统「分页存储」来提高I/O效率。操作系统会选择适当的时机将数据写入硬盘。但这样也会不可靠,写到「mmap」中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。

Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到「mmap」之后就立即flush然后再返回Producer叫 同步 (sync);写入「mmap」之后立即返回Producer不调用flush叫 异步 (async)。

零拷贝

消费者向broker索要消息时,「kafka」使用 零拷贝(zero-copy) ,建立一个磁盘空间和内存的直接映射,数据不再复制到“用户态缓冲区”,直接复制到socket缓冲区。

一般的读写是这样的,会有用户态和内核态的切换,这个切换也是比较耗时的。

如果采用零拷贝,不会经过用户态。

关于零拷贝的详细描述,可以看看我的另一篇文章:【使用了零拷贝技术的Kafka,当然很快】。

0 人点赞