大家好,又见面了,我是你们的朋友全栈君。
需要注意:参考的网站要与你的kafka的版本一致,因为里面的字段会不一致 例如:http://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html#kafka-sink 这是1.6版本的,如果需要查看1.9版本的直接就将1.6.0改为1.9.0即可
代码语言:javascript复制# avro-memory-kafka.conf
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel
# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = hadoop000
avro-memory-kafka.sources.avro-source.port = 44444
# Describe the sink
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092
avro-memory-kafka.sinks.kafka-sink.topic = hello_topic
# batchSize 当达到5个日志才会处理,所以消费者出现的消息会慢
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory
# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
代码语言:javascript复制flume-ng agent
--name avro-memory-kafka
--conf $FLUME_HOME/conf
--conf-file $FLUME_HOME/conf/avro-memory-kafka.conf
-Dflume.root.logger=INFO,console
代码语言:javascript复制flume-ng agent
--name exec-memory-avro
--conf $FLUME_HOME/conf
--conf-file $FLUME_HOME/conf/exec-memory-avro.conf
-Dflume.root.logger=INFO,console
启动消费者: kafka-console-consumer.sh –zookeeper hadoop000:2181 –topic hello_topic
向data.log写入数据,发现消费者出现消息,成功
代码语言:javascript复制[hadoop@hadoop000 data]$ echo hellospark1111 >> data.log
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/152379.html原文链接:https://javaforall.cn