1.建立工程,导入相应的jar包
Procuder类
package cn.itcast.kafka;
import Java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;
import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata;
public class ConsumerDemo { //要读取的数据主题 private static final String topic = "kfc"; //消费者的数量 private static final Integer threads = 2; public static void main(String[] args) { Properties props = new Properties(); //指定zookeeper的地址 props.put("zookeeper.connect", "storm01:2181,storm02:2181,storm03:2181"); //消费组的编号 props.put("group.id", "1111"); //偏移量,从哪个位置读 props.put("auto.offset.reset", "smallest"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config); HashMap<String, Integer> topicCountmap = new HashMap<String,Integer>(); topicCountmap.put(topic, threads); //根据map获取所有的主题对应的消息流 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountmap); //获取某个主题的消息流 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); //开启两个消费者进程,读取主题下的流 for (final KafkaStream<byte[], byte[]> kafkaStream : streams) { new Thread(new Runnable() { @Override public void run() { for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) { System.err.println(new String(messageAndMetadata.message())); } } }).start(); } } }
consumer--消费者类
package cn.itcast.kafka;
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;
import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata;
public class ConsumerDemo { //要读取的数据主题 private static final String topic = "kfc"; //消费者的数量 private static final Integer threads = 2; public static void main(String[] args) { Properties props = new Properties(); //指定zookeeper的地址 props.put("zookeeper.connect", "storm01:2181,storm02:2181,storm03:2181"); //消费组的编号 props.put("group.id", "1111"); //偏移量,从哪个位置读 props.put("auto.offset.reset", "smallest"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config); HashMap<String, Integer> topicCountmap = new HashMap<String,Integer>(); topicCountmap.put(topic, threads); //根据map获取所有的主题对应的消息流 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountmap); //获取某个主题的消息流 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); //开启两个消费者进程,读取主题下的流 for (final KafkaStream<byte[], byte[]> kafkaStream : streams) { new Thread(new Runnable() { @Override public void run() { for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) { System.err.println(new String(messageAndMetadata.message())); } } }).start(); } } }