AMQP-RabbitMQ/1/概念/一对一简单模型

2020-06-29 12:09:12 浏览数 (1)

# JMS, AMQP, MQTT的区别与联系

  • JMS Java消息传递服务(Java Messaging Service )
  • AMQP 高级消息队列协议(Advanced Message Queueing Protocol
  • MQTT 消息队列遥测传输(Message Queueing Telemetry Transport )

简单理解:

  • JMS是专门为Java设计的一套消息服务API,像 ActiveMQ就是对它的实现
  • AMQP为了解决不同平台之间的通信问题,定义了一种名为 amqp通信协议,从而实现平台和语言无关性。
  • MQTT也是一种通信协议。相比于AMQP的复杂性,它简单的多。所以 amqp用于处理相对较重的任务,如两个系统平台之间的消息传输。而 mqtt因为非常轻量,所以大量应用于物联网

# RabbitMQ常用命令

  • 修改密码 rabbitmqctl change_passwordUsernameNewpassword
  • 显示所有用户 rabbitmqctl list_users
  • 启动 rabbitmq-server start
  • 关闭 rabbitmqctl stop
  • homebrew安装配置文件地址 /usr/local/etc/rabbitmq/rabbitmq-env.conf
  • 管理平台地址 http://localhost:15672/

1. 简单模型

The simplest thing that does something

  • 模型图示
  • 生产者
代码语言:javascript复制
package com.futao.springmvcdemo.mq.rabbit.simple;
 


 
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
 
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
 
import com.rabbitmq.client.Channel;
 
import com.rabbitmq.client.Connection;
 
import lombok.Cleanup;
 
import lombok.SneakyThrows;
 
import lombok.extern.slf4j.Slf4j;
 


 
/**
 
 * 简单发送者
 
 *
 
 * @author futao
 
 * Created on 2019-04-22.
 
 */
 
@Slf4j
 
public class Send {
 
 @SneakyThrows
 
 public static void main(String[] args) {
 
 @Cleanup
 
 Connection connection = RabbitMqConnectionTools.getConnection();
 
 @Cleanup
 
 Channel channel = connection.createChannel();
 
 //定义一个队列
 
        channel.queueDeclare(RabbitMqQueueEnum.SIMPLE.getQueueName(), false, false, false, null);
 
 String msg = "Hello RabbitMq!";
 
        channel.basicPublish("", RabbitMqQueueEnum.SIMPLE.getQueueName(), null, msg.getBytes());
 
        log.info("Send msg:[{}] success", msg);
 
 }
 
}
 
  • 消费者

代码语言:javascript复制
package com.futao.springmvcdemo.mq.rabbit.simple;
 


 
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
 
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
 
import com.rabbitmq.client.Channel;
 
import com.rabbitmq.client.DeliverCallback;
 
import lombok.SneakyThrows;
 
import lombok.extern.slf4j.Slf4j;
 


 
/**
 
 * 简单消费者
 
 *
 
 * @author futao
 
 * Created on 2019-04-22.
 
 */
 
@Slf4j
 
public class Recv {
 
 @SneakyThrows
 
 public static void main(String[] args) {
 
 Channel channel = RabbitMqConnectionTools.getChannel();
 
        channel.queueDeclare(RabbitMqQueueEnum.SIMPLE.getQueueName(), false, false, false, null);
 
        log.info("Waiting for message...");
 
 DeliverCallback deliverCallback = ((consumerTag, message) -> {
 
            log.info("收到消息:[{}],tag:[{}]", new java.lang.String(message.getBody()), consumerTag);
 
 });
 
        channel.basicConsume(RabbitMqQueueEnum.SIMPLE.getQueueName(), true, deliverCallback, consumerTag -> {
 
 });
 
 }
 
}
 
  • 特点:一对一。一个生产者,一个消费者。

# 通用代码

  • 工具类 - 链接工厂

代码语言:javascript复制
package com.futao.springmvcdemo.mq.rabbit;
 


 
import com.rabbitmq.client.Channel;
 
import com.rabbitmq.client.Connection;
 
import com.rabbitmq.client.ConnectionFactory;
 
import lombok.SneakyThrows;
 


 
import java.io.IOException;
 
import java.util.concurrent.TimeoutException;
 


 
/**
 
 * rabbitMq配置类
 
 *
 
 * @author futao
 
 * Created on 2019-04-19.
 
 */
 
public class RabbitMqConnectionTools {
 
 /**
 
     * 获取链接
 
     *
 
     * @return
 
     * @throws IOException
 
     * @throws TimeoutException
 
     */
 
 public static Connection getConnection() {
 
 try {
 
 return getConnectionFactory().newConnection();
 
 } catch (IOException | TimeoutException e) {
 
            e.printStackTrace();
 
 throw new RuntimeException(e.getMessage(), e);
 
 }
 
 }
 


 
 /**
 
     * 连接工厂
 
     *
 
     * @return
 
     */
 
 private static ConnectionFactory getConnectionFactory() {
 
 ConnectionFactory factory = new ConnectionFactory();
 
        factory.setHost("localhost");
 
        factory.setPort(5672);
 
        factory.setUsername("futao");
 
        factory.setPassword("123456");
 
        factory.setVirtualHost("/springmvc");
 
 return factory;
 
 }
 


 
 /**
 
     * 创建并获取通道
 
     *
 
     * @return
 
     */
 
 @SneakyThrows
 
 public static Channel getChannel() {
 
 Connection connection = RabbitMqConnectionTools.getConnection();
 
 return connection.createChannel();
 
 }
 
}
 
  • 枚举类 - RabbitMqQueueEnum队列名称

代码语言:javascript复制
package com.futao.springmvcdemo.mq.rabbit;
 


 
import lombok.AllArgsConstructor;
 
import lombok.Getter;
 


 
/**
 
 * queue名称枚举
 
 *
 
 * @author futao
 
 * Created on 2019-04-19.
 
 */
 
@Getter
 
@AllArgsConstructor
 
public enum RabbitMqQueueEnum {
 


 
 /**
 
     * 简单queue
 
     */
 
    SIMPLE("simple-queue"),
 
 /**
 
     * WorkQueue 工作队列
 
     */
 
    WORK_QUEUE("work-queue"),
 
 /**
 
     * 发布订阅-fanout
 
     */
 
    EXCHANGE_QUEUE_FANOUT_ONE("exchange-queue-fanout-1"),
 


 
    EXCHANGE_QUEUE_FANOUT_TWO("exchange-queue-fanout-2"),
 


 
    EXCHANGE_QUEUE_DIRECT_ONE("exchange-queue-direct-1"),
 
    EXCHANGE_QUEUE_DIRECT_TWO("exchange-queue-direct-2"),
 


 
    EXCHANGE_QUEUE_TOPIC_ONE("exchange-queue-topic-1"),
 
    EXCHANGE_QUEUE_TOPIC_TWO("exchange-queue-topic-2"),
 
    EXCHANGE_QUEUE_TOPIC_THREE("exchange-queue-topic-3"),
 
    EXCHANGE_QUEUE_TOPIC_FOUR("exchange-queue-topic-4"),
 


 
 /**
 
     * RPC队列
 
     */
 
    RPC_QUEUE("rpc-queue");
 


 


 
 /**
 
     * queue名称
 
     */
 
 private String queueName;
 
}
 

0 人点赞