我们直接使用docker来安装RabbitMq,安装的命令为:
代码语言:javascript复制docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
执行后,就会进行下载images以及进行安装,具体如下:
代码语言:javascript复制3-management: Pulling from library/rabbitmq
345e3491a907: Pull complete
57671312ef6f: Pull complete
5e9250ddb7d0: Pull complete
87dda86b6fa0: Pull complete
848ffb5f028a: Pull complete
90a071418b65: Pull complete
c19f691510d5: Pull complete
7aef9f2fae06: Pull complete
a5a33e63d14b: Pull complete
68655d63808f: Pull complete
13b25112878a: Pull complete
e3506709d158: Pull complete
6f81b4a80ae6: Pull complete
Digest: sha256:f0be9e47ec42081a36593dfc6604274a623caed074fc043e0a927fbd1533dc20
Status: Downloaded newer image for rabbitmq:3-management
Configuring logger redirection
下来就会自动安装以及安装相应的插件,监听的端口是5672和15672,如下所示:
在浏览器输入http://localhost:15672就可以访问RabbitMq的WEB管理界面了(登录账户和密码都是guest),如下所示:
安装成功后,使用Python来对RabbitMq来进行操作,需要安装第三方的库pika,安装命令为:pip3 install pika。下面使用生产者消费者的模式来实现数据的传输,也就是说生产者发送数据,消费者接收到数据后把接收的数据输出,这地方使用的队列是“hello”,生产者的代码为:
代码语言:javascript复制#!/usr/bin/env python
#!coding:utf-8
import pika
def send():
'''生产者发送数据'''
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
#创建队列es,发送数据:Hello RabbitMQ
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello RabbitMQ')
print('send data: Hello RabbitMQ')
connection.close()
if __name__ == '__main__':
send()
消费者的代码为:
代码语言:javascript复制#!/usr/bin/env python
#!coding:utf-8
import pika,sys,os
def recv():
'''消费者接收数据'''
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
#设置回调函数
def callback(ch,method,properties,body):
print('recv data:{0}'.format(body))
channel.basic_consume(
queue='hello',
on_message_callback=callback,
auto_ack=True)
channel.start_consuming()
if __name__ == '__main__':
try:
recv()
except Exception as e:
print(e.args)
try:
sys.exit(0)
except SystemExit:
os._exit(0)
下来先启动消费者的应用程序,然后再执行生产者的应用程序,消费者的应用程序就会接收到生产者发送的数据了,不过接收到的数据是byte的类型,如下所示:
下面我们实现获取第三方的数据,然后通过生产者发送给消费者,第三方的数据主要是获取拉勾网测试开发职位搜索后服务端返回的响应数据,生产者完整的代码如下:
代码语言:javascript复制#!/usr/bin/env python
#!coding:utf-8
import pika,requests
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
def laGou(page):
'''
获取拉勾网搜索后的数据
:param page: 页数
:return:
'''
r=requests.post(
url='https://www.lagou.com/jobs/positionAjax.json?needAddtionalResult=false',
data={'first':False,'pn':page,'kd':'测试开发工程师','sid':'850031016ddf4030a88f6754e5dc006a'},
headers={
'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.114 Safari/537.36',
'Content-Type':'application/x-www-form-urlencoded; charset=UTF-8',
'referer':'https://www.lagou.com/jobs/list_测试开发工程师?labelWords=&fromSearch=true&suginput=',
'cookie':'RECOMMEND_TIP=true; user_trace_token=20210612193812-7056f0f1-562c-4760-8805-d4a8d9695e91; LGUID=20210612193812-43079adb-23ec-41b5-84f3-86a3fbaac457; privacyPolicyPopup=false; _ga=GA1.2.2098744891.1623497893; _gid=GA1.2.181935330.1623497893; index_location_city=全国; __lg_stoken__=050b9c14216caaecf04f05a60eed1dd07b757a309a7718fd029be51146e09231eff343e0b453238b6ce814f69fa791e274cc714224da0dec17d70601ab51948dea31c3307b43; JSESSIONID=ABAAABAABAGABFA30412D5E1F0603D2F0DE5D2F5A13BE14; WEBTJ-ID=20210613上午11:37:34113734-17a037236a56d7-025fa61b07ca0f-1f386255-1296000-17a037236a6f2f; PRE_UTM=; PRE_HOST=; PRE_LAND=https://www.lagou.com/; LGSID=20210613113734-a42644fe-596f-45b7-b3ed-1cb51c7dec45; PRE_SITE=https://www.lagou.com; sensorsdata2015session={}; _gat=1; Hm_lvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1623497893,1623555455; TG-TRACK-CODE=index_search; SEARCH_ID=944ca4ca8c6540c492cecffa9752ddf9; X_HTTP_TOKEN=f9bc5bdf02d9bd4e0645553261bc88f238facccde5; sensorsdata2015jssdkcross={"distinct_id":"17a0003e51c1f-044ed69b3fd283-1f386255-1296000-17a0003e51d5d2","first_id":"","props":{"$latest_traffic_source_type":"直接流量","$latest_search_keyword":"未取到值_直接打开","$latest_referrer":"","$os":"MacOS","$browser":"Chrome","$browser_version":"91.0.4472.77"},"$device_id":"17a0003e51c1f-044ed69b3fd283-1f386255-1296000-17a0003e51d5d2"}; Hm_lpvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1623555460; LGRID=20210613113740-41bfbd2f-f701-481e-8cab-cfb6379a6c6e'
})
return r.text
def send():
'''生产者发送数据'''
for i in range(1,6):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
#创建队列es,发送数据:Hello RabbitMQ
channel.basic_publish(
exchange='',
routing_key='hello',
body=laGou(page=i))
connection.close()
if __name__ == '__main__':
send()
executor=ThreadPoolExecutor(max_workers=5)
executor.submit(send)
#关闭线程池
executor.shutdown()
执行后,消费者就会获取到发送的数据,如下所示:
见队列的截图信息:
可以看到未被消费的数据条数(多般是消费者的应用被堵塞等情况)。