kafka相关操作

2019-11-22 00:12:09 浏览数 (1)

调试工具操作

代码语言:javascript复制
- 启动
bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

- 列出topicc
./kafka-topics.sh  --zookeeper 9.43.186.132:2181,9.43.186.152:2181,9.43.186.176:2181 --list 

- 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1

- 查看topic的状态
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

- 消费者 读数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group superpig

- 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

- 删除topic
./kafka-topics --delete --zookeeper 10.0.8.23:2181 --topic PedesJobResult

go客户端,读消息

代码语言:javascript复制
/*
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
*/
ctx, cancel = context.WithCancel(context.Background())

config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Version = sarama.V2_0_0_0

// 这里很有迷惑性,实际上,这个选项只有第一次new consumer的时候才会有效,当partion已经存在offset,这是没用的
// 如果想每次重启,都忽略中间产生的消息,必须更换group_ip
config.Consumer.Offsets.Initial = sarama.OffsetNewest

var topicArr = []string{topic}
// KafkaAddresses=["kafka.service.consul:9092"]
consumer, err := cluster.NewConsumer(kafkaAddress,
    kafkaGroupID, topicArr, config)

if err != nil {
    logging.Errorf("cluster.NewConsumer  err:%s", err)
    return nil
}

go func() {
    for err := range consumer.Errors() {
        logging.Errorf("consumer.Error: groupId:%s:Error: %sn", kafkaGroupID, err.Error())
    }
}()
go func() {
    for ntf := range consumer.Notifications() {
        logging.Infof("consumer.Notification: groupId:%s Rebalanced: % v n", kafkaGroupID, ntf)
    }
}()

logging.Infof("NewKafka loop before")
Loop:
    for {
        select {
        case msgc, ok := <-consumer.Messages():
            if ok {
                //logging.Debugf("read msg %v", msgc)
                // do sth
                // 如果sarama.OffsetNewest ,commit意义不大
                consumer.MarkOffset(msg, "")

            } else {
                logging.Errorf("read msg not ok %v", topic)
            }
        case <-ctx.Done():
            logging.Infof("kafka done %v", topic)
            break Loop
        case <-time.After(time.Second * 3):
            //logging.Debugf("NewKafka %v timeout", topic)
        }
    }
    logging.Infof("NewKafka kafka exit %v", topic)
    consumer.Close()

go客户端,写消息

代码语言:javascript复制
// producer config
config := sarama.NewConfig()
config.Producer.Retry.Max = 5
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true

// sync producer
producer, err := sarama.NewSyncProducer(addr, config)

i := 0
for {
    i  
    msg := &sarama.ProducerMessage{
        Topic: topics[0],
        Value: sarama.StringEncoder(strconv.Itoa(i)),
    }
    _, _, err = producer.SendMessage(msg)
    checkerr.CheckError(err)
    time.Sleep(time.Millisecond * 500)
}

0 人点赞