Kafka代码API

2022-07-03 13:06:30 浏览数 (1)

1.建立工程,导入相应的jar包

Procuder类

package cn.itcast.kafka;

import Java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;

import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata;

public class ConsumerDemo {  //要读取的数据主题  private static final String topic = "kfc";  //消费者的数量  private static final Integer threads = 2;  public static void main(String[] args) {   Properties props = new Properties();   //指定zookeeper的地址   props.put("zookeeper.connect", "storm01:2181,storm02:2181,storm03:2181");   //消费组的编号   props.put("group.id", "1111");   //偏移量,从哪个位置读   props.put("auto.offset.reset", "smallest");   ConsumerConfig config = new ConsumerConfig(props);   ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);   HashMap<String, Integer> topicCountmap = new HashMap<String,Integer>();   topicCountmap.put(topic, threads);   //根据map获取所有的主题对应的消息流   Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountmap);   //获取某个主题的消息流   List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);   //开启两个消费者进程,读取主题下的流   for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {    new Thread(new Runnable() {     @Override     public void run() {      for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {       System.err.println(new String(messageAndMetadata.message()));      }     }    }).start();   }  } }

consumer--消费者类

package cn.itcast.kafka;

import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;

import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata;

public class ConsumerDemo {  //要读取的数据主题  private static final String topic = "kfc";  //消费者的数量  private static final Integer threads = 2;  public static void main(String[] args) {   Properties props = new Properties();   //指定zookeeper的地址   props.put("zookeeper.connect", "storm01:2181,storm02:2181,storm03:2181");   //消费组的编号   props.put("group.id", "1111");   //偏移量,从哪个位置读   props.put("auto.offset.reset", "smallest");   ConsumerConfig config = new ConsumerConfig(props);   ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);   HashMap<String, Integer> topicCountmap = new HashMap<String,Integer>();   topicCountmap.put(topic, threads);   //根据map获取所有的主题对应的消息流   Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountmap);   //获取某个主题的消息流   List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);   //开启两个消费者进程,读取主题下的流   for (final KafkaStream<byte[], byte[]> kafkaStream : streams) {    new Thread(new Runnable() {     @Override     public void run() {      for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {       System.err.println(new String(messageAndMetadata.message()));      }     }    }).start();   }  } }

0 人点赞