RabbitMQ 基本概念入门

2022-01-20 10:27:06 浏览数 (2)

1 主流消息队列对比

  • Kafka
    • 优点:诞生于大数据时代,最主流。高吞吐量,速度快;横向扩展方便
    • 缺点:搭建复杂,线上容易出问题,运维成本高
  • Pulsar
    • 优点:速度快,适配云原生
    • 缺点:复杂度不亚于Kafka
  • RocketMQ
    • 阿里开源
  • RabbitMQ
    • 优点:轻量级,但可组合出高级功能;消息推送速度快;部署简单
    • 缺点:集群部署会影响吞吐量和速度
主流消息队列对比主流消息队列对比

2 RabbitMQ 基础概念

2.1 AMQP 模型概览

AMQP ModelAMQP Model
  • 生产者/发布者(Producer/Publisher)消息(message)发送给 exchange
  • exchange根据规则(binding){routing key}消息(message)分发给队列(queue)
  • broker消息(message)投递给队列(queue)的订阅者(subscriber)
  • 或者消费者(consumer)队列(queue)拉取(fetch/pull)所需要的消息(message)

2.2 Exchanges

RabbitMQ中,一个生产者不会直接把消息发送给队列,而是发送到exchange,然后exchange作为路由中介者,根据exchange typebinding,决定将消息分发给零个或多个队列。

RabbitMQ提供四种exchange type

  • Direct - the exchange forwards the message to a queue based on a routing key
  • Fanout - the exchange ignores the routing key and forwards the message to all bounded queues
  • Topic - the exchange routes the message to bounded queues using the match between a pattern defined on the exchange and the routing keys attached to the queues
  • Headers - in this case, the message header attributes are used, instead of the routing key, to bind an exchange to one or more queues

除了exchange type, exchange还有一些重要attribute值得关注:

  • Name
  • Durability:durable/transient:Durable exchanges survive broker restart whereas transient exchanges do not (they have to be redeclared when broker comes back online)
  • Auto-delete:exchange is deleted when last queue is unbound from it
  • Arguments:optional, used by plugins and broker-specific features

2.2.1 Default Exchange

default exchange是由broker预先声明的没有name的direct exchange。其有一个很有用的特性:每个queue都会自动与其绑定,且routing keyqueue name

For example, when you declare a queue with the name of "search-indexing-online", the AMQP 0-9-1 broker will bind it to the default exchange using "search-indexing-online" as the routing key (in this context sometimes referred to as the binding key). Therefore, a message published to the default exchange with the routing key "search-indexing-online" will be routed to the queue "search-indexing-online". In other words, the default exchange makes it seem like it is possible to deliver messages directly to queues, even though that is not technically what is happening.

2.2.2 Direct Exchange

direct exchange 基于message routing key将消息发送给queue

  • A queue binds to the exchange with a routing key K
  • When a new message with routing key R arrives at the direct exchange, the exchange routes it to the queue if K = R

Direct exchanges are often used to distribute tasks between multiple workers (instances of the same application) in a round robin manner. When doing so, it is important to understand that, in AMQP 0-9-1, messages are load balanced between consumers and not between queues.

2.2.3 Fanout Exchange

fanout exchange 将消息发送给所有绑定的queue,忽略routing key

Massively multi-player online (MMO) games can use it for leaderboard updates or other global events

Sport news sites can use fanout exchanges for distributing score updates to mobile clients in near real-time

Distributed systems can broadcast various state and configuration updates

Group chats can distribute messages between participants using a fanout exchange (although AMQP does not have a built-in concept of presence, so XMPP may be a better choice)

2.2.4 Topic Exchange

topic exchange 基于message routing keyqueue 绑定 exchange 时设置的 pattern是否匹配,将消息发送给一个或多个queueTopic exchange经常用来实现发布订阅模型及其各种变体

Distributing data relevant to specific geographic location, for example, points of sale

Background task processing done by multiple workers, each capable of handling specific set of tasks

Stocks price updates (and updates on other kinds of financial data)

News updates that involve categorization or tagging (for example, only for a particular sport or team)

Orchestration of services of different kinds in the cloud

Distributed architecture/OS-specific software builds or packaging where each builder can handle only one architecture or OS

2.2.5 Headers Exchange

A headers exchange is designed for routing on multiple attributes that are more easily expressed as message headers than a routing key. Headers exchanges ignore the routing key attribute. Instead, the attributes used for routing are taken from the headers attribute. A message is considered matching if the value of the header equals the value specified upon binding.

It is possible to bind a queue to a headers exchange using more than one header for matching. In this case, the broker needs one more piece of information from the application developer, namely, should it consider messages with any of the headers matching, or all of them? This is what the "x-match" binding argument is for. When the "x-match" argument is set to "any", just one matching header value is sufficient. Alternatively, setting "x-match" to "all" mandates that all the values must match.

