在Kafka的生产者模式主要详细的介绍了作为生产者的中间价,把消息数据写入到Kafka,这样消费者才可以消费数据,以及针对这些数据进行其他的如数据分析等。但是在实际的应用中,会有大批量的实时数据需要写入到Kafka的系统里面,因此作为单线程的模式很难满足实时数据的写入,需要使用多线程的方式来进行大批量的数据写入,当然作为消费者也是写多线程的方式来接收这些实时的数据。比如举一个案例,需要把日志系统的信息写入到Kafka的系统里面,这就是一个实时的过程,因为在程序执行的过程中,日志系统在进行大量的IO的读写,也就意味着这些数据都需要写入到Kafka里面。
下面主要演示使用多线程的方式把数据写入到Kafk里面,完善后的代码具体如下:
代码语言:javascript复制package MQ;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class KafkaThreadProducer extends Thread
{
private static Logger logger=LoggerFactory.getLogger(KafkaThreadProducer.class);
//创建一个最大的线程数
private final static int max_sise=10;
/*连接Kafka*/
public Properties configure()
{
Properties properties=new Properties();
//指定kafka的集群地址
properties.put("bootstrap.servers","localhost:9092");
//设置应答机制
properties.put("acks","1");
//批量提交大小
properties.put("batch.size",16384);
//延时提交
properties.put("linger.ms",1);
//缓充大小
properties.put("buffer.memory",33554432);
//序列化主键
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
//序列化值
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
public void run()
{
Producer<String, String> objProducer=new org.apache.kafka.clients.producer.KafkaProducer<String, String>(this.configure());
//模拟发送批量的数据
for(int i=0;i<10000;i )
{
JSONObject jsonObject=new JSONObject();
jsonObject.put("id",i);
jsonObject.put("username","无涯");
jsonObject.put("city","西安");
jsonObject.put("age",18);
jsonObject.put("date",new Date().toString());
//异步发送,调用回调函数,给主题login写入数据
objProducer.send(new ProducerRecord<String, String>("login", jsonObject.toJSONString()), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e)
{
if(e!=null)
{
logger.error("发送错误,信息具体为:" e.getMessage());
}
else
{
logger.info("写入的数据为:" recordMetadata.offset());
}
}
});
}
try{
Thread.sleep(3000);
}catch(Exception e){
e.printStackTrace();
}
//关闭生产者的对象
objProducer.close();
}
public static void main(String[] args)
{
ExecutorService executorService=Executors.newFixedThreadPool(max_sise);
//提交任务批量执行
executorService.submit(new KafkaThreadProducer());
//关闭线程池
executorService.shutdown();
}
}
执行代码后,在消费者就可以看到消费了10000条的数据,具体信息如下:
使用多线程的方式其实是非常高效的,这个过程1万条的数据很快就写入到生产者里面,而不会因为单线程的模式因为写入导致吞吐量低。当然,同理,在Python里面我们也是可以使用线程池的方式来批量的提交任务,也是获取拉勾网的招聘数据(拉勾网使用了Cookie反爬虫的机制,所以需要动态的替换请求头里面的Cookie信息),然后写入到Kafka里面,案例代码如下:
代码语言:javascript复制#!/usr/bin/env python
#!coding:utf-8
from kafka import KafkaProducer
from threading import Thread
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import json
import requests
def laGou():
r=requests.post(
url='https://www.lagou.com/jobs/positionAjax.json?needAddtionalResult=false',
data={'first':False,'pn':2,'kd':'测试开发工程师','sid':'850031016ddf4030a88f6754e5dc006a'},
headers={
'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36',
'Content-Type':'application/x-www-form-urlencoded; charset=UTF-8',
'referer':'https://www.lagou.com/jobs/list_测试开发工程师?labelWords=&fromSearch=true&suginput=',
'cookie':'JSESSIONID=ABAAAECAAEBABII00AC62E013C0625CA93D0EB4398C66D6; WEBTJ-ID=20210410下午7:57:51195751-178bba53a39117-08cbdf3583191a-33697c08-1296000-178bba53a3ae9b; RECOMMEND_TIP=true; PRE_UTM=; PRE_HOST=; PRE_LAND=https://www.lagou.com/; user_trace_token=20210410195751-f07d0d66-519e-48d4-877c-5387150e2a09; LGSID=20210410195751-db63c8f7-870f-4cbe-ac29-02dc960ab170; PRE_SITE=https://www.lagou.com; LGUID=20210410195751-67a56d41-e0b1-4cde-ba97-e277061bba86; privacyPolicyPopup=false; _ga=GA1.2.962131978.1618055871; _gat=1; Hm_lvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1618055871; sajssdk_2015_cross_new_user=1; sensorsdata2015session={}; _gid=GA1.2.1436722793.1618055871; index_location_city=全国; __lg_stoken__=923813bee6e58b745829da50ad9441f719dc153ffd1937f393cf3fa8fbafe370e01d13c554827a78150d672f8697c19d5842821d051bcd2ee1ce37be887549bcbd1f7469c674; X_MIDDLE_TOKEN=b6b1f78324fc9dd4509291efd5b2504d; SEARCH_ID=4b5c97dd7b084167b3007a6d87b7d95e; X_HTTP_TOKEN=3676ec642119da6d878550816138d25526c88f2008; sensorsdata2015jssdkcross={"distinct_id":"178bba53b758db-02a6a23b110e22-33697c08-1296000-178bba53b76cc0","first_id":"","props":{"$latest_traffic_source_type":"直接流量","$latest_search_keyword":"未取到值_直接打开","$latest_referrer":"","$os":"MacOS","$browser":"Chrome","$browser_version":"89.0.4389.114"},"$device_id":"178bba53b758db-02a6a23b110e22-33697c08-1296000-178bba53b76cc0"}; Hm_lpvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1618055879; TG-TRACK-CODE=search_code; LGRID=20210410195804-8c8f7df0-646e-4133-98c0-63f0266fae20'
})
return r.json()
def sendData():
for i in range(3):
producer=KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send("login",laGou())
producer.flush()
producer.close()
if __name__ == '__main__':
executor=ThreadPoolExecutor(max_workers=10)
executor.submit(sendData)
#关闭线程池
executor.shutdown()
下面具体可以看到生产者的监控的信息,具体如下:
在案例过程中进行批量的执行了多次,在多线程的方式中,只有我们数据的来源获取速度足够快,那么写入的速度也是非常快的,因为在实际的使用中,我们先去调用来源的数据,然后把这些数据获取到再连接Kafka把数据写入到Kafka的系统里面,比如案例中获取拉勾网的数据,这个过程是需要耗时的,那么获取来源的数据也是可以从单线程修改为多线程的方式批量的获取到数据然后实时的写入到Kafka的系统里面。
感谢您的阅读和关注,后续持续更新。如果您想系统全栈的学习基于Python语言的服务端自动化测试和客户端自动化测试,可点击如下链接购买书籍。