使用python发送和接口kafka

2022-05-16 11:59:14 浏览数 (1)

使用python操作kafka

安装 pip install kafka-python==2.0.2

kafka 的Producer

如果是kafka集群则bootstrap_servers可传入多个,需要使用逗号隔开。需要主要传入的值,必须转换为byte类型。

代码语言:javascript复制
# TestKafkaProducer.py
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['12.23.34.56:9092'])

value = {"type": "test",
         "value": {"requestNo":"1","auditStatus":"2","failReason":"3"}}

bytesDict = bytes('{}'.format(value),'utf-8')

print(bytesDict)

topic='test_topic'

producer.send(topic, bytesDict)
producer.close()

kafka的Consumer

需要注意topicbootstrap_servers地址 同上面一致。

代码语言:javascript复制
# 安装 pip install  kafka-python==2.0.2
from kafka import KafkaConsumer
import time

topic='test_topic'

consumer = KafkaConsumer(topic, bootstrap_servers = ['12.23.34.56:9092'])

for m in consumer:
    print(m)
    print(m.topic)

运行

需要先执行Consumer脚本,再执行Producer脚本,就能看到发送的信息会被接收到:

原生kafka查看命令

需要登录到服务器的kafka安装目录下,找到kafka-topics.sh,然后执行,别忘了替换你对应的地址哦。

查看所有的topic

./kafka-topics.sh --list --zookeeper 172.1.1.1:2181

查询某个topic接收到的消息

./kafka-console-consumer.sh --zookeeper 172.1.1.1:2181 --topic test_topic --from-beginning

0 人点赞