Kafka在php中的使用----生产者与消费者

2021-01-18 11:13:25 浏览数 (1)

安装扩展

安装教程 kafka和php的rdkafka扩展教程网上有很多,大家可以自行查询,例如:Kafka-php-使用 PHP 编写的 Kafka 客户端

Kafka文档推荐

不清楚里面的api的可以在文档中查询 kafka中文文档

composer 依赖

创建 composer.json填写内容

代码语言:javascript复制
{
  "require": {
        "nmred/kafka-php": "v0.2.0.8"
  }
}
异步调用生产者
代码语言:javascript复制
<?php
require_once __DIR__ . '/Vendor/autoload.php';

use KafkaProducerConfig;
use KafkaProducer;

//异步方式调用
$config = ProducerConfig::getInstance();
//设置数据刷新时间(毫秒)
$config->setMetadataRefreshIntervalMs(10);
//地址
$config->setMetadataBrokerList('localhost:9092');
//设置代理版本
$config->setBrokerVersion('0.9.0.1');
//开启消息确认
$config->setRequiredAck(1);
$config->setIsAsyn(false);
//设置生产间隔
$config->setProduceInterval(500);
//生产者
$producer = new Producer(function () {
    return array([
        'topic' => 'test',//主题
        'value' => 'test message',
        'key' => 'testKey',//key
    ]);
});
$producer->success(function ($result) {
    echo '投递成功' . json_encode($result, 256) . PHP_EOL;
});
$producer->error(function ($result) {
    echo '投递失败' . json_encode($result, 256) . PHP_EOL;
});
$producer->send(true);
同步调用生产者
代码语言:javascript复制
<?php
require_once __DIR__ . '/Vendor/autoload.php';

use KafkaProducerConfig;
use KafkaProducer;

$config = ProducerConfig::getInstance();
//这是元组数据刷新间隔毫秒
$config->setMetadataRefreshIntervalMs(10);
//代理地址
$config->setMetadataBrokerList('localhost:9092');
//设置代理版本
$config->setBrokerVersion('0.9.0.1');
//开启消息确认
$config->setRequiredAck(1);
$config->setIsAsyn(false);
//生产间隔
$config->setProduceInterval(10);
$producer = new Producer();
for ($i = 0; $i < 100; $i  ) {
    $result = $producer->send(array(
        [
            'topic' => 'test',//主题
            'value' => 'test message',
            'key' => '',//key
        ]
    ));
    echo '投递成功' . json_encode($result, 256) . PHP_EOL;
}
消费者
代码语言:javascript复制
<?php
require_once __DIR__ . '/Vendor/autoload.php';
use KafkaConsumerConfig;
use KafkaConsumer;
$config = ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10);
$config->setMetadataBrokerList('localhost:9092');
//设置分组分区
$config->setGroupId('test');
$config->setBrokerVersion('0.9.0.1');
$config->setTopics(['test']);
//设置偏移量
$config->setOffsetReset('earliest');
$consumer = new Consumer();
$consumer->start(function ($topic, $part, $message) {
    var_dump($message);
});

0 人点赞