调试工具操作
代码语言:javascript
复制- 启动
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
- 列出topicc
./kafka-topics.sh --zookeeper 9.43.186.132:2181,9.43.186.152:2181,9.43.186.176:2181 --list
- 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1
- 查看topic的状态
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
- 消费者 读数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group superpig
- 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
- 删除topic
./kafka-topics --delete --zookeeper 10.0.8.23:2181 --topic PedesJobResult
go客户端,读消息
代码语言:javascript
复制/*
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
*/
ctx, cancel = context.WithCancel(context.Background())
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Version = sarama.V2_0_0_0
// 这里很有迷惑性,实际上,这个选项只有第一次new consumer的时候才会有效,当partion已经存在offset,这是没用的
// 如果想每次重启,都忽略中间产生的消息,必须更换group_ip
config.Consumer.Offsets.Initial = sarama.OffsetNewest
var topicArr = []string{topic}
// KafkaAddresses=["kafka.service.consul:9092"]
consumer, err := cluster.NewConsumer(kafkaAddress,
kafkaGroupID, topicArr, config)
if err != nil {
logging.Errorf("cluster.NewConsumer err:%s", err)
return nil
}
go func() {
for err := range consumer.Errors() {
logging.Errorf("consumer.Error: groupId:%s:Error: %sn", kafkaGroupID, err.Error())
}
}()
go func() {
for ntf := range consumer.Notifications() {
logging.Infof("consumer.Notification: groupId:%s Rebalanced: % v n", kafkaGroupID, ntf)
}
}()
logging.Infof("NewKafka loop before")
Loop:
for {
select {
case msgc, ok := <-consumer.Messages():
if ok {
//logging.Debugf("read msg %v", msgc)
// do sth
// 如果sarama.OffsetNewest ,commit意义不大
consumer.MarkOffset(msg, "")
} else {
logging.Errorf("read msg not ok %v", topic)
}
case <-ctx.Done():
logging.Infof("kafka done %v", topic)
break Loop
case <-time.After(time.Second * 3):
//logging.Debugf("NewKafka %v timeout", topic)
}
}
logging.Infof("NewKafka kafka exit %v", topic)
consumer.Close()
go客户端,写消息
代码语言:javascript
复制// producer config
config := sarama.NewConfig()
config.Producer.Retry.Max = 5
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
// sync producer
producer, err := sarama.NewSyncProducer(addr, config)
i := 0
for {
i
msg := &sarama.ProducerMessage{
Topic: topics[0],
Value: sarama.StringEncoder(strconv.Itoa(i)),
}
_, _, err = producer.SendMessage(msg)
checkerr.CheckError(err)
time.Sleep(time.Millisecond * 500)
}