ClickHouse整合Kafka(写数据)

2020-12-23 17:55:21 浏览数 (2)

本文章主要讲解如何将ClickHouse中的消息写回到Kafka。

ClickHouse读取Kafka数据详见ClickHouse整合Kafka(读数据)

Kafka相关操作


  • 在Kafka中创建kafka_writersTopic用于接收ClickHouse写入的数据
代码语言:txt复制
kafka-topics 
--zookeeper localhost:2181 
--topic kafka_writers 
--create --partitions 2 
--replication-factor 2

执行命令后返回如下响应

代码语言:txt复制
Created topic "kafka_writers".

这标志着topic已经创建成功。

ClickHouse相关操作


  • 创建kafka_writers_reader表,用于标记读取kafka数据此处也不可以操作
代码语言:txt复制
CREATE TABLE kafka_writers_reader 
( 
    `id` Int, 
    `platForm` String, 
    `appname` String, 
    `time` DateTime 
) 
ENGINE = Kafka 
SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list = 'kafka_writers_reader', kafka_group_name = 'kafka_writers_reader_group', kafka_format = 'CSV';
  • 我们需要使用Kafka表引擎定义一个表,该表指向我们的kafka_writers主题。该表可以读取和写入Kafka消息(我们在此只做写入操作)。
代码语言:txt复制
CREATE TABLE kafka_writers_queue ( 
    id Int, 
    platForm String, 
    appname String, 
    time DateTime 
) 
ENGINE = Kafka 
SETTINGS kafka_broker_list = 'localhost:9092', 
       kafka_topic_list = 'kafka_writers', 
       kafka_group_name = 'kafka_writers_group', 
       kafka_format = 'CSV', 
       kafka_max_block_size = 1048576;

此处我们为了方便使用了CSV格式化数据格式,具体的数据格式根据数据而定。

  • 创建kafka_writers_view物化视图用于将ID大于5的数据输入到kafka_writersTopic中
代码语言:txt复制
CREATE MATERIALIZED VIEW kafka_writers_view TO 
kafka_writers_queue AS 
SELECT id, platForm, appname FROM kafka_writers_reader 
WHERE id >= 20;

验证Kafka数据的写入


  • 登录到Kafka集群中消费kafka_writers数据
代码语言:txt复制
kafka-console-consumer --bootstrap-server localhost:9092 --topic kafka_writers
  • 新开另一个窗口对kafka_writers_readerKafka主题做生产数据操作
代码语言:txt复制
kafka-console-producer --broker-list kafka:9092 --topic kafka_writers_reader <<END
4,"Data","Test","2020-12-23 14:45:31"
5,"Plan","Test1","2020-12-23 14:47:32"
22,"Plan","Test2","2020-12-23 14:52:15"
7,"Data","Test3","2020-12-23 14:54:39"
END

如果我们没有创建kafka_writers_reader主题的话,我们可以忽略此步骤使用下一步方式

  • 插入ClickHouse数据到表中
代码语言:txt复制
INSERT INTO kafka_writers_reader (id, platForm, appname, time) 
VALUES (4,'Data','Test','2020-12-23 14:45:31'), 
(5,'Plan','Test1','2020-12-23 14:47:32'), 
(22,'Plan','Test2','2020-12-23 14:52:15'), 
(7,'Data','Test3','2020-12-23 14:54:39');

经过短暂的时候后,我们会在消费kafka_writers窗口下看到以下信息的输出

代码语言:txt复制
"22","Plan","Test2","1970-01-01 08:00:00"

这标志着我们的数据已经成功写入Kakfa中。

0 人点赞