消费生产者样例,kafka用的版本:
pom文件
代码语言:javascript复制<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
<exclusions>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
代码:
自定义分区:
代码语言:javascript复制import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
import org.apache.log4j.Logger;
MyLogPartitioner
private static Logger logger = Logger.getLogger(MyLogPartitioner.class);
public MyLogPartitioner(VerifiableProperties props) {}
public MyLogPartitioner() {}
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return 1;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> map) {}
----------------------------------------------
代码语言:javascript复制KafkaProducerSimple
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public static void main(String[] args) {
/**
* 1.指定当前kafka producer生产数据的目的地
*
*/
String TOPIC="test";
/**
* 2.读取配置文件
*/
Properties properties = new Properties();
/**
* key 序列号方式
*/
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
/**
* value 序列号方式
*/
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
/**
* 自定义客户端id
*/
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerSimple");
/**
* kafka broker对应的主机
*/
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9093,ip:9094");
/**
* request.required.acks,设置发送是否需要服务端的反馈,有三个值0,1,-1
* 0,意味着producer永远不会等待一个来自broker的ack,这是0.7版本的行为
* 这个选项提供了最低延迟,但是持久化的保证是最弱,当server挂掉的时候会丢失一些数据。
* 1.意味着在leader上,replica已经接收到数据后,producer会得到一个ack.
* 这个选项提供了更好的持久性,因为server确认请求成功处理后client才会返回。
* -1,意味着在所有的isr(同步副本列表)都接收到数据后,producer才得到一个ack。
* 这选项提供最好持久性,只要还有一个replica存活,那么数据不会丢失。
*/
properties.put(ProducerConfig.ACKS_CONFIG, "1");
/**
* 可选配置,如果不配置,则使用默认的partitioner
* 默认值:kafka.producer.DefaultPartitioner
* 用来把消息分到各个partition中,默认行为是对key进行hash。
*/
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyLogPartitioner.class.getCanonicalName());
/**
* 3.通过配置文件,创建生产者
*/
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
/**
* 生产数据
*/
String messageStr="";
for(int messageNo=1;messageNo < 100000;messageNo ){
messageStr=messageNo "_partitionKey:" "越过山丘 虽然已白了头,喋喋不休 时不我予的哀愁,还未如愿见着不朽 就把自己先搞丢 ";
producer.send(new ProducerRecord<String,String>(TOPIC,String.valueOf(messageNo),messageStr));
}
}