前言
如果看过博主之前的文章,也可以了解到我正在搭建一个大数据的集群,所以花了血本弄了几台服务器。终于在flume将日志收集到日志主控flume节点上后,下一步要进行消息队列的搭建了。中间遇到过很多坎坷和坑,下面就为大家讲解一下搭建过程和注意事项,最终的成果是kafka搭建成功并接受flume主控传来的数据。
环境
服务器:CentOS7.2
JDK: jdk1.8.0_161
flume: apache-flume-1.6.0-cdh5.7.0
zookeeper: zookeeper-3.4.5-cdh5.7.0
kafka: kafka_2.11-0.9.0.0
步骤
1. 解压安装zookeeper,配置conf/zoo.cfg
代码语言:javascript复制cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
修改日志保存目录:
代码语言:javascript复制# example sakes.
dataDir=日志存放目录 #如不存在,则需要创建
2. 解压kafka, 配置config/server.properties
代码语言:javascript复制log.dirs=日志存放路径 #如不存在,需要手动创建
advertised.host.name=本机IP #访问kafka时返回的下一个请求地址
socket.request.max.bytes=1205725856 #此处需要改大一点,此处是已经修改过的数值
zookeeper.connect=localhost:2181 # Zookeeper地址
broker.id=0 # broker.id 保证每一个Broker都是唯一的ID号即可,避免冲突
listeners=PLAINTEXT://:9092 # 此broker 监听的端口号,同样要避免冲突
注:advertised.host.name一定要设置为本地IP,因为之后flume会充当生产者的角色而向kafka进行请求,kafka会直接返回此属性设置的值,flume则根据返回的值进行再次请求,如果此处设置为localhost,则flume则会在本地进行寻找broker,导致连接失败。
PS: socket.request.max.bytes最好设置的大一点,因为flume传来的数据会比设置的默认值大,会产生报错现象,不过不影响运行,此处为修改后的数值,可以直接使用。
3. 进入到zookeeper主目录bin下启动zookeeper
代码语言:javascript复制./zkServer.sh start
注:建议将ZK_HOME和KAFKA_HOME配置到系统变量中,会简化操作:
代码语言:javascript复制zkServer.sh start
4. 在kafka主目录config目录下启动kafka
代码语言:javascript复制../bin/kafka-server-start.sh server.properties
注:配置了KAFKA_HOME和系统变量后:
代码语言:javascript复制kafka-server-start.sh $KAFKA_HOME/config/server.properties
5. 创建topic
代码语言:javascript复制kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic blog
注:此次的讲解和以下的代码均为设置了系统变量后的操作
6. 测试是否成功
代码语言:javascript复制kafka-topics.sh --describe --zookeeper localhost:2181
注:如果查询不成功,报错的话,注意看一下自己的云服务器主机名是否带_下划线。如果带下划线的话,注意修改hostname,因为kafka没办法解析带下划线的主机名。
7. 设置flume,将日志信息sink到kafka上
代码语言:javascript复制vim avro-memory-kafka.conf
编写配置:
代码语言:javascript复制avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = 0.0.0.0 #此处是监听的IP,切记不要写成localhost,那样只会允许本地访问
avro-memory-kafka.sources.avro-source.port = 8000 #flume日志收集的端口号
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = kafkaIP地址:9092 #9092是broker的监听端口
avro-memory-kafka.sinks.kafka-sink.topic = blog #topic名称
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks =1
avro-memory-kafka.channels.memory-channel.type = memory
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
7. 运行flume,并在kafka服务端打开消费者进行测试:
代码语言:javascript复制flume-ng agent
--name avro-memory-kafka
--conf $FLUME_HOME/conf
--conf-file $FLUME_HOME/conf/avro-memory-kafka.conf
-Dflume.root.logger=INFO,console
kafka端消费者开启:
代码语言:javascript复制kafka-console-consumer.sh --zookeeper localhost:2181 --topic blog --from-beginning
消费者接收数据,测试成功。至此环境已经搭建完毕