软件准备
- Confluent安装包 下载地址:https://www.confluent.io/download/ Confluent有两个类型可以下载,企业版(Enterprise)需要付费,可以免费使用30天,我这里使用的是开源版(Open Source)版,版本号是4.1.1
1. Confluent 介绍
(1) Confluent 是什么?
Confluent平台是一个可靠的,高性能的流处理平台,你可以通过这个平台组织和管理各式各样的数据源中的数据。
image.png
(2) Confluent 中有什么?
- Confluent开源版
- Confluent Kafka Connectors
- Kafka Connect JDBC Connector
- Kafka Connect HDFS Connector
- Kafka Connect Elasticsearch Connector
- Kafka Connect S3 Connector
- Confluent Kafka Clients
- C/C Client Library
- Python Client Library
- Go Client Library
- .Net Client Library
- Confluent Schema Registry
- Confluent Kafka REST Proxy
- Confluent Kafka Connectors
- Confluent 企业版中增加的功能
- Automatic Data Balancing
- Multi-Datacenter Replication
- Confluent Control Center
- JMS Client
2. Confluent 开源版安装
(1) 解压安装包,可以看到以下目录:
代码语言:javascript复制[root@confluent confluent-4.1.1]# ll
total 24
drwxr-xr-x 3 1000 1000 4096 May 12 08:01 bin
drwxr-xr-x 14 1000 1000 4096 May 12 07:05 etc
drwxr-xr-x 3 1000 1000 4096 May 12 06:47 lib
-rw-r--r-- 1 1000 1000 871 May 12 08:02 README
drwxr-xr-x 6 1000 1000 4096 May 12 07:05 share
drwxr-xr-x 2 1000 1000 4096 May 12 08:02 src
(2) 启动confluent
代码语言:javascript复制[root@confluent confluent-4.1.1]# bin/confluent start
Using CONFLUENT_CURRENT: /tmp/confluent.I5Y1nzpT
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
confluent start 会启动 confluent 全部组件,如果想要单独启动,比如单独启动 schema-registry,可以执行以下命令:
代码语言:javascript复制schema-registry-start
具体的单独启动各组件的命令,进入 bin 目录下,一看就能明白,不再赘述。
3. 简单使用
(1) 创建 topic
代码语言:javascript复制[root@confluent confluent-4.1.1]# bin/kafka-topics
> --create
> --zookeeper localhost:2181
> --replication-factor 1
> --partitions 1
> --topic confluent-test-001
Created topic "confluent-test-001".
说明: confluent 中内嵌了 Kafka 和 Zookeeper,你也可以通过指定不同的 zookeeper 在其他的 kafka 集群中创建 topic 或执行其他操作。
(2) 生产数据
代码语言:javascript复制[root@confluent confluent-4.1.1]# bin/ksql-datagen
> quickstart=users
> format=json
> topic=confluent-test-001
> maxInterval=1000
[2018-06-22 14:53:19,170] INFO AvroDataConfig values:
schemas.cache.config = 1
enhanced.avro.schema.support = false
connect.meta.data = true
(io.confluent.connect.avro.AvroDataConfig:179)
User_5 --> ([ 1513083004885 | 'User_5' | 'Region_9' | 'OTHER' ])
User_8 --> ([ 1508770926089 | 'User_8' | 'Region_3' | 'OTHER' ])
User_9 --> ([ 1504006562725 | 'User_9' | 'Region_5' | 'FEMALE' ])
User_8 --> ([ 1490524175099 | 'User_8' | 'Region_2' | 'OTHER' ])
User_8 --> ([ 1489424770134 | 'User_8' | 'Region_8' | 'MALE' ])
User_1 --> ([ 1516449943408 | 'User_1' | 'Region_4' | 'OTHER' ])
......
以上命令是内嵌的一个kafka-producer脚本,生成随机的用户信息,可以通过 quickstart=[CLICKSTREAM_CODES, CLICKSTREAM, CLICKSTREAM_USERS, ORDERS, RATINGS, USERS, USERS_, PAGEVIEWS] 来生成不同的数据,这个脚本会运行很长时间(官网只说了很长时间,到底多长,没说),除非你手动停止
(3) 使用 KSQL 查询生产的数据
在另一个窗口中,进入KSQL命令行(上一个窗口继续发数据不要停)
代码语言:javascript复制[root@confluent confluent-4.1.1]# bin/ksql
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ | | =
= | ' /| (___ | | | | | =
= | < ___ | | | | | =
= | . ____) | |__| | |____ =
= |_|______/ __________| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2017 Confluent Inc.
CLI v4.1.1, Server v4.1.1 located at http://localhost:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
把生产过来的数据创建为user表:
代码语言:javascript复制ksql> CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR,
> userid VARCHAR, interests array<VARCHAR>, contact_info map<VARCHAR, VARCHAR>)
> WITH (KAFKA_TOPIC='confluent-test-001', VALUE_FORMAT='JSON', KEY = 'userid');
Message
---------------
Table created
---------------
设置消费偏移量为 "earliest":
代码语言:javascript复制ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
查询:
代码语言:javascript复制ksql> select * from users;
1529651156298 | User_7 | 1497590434653 | OTHER | Region_7 | User_7 | null | null
1529651158082 | User_9 | 1508375625042 | OTHER | Region_1 | User_9 | null | null
1529651160496 | User_5 | 1501045879443 | MALE | Region_6 | User_5 | null | null
1529651161870 | User_6 | 1514541057484 | FEMALE | Region_5 | User_6 | null | null
1529651162248 | User_3 | 1498247501220 | MALE | Region_1 | User_3 | null | null
1529651162727 | User_1 | 1495368101769 | FEMALE | Region_3 | User_1 | null | null
1529651164048 | User_4 | 1508110530233 | MALE | Region_6 | User_4 | null | null
.....
# 只要生产数据的程序没有停止,这里会一直打印查询结果
4. 关闭服务
代码语言:javascript复制[root@confluent confluent-4.1.1]# bin/confluent stop
Using CONFLUENT_CURRENT: /tmp/confluent.I5Y1nzpT
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]