1、安装RabbitMQ
1)下载和安装erlang
下载erlang
代码语言:javascript复制wget http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el6.x86_64.rpm
安装erlang,root用户使用rpm安装
代码语言:javascript复制rpm -ihv erlang-18.1-1.el6.x86_64.rpm
2)下载和安装RabbitMQ
下载RabbitMQ
代码语言:javascript复制wget https://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_6_12/rabbitmq-server-3.6.12-1.el6.noarch.rpm
安装RabbitMQ,root用户使用rpm安装
代码语言:javascript复制rpm -ihv rabbitmq-server-3.6.12-1.el6.noarch.rpm
一般来说不会有什么问题,如果安装RabbitMQ过程中遇到如下错误,清空rpmdb然后重试。
我遇到的问题如下是
代码语言:javascript复制[root@bigdata-arch-client11 yangfan]# rpm -ihv erlang-18.1-1.el6.x86_64.rpm
rpmdb: Thread/process 72423/139858093815712 failed: Thread died in Berkeley DB library
error: db3 error(-30974) from dbenv->failchk: DB_RUNRECOVERY: Fatal error, run database recovery
error: cannot open Packages index using db3 - (-30974)
error: cannot open Packages database in /var/lib/rpm
rpmdb: Thread/process 72423/139858093815712 failed: Thread died in Berkeley DB library
error: db3 error(-30974) from dbenv->failchk: DB_RUNRECOVERY: Fatal error, run database recovery
error: cannot open Packages database in /var/lib/rpm
百度了一下,请按顺序执行,然后重试安装。
代码语言:javascript复制rm -f /var/lib/rpm/__db*
rpm --rebuilddb
yum clean all
当安装完成之后,可以使用缺省配置启动一下,如果打印如下,那么安装就成功了。
代码语言:javascript复制[root@bigdata-arch-client11 yangfan]# rabbitmq-server
RabbitMQ 3.6.12. Copyright (C) 2007-2017 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit@bigdata-arch-client11.log
###### ## /var/log/rabbitmq/rabbit@bigdata-arch-client11-sasl.log
##########
Starting broker...
completed with 0 plugins.
2、配置RabbitMQ
1)创建RabbitMQ账号
rabbitmqctl add_user admin bigdata123
代码语言:javascript复制[root@bigdata-arch-client11 yangfan]# rabbitmqctl add_user admin bigdata123
Creating user "admin"
2)将admin账号赋予管理员权限
rabbitmqctl set_user_tags admin administrator
代码语言:javascript复制[root@bigdata-arch-client11 yangfan]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator]
3)设置权限
rabbitmqctl set_permissions -p '/' admin '.' '.' '.'
代码语言:javascript复制[root@bigdata-arch-client09 ~]# rabbitmqctl set_permissions -p '/' admin '.' '.' '.'
Setting permissions for user "admin" in vhost "/"
4)启用web管理插件
rabbitmq-plugins enable rabbitmq_management
代码语言:javascript复制[root@bigdata-arch-client11 yangfan]# rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
amqp_client
cowlib
cowboy
rabbitmq_web_dispatch
rabbitmq_management_agent
rabbitmq_management
Applying plugin configuration to rabbit@bigdata-arch-client11... started 6 plugins.
这样你可以通过web页面观察rabbitmq的status,端口号是15672,例如http://ip:15672
3、配置RabbitMQ集群
我们这里会展示如何配置一个RabbitMQ集群,集群由以下节点组成。
要保证集群在同一个局域网,IP能通。
1)安装好RabbitMQ
安装方法同上文。
2)保证相同的Erlang Cookie
我这里是把client09上的.erlang.cookie以scp的方式拷贝到另外两台机器。
代码语言:javascript复制[root@bigdata-arch-client09 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@10.93.18.34:/var/lib/rabbitmq
[root@bigdata-arch-client09 ~]# scp /var/lib/rabbitmq/.erlang.cookie rootr@10.93.21.21:/var/lib/rabbitmq
3)运行各个RabbitMQ节点
代码语言:javascript复制rabbitmqctl stop
rabbitmq-server -detached
运行成功后可以查看一下节点当前的集群状态,当然这个时候还没有组成集群。
代码语言:javascript复制[root@bigdata-arch-client09 ~]# rabbitmqctl cluster_status
[root@bigdata-arch-client10 ~]# rabbitmqctl cluster_status
[root@bigdata-arch-client11 ~]# rabbitmqctl cluster_status
4)将节点连接成集群
client10:加入到集群rabbit@bigdata-arch-client09
代码语言:javascript复制[root@bigdata-arch-client10 ~]#rabbitmqctl stop_app
[root@bigdata-arch-client10 ~]#rabbitmqctl join_cluster rabbit@bigdata-arch-client09
[root@bigdata-arch-client10 ~]#rabbitmqctl start_app
client11:加入到集群rabbit@bigdata-arch-client09
代码语言:javascript复制[root@bigdata-arch-client11 ~]#rabbitmqctl stop_app
[root@bigdata-arch-client11 ~]#rabbitmqctl join_cluster rabbit@bigdata-arch-client09
[root@bigdata-arch-client11 ~]#rabbitmqctl start_app
client09:不用加入自己
查看集群状态,我们可以在任意一台机器上查看,我们选择在client09上看。
代码语言:javascript复制[root@bigdata-arch-client09 ~]# rabbitmqctl cluster_status
Cluster status of node 'rabbit@bigdata-arch-client09'
[{nodes,[{disc,['rabbit@bigdata-arch-client09',
'rabbit@bigdata-arch-client11',
'rabbit@bigdata-arch-client10']}]},
{running_nodes,['rabbit@bigdata-arch-client10',
'rabbit@bigdata-arch-client11',
'rabbit@bigdata-arch-client09']},
{cluster_name,<<"rabbit@bigdata-arch-client09.xg01">>},
{partitions,[]},
{alarms,[{'rabbit@bigdata-arch-client10',[]},
{'rabbit@bigdata-arch-client09',[]},
{'rabbit@bigdata-arch-client11', []}]}]
可以看到,3个实例已经组成了集群。
5)试一下容错
我们关掉client10上的实例
代码语言:javascript复制[root@bigdata-arch-client10 ~]# rabbitmqctl stop
然后我们再看集群情况
代码语言:javascript复制[root@bigdata-arch-client09 ~]# rabbitmqctl cluster_status
Cluster status of node 'rabbit@bigdata-arch-client09'
[{nodes,[{disc,['rabbit@bigdata-arch-client09',
'rabbit@bigdata-arch-client11']}]},
{running_nodes,['rabbit@bigdata-arch-client11',
'rabbit@bigdata-arch-client09']},
{cluster_name,<<"rabbit@bigdata-arch-client11.xg01">>},
{partitions,[]},
{alarms,[{'rabbit@bigdata-arch-client11',[]},
{'rabbit@bigdata-arch-client09',[]}]}]
可以发现client10已经成功摘除。
4、HA配置
我们使用haproxy来代理配置高可用。
haproxy可以用来做代理,进行负载均衡和backend探活。支持TCP和HTTP模式。
关于haproxy的内容就不展开说了。
这里仅仅给出配置。
代码语言:javascript复制########tcp配置#################
listen rabbitmq
bind 10.93.21.21:5077
mode tcp
option tcplog #日志类别,采用tcplog
maxconn 4086
#log 127.0.0.1 local0 debug
server rabbit1 10.93.18.34:5672 maxconn 1024 weight 1 check inter 2000 rise 2 fall 3
server rabbit2 10.93.18.35:5672 maxconn 1024 weight 1 check inter 2000 rise 2 fall 3
server rabbit3 10.93.21.21:5672 maxconn 1024 weight 1 check inter 2000 rise 2 fall 3
实验一下,下面是实验验证的程序,你可以挂掉一个实例试试。
send.py
代码语言:javascript复制# -*- coding:utf-8 -*-
import pika
credentials = pika.PlainCredentials('admin','bigdata123')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'10.93.21.21',5077, '/', credentials))
channel = connection.channel()
# 声明queue
channel.queue_declare(queue='balance')
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='balance',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
receive.py
代码语言:javascript复制# _*_coding:utf-8_*_
import pika
credentials = pika.PlainCredentials('admin','bigdata123')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'10.93.21.21',5077,'/',credentials))
channel = connection.channel()
# You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
# was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='balance')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='balance',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL C')
channel.start_consuming()