最近项目的用户日志达到了上亿条,之前图方便,直接存储到MySQL,然后大数据的技术让我把这些日志都存储到Kafka
安装
- 因为我的开发环境是Windows,测试环境用的不是编译安装,生产环境由运维负责维护
- 得到你的PHP环境
- Linux
- 确保有
pecl
,运行下面的命令,没有报错那么就是已安装pecl help version
- 执行通过
pecl
安装 sudo pecl install rdkafka
- 编译安装
php-rdkafka
依赖php-rdkafka based on librdkafka
- 找一个目录用于放扩展源码
参考(PHP 安装 Kafka 扩展)
代码语言:javascript复制## 前提条件,先确定好自己的环境目录
# phpize 目录
whereis phpize
### phpize: /www/server/php/71/bin/phpize
php -i | grep php.ini
### Configuration File (php.ini) Path => /www/server/php/71/etc
### Loaded Configuration File => /www/server/php/71/etc/php.ini
### 从上面输出找到 php-config 目录: /www/server/php/71/bin/php-config
# 先编译 librdkafka
wget -c https://github.com/edenhill/librdkafka/archive/v0.11.0.tar.gz
tar xvzf v0.11.0.tar.gz
cd librdkafka-0.11.0/
./configure
make
sudo make install
# 下载扩展源代码(需要确定 kafka 服务的版本以下载对应的版本)
wget -c https://github.com/arnaud-lb/php-rdkafka/archive/3.0.4.tar.gz
# 解压
tar xvzf 3.0.4.tar.gz
cd php-rdkafka-3.0.4
# 编译 (用前面找到的 phpize 目录)
/usr/bin/phpize
# 用前面找到的 php-config 配置
./configure --with-php-config=/www/server/php/71/bin/php-config --with-rdkafka
# 安装
$ make all -j 5
$ make install
# 根据前面的 php.ini 路径写入
extension = rdkafka.so
- Windows
php -v
PHP 7.1.9 (cli) (built: Aug 30 2017 18:33:57) ( NTS MSVC14 (Visual C 2015) x64 )
Copyright (c) 1997-2017 The PHP Group
Zend Engine v3.1.0, Copyright (c) 1998-2017 Zend Technologies
- 去到这里下载对应的动态链接文件(
PHP
版本,X86,x64, NTS,TS
都要对应上) https://windows.php.net/downloads/pecl/releases/rdkafka/ - 解压之后找到两个文件
librdkafka.dll
,php_rdkafka.dll
librdkafka.dll
丢进PHP
安装根目录,php_rdkafka.dll
丢进PHP
安装目录下的ext
- 然后在
php.ini
加入
php_rdkafka.dll
- 运行
php -m
如果出现下面的警告,那就是librdkafka.dll
没有放对目录,参考此项https://github.com/arnaud-lb/php-rdkafka/issues/152
Warning: PHP Startup: Unable to load dynamic library 'C:phpstudy_proExtensionsphpphp7.1.9ntsextphp_rdkafka.dll' - 找不到指定的模块。
in Unknown on line 0
- 一切正常的话,运行
php -m
你就能看到
D:wwwxxx>php -m
[PHP Modules]
bcmath
calendar
Core
ctype
curl
date
dom
fileinfo
filter
gd
hash
iconv
json
libxml
mbstring
mcrypt
mysqli
mysqlnd
openssl
pcre
PDO
pdo_mysql
pdo_sqlite
Phar
## 此项为 kafka
rdkafka
readline
redis
Reflection
session
SimpleXML
SPL
standard
tokenizer
wddx
xml
xmlreader
xmlwriter
zip
zlib
[Zend Modules]
通过一下命令查看扩展的版本
代码语言:javascript复制php --ri rdkafka
# rdkafka
# rdkafka support => enabled
# version => 3.0.1
# build date => May 19 2020 20:02:07
# librdkafka version (runtime) => 0.9.4
# librdkafka version (build) => 0.9.4.0
开始使用
代码语言:javascript复制####################################################
# 生产者 producer.php
$max = 100;
$configBrokers = '127.0.0.1:9092';
$configTopic = 'test';
$conf = new RdKafkaConf();
// 注册发送消息事件
$conf->setDrMsgCb(function ($kafka, $message) {
var_dump('msg:', $kafka, $message);
});
// 注册错误发送的事件回调
$conf->setErrorCb(function ($kafka, $err, $reason) {
dump('error', $kafka, $err, $reason);
});
// 实例化生产者
$producer = new RdKafkaProducer($conf);
$producer->addBrokers($configBrokers);
$configObj = new RdKafkaTopicConf();
$topic = $producer->newTopic($configTopic, $configObj);
// 尝试发送几个消息, 这里注意,发送是异步的
for ($i = 0; $i < $max; $i) {
// RD_KAFKA_PARTITION_UA 让 kafka 自由选择分区
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "php send " . $i);
}
// 这里必须 poll 消息发送完毕
while (($len = $producer->getOutQLen()) > 0) {
$producer->poll(1);
}
####################################################
# 消费者 consumer.php
$configBrokers = '127.0.0.1:9092';
$configTopic = 'test';
$consumer = new RdKafkaConsumer();
$consumer->addBrokers($configBrokers);
$topic = $consumer->newTopic($configTopic);
// 从上一次记录的偏移量消费
$topic->consumeStart(0, RD_KAFKA_OFFSET_END);
while (true) {
// 连接的超时时间, 如果常驻内存消费, 设置时间长点
$message = $topic->consume(0, 60*60);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message, Carbon::now()->toDateTimeString(), 'count:' . $count);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
var_dump("No more messages; will wait for moren");
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
var_dump("Timed outn");
break;
default:
var_dump('error', $message);
break;
}
}