Headers exchanges can be looked upon as "direct exchanges on steroids". Because they route based on header values, they can be used as direct exchanges where the routing key does not have to be a string; it could be an integer or a hash (dictionary) for example.

Note that headers beginning with the string x- will not be used to evaluate matches.

2.3 Queues

In addition, when creating a queue, we can define several properties of the queue:

  • Name:the name of the queue. If not defined, the broker will generate one
  • Durable:the queue will survive a broker restart
  • Exclusive :if enabled, the queue will only be used by one connection and will be removed when the connection is closed (used by only one connection and the queue will be deleted when that connection closes)
  • Auto-delete :if enabled, the broker deletes the queue when the last consumer unsubscribes(queue that has had at least one consumer is deleted when last consumer unsubscribes)
  • Optional arguments:used by plugins and broker-specific features such as message TTL, queue length limit, etc

Before a queue can be used it has to be declared. Declaring a queue will cause it to be created if it does not already exist. The declaration will have no effect if the queue does already exist and its attributes are the same as those in the declaration. When the existing queue attributes are not the same as those in the declaration a channel-level exception with code 406 (PRECONDITION_FAILED) will be raised.

queues can be declared as durable or transient. Metadata of a durable queue is stored on disk, while metadata of a transient queue is stored in memory when possible.

2.4 Bindings

Exchanges use bindings to route messages to specific queues.

Sometimes, they have a routing key attached to them, used by some types of exchanges to filter specific messages and route them to the bounded queue.

Finally, let's bind the queue that we created to the exchange using a routing key:

bindingexchange用来分发message到各个queue的路由规则。

Bindings are rules that exchanges use (among other things) to route messages to queues. To instruct an exchange E to route messages to a queue Q, Q has to be bound to E. Bindings may have an optional routing key attribute used by some exchange types. The purpose of the routing key is to select certain messages published to an exchange to be routed to the bound queue. In other words, the routing key acts like a filter.

To draw an analogy:

  • Queue is like your destination in New York city
  • Exchange is like JFK airport
  • Bindings are routes from JFK to your destination. There can be zero or many ways to reach it

Having this layer of indirection enables routing scenarios that are impossible or very hard to implement using publishing directly to queues and also eliminates certain amount of duplicated work application developers have to do.

If a message cannot be routed to any queue (for example, because there are no bindings for the exchange it was published to) it is either dropped or returned to the publisher, depending on message attributes the publisher has set.

2.5 Consumers

Storing messages in queues is useless unless applications can consume them. In the AMQP 0-9-1 Model, there are two ways for applications to do this:

  • Subscribe to have messages delivered to them ("push API"): this is the recommended option
  • Polling ("pull API"): this way is highly inefficient and should be avoided in most cases

With the "push API", applications have to indicate interest in consuming messages from a particular queue. When they do so, we say that they register a consumer or, simply put, subscribe to a queue. It is possible to have more than one consumer per queue or to register an exclusive consumer (excludes all other consumers from the queue while it is consuming).

Each consumer (subscription) has an identifier called a consumer tag. It can be used to unsubscribe from messages. Consumer tags are just strings.

  1. A Connection represents a real TCP connection to the message broker, whereas a Channel is a virtual connection (AMQP connection) inside it. This way you can use as many (virtual) connections as you want inside your application without overloading the broker with TCP connections.
  2. You can use one Channel for everything. However, if you have multiple threads, it's suggested to use a different Channel for each thread. Channel thread-safety in Java Client API Guide: Channel instances are safe for use by multiple threads. Requests into a Channel are serialized, with only one thread being able to run a command on the Channel at a time. Even so, applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads. There is no direct relation between Channel and Queue. A Channel is used to send AMQP commands to the broker. This can be the creation of a queue or similar, but these concepts are not tied together.
  3. Each Consumer runs in its own thread allocated from the consumer thread pool. If multiple Consumers are subscribed to the same Queue, the broker uses round-robin to distribute the messages between them equally. See Tutorial two: "Work Queues". It is also possible to attach the same Consumer to multiple Queues. You can understand Consumers as callbacks. These are called everytime a message arrives on a Queue the Consumer is bound to. For the case of the Java Client, each Consumers has a method handleDelivery(...), which represents the callback method. What you typically do is, subclass DefaultConsumer and override handleDelivery(...). Note: If you attach the same Consumer instance to multiple queues, this method will be called by different threads. So take care of synchronization if necessary.

Acknowledgement

Networks are unreliable and applications may fail to process messages therefore the AMQP 0-9-1 model has a notion ofmessage acknowledgements: when a message is delivered to a consumer the consumernotifies the broker, either automatically or as soon as the application developer chooses to do so. When message acknowledgements are in use, a broker will only completely remove a message from a queue when it receives a notification for that message (or group of messages).

In certain situations, for example, when a message cannot be routed, messages may bereturnedto publishers, dropped, or, if the broker implements an extension, placed into a so-called "dead letter queue". Publishers choose how to handle situations like this by publishing messages using certain parameters.

0 人点赞