浅谈kafka 一

2022-05-16 14:47:44 浏览数 (1)

Kafka官方给出的定义是:Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. (Apache Kafka 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。)

大数据领域中我们常用kakfa来构建流处理数据管道,与Spark或者Flink对接。

搭建Kafka集群,我们选用的kafka版本是kafka_2.12-2.4.1,Zookeeper版本为3.6.3。

zookeeper配置

代码语言:javascript复制
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.0=10.60.**.**:2888:3888
server.1=10.60.**.**:2888:3888
server.2=10.60.**.**:2888:3888

默认情况下,Linux系统中没有/tmp/zookeeper/data和/tmp/zookeeper/log这两个目录,所以接下来还要创建这两个目录。

第四步,在${dataDir}目录(也就是/tmp/zookeeper/data)下创建一个myid文件,并写入一个数值,比如0。myid文件里存放的是服务器的编号。

启动

代码语言:javascript复制
./zkServer.sh start

查看zookeeper服务状态

代码语言:javascript复制
./zkServer.sh status

三个节点成功部署的话,jps后会有一个Leader和两个follower。

Kafka配置

修改 server.properties

代码语言:javascript复制
# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/usr/local/kafka/data
# 配置zk的三个节点
zookeeper.connect=10.60.**.**:2181,10.60.**.**:2181,10.60.**.**:2181

将安装好的kafka复制到另外两台服务器,修改另外两个节点的broker.id分别为1和2

配置KAFKA_HOME环境变量

代码语言:javascript复制
export KAFKA_HOME=/usr/local/kafka
export PATH=:$PATH:${KAFKA_HOME}

启动服务器

代码语言:javascript复制
nohup bin/kafka-server-start.sh config/server.properties &

由于集群的服务器可能很多,手动启动比较麻烦,可以写一个一键启动和关闭的shell脚本:

在kafkalist文件中写入需要启动的服务器ip

代码语言:javascript复制
cat /root/myshell/kafkalist | while read line
do
{
 echo $line
 ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & "
}&
wait
done

示例demo

我们创建一个topic,用java代码创建一个kafka producer向topic中发送数据。

代码语言:javascript复制
./bin/kafka-topics.sh --zookeeper 10.60.**.**:2181 --create --topic topic-demo --replication-factor 3 --partitions 4

查看该topic

代码语言:javascript复制
./bin/kafka-topics.sh --zookeeper 10.60.**.**:2181 --describe --topic topic-demo

Java Producer:

代码语言:javascript复制
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 *
 * 1. 创建用于连接Kafka的Properties配置
 * 2. 创建一个生产者对象KafkaProducer
 * 3. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
 * 4. 再调用一个Future.get()方法等待响应
 * 5. 关闭生产者
 */
public class KafkaProducerTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 创建用于连接Kafka的Properties配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "117.50.**.**:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        // 3. 发送1-100的消息到指定的topic中
        for(int i = 0; i < 10000000;   i) {
            // 一、使用同步等待的方式发送消息
            // // 构建一条消息,直接new ProducerRecord
            // ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i   "");
            // Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
            // // 调用Future的get方法等待响应
            // future.get();
            // System.out.println("第"   i   "条消息写入成功!");

            // 二、使用异步回调的方式发送消息
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-demo", null, i   "");
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    // 1. 判断发送消息是否成功
                    if(exception == null) {
                        // 发送成功
                        // 主题
                        String topic = metadata.topic();
                        // 分区id
                        int partition = metadata.partition();
                        // 偏移量
                        long offset = metadata.offset();
                        System.out.println("topic:"   topic   " 分区id:"   partition   " 偏移量:"   offset);
                    }
                    else {
                        // 发送出现错误
                        System.out.println("生产消息出现异常!");
                        // 打印异常消息
                        System.out.println(exception.getMessage());
                        // 打印调用栈
                        System.out.println(exception.getStackTrace());
                    }
                }
            });
        }

        // 4.关闭生产者
        kafkaProducer.close();
    }
}

消费数据

代码语言:javascript复制
[root@master kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic topic-demo --from-beginning
441665
441666
441667
441668
441669
441670
441671
441672
441673
441674
441675
441676
441677
441678
441679
441680
441681
441682
441683

可以看到数据已经写入。

0 人点赞