安装扩展
安装教程 kafka
和php的rdkafka
扩展教程网上有很多,大家可以自行查询,例如:Kafka-php-使用 PHP 编写的 Kafka 客户端
Kafka文档推荐
不清楚里面的api的可以在文档中查询 kafka中文文档
composer 依赖
创建 composer.json
填写内容
{
"require": {
"nmred/kafka-php": "v0.2.0.8"
}
}
异步调用生产者
代码语言:javascript复制<?php
require_once __DIR__ . '/Vendor/autoload.php';
use KafkaProducerConfig;
use KafkaProducer;
//异步方式调用
$config = ProducerConfig::getInstance();
//设置数据刷新时间(毫秒)
$config->setMetadataRefreshIntervalMs(10);
//地址
$config->setMetadataBrokerList('localhost:9092');
//设置代理版本
$config->setBrokerVersion('0.9.0.1');
//开启消息确认
$config->setRequiredAck(1);
$config->setIsAsyn(false);
//设置生产间隔
$config->setProduceInterval(500);
//生产者
$producer = new Producer(function () {
return array([
'topic' => 'test',//主题
'value' => 'test message',
'key' => 'testKey',//key
]);
});
$producer->success(function ($result) {
echo '投递成功' . json_encode($result, 256) . PHP_EOL;
});
$producer->error(function ($result) {
echo '投递失败' . json_encode($result, 256) . PHP_EOL;
});
$producer->send(true);
同步调用生产者
代码语言:javascript复制<?php
require_once __DIR__ . '/Vendor/autoload.php';
use KafkaProducerConfig;
use KafkaProducer;
$config = ProducerConfig::getInstance();
//这是元组数据刷新间隔毫秒
$config->setMetadataRefreshIntervalMs(10);
//代理地址
$config->setMetadataBrokerList('localhost:9092');
//设置代理版本
$config->setBrokerVersion('0.9.0.1');
//开启消息确认
$config->setRequiredAck(1);
$config->setIsAsyn(false);
//生产间隔
$config->setProduceInterval(10);
$producer = new Producer();
for ($i = 0; $i < 100; $i ) {
$result = $producer->send(array(
[
'topic' => 'test',//主题
'value' => 'test message',
'key' => '',//key
]
));
echo '投递成功' . json_encode($result, 256) . PHP_EOL;
}
消费者
代码语言:javascript复制<?php
require_once __DIR__ . '/Vendor/autoload.php';
use KafkaConsumerConfig;
use KafkaConsumer;
$config = ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10);
$config->setMetadataBrokerList('localhost:9092');
//设置分组分区
$config->setGroupId('test');
$config->setBrokerVersion('0.9.0.1');
$config->setTopics(['test']);
//设置偏移量
$config->setOffsetReset('earliest');
$consumer = new Consumer();
$consumer->start(function ($topic, $part, $message) {
var_dump($message);
});