一、RabbitMQ简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
二、go-amqp
对于Go语言来说,rabbitMQ的开源实现,往往采用go-amqp,下面章节主要是对go-amqp的常用API做些简单介绍。
1.创建新连接函数 1)func Dial(url string) (*Connection,error)
功能描述:创建一个基于明文认证的Tcp新连接。
入参:url是AMQPURI的格式,例如:amqp://guest:guest@localhost:5672/qname
amqp:是传输协议。
guest:guest:登录RabbitMQ Server的账号密码均为guest。
localhost:5672: 地址是localhost:5672,表示的是需要连接的RabbitMQ的服务器地址。
qname:表示的是Virtual Hosts的名字,如果没有设置这个名字,可以不写。
英文:Dial accepts a string in the AMQP URI format and returns a new Connection over TCP using PlainAuth.
备注:这个连接的心跳默认值是10s,握手的最大时间为30s。这个函数与调用DialTLS(amqp, nil)是一样的。
英文:Defaults to a server heartbeat interval of 10 seconds and sets the handshake deadline to 30 seconds. After handshake, deadlines are cleared.
Dial uses the zero value of tls.Config when it encounters an amqps:// scheme. It is equivalent to calling DialTLS(amqp, nil).
2) func DialTLS(urlstring, amqps *tls.Config) (*Connection,error)
功能与1)一致都是为了创建一个新的连接,区别在于可以设置TLS相关的配置信息。
DialTLS accepts a string in the AMQP URI format and returns a new Connection over TCP using PlainAuth. Defaults to a server heartbeat interval of 10 seconds and sets the initial read deadline to 30 seconds.
DialTLS uses the provided tls.Config when encountering an amqps:// scheme.
2. 在一个connection上新建一个channel
func (c *Connection) Channel() (*Channel,error)
创建一个唯一的并发服务器channel去处理这些AMQP相关的消息。
Channel opens a unique, concurrent server channel to process the bulk of AMQP messages. Any error from methods on this receiver will render the receiver invalid and a new Channel should be opened.
3.关闭连接相关函数
func (c *Connection) Close()error // 关闭连接
func (c *Connection) IsClosed()bool // 判断连接conn是否处于关闭状态,true表示关闭
4. Exchange相关
1)func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table)error
声明一个exchange,没有的话会新建一个,存在的话校验。
英文:ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.
参数:
name: exchange的名称,格式为:以"amq."作为开头,字符包括字母,数字,连字符,下划线,.和:。(英文:Exchange names starting with "amq." are reserved for pre-declared and standardized exchanges. The client MAY declare an exchange starting with "amq." if the passive option is set, or the exchange already exists. Names can consist of a non-empty sequence of letters, digits, hyphen, underscore, period, or colon.)
kind: exchange的类型,direct、fanout、topic或者headers,一旦被设置之后,这个exchange中的该类型不可以更改。
durable:是否持久化,true表示是。持久化表示会把交换器的配置存盘,Server重启后,会自动加载交换器。备注:一旦durable被设置成true,与该exchange绑定的queue也需要是durable的。(Note: RabbitMQ declares the default exchange types like 'amq.fanout' as durable, so queues that bind to these pre-declared exchanges must also be durable.)
autoDelete:是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都与交换器解绑后,会自动删除此交换器。
internal:是否为内部交换器,true表示是。客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器。(英文:Exchanges declared as `internal` do not accept accept publishings. Internal exchanges are useful when you wish to implement inter-exchange topologies that should not be exposed to users of the broke.)
noWait:是否等收到ACK之后,才能继续执行,true表示不等待Ack。(英文:When noWait is true, declare without waiting for a confirmation from the server. The channel may be closed as a result of an error. Add a NotifyClose listener to respond to any exceptions.)
args:可选参数,一般填nil,主要用于函数扩展设计的。(英文:Optional amqp.Table of arguments that are specific to the server's implementation of the exchange can be sent for exchange types that require extra parameters.)
2)func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool)error
删除一个exchange,交换器被删除之后,与之绑定的queue也会变被删除,同时与exchange相关的channel也会返回失败。(英文:ExchangeDelete removes the named exchange from the server. When an exchange is deleted all queue bindings on the exchange are also deleted. If this exchange does not exist, the channel will be closed with an error.)
参数:
name: exchange的名字。
ifUnused:true的时候,在有queue绑定的时候不会被删除,没有queue绑定的时候才会删除。false的时候,不管queue绑定与否都删除掉。(英文:When ifUnused is true, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but close the channel with an exception instead. Set this to true if you are not the sole owner of the exchange.)
noWait:是否等待ACK之后,才会去删除exchange,true表示在没有收到ack确认,就可以之间删除,false则不需要等待ack。(英文:When noWait is true, do not wait for a server confirmation that the exchange has been deleted. Failing to delete the channel could close the channel. Add a NotifyClose listener to respond to these channel exceptions.)
5.Queue相关
1)func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue,error)
用来创建一个不存在的队列,或者确认已经存在的这个队列多对应的参数都是一致的。
英文:QueueDeclare declares a queue to hold messages and deliver to consumers. Declaring creates a queue if it doesn't already exist, or ensures that an existing queue matches the same parameters.
参数:
name: 希望生成的queue的名称,可以为空的,可以自己定义值。空的时候,会被自动生成一个名字,并且存储在返回值中。英文:The queue name may be empty, in which case the server will generate a unique name which will be returned in the Name field of Queue struct.
durable:是否持久化,true表示是。持久化表示会把交换器的配置存盘,Server重启后,会自动加载交换器。
autoDelete:是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都与交换器解绑后,会自动删除此交换器。
exclusive:只被一个连接(connection)使用,而且当连接关闭后队列即被删除。英文:Exclusive queues are only accessible by the connection that declares them and will be deleted when the connection closes. Channels on other connections will receive an error when attempting to declare, bind, consume, purge or delete a queue with the same name.
noWait:是否等收到ACK之后,才能继续执行,true表示不等待Ack。
args:可选参数,一般填nil,主要用于函数扩展设计的(一些消息代理用他来完成类似与 TTL 的某些额外功能)。
2)func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table)error
建立Exchange与Queue之间的绑定关系。英文:QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.
参数:
name:需要绑定的队列名称。
key: exchange转发到queue所使用的规则符。
exchange:需要绑定的路由exchange的名称。
noWait:是否等收到ACK之后,才能继续执行,true表示不等待Ack。
args:可选参数,一般填nil,主要用于函数扩展设计的(一些消息代理用他来完成类似与 TTL 的某些额外功能)。
3)func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
英文:Qos controls how many messages or how many bytes the server will try to keep on the network for consumers before receiving delivery acks. The intent of Qos is to make sure the network buffers stay full between the server and client.
With a prefetch count greater than zero, the server will deliver that many messages to consumers before acknowledgments are received. The server ignores this option when consumers are started with noAck because no acknowledgments are expected or sent.
With a prefetch size greater than zero, the server will try to keep at least that many bytes of deliveries flushed to the network before receiving acknowledgments from the consumers. This option is ignored when consumers are started with noAck.
When global is true, these Qos settings apply to all existing and future consumers on all channels on the same connection. When false, the Channel.Qos settings will apply to all existing and future consumers on this channel.
6. 消息处理
1)func (d Delivery) Ack(multiple bool) error
英文:Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery.
All deliveries in AMQP must be acknowledged. If you called Channel.Consume with autoAck true then the server will be automatically ack each message and this method should not be called. Otherwise, you must call Delivery.Ack after you have successfully processed this delivery.
When multiple is true, this delivery and all prior unacknowledged deliveries on the same channel will be acknowledged. This is useful for batch processing of deliveries.
2) func (d Delivery) Nack(multiple, requeue bool) error
英文:Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server.
When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel.
When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue.
This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time.
三、参考文档:
RabbitMQ的go语言驱动:https://github.com/streadway/amqp
RabbitMQ里面的常用API介绍:
https://godoc.org/github.com/streadway/amqp#Dial