安装JDK1.8
1、搜索jdk安装包
代码语言:shell复制yum search java|grep jdk
2、下载jdk1.8,下载之后默认的目录为: /usr/lib/jvm/
代码语言:shell复制yum install java-1.8.0-openjdk
安装zookeeper
安装zookeeper
kafka依赖zookeeper,所以需要下载安装zookeeper
代码语言:shell复制# 下载压缩包
wget http://archive.apache.org/dist/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
# 解压
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz
修改配置文件
代码语言:shell复制cd apache-zookeeper-3.7.0-bin/conf/
mv zoo_sample.cfg zoo.cfg
启动zookeeper
代码语言:shell复制cd ../bin/
./zkServer.sh start
出现以下信息表示启动成功
代码语言:shell复制[root@localhost apache-zookeeper-3.7.0-bin]# bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /root/apache-zookeeper-3.7.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
启动异常
如果出现 already running as process
错误,这个一般是因为机器异常关闭缓存目录中残留PID文件导致的(为关闭进程强行关机等导致的)
解决方案:到配置文件 conf/zoo.cfg
查找 dataDir
配置的目录
dataDir=/tmp/zookeeper
到 dataDir
目录下,清理缓存文件
cd /tmp/zookeeper
rm -rf zookeeper_server.pid
安装kafka
下载并解压
代码语言:shell复制wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz
tar -zxvf kafka_2.13-3.2.1.tgz
cd kafka_2.13-3.2.1
修改配置 config/server.properties
中 listeners
配置项
# 默认为:
#listeners=PLAINTEXT://:9092
# 修改为:
listeners=PLAINTEXT://192.168.10.232:9092
代码语言:shell复制这里需要修改监听地址,否则无法在另外的主机中连接kafka 修改后,监听地址需改为:
IP地址:端口
,否则会出现如下错误:
[2022-08-05 10:40:56,361] WARN [Consumer clientId=console-consumer, groupId=console-consumer-65957] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-08-05 10:40:56,362] WARN [Consumer clientId=console-consumer, groupId=console-consumer-65957] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
启动kafka
代码语言:shell复制bin/kafka-server-start.sh config/server.properties
创建主题
代码语言:shell复制bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic topic1 --bootstrap-server 192.168.10.232:9092
生产者发送消息
代码语言:shell复制bin/kafka-console-producer.sh --topic topic1 --bootstrap-server 192.168.10.232:9092
消费者接收消息
代码语言:shell复制bin/kafka-console-consumer.sh --topic topic1 --from-beginning --bootstrap-server 192.168.10.232:9092
golang中使用kafka
安装golang客户端
代码语言:shell复制go get github.com/Shopify/sarama
go get github.com/bsm/sarama-cluster
使用golang创建同步消息生产者
代码语言:go复制package main
import (
"fmt"
"github.com/Shopify/sarama"
"log"
"time"
)
var address = []string{"192.168.10.232:9092"}
func main() {
// 配置
config := sarama.NewConfig()
// 设置属性
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
producer, err := sarama.NewSyncProducer(address, config)
if err != nil {
log.Printf("new sync producer error: %s n", err.Error())
return
}
// 关闭生产者
defer producer.Close()
// 循环发送消息
for i := 0; i < 10; i {
// 创建消息
value := fmt.Sprintf("sync message, index = %d", i)
msg := &sarama.ProducerMessage{
Topic: "topic1", // 主题名称
Value: sarama.ByteEncoder(value), // 消息内容
}
// 发送消息
part, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("send message error: %s n", err.Error())
} else {
fmt.Printf("SUCCESS: value=%s, partition=%d, offset=%d n", value, part, offset)
}
// 每隔两秒发送一条消息
time.Sleep(2 * time.Second)
}
}
使用golang创建异步消息生产者
代码语言:go复制package main
import (
"fmt"
"github.com/Shopify/sarama"
"log"
"time"
)
var address = []string{"192.168.10.232:9092"}
func main() {
// 配置
config := sarama.NewConfig()
// 等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
// 随机向partition发送消息
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
// 设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用,需要消费和生产同时配置
// 注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
config.Version = sarama.V0_10_0_1
fmt.Println("start make producer")
//使用配置,新建一个异步生产者
producer, err := sarama.NewAsyncProducer(address, config)
if err != nil {
log.Printf("new async producer error: %s n", err.Error())
return
}
defer producer.AsyncClose()
// 循环判断哪个通道发送过来数据
fmt.Println("start goroutine")
go func(p sarama.AsyncProducer) {
for {
select {
case suc := <-p.Successes():
fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
case fail := <-p.Errors():
fmt.Println("error: ", fail.Error())
}
}
}(producer)
var value string
for i := 0; ; i {
// 每隔两秒发送一条消息
time.Sleep(2 * time.Second)
// 创建消息
value = fmt.Sprintf("async message, index = %d", i)
// 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系
msg := &sarama.ProducerMessage{
Topic: "topic1",
Value: sarama.ByteEncoder(value),
}
// 使用通道发送
producer.Input() <- msg
}
}
使用golang创建消息消费者
代码语言:go复制package main
import (
"fmt"
"os"
"os/signal"
cluster "github.com/bsm/sarama-cluster"
)
func main() {
// 配置
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Consumer.Offsets.Initial = -2
config.Consumer.Offsets.CommitInterval = 1 * time.Second
config.Group.Return.Notifications = true
// 创建消费者
brokers := []string{"192.168.10.232:9092"}
topics := []string{"topic1"}
consumer, err := cluster.NewConsumer(brokers, "consumer-group", topics, config)
if err != nil {
fmt.Printf("new consumer error: %sn", err.Error())
return
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go func() {
for err := range consumer.Errors() {
fmt.Printf("consumer error: %s", err.Error())
}
}()
go func() {
for ntf := range consumer.Notifications() {
fmt.Printf("consumer notification error: %v n", ntf)
}
}()
// 循环从通道中获取消息
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
fmt.Printf("%s/%d/%dt%st%sn", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // 上报offset
} else {
fmt.Println("监听服务失败")
}
case <-signals:
return
}
}
}
链接
DEMO:https://github.com/cqcqs/go-kafka-demo