java rabbitmq模拟生产者,消费者demo

2024-10-09 08:50:14 浏览数 (7)

1.exchange类型,rabbitmq交换机类型

exchange类型 fanout 扇形交换机,它会把所有发送到该交换机的消息路由到所有与该交换机绑定的队列中。 direct 直连交换机,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中。 topic 主题交换机,与direct类似,但它可以通过通配符进行模糊匹配,topic模糊匹配 井号代表全部 星号代表一个字符。 headers 头交换机,不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配,headers类型的交换机性能很差,而且也不实用。

2.

代码语言:javascript复制
EndPoint.java 定义连接,channel,队列,绑定交换机和路由键
代码语言:javascript复制
Producer.java 生产者 发送消息,基于交换机,路由键来发送
代码语言:javascript复制
QueueConsumer.java 消费者 基于队列来接收消息
代码语言:javascript复制
MqTest.java MQ测试类
代码语言:javascript复制
package com.redis.demo.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

public abstract class EndPoint {

    protected Channel channel;
    protected Connection connection;
    protected String endPointName;

    public EndPoint(String endpointName) throws Exception {
        this.endPointName = endpointName;
        //Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();

        //hostname of your rabbitmq server
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");

        //getting a connection
        connection = factory.newConnection();

        //creating a channel
        channel = connection.createChannel();

        //declaring a queue for this channel. If queue does not exist,
        //it will be created on the server.
        //定义队列
        channel.queueDeclare(endpointName, false, false, false, null);

        //exchange
//        channel.exchangeDeclare("fanout_test","fanout");
//        //rount-key 空, 绑定队列
//        channel.queueBind(endpointName,"fanout_test","");

        //direct,topic
//        channel.exchangeDeclare("direct_test","direct");
//        channel.queueBind(endpointName,"direct_test","com.login.message");
//
        channel.exchangeDeclare("topic_test","topic");
//        channel.queueBind(endpointName,"topic_test","com.order.*");
        channel.queueBind(endpointName,"topic_test","com.order.#");
    }

    public void close() throws Exception{
        this.channel.close();
        this.connection.close();
    }

}
代码语言:javascript复制
package com.redis.demo.rabbitmq;

import org.springframework.util.SerializationUtils;

import java.io.Serializable;

public class Producer extends EndPoint{

    public Producer(String endpointName) throws Exception {
        super(endpointName);
    }

    /**
     * 发送消息
     * @param object
     * @throws Exception
     */
    public void sendMessage(Serializable object) throws Exception {
        //发送到队列
//        channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));

        //发送rount-key  direct
//        channel.basicPublish("direct_test","com.login.message",null,SerializationUtils.serialize(object));

        //发送,群发各个业务线,匹配规则
        channel.basicPublish("topic_test","com.order.create.success",null,SerializationUtils.serialize(object));
//        channel.basicPublish("topic_test","com.order.sms",null,SerializationUtils.serialize(object));
//        channel.basicPublish("topic_test","com.order.pay.ok",null,SerializationUtils.serialize(object));

    }
}
代码语言:javascript复制
package com.redis.demo.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import org.springframework.util.SerializationUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class QueueConsumer extends EndPoint implements Runnable, Consumer {
    public QueueConsumer(String endpointName) throws Exception {
        super(endpointName);
    }

    @Override
    public void handleConsumeOk(String consumerTag) {
        System.out.println("Consumer " consumerTag  " registered");
    }

    @Override
    public void handleCancelOk(String s) {

    }

    @Override
    public void handleCancel(String s) throws IOException {

    }

    @Override
    public void handleShutdownSignal(String s, ShutdownSignalException e) {

    }

    @Override
    public void handleRecoverOk(String s) {
        System.out.println("handleRecoverOk " s  " registered");
    }

    @Override
    public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
        System.out.println("handleDelivery " s  " registered, type="   basicProperties.getType());
        Map map = (HashMap) SerializationUtils.deserialize(body);
        System.out.println("Message Number "  map.get("message number")   " received.");

    }

    @Override
    public void run() {
        try {
            //start consuming messages. Auto acknowledge messages.
            channel.basicConsume(endPointName, true,this);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
代码语言:javascript复制
package com.redis.demo.rabbitmq;

import java.util.HashMap;

public class MqTest {
    public static void main(String[] args) throws Exception{
        //启动线程接收消息
        QueueConsumer consumer = new QueueConsumer("queue");
        Thread consumerThread = new Thread(consumer);
        consumerThread.start();
        System.out.println("消费端启动完成===============");

        Producer producer = new Producer("queue");

        //发送消息
        for (int i = 0; i < 10; i  ) {
            HashMap message = new HashMap();
            message.put("message number", i);
            //发送map对象
            producer.sendMessage(message);
            System.out.println("Message Number "  i  " send success.");
        }

    }
}

 3.pom jar导入

代码语言:javascript复制
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.2</version>
</dependency>
代码语言:javascript复制

0 人点赞