导语:我们来搭建一套java开发环境,用java实现一个生产者客户端和消费者客户端。
一、安装JDK1.8,并配置环境变量
二、安装Eclipse:
三、安装和配置Maven,并加入环境变量
四、安装的Eclipse自带了M2Eclipes
五、下载安装Tomcat,并在Eclipse中配置
六、创建Maven管理的Web工程
创建的maven会有错误,解决报错:
报错没有了。
构建项目:
七. 运行访问
八. 用Java实现生产者
代码语言:javascript复制import java.util.Collections;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0 Producer Example
* @author Fa
*
*/
public class ProducerDemo {
public static void main(String[] args) {
Random rnd = new Random();
int events=100;
// 设置配置属性
Properties props = new Properties();
props.put("bootstrap.servers","10.1.1.90:9092");
props.put("security.protocol","PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
props.put("session.timeout.ms",30000);
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username="yourinstance#yourusername" password="yourpasswd";");
props.put("acks", "1");
ProducerConfig config = new ProducerConfig(props);
// 创建producer
Producer<String, String> producer = new Producer<String, String>(config);
// 产生并发送消息
long start=System.currentTimeMillis();
for (long i = 0; i < events; i ) {
long runtime = new Date().getTime();
String ip = "192.168.2." i;//rnd.nextInt(255);
String msg = runtime ",www.example.com," ip;
//如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
System.out.println("耗时:" (System.currentTimeMillis() - start));
// 关闭producer
producer.close();
}
}
九. 用Java实现消费者
代码语言:javascript复制import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
* 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer Group Example
*
* @author Fa
*
*/
public class ConsumerDemo {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {
consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if (consumer != null)
consumer.shutdown();
if (executor != null)
executor.shutdown();
}
public void run(int numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
executor = Executors.newFixedThreadPool(numThreads);
// now create an object to consume the messages
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerMsgTask(stream, threadNumber));
threadNumber ;
}
}
private static ConsumerConfig createConsumerConfig(String a_bootservers,String a_groupId) {
Properties props = new Properties();
props.put("group.id", a_groupId);
props.put("auto.commit.interval.ms", "1000");
props.put("bootstrap.servers",a_bootservers);
props.put("security.protocol","PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("session.timeout.ms",30000);
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username="yourinstance#yourusername" password="yourpasswd";");
props.put("acks", "1");
return new ConsumerConfig(props);
}
public static void main(String[] arg) {
String[] args = { "10.1.1.90:9092", "console-consumer-4626", "bowenqiu_topic1", "3" };
String bootservers = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
ConsumerDemo demo = new ConsumerDemo(bootservers, groupId, topic);
demo.run(threads);
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
demo.shutdown();
}
}