Confluent 入门

2018-09-13 10:33:39 浏览数 (1)


软件准备

  • Confluent安装包 下载地址:https://www.confluent.io/download/ Confluent有两个类型可以下载,企业版(Enterprise)需要付费,可以免费使用30天,我这里使用的是开源版(Open Source)版,版本号是4.1.1

1. Confluent 介绍

(1) Confluent 是什么?

Confluent平台是一个可靠的,高性能的流处理平台,你可以通过这个平台组织和管理各式各样的数据源中的数据。

image.png

(2) Confluent 中有什么?

  • Confluent开源版
    • Confluent Kafka Connectors
      • Kafka Connect JDBC Connector
      • Kafka Connect HDFS Connector
      • Kafka Connect Elasticsearch Connector
      • Kafka Connect S3 Connector
    • Confluent Kafka Clients
      • C/C Client Library
      • Python Client Library
      • Go Client Library
      • .Net Client Library
    • Confluent Schema Registry
    • Confluent Kafka REST Proxy
  • Confluent 企业版中增加的功能
    • Automatic Data Balancing
    • Multi-Datacenter Replication
    • Confluent Control Center
    • JMS Client

2. Confluent 开源版安装

(1) 解压安装包,可以看到以下目录:

代码语言:javascript复制
[root@confluent confluent-4.1.1]# ll
total 24
drwxr-xr-x  3 1000 1000 4096 May 12 08:01 bin
drwxr-xr-x 14 1000 1000 4096 May 12 07:05 etc
drwxr-xr-x  3 1000 1000 4096 May 12 06:47 lib
-rw-r--r--  1 1000 1000  871 May 12 08:02 README
drwxr-xr-x  6 1000 1000 4096 May 12 07:05 share
drwxr-xr-x  2 1000 1000 4096 May 12 08:02 src

(2) 启动confluent

代码语言:javascript复制
[root@confluent confluent-4.1.1]# bin/confluent start
Using CONFLUENT_CURRENT: /tmp/confluent.I5Y1nzpT
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]

confluent start 会启动 confluent 全部组件,如果想要单独启动,比如单独启动 schema-registry,可以执行以下命令:

代码语言:javascript复制
schema-registry-start

具体的单独启动各组件的命令,进入 bin 目录下,一看就能明白,不再赘述。

3. 简单使用

(1) 创建 topic

代码语言:javascript复制
[root@confluent confluent-4.1.1]# bin/kafka-topics 
> --create 
> --zookeeper localhost:2181 
> --replication-factor 1 
> --partitions 1 
> --topic confluent-test-001
Created topic "confluent-test-001".

说明: confluent 中内嵌了 Kafka 和 Zookeeper,你也可以通过指定不同的 zookeeper 在其他的 kafka 集群中创建 topic 或执行其他操作。

(2) 生产数据

代码语言:javascript复制
[root@confluent confluent-4.1.1]# bin/ksql-datagen 
> quickstart=users 
> format=json 
> topic=confluent-test-001 
> maxInterval=1000
[2018-06-22 14:53:19,170] INFO AvroDataConfig values: 
    schemas.cache.config = 1
    enhanced.avro.schema.support = false
    connect.meta.data = true
 (io.confluent.connect.avro.AvroDataConfig:179)
User_5 --> ([ 1513083004885 | 'User_5' | 'Region_9' | 'OTHER' ])
User_8 --> ([ 1508770926089 | 'User_8' | 'Region_3' | 'OTHER' ])
User_9 --> ([ 1504006562725 | 'User_9' | 'Region_5' | 'FEMALE' ])
User_8 --> ([ 1490524175099 | 'User_8' | 'Region_2' | 'OTHER' ])
User_8 --> ([ 1489424770134 | 'User_8' | 'Region_8' | 'MALE' ])
User_1 --> ([ 1516449943408 | 'User_1' | 'Region_4' | 'OTHER' ])
......

以上命令是内嵌的一个kafka-producer脚本,生成随机的用户信息,可以通过 quickstart=[CLICKSTREAM_CODES, CLICKSTREAM, CLICKSTREAM_USERS, ORDERS, RATINGS, USERS, USERS_, PAGEVIEWS] 来生成不同的数据,这个脚本会运行很长时间(官网只说了很长时间,到底多长,没说),除非你手动停止

(3) 使用 KSQL 查询生产的数据

在另一个窗口中,进入KSQL命令行(上一个窗口继续发数据不要停)

代码语言:javascript复制
[root@confluent confluent-4.1.1]# bin/ksql
                  
                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ | |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  ___ | |  | | |            =
                  =       | .  ____) | |__| | |____        =
                  =       |_|______/ __________|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017 Confluent Inc.

CLI v4.1.1, Server v4.1.1 located at http://localhost:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> 

把生产过来的数据创建为user表:

代码语言:javascript复制
ksql> CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR, 
> userid VARCHAR, interests array<VARCHAR>, contact_info map<VARCHAR, VARCHAR>) 
> WITH (KAFKA_TOPIC='confluent-test-001', VALUE_FORMAT='JSON', KEY = 'userid');

 Message       
---------------
 Table created 
---------------

设置消费偏移量为 "earliest":

代码语言:javascript复制
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

查询:

代码语言:javascript复制
ksql> select * from users;
1529651156298 | User_7 | 1497590434653 | OTHER | Region_7 | User_7 | null | null
1529651158082 | User_9 | 1508375625042 | OTHER | Region_1 | User_9 | null | null
1529651160496 | User_5 | 1501045879443 | MALE | Region_6 | User_5 | null | null
1529651161870 | User_6 | 1514541057484 | FEMALE | Region_5 | User_6 | null | null
1529651162248 | User_3 | 1498247501220 | MALE | Region_1 | User_3 | null | null
1529651162727 | User_1 | 1495368101769 | FEMALE | Region_3 | User_1 | null | null
1529651164048 | User_4 | 1508110530233 | MALE | Region_6 | User_4 | null | null
.....
# 只要生产数据的程序没有停止,这里会一直打印查询结果

4. 关闭服务

代码语言:javascript复制
[root@confluent confluent-4.1.1]# bin/confluent stop
Using CONFLUENT_CURRENT: /tmp/confluent.I5Y1nzpT
Stopping ksql-server
ksql-server is [DOWN]
Stopping connect
connect is [DOWN]
Stopping kafka-rest
kafka-rest is [DOWN]
Stopping schema-registry
schema-registry is [DOWN]
Stopping kafka
kafka is [DOWN]
Stopping zookeeper
zookeeper is [DOWN]

0 人点赞