使用python操作kafka
安装 pip install kafka-python==2.0.2
kafka 的Producer
如果是kafka集群则bootstrap_servers
可传入多个,需要使用逗号隔开。需要主要传入的值,必须转换为byte类型。
# TestKafkaProducer.py
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['12.23.34.56:9092'])
value = {"type": "test",
"value": {"requestNo":"1","auditStatus":"2","failReason":"3"}}
bytesDict = bytes('{}'.format(value),'utf-8')
print(bytesDict)
topic='test_topic'
producer.send(topic, bytesDict)
producer.close()
kafka的Consumer
需要注意topic
和bootstrap_servers地址
同上面一致。
# 安装 pip install kafka-python==2.0.2
from kafka import KafkaConsumer
import time
topic='test_topic'
consumer = KafkaConsumer(topic, bootstrap_servers = ['12.23.34.56:9092'])
for m in consumer:
print(m)
print(m.topic)
运行
需要先执行Consumer脚本,再执行Producer脚本,就能看到发送的信息会被接收到:
原生kafka查看命令
需要登录到服务器的kafka安装目录下,找到kafka-topics.sh
,然后执行,别忘了替换你对应的地址哦。
查看所有的topic
./kafka-topics.sh --list --zookeeper 172.1.1.1:2181
查询某个topic接收到的消息
./kafka-console-consumer.sh --zookeeper 172.1.1.1:2181 --topic test_topic --from-beginning