记录一下php 调用 kafka 生产者代码但是消费者端口收不到数据的情况
PHP代码如下:
代码语言:javascript复制<?php
$objRdKafka = new RdKafkaProducer();
$objRdKafka->addBrokers("localhost:9092");
$oObjTopic = $objRdKafka->newTopic("demo");
$oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, 'eeeeeeeeeeeeeeeee');
使用 php producer.php
执行文件成功后,发现不能在消费者终端接到数据。
经过调试,发现在代码结尾加上 sleep(3)
消费者终端可以接收到数据。
结论: 在生产者还未将 缓冲区数据数据发送到主题中时,该进程已经结束导致消费者终端不能接收到数据。
6/10 更新
代码语言:javascript复制调用flush() 函数来 ,使kafka生产者发送缓冲区中的消息记录(record)可以被立即发送。并且一直阻塞,直到这些消息记录都发送完成。
<?php
$objRdKafka = new RdKafkaProducer();
$objRdKafka->addBrokers("localhost:9092");
$oObjTopic = $objRdKafka->newTopic("demo");
$oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, 'eeeeeeeeeeeeeeeee');
$timeout_ms = 600;
$objRdKafka->flush($timeout_ms);