这两天学习MQ在项目中的使用,就自己搭建了一个测试环境,在笔记本电脑搭建,使用的win10系统。不废话,开撸。
最先安装JDK,凡是java开发基本都会有,不再赘述;
一、安装zookeeper
1、下载安装文件,地址: http://zookeeper.apache.org/releases.html
2、解压。 打开zookeeper-3.4.13conf,把zoo_sample.cfg重命名成zoo.cf
3、修改 dataDir= C:/tmp/zookeeper/data ,改成自己的路径
4、添加系统变量
ZOOKEEPER_HOME: C:hakesoftapache-zookeeper-3.7.0 (zookeeper目录)
Path: 在现有的值后面添加 ";%ZOOKEEPER_HOME%bin;"
5、启动
6、测试,可下载 可视化的工具:https://link.zhihu.com/?target=https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip
二、下载安装 Kafka
1、下载地址: http://kafka.apache.org/downloads.html
2、解压文件。 打开kafka_2.12-2.8.0config
3、编辑server.properties,修改 log.dirs=C:/tmp/kafka-logs配置
4、 进入 C:hakesoftkafka_2.12-2.8.0binwindows(kafka目录)
5、执行命令:kafka-server-start.bat C:hakesoftkafka_2.12-2.8.0configserver.properties
三、Springboot2 集成 Kafka - 生产者
1、生产者,创建项目 kafka-producter
2、添加pom关键引用
代码语言:javascript复制 <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3、yml配置
代码语言:javascript复制server:
servlet:
context-path: /kafka-producter
port: 9004
#### kafka配置生产者 begin ####
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
retries: 0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
batch-size: 16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
buffer-memory: 33554432
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
acks: 1
# 指定消息key和消息体的序列化编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#### kafka配置生产者 end ####
4、发送代码
代码语言:javascript复制/**
* @Author: Liu Yue
* @Descripition:
* @Date; Create in 2021/6/24 14:07
**/
@RestController
@RequestMapping
@Slf4j
public class KafkaController {
@Autowired
private KafkaTemplate kafkaTemplate;
@PostMapping("/message/send")
public boolean send(){
String msg = "kafka测试数据";
kafkaTemplate.send("topic_A",msg);
return true;
}
}
四、Springboot2 集成 Kafka - 消费者
1、pom配置同上
2、yml配置消费者
代码语言:javascript复制server:
servlet:
context-path: /kafka-consumer
port: 9003
#### kafka配置生产者 begin ####
spring:
kafka:
#### kafka配置消费者 begin ####
consumer:
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
group-id: user-log-group
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
auto-offset-reset: earliest
# enable.auto.commit:true --> 设置自动提交offset
enable-auto-commit: true
auto-commit-interval: 100
# 指定消息key和消息体的序列化编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#### kafka配置消费者 end ####
3、消费者代码
代码语言:javascript复制/**
* @Author: Liu Yue
* @Descripition:
* @Date; Create in 2021/6/24 14:19
**/
@Component
@Slf4j
public class ConsumerListener {
@KafkaListener(topics = "topic_A")
public void onMessage(String message){
//这里为插入数据库代码,业务逻辑等等
log.info("收到消息内容:{}",message);
}
}
至此,一个完整的生产者/消费者模型完成。
作用:削峰填谷、解耦、异步执行等等
每天进步一点点!