【应用进阶】Kafka的部署和案例

2021-06-29 11:15:01 浏览数 (1)

这两天学习MQ在项目中的使用,就自己搭建了一个测试环境,在笔记本电脑搭建,使用的win10系统。不废话,开撸。

最先安装JDK,凡是java开发基本都会有,不再赘述;

一、安装zookeeper

1、下载安装文件,地址: http://zookeeper.apache.org/releases.html

2、解压。 打开zookeeper-3.4.13conf,把zoo_sample.cfg重命名成zoo.cf

3、修改 dataDir= C:/tmp/zookeeper/data ,改成自己的路径

4、添加系统变量

ZOOKEEPER_HOME: C:hakesoftapache-zookeeper-3.7.0 (zookeeper目录)

Path: 在现有的值后面添加 ";%ZOOKEEPER_HOME%bin;"

5、启动

6、测试,可下载 可视化的工具:https://link.zhihu.com/?target=https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip

二、下载安装 Kafka

1、下载地址: http://kafka.apache.org/downloads.html

2、解压文件。 打开kafka_2.12-2.8.0config

3、编辑server.properties,修改 log.dirs=C:/tmp/kafka-logs配置

4、 进入 C:hakesoftkafka_2.12-2.8.0binwindows(kafka目录)

5、执行命令:kafka-server-start.bat C:hakesoftkafka_2.12-2.8.0configserver.properties

三、Springboot2 集成 Kafka - 生产者

1、生产者,创建项目 kafka-producter

2、添加pom关键引用

代码语言:javascript复制
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

3、yml配置

代码语言:javascript复制
server:
  servlet:
    context-path: /kafka-producter
  port: 9004
#### kafka配置生产者 begin ####
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
      # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
      retries: 0
      # 每次批量发送消息的数量,produce积累到一定数据,一次发送
      batch-size: 16384
      # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
      buffer-memory: 33554432
      #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
      #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
      #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
      #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
      #可以设置的值为:all, -1, 0, 1
      acks: 1
      # 指定消息key和消息体的序列化编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
#### kafka配置生产者 end ####

4、发送代码

代码语言:javascript复制
/**
 * @Author: Liu Yue
 * @Descripition:
 * @Date; Create in 2021/6/24 14:07
 **/
@RestController
@RequestMapping
@Slf4j
public class KafkaController {

    @Autowired
    private KafkaTemplate  kafkaTemplate;

    @PostMapping("/message/send")
    public boolean send(){
        String msg = "kafka测试数据";
        kafkaTemplate.send("topic_A",msg);
        return true;
    }
}

四、Springboot2 集成 Kafka - 消费者

1、pom配置同上

2、yml配置消费者

代码语言:javascript复制
server:
  servlet:
    context-path: /kafka-consumer
  port: 9003
#### kafka配置生产者 begin ####
spring:
  kafka:
#### kafka配置消费者 begin ####
    consumer:
      # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
      group-id: user-log-group
      # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
      auto-offset-reset: earliest
      # enable.auto.commit:true --> 设置自动提交offset
      enable-auto-commit: true
      auto-commit-interval: 100
      # 指定消息key和消息体的序列化编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
#### kafka配置消费者 end ####

3、消费者代码

代码语言:javascript复制
/**
 * @Author: Liu Yue
 * @Descripition:
 * @Date; Create in 2021/6/24 14:19
 **/
@Component
@Slf4j
public class ConsumerListener {

    @KafkaListener(topics = "topic_A")
    public void onMessage(String message){
        //这里为插入数据库代码,业务逻辑等等
        log.info("收到消息内容:{}",message);
    }

}

至此,一个完整的生产者/消费者模型完成。

作用:削峰填谷、解耦、异步执行等等

每天进步一点点!

0 人点赞